risingwave_meta/controller/catalog/
test.rs1#[cfg(test)]
16mod tests {
17 use risingwave_pb::catalog::StreamSourceInfo;
18 use risingwave_pb::catalog::subscription::SubscriptionState;
19
20 use crate::controller::catalog::*;
21
22 const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
23 const TEST_SCHEMA_ID: SchemaId = SchemaId::new(2);
24 const TEST_OWNER_ID: UserId = UserId::new(1);
25
26 #[tokio::test]
27 async fn test_database_func() -> MetaResult<()> {
28 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
29 let pb_database = PbDatabase {
30 name: "db1".to_owned(),
31 owner: TEST_OWNER_ID as _,
32 ..Default::default()
33 };
34 mgr.create_database(pb_database).await?;
35
36 let database_id: DatabaseId = Database::find()
37 .select_only()
38 .column(database::Column::DatabaseId)
39 .filter(database::Column::Name.eq("db1"))
40 .into_tuple()
41 .one(&mgr.inner.read().await.db)
42 .await?
43 .unwrap();
44
45 mgr.alter_name(ObjectType::Database, database_id, "db2")
46 .await?;
47 let database = Database::find_by_id(database_id)
48 .one(&mgr.inner.read().await.db)
49 .await?
50 .unwrap();
51 assert_eq!(database.name, "db2");
52
53 mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
54 .await?;
55
56 Ok(())
57 }
58
59 #[tokio::test]
60 async fn test_schema_func() -> MetaResult<()> {
61 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
62 let pb_schema = PbSchema {
63 database_id: TEST_DATABASE_ID,
64 name: "schema1".to_owned(),
65 owner: TEST_OWNER_ID as _,
66 ..Default::default()
67 };
68 mgr.create_schema(pb_schema.clone()).await?;
69 assert!(mgr.create_schema(pb_schema).await.is_err());
70
71 let schema_id: SchemaId = Schema::find()
72 .select_only()
73 .column(schema::Column::SchemaId)
74 .filter(schema::Column::Name.eq("schema1"))
75 .into_tuple()
76 .one(&mgr.inner.read().await.db)
77 .await?
78 .unwrap();
79
80 mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
81 .await?;
82 let schema = Schema::find_by_id(schema_id)
83 .one(&mgr.inner.read().await.db)
84 .await?
85 .unwrap();
86 assert_eq!(schema.name, "schema2");
87 mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
88 .await?;
89
90 Ok(())
91 }
92
93 #[tokio::test]
94 async fn test_create_view() -> MetaResult<()> {
95 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
96 let pb_view = PbView {
97 schema_id: TEST_SCHEMA_ID,
98 database_id: TEST_DATABASE_ID,
99 name: "view".to_owned(),
100 owner: TEST_OWNER_ID as _,
101 sql: "CREATE VIEW view AS SELECT 1".to_owned(),
102 ..Default::default()
103 };
104 mgr.create_view(pb_view.clone(), HashSet::new()).await?;
105 assert!(mgr.create_view(pb_view, HashSet::new()).await.is_err());
106
107 let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
108 mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
109 .await?;
110 assert!(
111 View::find_by_id(view.view_id)
112 .one(&mgr.inner.read().await.db)
113 .await?
114 .is_none()
115 );
116
117 Ok(())
118 }
119
120 #[tokio::test]
121 async fn test_create_function() -> MetaResult<()> {
122 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
123 let test_data_type = risingwave_pb::data::DataType {
124 type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
125 ..Default::default()
126 };
127 let arg_types = vec![test_data_type.clone()];
128 let pb_function = PbFunction {
129 schema_id: TEST_SCHEMA_ID,
130 database_id: TEST_DATABASE_ID,
131 name: "test_function".to_owned(),
132 owner: TEST_OWNER_ID as _,
133 arg_types,
134 return_type: Some(test_data_type.clone()),
135 language: "python".to_owned(),
136 kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
137 Default::default(),
138 )),
139 ..Default::default()
140 };
141 mgr.create_function(pb_function.clone()).await?;
142 assert!(mgr.create_function(pb_function).await.is_err());
143
144 let function = Function::find()
145 .inner_join(Object)
146 .filter(
147 object::Column::DatabaseId
148 .eq(TEST_DATABASE_ID)
149 .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
150 .add(function::Column::Name.eq("test_function")),
151 )
152 .one(&mgr.inner.read().await.db)
153 .await?
154 .unwrap();
155 assert_eq!(function.return_type.to_protobuf(), test_data_type);
156 assert_eq!(function.arg_types.to_protobuf().len(), 1);
157 assert_eq!(function.language, "python");
158
159 mgr.drop_object(
160 ObjectType::Function,
161 function.function_id,
162 DropMode::Restrict,
163 )
164 .await?;
165 assert!(
166 Object::find_by_id(function.function_id)
167 .one(&mgr.inner.read().await.db)
168 .await?
169 .is_none()
170 );
171
172 Ok(())
173 }
174
175 #[tokio::test]
176 async fn test_alter_relation_rename() -> MetaResult<()> {
177 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
178 let pb_source = PbSource {
179 schema_id: TEST_SCHEMA_ID,
180 database_id: TEST_DATABASE_ID,
181 name: "s1".to_owned(),
182 owner: TEST_OWNER_ID as _,
183 definition: r#"CREATE SOURCE s1 (v1 int) with (
184 connector = 'kafka',
185 topic = 'kafka_alter',
186 properties.bootstrap.server = 'message_queue:29092',
187 scan.startup.mode = 'earliest'
188) FORMAT PLAIN ENCODE JSON"#
189 .to_owned(),
190 info: Some(StreamSourceInfo {
191 ..Default::default()
192 }),
193 ..Default::default()
194 };
195 mgr.create_source(pb_source).await?;
196 let source_id: SourceId = Source::find()
197 .select_only()
198 .column(source::Column::SourceId)
199 .filter(source::Column::Name.eq("s1"))
200 .into_tuple()
201 .one(&mgr.inner.read().await.db)
202 .await?
203 .unwrap();
204
205 let pb_view = PbView {
206 schema_id: TEST_SCHEMA_ID,
207 database_id: TEST_DATABASE_ID,
208 name: "view_1".to_owned(),
209 owner: TEST_OWNER_ID as _,
210 sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
211 ..Default::default()
212 };
213 mgr.create_view(pb_view, HashSet::from([source_id.as_object_id()]))
214 .await?;
215 let view_id: ViewId = View::find()
216 .select_only()
217 .column(view::Column::ViewId)
218 .filter(view::Column::Name.eq("view_1"))
219 .into_tuple()
220 .one(&mgr.inner.read().await.db)
221 .await?
222 .unwrap();
223
224 mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
225 let source = Source::find_by_id(source_id)
226 .one(&mgr.inner.read().await.db)
227 .await?
228 .unwrap();
229 assert_eq!(source.name, "s2");
230 assert_eq!(
231 source.definition,
232 "CREATE SOURCE s2 (v1 INT) WITH (\
233 connector = 'kafka', \
234 topic = 'kafka_alter', \
235 properties.bootstrap.server = 'message_queue:29092', \
236 scan.startup.mode = 'earliest'\
237) FORMAT PLAIN ENCODE JSON"
238 );
239
240 let view = View::find_by_id(view_id)
241 .one(&mgr.inner.read().await.db)
242 .await?
243 .unwrap();
244 assert_eq!(
245 view.definition,
246 "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
247 );
248
249 mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
250 .await?;
251 assert!(
252 View::find_by_id(view_id)
253 .one(&mgr.inner.read().await.db)
254 .await?
255 .is_none()
256 );
257
258 Ok(())
259 }
260
261 #[tokio::test]
262 async fn test_abort_creating_subscription_commits_delete() -> MetaResult<()> {
263 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
264 let pb_view = PbView {
265 schema_id: TEST_SCHEMA_ID,
266 database_id: TEST_DATABASE_ID,
267 name: "subscription_dep_view".to_owned(),
268 owner: TEST_OWNER_ID as _,
269 sql: "CREATE VIEW subscription_dep_view AS SELECT 1".to_owned(),
270 ..Default::default()
271 };
272 mgr.create_view(pb_view, HashSet::new()).await?;
273
274 let view_id: ViewId = View::find()
275 .select_only()
276 .column(view::Column::ViewId)
277 .filter(view::Column::Name.eq("subscription_dep_view"))
278 .into_tuple()
279 .one(&mgr.inner.read().await.db)
280 .await?
281 .unwrap();
282
283 let mut pb_subscription = PbSubscription {
284 name: "subscription_to_abort".to_owned(),
285 definition: "CREATE SUBSCRIPTION subscription_to_abort FROM subscription_dep_view"
286 .to_owned(),
287 retention_seconds: 86400,
288 database_id: TEST_DATABASE_ID,
289 schema_id: TEST_SCHEMA_ID,
290 dependent_table_id: view_id.as_object_id().as_table_id(),
291 owner: TEST_OWNER_ID as _,
292 subscription_state: SubscriptionState::Init as _,
293 ..Default::default()
294 };
295 mgr.create_subscription_catalog(&mut pb_subscription)
296 .await?;
297
298 mgr.try_abort_creating_subscription(pb_subscription.id)
299 .await?;
300
301 assert!(
302 Subscription::find_by_id(pb_subscription.id)
303 .one(&mgr.inner.read().await.db)
304 .await?
305 .is_none()
306 );
307
308 Ok(())
309 }
310}