risingwave_meta/controller/catalog/
test.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests {
17    use risingwave_pb::catalog::StreamSourceInfo;
18
19    use crate::controller::catalog::*;
20
21    const TEST_DATABASE_ID: DatabaseId = 1;
22    const TEST_SCHEMA_ID: SchemaId = 2;
23    const TEST_OWNER_ID: UserId = 1;
24
25    #[tokio::test]
26    async fn test_database_func() -> MetaResult<()> {
27        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
28        let pb_database = PbDatabase {
29            name: "db1".to_owned(),
30            owner: TEST_OWNER_ID as _,
31            ..Default::default()
32        };
33        mgr.create_database(pb_database).await?;
34
35        let database_id: DatabaseId = Database::find()
36            .select_only()
37            .column(database::Column::DatabaseId)
38            .filter(database::Column::Name.eq("db1"))
39            .into_tuple()
40            .one(&mgr.inner.read().await.db)
41            .await?
42            .unwrap();
43
44        mgr.alter_name(ObjectType::Database, database_id, "db2")
45            .await?;
46        let database = Database::find_by_id(database_id)
47            .one(&mgr.inner.read().await.db)
48            .await?
49            .unwrap();
50        assert_eq!(database.name, "db2");
51
52        mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
53            .await?;
54
55        Ok(())
56    }
57
58    #[tokio::test]
59    async fn test_schema_func() -> MetaResult<()> {
60        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
61        let pb_schema = PbSchema {
62            database_id: TEST_DATABASE_ID as _,
63            name: "schema1".to_owned(),
64            owner: TEST_OWNER_ID as _,
65            ..Default::default()
66        };
67        mgr.create_schema(pb_schema.clone()).await?;
68        assert!(mgr.create_schema(pb_schema).await.is_err());
69
70        let schema_id: SchemaId = Schema::find()
71            .select_only()
72            .column(schema::Column::SchemaId)
73            .filter(schema::Column::Name.eq("schema1"))
74            .into_tuple()
75            .one(&mgr.inner.read().await.db)
76            .await?
77            .unwrap();
78
79        mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
80            .await?;
81        let schema = Schema::find_by_id(schema_id)
82            .one(&mgr.inner.read().await.db)
83            .await?
84            .unwrap();
85        assert_eq!(schema.name, "schema2");
86        mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
87            .await?;
88
89        Ok(())
90    }
91
92    #[tokio::test]
93    async fn test_create_view() -> MetaResult<()> {
94        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
95        let pb_view = PbView {
96            schema_id: TEST_SCHEMA_ID as _,
97            database_id: TEST_DATABASE_ID as _,
98            name: "view".to_owned(),
99            owner: TEST_OWNER_ID as _,
100            sql: "CREATE VIEW view AS SELECT 1".to_owned(),
101            ..Default::default()
102        };
103        mgr.create_view(pb_view.clone()).await?;
104        assert!(mgr.create_view(pb_view).await.is_err());
105
106        let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
107        mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
108            .await?;
109        assert!(
110            View::find_by_id(view.view_id)
111                .one(&mgr.inner.read().await.db)
112                .await?
113                .is_none()
114        );
115
116        Ok(())
117    }
118
119    #[tokio::test]
120    async fn test_create_function() -> MetaResult<()> {
121        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
122        let test_data_type = risingwave_pb::data::DataType {
123            type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
124            ..Default::default()
125        };
126        let arg_types = vec![test_data_type.clone()];
127        let pb_function = PbFunction {
128            schema_id: TEST_SCHEMA_ID as _,
129            database_id: TEST_DATABASE_ID as _,
130            name: "test_function".to_owned(),
131            owner: TEST_OWNER_ID as _,
132            arg_types,
133            return_type: Some(test_data_type.clone()),
134            language: "python".to_owned(),
135            kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
136                Default::default(),
137            )),
138            ..Default::default()
139        };
140        mgr.create_function(pb_function.clone()).await?;
141        assert!(mgr.create_function(pb_function).await.is_err());
142
143        let function = Function::find()
144            .inner_join(Object)
145            .filter(
146                object::Column::DatabaseId
147                    .eq(TEST_DATABASE_ID)
148                    .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
149                    .add(function::Column::Name.eq("test_function")),
150            )
151            .one(&mgr.inner.read().await.db)
152            .await?
153            .unwrap();
154        assert_eq!(function.return_type.to_protobuf(), test_data_type);
155        assert_eq!(function.arg_types.to_protobuf().len(), 1);
156        assert_eq!(function.language, "python");
157
158        mgr.drop_object(
159            ObjectType::Function,
160            function.function_id,
161            DropMode::Restrict,
162        )
163        .await?;
164        assert!(
165            Object::find_by_id(function.function_id)
166                .one(&mgr.inner.read().await.db)
167                .await?
168                .is_none()
169        );
170
171        Ok(())
172    }
173
174    #[tokio::test]
175    async fn test_alter_relation_rename() -> MetaResult<()> {
176        let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
177        let pb_source = PbSource {
178            schema_id: TEST_SCHEMA_ID as _,
179            database_id: TEST_DATABASE_ID as _,
180            name: "s1".to_owned(),
181            owner: TEST_OWNER_ID as _,
182            definition: r#"CREATE SOURCE s1 (v1 int) with (
183  connector = 'kafka',
184  topic = 'kafka_alter',
185  properties.bootstrap.server = 'message_queue:29092',
186  scan.startup.mode = 'earliest'
187) FORMAT PLAIN ENCODE JSON"#
188                .to_owned(),
189            info: Some(StreamSourceInfo {
190                ..Default::default()
191            }),
192            ..Default::default()
193        };
194        mgr.create_source(pb_source).await?;
195        let source_id: SourceId = Source::find()
196            .select_only()
197            .column(source::Column::SourceId)
198            .filter(source::Column::Name.eq("s1"))
199            .into_tuple()
200            .one(&mgr.inner.read().await.db)
201            .await?
202            .unwrap();
203
204        let pb_view = PbView {
205            schema_id: TEST_SCHEMA_ID as _,
206            database_id: TEST_DATABASE_ID as _,
207            name: "view_1".to_owned(),
208            owner: TEST_OWNER_ID as _,
209            sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
210            dependent_relations: vec![source_id as _],
211            ..Default::default()
212        };
213        mgr.create_view(pb_view).await?;
214        let view_id: ViewId = View::find()
215            .select_only()
216            .column(view::Column::ViewId)
217            .filter(view::Column::Name.eq("view_1"))
218            .into_tuple()
219            .one(&mgr.inner.read().await.db)
220            .await?
221            .unwrap();
222
223        mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
224        let source = Source::find_by_id(source_id)
225            .one(&mgr.inner.read().await.db)
226            .await?
227            .unwrap();
228        assert_eq!(source.name, "s2");
229        assert_eq!(
230            source.definition,
231            "CREATE SOURCE s2 (v1 INT) WITH (\
232  connector = 'kafka', \
233  topic = 'kafka_alter', \
234  properties.bootstrap.server = 'message_queue:29092', \
235  scan.startup.mode = 'earliest'\
236) FORMAT PLAIN ENCODE JSON"
237        );
238
239        let view = View::find_by_id(view_id)
240            .one(&mgr.inner.read().await.db)
241            .await?
242            .unwrap();
243        assert_eq!(
244            view.definition,
245            "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
246        );
247
248        mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
249            .await?;
250        assert!(
251            View::find_by_id(view_id)
252                .one(&mgr.inner.read().await.db)
253                .await?
254                .is_none()
255        );
256
257        Ok(())
258    }
259}