risingwave_meta/controller/catalog/
test.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use risingwave_meta_model::table::HandleConflictBehavior;
18    use risingwave_pb::catalog::StreamSourceInfo;
19    use risingwave_pb::catalog::subscription::SubscriptionState;
20    use tokio::sync::oneshot;
21
22    use crate::controller::catalog::*;
23
24    const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
25    const TEST_SCHEMA_ID: SchemaId = SchemaId::new(2);
26    const TEST_OWNER_ID: UserId = UserId::new(1);
27
28    #[tokio::test]
29    async fn test_database_func() -> MetaResult<()> {
30        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
31        let pb_database = PbDatabase {
32            name: "db1".to_owned(),
33            owner: TEST_OWNER_ID as _,
34            ..Default::default()
35        };
36        mgr.create_database(pb_database).await?;
37
38        let database_id: DatabaseId = Database::find()
39            .select_only()
40            .column(database::Column::DatabaseId)
41            .filter(database::Column::Name.eq("db1"))
42            .into_tuple()
43            .one(&mgr.inner.read().await.db)
44            .await?
45            .unwrap();
46
47        mgr.alter_name(ObjectType::Database, database_id, "db2")
48            .await?;
49        let database = Database::find_by_id(database_id)
50            .one(&mgr.inner.read().await.db)
51            .await?
52            .unwrap();
53        assert_eq!(database.name, "db2");
54
55        mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
56            .await?;
57
58        Ok(())
59    }
60
61    #[tokio::test]
62    async fn test_schema_func() -> MetaResult<()> {
63        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
64        let pb_schema = PbSchema {
65            database_id: TEST_DATABASE_ID,
66            name: "schema1".to_owned(),
67            owner: TEST_OWNER_ID as _,
68            ..Default::default()
69        };
70        mgr.create_schema(pb_schema.clone()).await?;
71        assert!(mgr.create_schema(pb_schema).await.is_err());
72
73        let schema_id: SchemaId = Schema::find()
74            .select_only()
75            .column(schema::Column::SchemaId)
76            .filter(schema::Column::Name.eq("schema1"))
77            .into_tuple()
78            .one(&mgr.inner.read().await.db)
79            .await?
80            .unwrap();
81
82        mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
83            .await?;
84        let schema = Schema::find_by_id(schema_id)
85            .one(&mgr.inner.read().await.db)
86            .await?
87            .unwrap();
88        assert_eq!(schema.name, "schema2");
89        mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
90            .await?;
91
92        Ok(())
93    }
94
95    #[tokio::test]
96    async fn test_create_view() -> MetaResult<()> {
97        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
98        let pb_view = PbView {
99            schema_id: TEST_SCHEMA_ID,
100            database_id: TEST_DATABASE_ID,
101            name: "view".to_owned(),
102            owner: TEST_OWNER_ID as _,
103            sql: "CREATE VIEW view AS SELECT 1".to_owned(),
104            ..Default::default()
105        };
106        mgr.create_view(pb_view.clone(), HashSet::new()).await?;
107        assert!(mgr.create_view(pb_view, HashSet::new()).await.is_err());
108
109        let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
110        mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
111            .await?;
112        assert!(
113            View::find_by_id(view.view_id)
114                .one(&mgr.inner.read().await.db)
115                .await?
116                .is_none()
117        );
118
119        Ok(())
120    }
121
122    #[tokio::test]
123    async fn test_create_function() -> MetaResult<()> {
124        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
125        let test_data_type = risingwave_pb::data::DataType {
126            type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
127            ..Default::default()
128        };
129        let arg_types = vec![test_data_type.clone()];
130        let pb_function = PbFunction {
131            schema_id: TEST_SCHEMA_ID,
132            database_id: TEST_DATABASE_ID,
133            name: "test_function".to_owned(),
134            owner: TEST_OWNER_ID as _,
135            arg_types,
136            return_type: Some(test_data_type.clone()),
137            language: "python".to_owned(),
138            kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
139                Default::default(),
140            )),
141            ..Default::default()
142        };
143        mgr.create_function(pb_function.clone()).await?;
144        assert!(mgr.create_function(pb_function).await.is_err());
145
146        let function = Function::find()
147            .inner_join(Object)
148            .filter(
149                object::Column::DatabaseId
150                    .eq(TEST_DATABASE_ID)
151                    .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
152                    .add(function::Column::Name.eq("test_function")),
153            )
154            .one(&mgr.inner.read().await.db)
155            .await?
156            .unwrap();
157        assert_eq!(function.return_type.to_protobuf(), test_data_type);
158        assert_eq!(function.arg_types.to_protobuf().len(), 1);
159        assert_eq!(function.language, "python");
160
161        mgr.drop_object(
162            ObjectType::Function,
163            function.function_id,
164            DropMode::Restrict,
165        )
166        .await?;
167        assert!(
168            Object::find_by_id(function.function_id)
169                .one(&mgr.inner.read().await.db)
170                .await?
171                .is_none()
172        );
173
174        Ok(())
175    }
176
177    #[tokio::test]
178    async fn test_alter_relation_rename() -> MetaResult<()> {
179        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
180        let pb_source = PbSource {
181            schema_id: TEST_SCHEMA_ID,
182            database_id: TEST_DATABASE_ID,
183            name: "s1".to_owned(),
184            owner: TEST_OWNER_ID as _,
185            definition: r#"CREATE SOURCE s1 (v1 int) with (
186  connector = 'kafka',
187  topic = 'kafka_alter',
188  properties.bootstrap.server = 'message_queue:29092',
189  scan.startup.mode = 'earliest'
190) FORMAT PLAIN ENCODE JSON"#
191                .to_owned(),
192            info: Some(StreamSourceInfo {
193                ..Default::default()
194            }),
195            ..Default::default()
196        };
197        mgr.create_source(pb_source).await?;
198        let source_id: SourceId = Source::find()
199            .select_only()
200            .column(source::Column::SourceId)
201            .filter(source::Column::Name.eq("s1"))
202            .into_tuple()
203            .one(&mgr.inner.read().await.db)
204            .await?
205            .unwrap();
206
207        let pb_view = PbView {
208            schema_id: TEST_SCHEMA_ID,
209            database_id: TEST_DATABASE_ID,
210            name: "view_1".to_owned(),
211            owner: TEST_OWNER_ID as _,
212            sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
213            ..Default::default()
214        };
215        mgr.create_view(pb_view, HashSet::from([source_id.as_object_id()]))
216            .await?;
217        let view_id: ViewId = View::find()
218            .select_only()
219            .column(view::Column::ViewId)
220            .filter(view::Column::Name.eq("view_1"))
221            .into_tuple()
222            .one(&mgr.inner.read().await.db)
223            .await?
224            .unwrap();
225
226        mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
227        let source = Source::find_by_id(source_id)
228            .one(&mgr.inner.read().await.db)
229            .await?
230            .unwrap();
231        assert_eq!(source.name, "s2");
232        assert_eq!(
233            source.definition,
234            "CREATE SOURCE s2 (v1 INT) WITH (\
235  connector = 'kafka', \
236  topic = 'kafka_alter', \
237  properties.bootstrap.server = 'message_queue:29092', \
238  scan.startup.mode = 'earliest'\
239) FORMAT PLAIN ENCODE JSON"
240        );
241
242        let view = View::find_by_id(view_id)
243            .one(&mgr.inner.read().await.db)
244            .await?
245            .unwrap();
246        assert_eq!(
247            view.definition,
248            "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
249        );
250
251        mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
252            .await?;
253        assert!(
254            View::find_by_id(view_id)
255                .one(&mgr.inner.read().await.db)
256                .await?
257                .is_none()
258        );
259
260        Ok(())
261    }
262
263    #[tokio::test]
264    async fn test_abort_initial_materialized_view_reports_cancellation() -> MetaResult<()> {
265        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
266
267        let mut inner = mgr.inner.write().await;
268        let txn = inner.db.begin().await?;
269        let obj = CatalogController::create_object(
270            &txn,
271            ObjectType::Table,
272            TEST_OWNER_ID,
273            Some(TEST_DATABASE_ID),
274            Some(TEST_SCHEMA_ID),
275        )
276        .await?;
277        let job_id = obj.oid.as_job_id();
278
279        table::ActiveModel {
280            table_id: Set(obj.oid.as_table_id()),
281            name: Set("mv_abort_initial".to_owned()),
282            optional_associated_source_id: Set(None),
283            table_type: Set(TableType::MaterializedView),
284            belongs_to_job_id: Set(None),
285            columns: Set(vec![].into()),
286            pk: Set(vec![].into()),
287            distribution_key: Set(Vec::<i32>::new().into()),
288            stream_key: Set(Vec::<i32>::new().into()),
289            append_only: Set(false),
290            fragment_id: Set(None),
291            vnode_col_index: Set(None),
292            row_id_index: Set(None),
293            value_indices: Set(Vec::<i32>::new().into()),
294            definition: Set("CREATE MATERIALIZED VIEW mv_abort_initial AS SELECT 1".to_owned()),
295            handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
296            version_column_indices: Set(None),
297            read_prefix_len_hint: Set(0),
298            watermark_indices: Set(Vec::<i32>::new().into()),
299            dist_key_in_pk: Set(Vec::<i32>::new().into()),
300            dml_fragment_id: Set(None),
301            cardinality: Set(None),
302            cleaned_by_watermark: Set(false),
303            description: Set(None),
304            version: Set(None),
305            retention_seconds: Set(None),
306            cdc_table_id: Set(None),
307            vnode_count: Set(1),
308            webhook_info: Set(None),
309            engine: Set(None),
310            clean_watermark_index_in_pk: Set(None),
311            clean_watermark_indices: Set(None),
312            refreshable: Set(false),
313            vector_index_info: Set(None),
314            cdc_table_type: Set(None),
315        }
316        .insert(&txn)
317        .await?;
318
319        streaming_job::ActiveModel {
320            job_id: Set(job_id),
321            job_status: Set(JobStatus::Initial),
322            create_type: Set(CreateType::Foreground),
323            timezone: Set(None),
324            config_override: Set(None),
325            adaptive_parallelism_strategy: Set(None),
326            parallelism: Set(StreamingParallelism::Adaptive),
327            backfill_parallelism: Set(None),
328            backfill_orders: Set(None),
329            max_parallelism: Set(1),
330            specific_resource_group: Set(None),
331            is_serverless_backfill: Set(false),
332        }
333        .insert(&txn)
334        .await?;
335
336        let (tx, rx) = oneshot::channel();
337        inner.register_finish_notifier(TEST_DATABASE_ID, job_id, tx);
338        txn.commit().await?;
339        drop(inner);
340
341        let (aborted, database_id) = mgr.try_abort_creating_streaming_job(job_id, true).await?;
342        assert!(aborted);
343        assert_eq!(database_id, Some(TEST_DATABASE_ID));
344
345        let err = rx
346            .await
347            .expect("finish notifier should be notified")
348            .expect_err("initial job drop should cancel the create wait");
349        assert!(err.contains("cancelled"));
350
351        let db = &mgr.inner.read().await.db;
352        assert!(Object::find_by_id(job_id).one(db).await?.is_none());
353        assert!(StreamingJob::find_by_id(job_id).one(db).await?.is_none());
354        assert!(
355            Table::find_by_id(job_id.as_mv_table_id())
356                .one(db)
357                .await?
358                .is_none()
359        );
360
361        Ok(())
362    }
363
364    #[tokio::test]
365    async fn test_abort_creating_subscription_commits_delete() -> MetaResult<()> {
366        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
367        let pb_view = PbView {
368            schema_id: TEST_SCHEMA_ID,
369            database_id: TEST_DATABASE_ID,
370            name: "subscription_dep_view".to_owned(),
371            owner: TEST_OWNER_ID as _,
372            sql: "CREATE VIEW subscription_dep_view AS SELECT 1".to_owned(),
373            ..Default::default()
374        };
375        mgr.create_view(pb_view, HashSet::new()).await?;
376
377        let view_id: ViewId = View::find()
378            .select_only()
379            .column(view::Column::ViewId)
380            .filter(view::Column::Name.eq("subscription_dep_view"))
381            .into_tuple()
382            .one(&mgr.inner.read().await.db)
383            .await?
384            .unwrap();
385
386        let mut pb_subscription = PbSubscription {
387            name: "subscription_to_abort".to_owned(),
388            definition: "CREATE SUBSCRIPTION subscription_to_abort FROM subscription_dep_view"
389                .to_owned(),
390            retention_seconds: 86400,
391            database_id: TEST_DATABASE_ID,
392            schema_id: TEST_SCHEMA_ID,
393            dependent_table_id: view_id.as_object_id().as_table_id(),
394            owner: TEST_OWNER_ID as _,
395            subscription_state: SubscriptionState::Init as _,
396            ..Default::default()
397        };
398        mgr.create_subscription_catalog(&mut pb_subscription)
399            .await?;
400
401        mgr.try_abort_creating_subscription(pb_subscription.id)
402            .await?;
403
404        assert!(
405            Subscription::find_by_id(pb_subscription.id)
406                .one(&mgr.inner.read().await.db)
407                .await?
408                .is_none()
409        );
410
411        Ok(())
412    }
413}