risingwave_meta/controller/catalog/
test.rs1#[cfg(test)]
16mod tests {
17 use risingwave_pb::catalog::StreamSourceInfo;
18
19 use crate::controller::catalog::*;
20
21 const TEST_DATABASE_ID: DatabaseId = 1;
22 const TEST_SCHEMA_ID: SchemaId = 2;
23 const TEST_OWNER_ID: UserId = 1;
24
25 #[tokio::test]
26 async fn test_database_func() -> MetaResult<()> {
27 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
28 let pb_database = PbDatabase {
29 name: "db1".to_owned(),
30 owner: TEST_OWNER_ID as _,
31 ..Default::default()
32 };
33 mgr.create_database(pb_database).await?;
34
35 let database_id: DatabaseId = Database::find()
36 .select_only()
37 .column(database::Column::DatabaseId)
38 .filter(database::Column::Name.eq("db1"))
39 .into_tuple()
40 .one(&mgr.inner.read().await.db)
41 .await?
42 .unwrap();
43
44 mgr.alter_name(ObjectType::Database, database_id, "db2")
45 .await?;
46 let database = Database::find_by_id(database_id)
47 .one(&mgr.inner.read().await.db)
48 .await?
49 .unwrap();
50 assert_eq!(database.name, "db2");
51
52 mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
53 .await?;
54
55 Ok(())
56 }
57
58 #[tokio::test]
59 async fn test_schema_func() -> MetaResult<()> {
60 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
61 let pb_schema = PbSchema {
62 database_id: TEST_DATABASE_ID as _,
63 name: "schema1".to_owned(),
64 owner: TEST_OWNER_ID as _,
65 ..Default::default()
66 };
67 mgr.create_schema(pb_schema.clone()).await?;
68 assert!(mgr.create_schema(pb_schema).await.is_err());
69
70 let schema_id: SchemaId = Schema::find()
71 .select_only()
72 .column(schema::Column::SchemaId)
73 .filter(schema::Column::Name.eq("schema1"))
74 .into_tuple()
75 .one(&mgr.inner.read().await.db)
76 .await?
77 .unwrap();
78
79 mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
80 .await?;
81 let schema = Schema::find_by_id(schema_id)
82 .one(&mgr.inner.read().await.db)
83 .await?
84 .unwrap();
85 assert_eq!(schema.name, "schema2");
86 mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
87 .await?;
88
89 Ok(())
90 }
91
92 #[tokio::test]
93 async fn test_create_view() -> MetaResult<()> {
94 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
95 let pb_view = PbView {
96 schema_id: TEST_SCHEMA_ID as _,
97 database_id: TEST_DATABASE_ID as _,
98 name: "view".to_owned(),
99 owner: TEST_OWNER_ID as _,
100 sql: "CREATE VIEW view AS SELECT 1".to_owned(),
101 ..Default::default()
102 };
103 mgr.create_view(pb_view.clone()).await?;
104 assert!(mgr.create_view(pb_view).await.is_err());
105
106 let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
107 mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
108 .await?;
109 assert!(
110 View::find_by_id(view.view_id)
111 .one(&mgr.inner.read().await.db)
112 .await?
113 .is_none()
114 );
115
116 Ok(())
117 }
118
119 #[tokio::test]
120 async fn test_create_function() -> MetaResult<()> {
121 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
122 let test_data_type = risingwave_pb::data::DataType {
123 type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
124 ..Default::default()
125 };
126 let arg_types = vec![test_data_type.clone()];
127 let pb_function = PbFunction {
128 schema_id: TEST_SCHEMA_ID as _,
129 database_id: TEST_DATABASE_ID as _,
130 name: "test_function".to_owned(),
131 owner: TEST_OWNER_ID as _,
132 arg_types,
133 return_type: Some(test_data_type.clone()),
134 language: "python".to_owned(),
135 kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
136 Default::default(),
137 )),
138 ..Default::default()
139 };
140 mgr.create_function(pb_function.clone()).await?;
141 assert!(mgr.create_function(pb_function).await.is_err());
142
143 let function = Function::find()
144 .inner_join(Object)
145 .filter(
146 object::Column::DatabaseId
147 .eq(TEST_DATABASE_ID)
148 .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
149 .add(function::Column::Name.eq("test_function")),
150 )
151 .one(&mgr.inner.read().await.db)
152 .await?
153 .unwrap();
154 assert_eq!(function.return_type.to_protobuf(), test_data_type);
155 assert_eq!(function.arg_types.to_protobuf().len(), 1);
156 assert_eq!(function.language, "python");
157
158 mgr.drop_object(
159 ObjectType::Function,
160 function.function_id,
161 DropMode::Restrict,
162 )
163 .await?;
164 assert!(
165 Object::find_by_id(function.function_id)
166 .one(&mgr.inner.read().await.db)
167 .await?
168 .is_none()
169 );
170
171 Ok(())
172 }
173
174 #[tokio::test]
175 async fn test_alter_relation_rename() -> MetaResult<()> {
176 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
177 let pb_source = PbSource {
178 schema_id: TEST_SCHEMA_ID as _,
179 database_id: TEST_DATABASE_ID as _,
180 name: "s1".to_owned(),
181 owner: TEST_OWNER_ID as _,
182 definition: r#"CREATE SOURCE s1 (v1 int) with (
183 connector = 'kafka',
184 topic = 'kafka_alter',
185 properties.bootstrap.server = 'message_queue:29092',
186 scan.startup.mode = 'earliest'
187) FORMAT PLAIN ENCODE JSON"#
188 .to_owned(),
189 info: Some(StreamSourceInfo {
190 ..Default::default()
191 }),
192 ..Default::default()
193 };
194 mgr.create_source(pb_source).await?;
195 let source_id: SourceId = Source::find()
196 .select_only()
197 .column(source::Column::SourceId)
198 .filter(source::Column::Name.eq("s1"))
199 .into_tuple()
200 .one(&mgr.inner.read().await.db)
201 .await?
202 .unwrap();
203
204 let pb_view = PbView {
205 schema_id: TEST_SCHEMA_ID as _,
206 database_id: TEST_DATABASE_ID as _,
207 name: "view_1".to_owned(),
208 owner: TEST_OWNER_ID as _,
209 sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
210 dependent_relations: vec![source_id as _],
211 ..Default::default()
212 };
213 mgr.create_view(pb_view).await?;
214 let view_id: ViewId = View::find()
215 .select_only()
216 .column(view::Column::ViewId)
217 .filter(view::Column::Name.eq("view_1"))
218 .into_tuple()
219 .one(&mgr.inner.read().await.db)
220 .await?
221 .unwrap();
222
223 mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
224 let source = Source::find_by_id(source_id)
225 .one(&mgr.inner.read().await.db)
226 .await?
227 .unwrap();
228 assert_eq!(source.name, "s2");
229 assert_eq!(
230 source.definition,
231 "CREATE SOURCE s2 (v1 INT) WITH (\
232 connector = 'kafka', \
233 topic = 'kafka_alter', \
234 properties.bootstrap.server = 'message_queue:29092', \
235 scan.startup.mode = 'earliest'\
236) FORMAT PLAIN ENCODE JSON"
237 );
238
239 let view = View::find_by_id(view_id)
240 .one(&mgr.inner.read().await.db)
241 .await?
242 .unwrap();
243 assert_eq!(
244 view.definition,
245 "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
246 );
247
248 mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
249 .await?;
250 assert!(
251 View::find_by_id(view_id)
252 .one(&mgr.inner.read().await.db)
253 .await?
254 .is_none()
255 );
256
257 Ok(())
258 }
259}