1#[cfg(test)]
16mod tests {
17 use risingwave_meta_model::table::HandleConflictBehavior;
18 use risingwave_pb::catalog::StreamSourceInfo;
19 use risingwave_pb::catalog::subscription::SubscriptionState;
20 use tokio::sync::oneshot;
21
22 use crate::controller::catalog::*;
23
24 const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
25 const TEST_SCHEMA_ID: SchemaId = SchemaId::new(2);
26 const TEST_OWNER_ID: UserId = UserId::new(1);
27
28 async fn insert_test_table(
29 txn: &DatabaseTransaction,
30 table_id: TableId,
31 name: &str,
32 table_type: TableType,
33 belongs_to_job_id: Option<JobId>,
34 definition: &str,
35 ) -> MetaResult<()> {
36 table::ActiveModel {
37 table_id: Set(table_id),
38 name: Set(name.to_owned()),
39 optional_associated_source_id: Set(None),
40 table_type: Set(table_type),
41 belongs_to_job_id: Set(belongs_to_job_id),
42 columns: Set(vec![].into()),
43 pk: Set(vec![].into()),
44 distribution_key: Set(Vec::<i32>::new().into()),
45 stream_key: Set(Vec::<i32>::new().into()),
46 append_only: Set(false),
47 fragment_id: Set(None),
48 vnode_col_index: Set(None),
49 row_id_index: Set(None),
50 value_indices: Set(Vec::<i32>::new().into()),
51 definition: Set(definition.to_owned()),
52 handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
53 version_column_indices: Set(None),
54 read_prefix_len_hint: Set(0),
55 watermark_indices: Set(Vec::<i32>::new().into()),
56 dist_key_in_pk: Set(Vec::<i32>::new().into()),
57 dml_fragment_id: Set(None),
58 cardinality: Set(None),
59 cleaned_by_watermark: Set(false),
60 description: Set(None),
61 version: Set(None),
62 retention_seconds: Set(None),
63 cdc_table_id: Set(None),
64 vnode_count: Set(1),
65 webhook_info: Set(None),
66 engine: Set(None),
67 clean_watermark_index_in_pk: Set(None),
68 clean_watermark_indices: Set(None),
69 refreshable: Set(false),
70 vector_index_info: Set(None),
71 cdc_table_type: Set(None),
72 }
73 .insert(txn)
74 .await?;
75 Ok(())
76 }
77
78 #[tokio::test]
79 async fn test_database_func() -> MetaResult<()> {
80 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
81 let pb_database = PbDatabase {
82 name: "db1".to_owned(),
83 owner: TEST_OWNER_ID as _,
84 ..Default::default()
85 };
86 mgr.create_database(pb_database).await?;
87
88 let database_id: DatabaseId = Database::find()
89 .select_only()
90 .column(database::Column::DatabaseId)
91 .filter(database::Column::Name.eq("db1"))
92 .into_tuple()
93 .one(&mgr.inner.read().await.db)
94 .await?
95 .unwrap();
96
97 mgr.alter_name(ObjectType::Database, database_id, "db2")
98 .await?;
99 let database = Database::find_by_id(database_id)
100 .one(&mgr.inner.read().await.db)
101 .await?
102 .unwrap();
103 assert_eq!(database.name, "db2");
104
105 mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
106 .await?;
107
108 Ok(())
109 }
110
111 #[tokio::test]
112 async fn test_schema_func() -> MetaResult<()> {
113 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
114 let pb_schema = PbSchema {
115 database_id: TEST_DATABASE_ID,
116 name: "schema1".to_owned(),
117 owner: TEST_OWNER_ID as _,
118 ..Default::default()
119 };
120 mgr.create_schema(pb_schema.clone()).await?;
121 assert!(mgr.create_schema(pb_schema).await.is_err());
122
123 let schema_id: SchemaId = Schema::find()
124 .select_only()
125 .column(schema::Column::SchemaId)
126 .filter(schema::Column::Name.eq("schema1"))
127 .into_tuple()
128 .one(&mgr.inner.read().await.db)
129 .await?
130 .unwrap();
131
132 mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
133 .await?;
134 let schema = Schema::find_by_id(schema_id)
135 .one(&mgr.inner.read().await.db)
136 .await?
137 .unwrap();
138 assert_eq!(schema.name, "schema2");
139 mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
140 .await?;
141
142 Ok(())
143 }
144
145 #[tokio::test]
146 async fn test_create_view() -> MetaResult<()> {
147 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
148 let pb_view = PbView {
149 schema_id: TEST_SCHEMA_ID,
150 database_id: TEST_DATABASE_ID,
151 name: "view".to_owned(),
152 owner: TEST_OWNER_ID as _,
153 sql: "CREATE VIEW view AS SELECT 1".to_owned(),
154 ..Default::default()
155 };
156 mgr.create_view(pb_view.clone(), HashSet::new()).await?;
157 assert!(mgr.create_view(pb_view, HashSet::new()).await.is_err());
158
159 let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
160 mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
161 .await?;
162 assert!(
163 View::find_by_id(view.view_id)
164 .one(&mgr.inner.read().await.db)
165 .await?
166 .is_none()
167 );
168
169 Ok(())
170 }
171
172 #[tokio::test]
173 async fn test_create_function() -> MetaResult<()> {
174 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
175 let test_data_type = risingwave_pb::data::DataType {
176 type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
177 ..Default::default()
178 };
179 let arg_types = vec![test_data_type.clone()];
180 let pb_function = PbFunction {
181 schema_id: TEST_SCHEMA_ID,
182 database_id: TEST_DATABASE_ID,
183 name: "test_function".to_owned(),
184 owner: TEST_OWNER_ID as _,
185 arg_types,
186 return_type: Some(test_data_type.clone()),
187 language: "python".to_owned(),
188 kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
189 Default::default(),
190 )),
191 ..Default::default()
192 };
193 mgr.create_function(pb_function.clone()).await?;
194 assert!(mgr.create_function(pb_function).await.is_err());
195
196 let function = Function::find()
197 .inner_join(Object)
198 .filter(
199 object::Column::DatabaseId
200 .eq(TEST_DATABASE_ID)
201 .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
202 .add(function::Column::Name.eq("test_function")),
203 )
204 .one(&mgr.inner.read().await.db)
205 .await?
206 .unwrap();
207 assert_eq!(function.return_type.to_protobuf(), test_data_type);
208 assert_eq!(function.arg_types.to_protobuf().len(), 1);
209 assert_eq!(function.language, "python");
210
211 mgr.drop_object(
212 ObjectType::Function,
213 function.function_id,
214 DropMode::Restrict,
215 )
216 .await?;
217 assert!(
218 Object::find_by_id(function.function_id)
219 .one(&mgr.inner.read().await.db)
220 .await?
221 .is_none()
222 );
223
224 Ok(())
225 }
226
227 #[tokio::test]
228 async fn test_alter_relation_rename() -> MetaResult<()> {
229 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
230 let pb_source = PbSource {
231 schema_id: TEST_SCHEMA_ID,
232 database_id: TEST_DATABASE_ID,
233 name: "s1".to_owned(),
234 owner: TEST_OWNER_ID as _,
235 definition: r#"CREATE SOURCE s1 (v1 int) with (
236 connector = 'kafka',
237 topic = 'kafka_alter',
238 properties.bootstrap.server = 'message_queue:29092',
239 scan.startup.mode = 'earliest'
240) FORMAT PLAIN ENCODE JSON"#
241 .to_owned(),
242 info: Some(StreamSourceInfo {
243 ..Default::default()
244 }),
245 ..Default::default()
246 };
247 mgr.create_source(pb_source).await?;
248 let source_id: SourceId = Source::find()
249 .select_only()
250 .column(source::Column::SourceId)
251 .filter(source::Column::Name.eq("s1"))
252 .into_tuple()
253 .one(&mgr.inner.read().await.db)
254 .await?
255 .unwrap();
256
257 let pb_view = PbView {
258 schema_id: TEST_SCHEMA_ID,
259 database_id: TEST_DATABASE_ID,
260 name: "view_1".to_owned(),
261 owner: TEST_OWNER_ID as _,
262 sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
263 ..Default::default()
264 };
265 mgr.create_view(pb_view, HashSet::from([source_id.as_object_id()]))
266 .await?;
267 let view_id: ViewId = View::find()
268 .select_only()
269 .column(view::Column::ViewId)
270 .filter(view::Column::Name.eq("view_1"))
271 .into_tuple()
272 .one(&mgr.inner.read().await.db)
273 .await?
274 .unwrap();
275
276 mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
277 let source = Source::find_by_id(source_id)
278 .one(&mgr.inner.read().await.db)
279 .await?
280 .unwrap();
281 assert_eq!(source.name, "s2");
282 assert_eq!(
283 source.definition,
284 "CREATE SOURCE s2 (v1 INT) WITH (\
285 connector = 'kafka', \
286 topic = 'kafka_alter', \
287 properties.bootstrap.server = 'message_queue:29092', \
288 scan.startup.mode = 'earliest'\
289) FORMAT PLAIN ENCODE JSON"
290 );
291
292 let view = View::find_by_id(view_id)
293 .one(&mgr.inner.read().await.db)
294 .await?
295 .unwrap();
296 assert_eq!(
297 view.definition,
298 "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
299 );
300
301 mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
302 .await?;
303 assert!(
304 View::find_by_id(view_id)
305 .one(&mgr.inner.read().await.db)
306 .await?
307 .is_none()
308 );
309
310 Ok(())
311 }
312
313 #[tokio::test]
314 async fn test_abort_initial_materialized_view_reports_cancellation() -> MetaResult<()> {
315 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
316
317 let mut inner = mgr.inner.write().await;
318 let txn = inner.db.begin().await?;
319 let obj = CatalogController::create_object(
320 &txn,
321 ObjectType::Table,
322 TEST_OWNER_ID,
323 Some(TEST_DATABASE_ID),
324 Some(TEST_SCHEMA_ID),
325 )
326 .await?;
327 let job_id = obj.oid.as_job_id();
328
329 table::ActiveModel {
330 table_id: Set(obj.oid.as_table_id()),
331 name: Set("mv_abort_initial".to_owned()),
332 optional_associated_source_id: Set(None),
333 table_type: Set(TableType::MaterializedView),
334 belongs_to_job_id: Set(None),
335 columns: Set(vec![].into()),
336 pk: Set(vec![].into()),
337 distribution_key: Set(Vec::<i32>::new().into()),
338 stream_key: Set(Vec::<i32>::new().into()),
339 append_only: Set(false),
340 fragment_id: Set(None),
341 vnode_col_index: Set(None),
342 row_id_index: Set(None),
343 value_indices: Set(Vec::<i32>::new().into()),
344 definition: Set("CREATE MATERIALIZED VIEW mv_abort_initial AS SELECT 1".to_owned()),
345 handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
346 version_column_indices: Set(None),
347 read_prefix_len_hint: Set(0),
348 watermark_indices: Set(Vec::<i32>::new().into()),
349 dist_key_in_pk: Set(Vec::<i32>::new().into()),
350 dml_fragment_id: Set(None),
351 cardinality: Set(None),
352 cleaned_by_watermark: Set(false),
353 description: Set(None),
354 version: Set(None),
355 retention_seconds: Set(None),
356 cdc_table_id: Set(None),
357 vnode_count: Set(1),
358 webhook_info: Set(None),
359 engine: Set(None),
360 clean_watermark_index_in_pk: Set(None),
361 clean_watermark_indices: Set(None),
362 refreshable: Set(false),
363 vector_index_info: Set(None),
364 cdc_table_type: Set(None),
365 }
366 .insert(&txn)
367 .await?;
368
369 streaming_job::ActiveModel {
370 job_id: Set(job_id),
371 job_status: Set(JobStatus::Initial),
372 create_type: Set(CreateType::Foreground),
373 timezone: Set(None),
374 config_override: Set(None),
375 adaptive_parallelism_strategy: Set(None),
376 parallelism: Set(StreamingParallelism::Adaptive),
377 backfill_parallelism: Set(None),
378 backfill_adaptive_parallelism_strategy: Set(None),
379 backfill_orders: Set(None),
380 max_parallelism: Set(1),
381 specific_resource_group: Set(None),
382 is_serverless_backfill: Set(false),
383 refresh_interval_sec: Set(None),
384 }
385 .insert(&txn)
386 .await?;
387
388 let (tx, rx) = oneshot::channel();
389 inner.register_finish_notifier(TEST_DATABASE_ID, job_id, tx);
390 txn.commit().await?;
391 drop(inner);
392
393 let (aborted, database_id) = mgr.try_abort_creating_streaming_job(job_id, true).await?;
394 assert!(aborted);
395 assert_eq!(database_id, Some(TEST_DATABASE_ID));
396
397 let err = rx
398 .await
399 .expect("finish notifier should be notified")
400 .expect_err("initial job drop should cancel the create wait");
401 assert!(err.contains("cancelled"));
402
403 let db = &mgr.inner.read().await.db;
404 assert!(Object::find_by_id(job_id).one(db).await?.is_none());
405 assert!(StreamingJob::find_by_id(job_id).one(db).await?.is_none());
406 assert!(
407 Table::find_by_id(job_id.as_mv_table_id())
408 .one(db)
409 .await?
410 .is_none()
411 );
412
413 Ok(())
414 }
415
416 #[tokio::test]
417 async fn test_clean_dirty_creating_jobs_records_dropped_tables_for_per_db_recovery()
418 -> MetaResult<()> {
419 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
420
421 let inner = mgr.inner.write().await;
422 let txn = inner.db.begin().await?;
423 let mv_obj = CatalogController::create_object(
424 &txn,
425 ObjectType::Table,
426 TEST_OWNER_ID,
427 Some(TEST_DATABASE_ID),
428 Some(TEST_SCHEMA_ID),
429 )
430 .await?;
431 let job_id = mv_obj.oid.as_job_id();
432 let mv_table_id = job_id.as_mv_table_id();
433 insert_test_table(
434 &txn,
435 mv_table_id,
436 "mv_dirty",
437 TableType::MaterializedView,
438 None,
439 "CREATE MATERIALIZED VIEW mv_dirty AS SELECT 1",
440 )
441 .await?;
442
443 let internal_obj = CatalogController::create_object(
444 &txn,
445 ObjectType::Table,
446 TEST_OWNER_ID,
447 Some(TEST_DATABASE_ID),
448 Some(TEST_SCHEMA_ID),
449 )
450 .await?;
451 let internal_table_id = internal_obj.oid.as_table_id();
452 insert_test_table(
453 &txn,
454 internal_table_id,
455 "__internal_mv_dirty",
456 TableType::Internal,
457 Some(job_id),
458 "",
459 )
460 .await?;
461
462 streaming_job::ActiveModel {
463 job_id: Set(job_id),
464 job_status: Set(JobStatus::Creating),
465 create_type: Set(CreateType::Foreground),
466 timezone: Set(None),
467 config_override: Set(None),
468 adaptive_parallelism_strategy: Set(None),
469 parallelism: Set(StreamingParallelism::Adaptive),
470 backfill_parallelism: Set(None),
471 backfill_adaptive_parallelism_strategy: Set(None),
472 backfill_orders: Set(None),
473 max_parallelism: Set(1),
474 specific_resource_group: Set(None),
475 is_serverless_backfill: Set(false),
476 refresh_interval_sec: Set(None),
477 }
478 .insert(&txn)
479 .await?;
480 txn.commit().await?;
481 drop(inner);
482
483 let cleaned = mgr
484 .clean_dirty_creating_jobs(Some(TEST_DATABASE_ID))
485 .await?;
486 assert_eq!(cleaned.streaming_job_ids, vec![job_id]);
487 assert!(cleaned.source_ids.is_empty());
488 let mut dropped_table_ids = cleaned.dropped_table_ids;
489 dropped_table_ids.sort_unstable();
490 assert_eq!(dropped_table_ids, vec![mv_table_id, internal_table_id]);
491
492 let inner = mgr.inner.read().await;
493 assert!(inner.dropped_tables.contains_key(&mv_table_id));
494 assert!(inner.dropped_tables.contains_key(&internal_table_id));
495 assert!(Object::find_by_id(job_id).one(&inner.db).await?.is_none());
496 assert!(
497 StreamingJob::find_by_id(job_id)
498 .one(&inner.db)
499 .await?
500 .is_none()
501 );
502 assert!(
503 Table::find_by_id(mv_table_id)
504 .one(&inner.db)
505 .await?
506 .is_none()
507 );
508 assert!(
509 Table::find_by_id(internal_table_id)
510 .one(&inner.db)
511 .await?
512 .is_none()
513 );
514
515 Ok(())
516 }
517
518 #[tokio::test]
519 async fn test_abort_creating_subscription_commits_delete() -> MetaResult<()> {
520 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
521 let pb_view = PbView {
522 schema_id: TEST_SCHEMA_ID,
523 database_id: TEST_DATABASE_ID,
524 name: "subscription_dep_view".to_owned(),
525 owner: TEST_OWNER_ID as _,
526 sql: "CREATE VIEW subscription_dep_view AS SELECT 1".to_owned(),
527 ..Default::default()
528 };
529 mgr.create_view(pb_view, HashSet::new()).await?;
530
531 let view_id: ViewId = View::find()
532 .select_only()
533 .column(view::Column::ViewId)
534 .filter(view::Column::Name.eq("subscription_dep_view"))
535 .into_tuple()
536 .one(&mgr.inner.read().await.db)
537 .await?
538 .unwrap();
539
540 let mut pb_subscription = PbSubscription {
541 name: "subscription_to_abort".to_owned(),
542 definition: "CREATE SUBSCRIPTION subscription_to_abort FROM subscription_dep_view"
543 .to_owned(),
544 retention_seconds: 86400,
545 database_id: TEST_DATABASE_ID,
546 schema_id: TEST_SCHEMA_ID,
547 dependent_table_id: view_id.as_object_id().as_table_id(),
548 owner: TEST_OWNER_ID as _,
549 subscription_state: SubscriptionState::Init as _,
550 ..Default::default()
551 };
552 mgr.create_subscription_catalog(&mut pb_subscription)
553 .await?;
554
555 mgr.try_abort_creating_subscription(pb_subscription.id)
556 .await?;
557
558 assert!(
559 Subscription::find_by_id(pb_subscription.id)
560 .one(&mgr.inner.read().await.db)
561 .await?
562 .is_none()
563 );
564
565 Ok(())
566 }
567}