risingwave_meta/controller/catalog/
test.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use risingwave_pb::catalog::StreamSourceInfo;
18    use risingwave_pb::catalog::subscription::SubscriptionState;
19
20    use crate::controller::catalog::*;
21
22    const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
23    const TEST_SCHEMA_ID: SchemaId = SchemaId::new(2);
24    const TEST_OWNER_ID: UserId = UserId::new(1);
25
26    #[tokio::test]
27    async fn test_database_func() -> MetaResult<()> {
28        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
29        let pb_database = PbDatabase {
30            name: "db1".to_owned(),
31            owner: TEST_OWNER_ID as _,
32            ..Default::default()
33        };
34        mgr.create_database(pb_database).await?;
35
36        let database_id: DatabaseId = Database::find()
37            .select_only()
38            .column(database::Column::DatabaseId)
39            .filter(database::Column::Name.eq("db1"))
40            .into_tuple()
41            .one(&mgr.inner.read().await.db)
42            .await?
43            .unwrap();
44
45        mgr.alter_name(ObjectType::Database, database_id, "db2")
46            .await?;
47        let database = Database::find_by_id(database_id)
48            .one(&mgr.inner.read().await.db)
49            .await?
50            .unwrap();
51        assert_eq!(database.name, "db2");
52
53        mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
54            .await?;
55
56        Ok(())
57    }
58
59    #[tokio::test]
60    async fn test_schema_func() -> MetaResult<()> {
61        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
62        let pb_schema = PbSchema {
63            database_id: TEST_DATABASE_ID,
64            name: "schema1".to_owned(),
65            owner: TEST_OWNER_ID as _,
66            ..Default::default()
67        };
68        mgr.create_schema(pb_schema.clone()).await?;
69        assert!(mgr.create_schema(pb_schema).await.is_err());
70
71        let schema_id: SchemaId = Schema::find()
72            .select_only()
73            .column(schema::Column::SchemaId)
74            .filter(schema::Column::Name.eq("schema1"))
75            .into_tuple()
76            .one(&mgr.inner.read().await.db)
77            .await?
78            .unwrap();
79
80        mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
81            .await?;
82        let schema = Schema::find_by_id(schema_id)
83            .one(&mgr.inner.read().await.db)
84            .await?
85            .unwrap();
86        assert_eq!(schema.name, "schema2");
87        mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
88            .await?;
89
90        Ok(())
91    }
92
93    #[tokio::test]
94    async fn test_create_view() -> MetaResult<()> {
95        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
96        let pb_view = PbView {
97            schema_id: TEST_SCHEMA_ID,
98            database_id: TEST_DATABASE_ID,
99            name: "view".to_owned(),
100            owner: TEST_OWNER_ID as _,
101            sql: "CREATE VIEW view AS SELECT 1".to_owned(),
102            ..Default::default()
103        };
104        mgr.create_view(pb_view.clone(), HashSet::new()).await?;
105        assert!(mgr.create_view(pb_view, HashSet::new()).await.is_err());
106
107        let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
108        mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
109            .await?;
110        assert!(
111            View::find_by_id(view.view_id)
112                .one(&mgr.inner.read().await.db)
113                .await?
114                .is_none()
115        );
116
117        Ok(())
118    }
119
120    #[tokio::test]
121    async fn test_create_function() -> MetaResult<()> {
122        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
123        let test_data_type = risingwave_pb::data::DataType {
124            type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
125            ..Default::default()
126        };
127        let arg_types = vec![test_data_type.clone()];
128        let pb_function = PbFunction {
129            schema_id: TEST_SCHEMA_ID,
130            database_id: TEST_DATABASE_ID,
131            name: "test_function".to_owned(),
132            owner: TEST_OWNER_ID as _,
133            arg_types,
134            return_type: Some(test_data_type.clone()),
135            language: "python".to_owned(),
136            kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
137                Default::default(),
138            )),
139            ..Default::default()
140        };
141        mgr.create_function(pb_function.clone()).await?;
142        assert!(mgr.create_function(pb_function).await.is_err());
143
144        let function = Function::find()
145            .inner_join(Object)
146            .filter(
147                object::Column::DatabaseId
148                    .eq(TEST_DATABASE_ID)
149                    .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
150                    .add(function::Column::Name.eq("test_function")),
151            )
152            .one(&mgr.inner.read().await.db)
153            .await?
154            .unwrap();
155        assert_eq!(function.return_type.to_protobuf(), test_data_type);
156        assert_eq!(function.arg_types.to_protobuf().len(), 1);
157        assert_eq!(function.language, "python");
158
159        mgr.drop_object(
160            ObjectType::Function,
161            function.function_id,
162            DropMode::Restrict,
163        )
164        .await?;
165        assert!(
166            Object::find_by_id(function.function_id)
167                .one(&mgr.inner.read().await.db)
168                .await?
169                .is_none()
170        );
171
172        Ok(())
173    }
174
175    #[tokio::test]
176    async fn test_alter_relation_rename() -> MetaResult<()> {
177        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
178        let pb_source = PbSource {
179            schema_id: TEST_SCHEMA_ID,
180            database_id: TEST_DATABASE_ID,
181            name: "s1".to_owned(),
182            owner: TEST_OWNER_ID as _,
183            definition: r#"CREATE SOURCE s1 (v1 int) with (
184  connector = 'kafka',
185  topic = 'kafka_alter',
186  properties.bootstrap.server = 'message_queue:29092',
187  scan.startup.mode = 'earliest'
188) FORMAT PLAIN ENCODE JSON"#
189                .to_owned(),
190            info: Some(StreamSourceInfo {
191                ..Default::default()
192            }),
193            ..Default::default()
194        };
195        mgr.create_source(pb_source).await?;
196        let source_id: SourceId = Source::find()
197            .select_only()
198            .column(source::Column::SourceId)
199            .filter(source::Column::Name.eq("s1"))
200            .into_tuple()
201            .one(&mgr.inner.read().await.db)
202            .await?
203            .unwrap();
204
205        let pb_view = PbView {
206            schema_id: TEST_SCHEMA_ID,
207            database_id: TEST_DATABASE_ID,
208            name: "view_1".to_owned(),
209            owner: TEST_OWNER_ID as _,
210            sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
211            ..Default::default()
212        };
213        mgr.create_view(pb_view, HashSet::from([source_id.as_object_id()]))
214            .await?;
215        let view_id: ViewId = View::find()
216            .select_only()
217            .column(view::Column::ViewId)
218            .filter(view::Column::Name.eq("view_1"))
219            .into_tuple()
220            .one(&mgr.inner.read().await.db)
221            .await?
222            .unwrap();
223
224        mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
225        let source = Source::find_by_id(source_id)
226            .one(&mgr.inner.read().await.db)
227            .await?
228            .unwrap();
229        assert_eq!(source.name, "s2");
230        assert_eq!(
231            source.definition,
232            "CREATE SOURCE s2 (v1 INT) WITH (\
233  connector = 'kafka', \
234  topic = 'kafka_alter', \
235  properties.bootstrap.server = 'message_queue:29092', \
236  scan.startup.mode = 'earliest'\
237) FORMAT PLAIN ENCODE JSON"
238        );
239
240        let view = View::find_by_id(view_id)
241            .one(&mgr.inner.read().await.db)
242            .await?
243            .unwrap();
244        assert_eq!(
245            view.definition,
246            "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
247        );
248
249        mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
250            .await?;
251        assert!(
252            View::find_by_id(view_id)
253                .one(&mgr.inner.read().await.db)
254                .await?
255                .is_none()
256        );
257
258        Ok(())
259    }
260
261    #[tokio::test]
262    async fn test_abort_creating_subscription_commits_delete() -> MetaResult<()> {
263        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
264        let pb_view = PbView {
265            schema_id: TEST_SCHEMA_ID,
266            database_id: TEST_DATABASE_ID,
267            name: "subscription_dep_view".to_owned(),
268            owner: TEST_OWNER_ID as _,
269            sql: "CREATE VIEW subscription_dep_view AS SELECT 1".to_owned(),
270            ..Default::default()
271        };
272        mgr.create_view(pb_view, HashSet::new()).await?;
273
274        let view_id: ViewId = View::find()
275            .select_only()
276            .column(view::Column::ViewId)
277            .filter(view::Column::Name.eq("subscription_dep_view"))
278            .into_tuple()
279            .one(&mgr.inner.read().await.db)
280            .await?
281            .unwrap();
282
283        let mut pb_subscription = PbSubscription {
284            name: "subscription_to_abort".to_owned(),
285            definition: "CREATE SUBSCRIPTION subscription_to_abort FROM subscription_dep_view"
286                .to_owned(),
287            retention_seconds: 86400,
288            database_id: TEST_DATABASE_ID,
289            schema_id: TEST_SCHEMA_ID,
290            dependent_table_id: view_id.as_object_id().as_table_id(),
291            owner: TEST_OWNER_ID as _,
292            subscription_state: SubscriptionState::Init as _,
293            ..Default::default()
294        };
295        mgr.create_subscription_catalog(&mut pb_subscription)
296            .await?;
297
298        mgr.try_abort_creating_subscription(pb_subscription.id)
299            .await?;
300
301        assert!(
302            Subscription::find_by_id(pb_subscription.id)
303                .one(&mgr.inner.read().await.db)
304                .await?
305                .is_none()
306        );
307
308        Ok(())
309    }
310}