1#[cfg(test)]
16mod tests {
17 use risingwave_meta_model::table::HandleConflictBehavior;
18 use risingwave_pb::catalog::StreamSourceInfo;
19 use risingwave_pb::catalog::subscription::SubscriptionState;
20 use tokio::sync::oneshot;
21
22 use crate::controller::catalog::*;
23
24 const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
25 const TEST_SCHEMA_ID: SchemaId = SchemaId::new(2);
26 const TEST_OWNER_ID: UserId = UserId::new(1);
27
28 async fn insert_test_table(
29 txn: &DatabaseTransaction,
30 table_id: TableId,
31 name: &str,
32 table_type: TableType,
33 belongs_to_job_id: Option<JobId>,
34 definition: &str,
35 ) -> MetaResult<()> {
36 table::ActiveModel {
37 table_id: Set(table_id),
38 name: Set(name.to_owned()),
39 optional_associated_source_id: Set(None),
40 table_type: Set(table_type),
41 belongs_to_job_id: Set(belongs_to_job_id),
42 columns: Set(vec![].into()),
43 pk: Set(vec![].into()),
44 distribution_key: Set(Vec::<i32>::new().into()),
45 stream_key: Set(Vec::<i32>::new().into()),
46 append_only: Set(false),
47 fragment_id: Set(None),
48 vnode_col_index: Set(None),
49 row_id_index: Set(None),
50 value_indices: Set(Vec::<i32>::new().into()),
51 definition: Set(definition.to_owned()),
52 handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
53 version_column_indices: Set(None),
54 read_prefix_len_hint: Set(0),
55 watermark_indices: Set(Vec::<i32>::new().into()),
56 dist_key_in_pk: Set(Vec::<i32>::new().into()),
57 dml_fragment_id: Set(None),
58 cardinality: Set(None),
59 cleaned_by_watermark: Set(false),
60 description: Set(None),
61 version: Set(None),
62 retention_seconds: Set(None),
63 cdc_table_id: Set(None),
64 vnode_count: Set(1),
65 webhook_info: Set(None),
66 engine: Set(None),
67 clean_watermark_index_in_pk: Set(None),
68 clean_watermark_indices: Set(None),
69 refreshable: Set(false),
70 vector_index_info: Set(None),
71 cdc_table_type: Set(None),
72 }
73 .insert(txn)
74 .await?;
75 Ok(())
76 }
77
78 #[tokio::test]
79 async fn test_database_func() -> MetaResult<()> {
80 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
81 let pb_database = PbDatabase {
82 name: "db1".to_owned(),
83 owner: TEST_OWNER_ID as _,
84 ..Default::default()
85 };
86 mgr.create_database(pb_database).await?;
87
88 let database_id: DatabaseId = Database::find()
89 .select_only()
90 .column(database::Column::DatabaseId)
91 .filter(database::Column::Name.eq("db1"))
92 .into_tuple()
93 .one(&mgr.inner.read().await.db)
94 .await?
95 .unwrap();
96
97 mgr.alter_name(ObjectType::Database, database_id, "db2")
98 .await?;
99 let database = Database::find_by_id(database_id)
100 .one(&mgr.inner.read().await.db)
101 .await?
102 .unwrap();
103 assert_eq!(database.name, "db2");
104
105 mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
106 .await?;
107
108 Ok(())
109 }
110
111 #[tokio::test]
112 async fn test_schema_func() -> MetaResult<()> {
113 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
114 let pb_schema = PbSchema {
115 database_id: TEST_DATABASE_ID,
116 name: "schema1".to_owned(),
117 owner: TEST_OWNER_ID as _,
118 ..Default::default()
119 };
120 mgr.create_schema(pb_schema.clone()).await?;
121 assert!(mgr.create_schema(pb_schema).await.is_err());
122
123 let schema_id: SchemaId = Schema::find()
124 .select_only()
125 .column(schema::Column::SchemaId)
126 .filter(schema::Column::Name.eq("schema1"))
127 .into_tuple()
128 .one(&mgr.inner.read().await.db)
129 .await?
130 .unwrap();
131
132 mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
133 .await?;
134 let schema = Schema::find_by_id(schema_id)
135 .one(&mgr.inner.read().await.db)
136 .await?
137 .unwrap();
138 assert_eq!(schema.name, "schema2");
139 mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
140 .await?;
141
142 Ok(())
143 }
144
145 #[tokio::test]
146 async fn test_create_view() -> MetaResult<()> {
147 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
148 let pb_view = PbView {
149 schema_id: TEST_SCHEMA_ID,
150 database_id: TEST_DATABASE_ID,
151 name: "view".to_owned(),
152 owner: TEST_OWNER_ID as _,
153 sql: "CREATE VIEW view AS SELECT 1".to_owned(),
154 ..Default::default()
155 };
156 mgr.create_view(pb_view.clone(), HashSet::new()).await?;
157 assert!(mgr.create_view(pb_view, HashSet::new()).await.is_err());
158
159 let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
160 mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
161 .await?;
162 assert!(
163 View::find_by_id(view.view_id)
164 .one(&mgr.inner.read().await.db)
165 .await?
166 .is_none()
167 );
168
169 Ok(())
170 }
171
172 #[tokio::test]
173 async fn test_create_function() -> MetaResult<()> {
174 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
175 let test_data_type = risingwave_pb::data::DataType {
176 type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
177 ..Default::default()
178 };
179 let arg_types = vec![test_data_type.clone()];
180 let pb_function = PbFunction {
181 schema_id: TEST_SCHEMA_ID,
182 database_id: TEST_DATABASE_ID,
183 name: "test_function".to_owned(),
184 owner: TEST_OWNER_ID as _,
185 arg_types,
186 return_type: Some(test_data_type.clone()),
187 language: "python".to_owned(),
188 kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
189 Default::default(),
190 )),
191 ..Default::default()
192 };
193 mgr.create_function(pb_function.clone()).await?;
194 assert!(mgr.create_function(pb_function).await.is_err());
195
196 let function = Function::find()
197 .inner_join(Object)
198 .filter(
199 object::Column::DatabaseId
200 .eq(TEST_DATABASE_ID)
201 .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
202 .add(function::Column::Name.eq("test_function")),
203 )
204 .one(&mgr.inner.read().await.db)
205 .await?
206 .unwrap();
207 assert_eq!(function.return_type.to_protobuf(), test_data_type);
208 assert_eq!(function.arg_types.to_protobuf().len(), 1);
209 assert_eq!(function.language, "python");
210
211 mgr.drop_object(
212 ObjectType::Function,
213 function.function_id,
214 DropMode::Restrict,
215 )
216 .await?;
217 assert!(
218 Object::find_by_id(function.function_id)
219 .one(&mgr.inner.read().await.db)
220 .await?
221 .is_none()
222 );
223
224 Ok(())
225 }
226
227 #[tokio::test]
228 async fn test_alter_relation_rename() -> MetaResult<()> {
229 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
230 let pb_source = PbSource {
231 schema_id: TEST_SCHEMA_ID,
232 database_id: TEST_DATABASE_ID,
233 name: "s1".to_owned(),
234 owner: TEST_OWNER_ID as _,
235 definition: r#"CREATE SOURCE s1 (v1 int) with (
236 connector = 'kafka',
237 topic = 'kafka_alter',
238 properties.bootstrap.server = 'message_queue:29092',
239 scan.startup.mode = 'earliest'
240) FORMAT PLAIN ENCODE JSON"#
241 .to_owned(),
242 info: Some(StreamSourceInfo {
243 ..Default::default()
244 }),
245 ..Default::default()
246 };
247 mgr.create_source(pb_source).await?;
248 let source_id: SourceId = Source::find()
249 .select_only()
250 .column(source::Column::SourceId)
251 .filter(source::Column::Name.eq("s1"))
252 .into_tuple()
253 .one(&mgr.inner.read().await.db)
254 .await?
255 .unwrap();
256
257 let pb_view = PbView {
258 schema_id: TEST_SCHEMA_ID,
259 database_id: TEST_DATABASE_ID,
260 name: "view_1".to_owned(),
261 owner: TEST_OWNER_ID as _,
262 sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
263 ..Default::default()
264 };
265 mgr.create_view(pb_view, HashSet::from([source_id.as_object_id()]))
266 .await?;
267 let view_id: ViewId = View::find()
268 .select_only()
269 .column(view::Column::ViewId)
270 .filter(view::Column::Name.eq("view_1"))
271 .into_tuple()
272 .one(&mgr.inner.read().await.db)
273 .await?
274 .unwrap();
275
276 mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
277 let source = Source::find_by_id(source_id)
278 .one(&mgr.inner.read().await.db)
279 .await?
280 .unwrap();
281 assert_eq!(source.name, "s2");
282 assert_eq!(
283 source.definition,
284 "CREATE SOURCE s2 (v1 INT) WITH (\
285 connector = 'kafka', \
286 topic = 'kafka_alter', \
287 properties.bootstrap.server = 'message_queue:29092', \
288 scan.startup.mode = 'earliest'\
289) FORMAT PLAIN ENCODE JSON"
290 );
291
292 let view = View::find_by_id(view_id)
293 .one(&mgr.inner.read().await.db)
294 .await?
295 .unwrap();
296 assert_eq!(
297 view.definition,
298 "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
299 );
300
301 mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
302 .await?;
303 assert!(
304 View::find_by_id(view_id)
305 .one(&mgr.inner.read().await.db)
306 .await?
307 .is_none()
308 );
309
310 Ok(())
311 }
312
313 #[tokio::test]
314 async fn test_abort_initial_materialized_view_reports_cancellation() -> MetaResult<()> {
315 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
316
317 let mut inner = mgr.inner.write().await;
318 let txn = inner.db.begin().await?;
319 let obj = CatalogController::create_object(
320 &txn,
321 ObjectType::Table,
322 TEST_OWNER_ID,
323 Some(TEST_DATABASE_ID),
324 Some(TEST_SCHEMA_ID),
325 )
326 .await?;
327 let job_id = obj.oid.as_job_id();
328
329 table::ActiveModel {
330 table_id: Set(obj.oid.as_table_id()),
331 name: Set("mv_abort_initial".to_owned()),
332 optional_associated_source_id: Set(None),
333 table_type: Set(TableType::MaterializedView),
334 belongs_to_job_id: Set(None),
335 columns: Set(vec![].into()),
336 pk: Set(vec![].into()),
337 distribution_key: Set(Vec::<i32>::new().into()),
338 stream_key: Set(Vec::<i32>::new().into()),
339 append_only: Set(false),
340 fragment_id: Set(None),
341 vnode_col_index: Set(None),
342 row_id_index: Set(None),
343 value_indices: Set(Vec::<i32>::new().into()),
344 definition: Set("CREATE MATERIALIZED VIEW mv_abort_initial AS SELECT 1".to_owned()),
345 handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
346 version_column_indices: Set(None),
347 read_prefix_len_hint: Set(0),
348 watermark_indices: Set(Vec::<i32>::new().into()),
349 dist_key_in_pk: Set(Vec::<i32>::new().into()),
350 dml_fragment_id: Set(None),
351 cardinality: Set(None),
352 cleaned_by_watermark: Set(false),
353 description: Set(None),
354 version: Set(None),
355 retention_seconds: Set(None),
356 cdc_table_id: Set(None),
357 vnode_count: Set(1),
358 webhook_info: Set(None),
359 engine: Set(None),
360 clean_watermark_index_in_pk: Set(None),
361 clean_watermark_indices: Set(None),
362 refreshable: Set(false),
363 vector_index_info: Set(None),
364 cdc_table_type: Set(None),
365 }
366 .insert(&txn)
367 .await?;
368
369 streaming_job::ActiveModel {
370 job_id: Set(job_id),
371 job_status: Set(JobStatus::Initial),
372 create_type: Set(CreateType::Foreground),
373 timezone: Set(None),
374 config_override: Set(None),
375 adaptive_parallelism_strategy: Set(None),
376 parallelism: Set(StreamingParallelism::Adaptive),
377 backfill_parallelism: Set(None),
378 backfill_adaptive_parallelism_strategy: Set(None),
379 backfill_orders: Set(None),
380 max_parallelism: Set(1),
381 specific_resource_group: Set(None),
382 is_serverless_backfill: Set(false),
383 }
384 .insert(&txn)
385 .await?;
386
387 let (tx, rx) = oneshot::channel();
388 inner.register_finish_notifier(TEST_DATABASE_ID, job_id, tx);
389 txn.commit().await?;
390 drop(inner);
391
392 let (aborted, database_id) = mgr.try_abort_creating_streaming_job(job_id, true).await?;
393 assert!(aborted);
394 assert_eq!(database_id, Some(TEST_DATABASE_ID));
395
396 let err = rx
397 .await
398 .expect("finish notifier should be notified")
399 .expect_err("initial job drop should cancel the create wait");
400 assert!(err.contains("cancelled"));
401
402 let db = &mgr.inner.read().await.db;
403 assert!(Object::find_by_id(job_id).one(db).await?.is_none());
404 assert!(StreamingJob::find_by_id(job_id).one(db).await?.is_none());
405 assert!(
406 Table::find_by_id(job_id.as_mv_table_id())
407 .one(db)
408 .await?
409 .is_none()
410 );
411
412 Ok(())
413 }
414
415 #[tokio::test]
416 async fn test_clean_dirty_creating_jobs_records_dropped_tables_for_per_db_recovery()
417 -> MetaResult<()> {
418 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
419
420 let inner = mgr.inner.write().await;
421 let txn = inner.db.begin().await?;
422 let mv_obj = CatalogController::create_object(
423 &txn,
424 ObjectType::Table,
425 TEST_OWNER_ID,
426 Some(TEST_DATABASE_ID),
427 Some(TEST_SCHEMA_ID),
428 )
429 .await?;
430 let job_id = mv_obj.oid.as_job_id();
431 let mv_table_id = job_id.as_mv_table_id();
432 insert_test_table(
433 &txn,
434 mv_table_id,
435 "mv_dirty",
436 TableType::MaterializedView,
437 None,
438 "CREATE MATERIALIZED VIEW mv_dirty AS SELECT 1",
439 )
440 .await?;
441
442 let internal_obj = CatalogController::create_object(
443 &txn,
444 ObjectType::Table,
445 TEST_OWNER_ID,
446 Some(TEST_DATABASE_ID),
447 Some(TEST_SCHEMA_ID),
448 )
449 .await?;
450 let internal_table_id = internal_obj.oid.as_table_id();
451 insert_test_table(
452 &txn,
453 internal_table_id,
454 "__internal_mv_dirty",
455 TableType::Internal,
456 Some(job_id),
457 "",
458 )
459 .await?;
460
461 streaming_job::ActiveModel {
462 job_id: Set(job_id),
463 job_status: Set(JobStatus::Creating),
464 create_type: Set(CreateType::Foreground),
465 timezone: Set(None),
466 config_override: Set(None),
467 adaptive_parallelism_strategy: Set(None),
468 parallelism: Set(StreamingParallelism::Adaptive),
469 backfill_parallelism: Set(None),
470 backfill_adaptive_parallelism_strategy: Set(None),
471 backfill_orders: Set(None),
472 max_parallelism: Set(1),
473 specific_resource_group: Set(None),
474 is_serverless_backfill: Set(false),
475 }
476 .insert(&txn)
477 .await?;
478 txn.commit().await?;
479 drop(inner);
480
481 let cleaned = mgr
482 .clean_dirty_creating_jobs(Some(TEST_DATABASE_ID))
483 .await?;
484 assert_eq!(cleaned.streaming_job_ids, vec![job_id]);
485 assert!(cleaned.source_ids.is_empty());
486 let mut dropped_table_ids = cleaned.dropped_table_ids;
487 dropped_table_ids.sort_unstable();
488 assert_eq!(dropped_table_ids, vec![mv_table_id, internal_table_id]);
489
490 let inner = mgr.inner.read().await;
491 assert!(inner.dropped_tables.contains_key(&mv_table_id));
492 assert!(inner.dropped_tables.contains_key(&internal_table_id));
493 assert!(Object::find_by_id(job_id).one(&inner.db).await?.is_none());
494 assert!(
495 StreamingJob::find_by_id(job_id)
496 .one(&inner.db)
497 .await?
498 .is_none()
499 );
500 assert!(
501 Table::find_by_id(mv_table_id)
502 .one(&inner.db)
503 .await?
504 .is_none()
505 );
506 assert!(
507 Table::find_by_id(internal_table_id)
508 .one(&inner.db)
509 .await?
510 .is_none()
511 );
512
513 Ok(())
514 }
515
516 #[tokio::test]
517 async fn test_abort_creating_subscription_commits_delete() -> MetaResult<()> {
518 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
519 let pb_view = PbView {
520 schema_id: TEST_SCHEMA_ID,
521 database_id: TEST_DATABASE_ID,
522 name: "subscription_dep_view".to_owned(),
523 owner: TEST_OWNER_ID as _,
524 sql: "CREATE VIEW subscription_dep_view AS SELECT 1".to_owned(),
525 ..Default::default()
526 };
527 mgr.create_view(pb_view, HashSet::new()).await?;
528
529 let view_id: ViewId = View::find()
530 .select_only()
531 .column(view::Column::ViewId)
532 .filter(view::Column::Name.eq("subscription_dep_view"))
533 .into_tuple()
534 .one(&mgr.inner.read().await.db)
535 .await?
536 .unwrap();
537
538 let mut pb_subscription = PbSubscription {
539 name: "subscription_to_abort".to_owned(),
540 definition: "CREATE SUBSCRIPTION subscription_to_abort FROM subscription_dep_view"
541 .to_owned(),
542 retention_seconds: 86400,
543 database_id: TEST_DATABASE_ID,
544 schema_id: TEST_SCHEMA_ID,
545 dependent_table_id: view_id.as_object_id().as_table_id(),
546 owner: TEST_OWNER_ID as _,
547 subscription_state: SubscriptionState::Init as _,
548 ..Default::default()
549 };
550 mgr.create_subscription_catalog(&mut pb_subscription)
551 .await?;
552
553 mgr.try_abort_creating_subscription(pb_subscription.id)
554 .await?;
555
556 assert!(
557 Subscription::find_by_id(pb_subscription.id)
558 .one(&mgr.inner.read().await.db)
559 .await?
560 .is_none()
561 );
562
563 Ok(())
564 }
565}