risingwave_meta/controller/catalog/
test.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use risingwave_meta_model::table::HandleConflictBehavior;
18    use risingwave_pb::catalog::StreamSourceInfo;
19    use risingwave_pb::catalog::subscription::SubscriptionState;
20    use tokio::sync::oneshot;
21
22    use crate::controller::catalog::*;
23
24    const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
25    const TEST_SCHEMA_ID: SchemaId = SchemaId::new(2);
26    const TEST_OWNER_ID: UserId = UserId::new(1);
27
28    async fn insert_test_table(
29        txn: &DatabaseTransaction,
30        table_id: TableId,
31        name: &str,
32        table_type: TableType,
33        belongs_to_job_id: Option<JobId>,
34        definition: &str,
35    ) -> MetaResult<()> {
36        table::ActiveModel {
37            table_id: Set(table_id),
38            name: Set(name.to_owned()),
39            optional_associated_source_id: Set(None),
40            table_type: Set(table_type),
41            belongs_to_job_id: Set(belongs_to_job_id),
42            columns: Set(vec![].into()),
43            pk: Set(vec![].into()),
44            distribution_key: Set(Vec::<i32>::new().into()),
45            stream_key: Set(Vec::<i32>::new().into()),
46            append_only: Set(false),
47            fragment_id: Set(None),
48            vnode_col_index: Set(None),
49            row_id_index: Set(None),
50            value_indices: Set(Vec::<i32>::new().into()),
51            definition: Set(definition.to_owned()),
52            handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
53            version_column_indices: Set(None),
54            read_prefix_len_hint: Set(0),
55            watermark_indices: Set(Vec::<i32>::new().into()),
56            dist_key_in_pk: Set(Vec::<i32>::new().into()),
57            dml_fragment_id: Set(None),
58            cardinality: Set(None),
59            cleaned_by_watermark: Set(false),
60            description: Set(None),
61            version: Set(None),
62            retention_seconds: Set(None),
63            cdc_table_id: Set(None),
64            vnode_count: Set(1),
65            webhook_info: Set(None),
66            engine: Set(None),
67            clean_watermark_index_in_pk: Set(None),
68            clean_watermark_indices: Set(None),
69            refreshable: Set(false),
70            vector_index_info: Set(None),
71            cdc_table_type: Set(None),
72        }
73        .insert(txn)
74        .await?;
75        Ok(())
76    }
77
78    #[tokio::test]
79    async fn test_database_func() -> MetaResult<()> {
80        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
81        let pb_database = PbDatabase {
82            name: "db1".to_owned(),
83            owner: TEST_OWNER_ID as _,
84            ..Default::default()
85        };
86        mgr.create_database(pb_database).await?;
87
88        let database_id: DatabaseId = Database::find()
89            .select_only()
90            .column(database::Column::DatabaseId)
91            .filter(database::Column::Name.eq("db1"))
92            .into_tuple()
93            .one(&mgr.inner.read().await.db)
94            .await?
95            .unwrap();
96
97        mgr.alter_name(ObjectType::Database, database_id, "db2")
98            .await?;
99        let database = Database::find_by_id(database_id)
100            .one(&mgr.inner.read().await.db)
101            .await?
102            .unwrap();
103        assert_eq!(database.name, "db2");
104
105        mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
106            .await?;
107
108        Ok(())
109    }
110
111    #[tokio::test]
112    async fn test_schema_func() -> MetaResult<()> {
113        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
114        let pb_schema = PbSchema {
115            database_id: TEST_DATABASE_ID,
116            name: "schema1".to_owned(),
117            owner: TEST_OWNER_ID as _,
118            ..Default::default()
119        };
120        mgr.create_schema(pb_schema.clone()).await?;
121        assert!(mgr.create_schema(pb_schema).await.is_err());
122
123        let schema_id: SchemaId = Schema::find()
124            .select_only()
125            .column(schema::Column::SchemaId)
126            .filter(schema::Column::Name.eq("schema1"))
127            .into_tuple()
128            .one(&mgr.inner.read().await.db)
129            .await?
130            .unwrap();
131
132        mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
133            .await?;
134        let schema = Schema::find_by_id(schema_id)
135            .one(&mgr.inner.read().await.db)
136            .await?
137            .unwrap();
138        assert_eq!(schema.name, "schema2");
139        mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
140            .await?;
141
142        Ok(())
143    }
144
145    #[tokio::test]
146    async fn test_create_view() -> MetaResult<()> {
147        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
148        let pb_view = PbView {
149            schema_id: TEST_SCHEMA_ID,
150            database_id: TEST_DATABASE_ID,
151            name: "view".to_owned(),
152            owner: TEST_OWNER_ID as _,
153            sql: "CREATE VIEW view AS SELECT 1".to_owned(),
154            ..Default::default()
155        };
156        mgr.create_view(pb_view.clone(), HashSet::new()).await?;
157        assert!(mgr.create_view(pb_view, HashSet::new()).await.is_err());
158
159        let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
160        mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
161            .await?;
162        assert!(
163            View::find_by_id(view.view_id)
164                .one(&mgr.inner.read().await.db)
165                .await?
166                .is_none()
167        );
168
169        Ok(())
170    }
171
172    #[tokio::test]
173    async fn test_create_function() -> MetaResult<()> {
174        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
175        let test_data_type = risingwave_pb::data::DataType {
176            type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
177            ..Default::default()
178        };
179        let arg_types = vec![test_data_type.clone()];
180        let pb_function = PbFunction {
181            schema_id: TEST_SCHEMA_ID,
182            database_id: TEST_DATABASE_ID,
183            name: "test_function".to_owned(),
184            owner: TEST_OWNER_ID as _,
185            arg_types,
186            return_type: Some(test_data_type.clone()),
187            language: "python".to_owned(),
188            kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
189                Default::default(),
190            )),
191            ..Default::default()
192        };
193        mgr.create_function(pb_function.clone()).await?;
194        assert!(mgr.create_function(pb_function).await.is_err());
195
196        let function = Function::find()
197            .inner_join(Object)
198            .filter(
199                object::Column::DatabaseId
200                    .eq(TEST_DATABASE_ID)
201                    .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
202                    .add(function::Column::Name.eq("test_function")),
203            )
204            .one(&mgr.inner.read().await.db)
205            .await?
206            .unwrap();
207        assert_eq!(function.return_type.to_protobuf(), test_data_type);
208        assert_eq!(function.arg_types.to_protobuf().len(), 1);
209        assert_eq!(function.language, "python");
210
211        mgr.drop_object(
212            ObjectType::Function,
213            function.function_id,
214            DropMode::Restrict,
215        )
216        .await?;
217        assert!(
218            Object::find_by_id(function.function_id)
219                .one(&mgr.inner.read().await.db)
220                .await?
221                .is_none()
222        );
223
224        Ok(())
225    }
226
227    #[tokio::test]
228    async fn test_alter_relation_rename() -> MetaResult<()> {
229        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
230        let pb_source = PbSource {
231            schema_id: TEST_SCHEMA_ID,
232            database_id: TEST_DATABASE_ID,
233            name: "s1".to_owned(),
234            owner: TEST_OWNER_ID as _,
235            definition: r#"CREATE SOURCE s1 (v1 int) with (
236  connector = 'kafka',
237  topic = 'kafka_alter',
238  properties.bootstrap.server = 'message_queue:29092',
239  scan.startup.mode = 'earliest'
240) FORMAT PLAIN ENCODE JSON"#
241                .to_owned(),
242            info: Some(StreamSourceInfo {
243                ..Default::default()
244            }),
245            ..Default::default()
246        };
247        mgr.create_source(pb_source).await?;
248        let source_id: SourceId = Source::find()
249            .select_only()
250            .column(source::Column::SourceId)
251            .filter(source::Column::Name.eq("s1"))
252            .into_tuple()
253            .one(&mgr.inner.read().await.db)
254            .await?
255            .unwrap();
256
257        let pb_view = PbView {
258            schema_id: TEST_SCHEMA_ID,
259            database_id: TEST_DATABASE_ID,
260            name: "view_1".to_owned(),
261            owner: TEST_OWNER_ID as _,
262            sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
263            ..Default::default()
264        };
265        mgr.create_view(pb_view, HashSet::from([source_id.as_object_id()]))
266            .await?;
267        let view_id: ViewId = View::find()
268            .select_only()
269            .column(view::Column::ViewId)
270            .filter(view::Column::Name.eq("view_1"))
271            .into_tuple()
272            .one(&mgr.inner.read().await.db)
273            .await?
274            .unwrap();
275
276        mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
277        let source = Source::find_by_id(source_id)
278            .one(&mgr.inner.read().await.db)
279            .await?
280            .unwrap();
281        assert_eq!(source.name, "s2");
282        assert_eq!(
283            source.definition,
284            "CREATE SOURCE s2 (v1 INT) WITH (\
285  connector = 'kafka', \
286  topic = 'kafka_alter', \
287  properties.bootstrap.server = 'message_queue:29092', \
288  scan.startup.mode = 'earliest'\
289) FORMAT PLAIN ENCODE JSON"
290        );
291
292        let view = View::find_by_id(view_id)
293            .one(&mgr.inner.read().await.db)
294            .await?
295            .unwrap();
296        assert_eq!(
297            view.definition,
298            "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
299        );
300
301        mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
302            .await?;
303        assert!(
304            View::find_by_id(view_id)
305                .one(&mgr.inner.read().await.db)
306                .await?
307                .is_none()
308        );
309
310        Ok(())
311    }
312
313    #[tokio::test]
314    async fn test_abort_initial_materialized_view_reports_cancellation() -> MetaResult<()> {
315        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
316
317        let mut inner = mgr.inner.write().await;
318        let txn = inner.db.begin().await?;
319        let obj = CatalogController::create_object(
320            &txn,
321            ObjectType::Table,
322            TEST_OWNER_ID,
323            Some(TEST_DATABASE_ID),
324            Some(TEST_SCHEMA_ID),
325        )
326        .await?;
327        let job_id = obj.oid.as_job_id();
328
329        table::ActiveModel {
330            table_id: Set(obj.oid.as_table_id()),
331            name: Set("mv_abort_initial".to_owned()),
332            optional_associated_source_id: Set(None),
333            table_type: Set(TableType::MaterializedView),
334            belongs_to_job_id: Set(None),
335            columns: Set(vec![].into()),
336            pk: Set(vec![].into()),
337            distribution_key: Set(Vec::<i32>::new().into()),
338            stream_key: Set(Vec::<i32>::new().into()),
339            append_only: Set(false),
340            fragment_id: Set(None),
341            vnode_col_index: Set(None),
342            row_id_index: Set(None),
343            value_indices: Set(Vec::<i32>::new().into()),
344            definition: Set("CREATE MATERIALIZED VIEW mv_abort_initial AS SELECT 1".to_owned()),
345            handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
346            version_column_indices: Set(None),
347            read_prefix_len_hint: Set(0),
348            watermark_indices: Set(Vec::<i32>::new().into()),
349            dist_key_in_pk: Set(Vec::<i32>::new().into()),
350            dml_fragment_id: Set(None),
351            cardinality: Set(None),
352            cleaned_by_watermark: Set(false),
353            description: Set(None),
354            version: Set(None),
355            retention_seconds: Set(None),
356            cdc_table_id: Set(None),
357            vnode_count: Set(1),
358            webhook_info: Set(None),
359            engine: Set(None),
360            clean_watermark_index_in_pk: Set(None),
361            clean_watermark_indices: Set(None),
362            refreshable: Set(false),
363            vector_index_info: Set(None),
364            cdc_table_type: Set(None),
365        }
366        .insert(&txn)
367        .await?;
368
369        streaming_job::ActiveModel {
370            job_id: Set(job_id),
371            job_status: Set(JobStatus::Initial),
372            create_type: Set(CreateType::Foreground),
373            timezone: Set(None),
374            config_override: Set(None),
375            adaptive_parallelism_strategy: Set(None),
376            parallelism: Set(StreamingParallelism::Adaptive),
377            backfill_parallelism: Set(None),
378            backfill_adaptive_parallelism_strategy: Set(None),
379            backfill_orders: Set(None),
380            max_parallelism: Set(1),
381            specific_resource_group: Set(None),
382            is_serverless_backfill: Set(false),
383        }
384        .insert(&txn)
385        .await?;
386
387        let (tx, rx) = oneshot::channel();
388        inner.register_finish_notifier(TEST_DATABASE_ID, job_id, tx);
389        txn.commit().await?;
390        drop(inner);
391
392        let (aborted, database_id) = mgr.try_abort_creating_streaming_job(job_id, true).await?;
393        assert!(aborted);
394        assert_eq!(database_id, Some(TEST_DATABASE_ID));
395
396        let err = rx
397            .await
398            .expect("finish notifier should be notified")
399            .expect_err("initial job drop should cancel the create wait");
400        assert!(err.contains("cancelled"));
401
402        let db = &mgr.inner.read().await.db;
403        assert!(Object::find_by_id(job_id).one(db).await?.is_none());
404        assert!(StreamingJob::find_by_id(job_id).one(db).await?.is_none());
405        assert!(
406            Table::find_by_id(job_id.as_mv_table_id())
407                .one(db)
408                .await?
409                .is_none()
410        );
411
412        Ok(())
413    }
414
415    #[tokio::test]
416    async fn test_clean_dirty_creating_jobs_records_dropped_tables_for_per_db_recovery()
417    -> MetaResult<()> {
418        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
419
420        let inner = mgr.inner.write().await;
421        let txn = inner.db.begin().await?;
422        let mv_obj = CatalogController::create_object(
423            &txn,
424            ObjectType::Table,
425            TEST_OWNER_ID,
426            Some(TEST_DATABASE_ID),
427            Some(TEST_SCHEMA_ID),
428        )
429        .await?;
430        let job_id = mv_obj.oid.as_job_id();
431        let mv_table_id = job_id.as_mv_table_id();
432        insert_test_table(
433            &txn,
434            mv_table_id,
435            "mv_dirty",
436            TableType::MaterializedView,
437            None,
438            "CREATE MATERIALIZED VIEW mv_dirty AS SELECT 1",
439        )
440        .await?;
441
442        let internal_obj = CatalogController::create_object(
443            &txn,
444            ObjectType::Table,
445            TEST_OWNER_ID,
446            Some(TEST_DATABASE_ID),
447            Some(TEST_SCHEMA_ID),
448        )
449        .await?;
450        let internal_table_id = internal_obj.oid.as_table_id();
451        insert_test_table(
452            &txn,
453            internal_table_id,
454            "__internal_mv_dirty",
455            TableType::Internal,
456            Some(job_id),
457            "",
458        )
459        .await?;
460
461        streaming_job::ActiveModel {
462            job_id: Set(job_id),
463            job_status: Set(JobStatus::Creating),
464            create_type: Set(CreateType::Foreground),
465            timezone: Set(None),
466            config_override: Set(None),
467            adaptive_parallelism_strategy: Set(None),
468            parallelism: Set(StreamingParallelism::Adaptive),
469            backfill_parallelism: Set(None),
470            backfill_adaptive_parallelism_strategy: Set(None),
471            backfill_orders: Set(None),
472            max_parallelism: Set(1),
473            specific_resource_group: Set(None),
474            is_serverless_backfill: Set(false),
475        }
476        .insert(&txn)
477        .await?;
478        txn.commit().await?;
479        drop(inner);
480
481        let cleaned = mgr
482            .clean_dirty_creating_jobs(Some(TEST_DATABASE_ID))
483            .await?;
484        assert_eq!(cleaned.streaming_job_ids, vec![job_id]);
485        assert!(cleaned.source_ids.is_empty());
486        let mut dropped_table_ids = cleaned.dropped_table_ids;
487        dropped_table_ids.sort_unstable();
488        assert_eq!(dropped_table_ids, vec![mv_table_id, internal_table_id]);
489
490        let inner = mgr.inner.read().await;
491        assert!(inner.dropped_tables.contains_key(&mv_table_id));
492        assert!(inner.dropped_tables.contains_key(&internal_table_id));
493        assert!(Object::find_by_id(job_id).one(&inner.db).await?.is_none());
494        assert!(
495            StreamingJob::find_by_id(job_id)
496                .one(&inner.db)
497                .await?
498                .is_none()
499        );
500        assert!(
501            Table::find_by_id(mv_table_id)
502                .one(&inner.db)
503                .await?
504                .is_none()
505        );
506        assert!(
507            Table::find_by_id(internal_table_id)
508                .one(&inner.db)
509                .await?
510                .is_none()
511        );
512
513        Ok(())
514    }
515
516    #[tokio::test]
517    async fn test_abort_creating_subscription_commits_delete() -> MetaResult<()> {
518        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
519        let pb_view = PbView {
520            schema_id: TEST_SCHEMA_ID,
521            database_id: TEST_DATABASE_ID,
522            name: "subscription_dep_view".to_owned(),
523            owner: TEST_OWNER_ID as _,
524            sql: "CREATE VIEW subscription_dep_view AS SELECT 1".to_owned(),
525            ..Default::default()
526        };
527        mgr.create_view(pb_view, HashSet::new()).await?;
528
529        let view_id: ViewId = View::find()
530            .select_only()
531            .column(view::Column::ViewId)
532            .filter(view::Column::Name.eq("subscription_dep_view"))
533            .into_tuple()
534            .one(&mgr.inner.read().await.db)
535            .await?
536            .unwrap();
537
538        let mut pb_subscription = PbSubscription {
539            name: "subscription_to_abort".to_owned(),
540            definition: "CREATE SUBSCRIPTION subscription_to_abort FROM subscription_dep_view"
541                .to_owned(),
542            retention_seconds: 86400,
543            database_id: TEST_DATABASE_ID,
544            schema_id: TEST_SCHEMA_ID,
545            dependent_table_id: view_id.as_object_id().as_table_id(),
546            owner: TEST_OWNER_ID as _,
547            subscription_state: SubscriptionState::Init as _,
548            ..Default::default()
549        };
550        mgr.create_subscription_catalog(&mut pb_subscription)
551            .await?;
552
553        mgr.try_abort_creating_subscription(pb_subscription.id)
554            .await?;
555
556        assert!(
557            Subscription::find_by_id(pb_subscription.id)
558                .one(&mgr.inner.read().await.db)
559                .await?
560                .is_none()
561        );
562
563        Ok(())
564    }
565}