risingwave_meta/controller/catalog/
test.rs1#[cfg(test)]
16mod tests {
17 use risingwave_meta_model::table::HandleConflictBehavior;
18 use risingwave_pb::catalog::StreamSourceInfo;
19 use risingwave_pb::catalog::subscription::SubscriptionState;
20 use tokio::sync::oneshot;
21
22 use crate::controller::catalog::*;
23
24 const TEST_DATABASE_ID: DatabaseId = DatabaseId::new(1);
25 const TEST_SCHEMA_ID: SchemaId = SchemaId::new(2);
26 const TEST_OWNER_ID: UserId = UserId::new(1);
27
28 #[tokio::test]
29 async fn test_database_func() -> MetaResult<()> {
30 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
31 let pb_database = PbDatabase {
32 name: "db1".to_owned(),
33 owner: TEST_OWNER_ID as _,
34 ..Default::default()
35 };
36 mgr.create_database(pb_database).await?;
37
38 let database_id: DatabaseId = Database::find()
39 .select_only()
40 .column(database::Column::DatabaseId)
41 .filter(database::Column::Name.eq("db1"))
42 .into_tuple()
43 .one(&mgr.inner.read().await.db)
44 .await?
45 .unwrap();
46
47 mgr.alter_name(ObjectType::Database, database_id, "db2")
48 .await?;
49 let database = Database::find_by_id(database_id)
50 .one(&mgr.inner.read().await.db)
51 .await?
52 .unwrap();
53 assert_eq!(database.name, "db2");
54
55 mgr.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
56 .await?;
57
58 Ok(())
59 }
60
61 #[tokio::test]
62 async fn test_schema_func() -> MetaResult<()> {
63 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
64 let pb_schema = PbSchema {
65 database_id: TEST_DATABASE_ID,
66 name: "schema1".to_owned(),
67 owner: TEST_OWNER_ID as _,
68 ..Default::default()
69 };
70 mgr.create_schema(pb_schema.clone()).await?;
71 assert!(mgr.create_schema(pb_schema).await.is_err());
72
73 let schema_id: SchemaId = Schema::find()
74 .select_only()
75 .column(schema::Column::SchemaId)
76 .filter(schema::Column::Name.eq("schema1"))
77 .into_tuple()
78 .one(&mgr.inner.read().await.db)
79 .await?
80 .unwrap();
81
82 mgr.alter_name(ObjectType::Schema, schema_id, "schema2")
83 .await?;
84 let schema = Schema::find_by_id(schema_id)
85 .one(&mgr.inner.read().await.db)
86 .await?
87 .unwrap();
88 assert_eq!(schema.name, "schema2");
89 mgr.drop_object(ObjectType::Schema, schema_id, DropMode::Restrict)
90 .await?;
91
92 Ok(())
93 }
94
95 #[tokio::test]
96 async fn test_create_view() -> MetaResult<()> {
97 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
98 let pb_view = PbView {
99 schema_id: TEST_SCHEMA_ID,
100 database_id: TEST_DATABASE_ID,
101 name: "view".to_owned(),
102 owner: TEST_OWNER_ID as _,
103 sql: "CREATE VIEW view AS SELECT 1".to_owned(),
104 ..Default::default()
105 };
106 mgr.create_view(pb_view.clone(), HashSet::new()).await?;
107 assert!(mgr.create_view(pb_view, HashSet::new()).await.is_err());
108
109 let view = View::find().one(&mgr.inner.read().await.db).await?.unwrap();
110 mgr.drop_object(ObjectType::View, view.view_id, DropMode::Cascade)
111 .await?;
112 assert!(
113 View::find_by_id(view.view_id)
114 .one(&mgr.inner.read().await.db)
115 .await?
116 .is_none()
117 );
118
119 Ok(())
120 }
121
122 #[tokio::test]
123 async fn test_create_function() -> MetaResult<()> {
124 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
125 let test_data_type = risingwave_pb::data::DataType {
126 type_name: risingwave_pb::data::data_type::TypeName::Int32 as _,
127 ..Default::default()
128 };
129 let arg_types = vec![test_data_type.clone()];
130 let pb_function = PbFunction {
131 schema_id: TEST_SCHEMA_ID,
132 database_id: TEST_DATABASE_ID,
133 name: "test_function".to_owned(),
134 owner: TEST_OWNER_ID as _,
135 arg_types,
136 return_type: Some(test_data_type.clone()),
137 language: "python".to_owned(),
138 kind: Some(risingwave_pb::catalog::function::Kind::Scalar(
139 Default::default(),
140 )),
141 ..Default::default()
142 };
143 mgr.create_function(pb_function.clone()).await?;
144 assert!(mgr.create_function(pb_function).await.is_err());
145
146 let function = Function::find()
147 .inner_join(Object)
148 .filter(
149 object::Column::DatabaseId
150 .eq(TEST_DATABASE_ID)
151 .and(object::Column::SchemaId.eq(TEST_SCHEMA_ID))
152 .add(function::Column::Name.eq("test_function")),
153 )
154 .one(&mgr.inner.read().await.db)
155 .await?
156 .unwrap();
157 assert_eq!(function.return_type.to_protobuf(), test_data_type);
158 assert_eq!(function.arg_types.to_protobuf().len(), 1);
159 assert_eq!(function.language, "python");
160
161 mgr.drop_object(
162 ObjectType::Function,
163 function.function_id,
164 DropMode::Restrict,
165 )
166 .await?;
167 assert!(
168 Object::find_by_id(function.function_id)
169 .one(&mgr.inner.read().await.db)
170 .await?
171 .is_none()
172 );
173
174 Ok(())
175 }
176
177 #[tokio::test]
178 async fn test_alter_relation_rename() -> MetaResult<()> {
179 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
180 let pb_source = PbSource {
181 schema_id: TEST_SCHEMA_ID,
182 database_id: TEST_DATABASE_ID,
183 name: "s1".to_owned(),
184 owner: TEST_OWNER_ID as _,
185 definition: r#"CREATE SOURCE s1 (v1 int) with (
186 connector = 'kafka',
187 topic = 'kafka_alter',
188 properties.bootstrap.server = 'message_queue:29092',
189 scan.startup.mode = 'earliest'
190) FORMAT PLAIN ENCODE JSON"#
191 .to_owned(),
192 info: Some(StreamSourceInfo {
193 ..Default::default()
194 }),
195 ..Default::default()
196 };
197 mgr.create_source(pb_source).await?;
198 let source_id: SourceId = Source::find()
199 .select_only()
200 .column(source::Column::SourceId)
201 .filter(source::Column::Name.eq("s1"))
202 .into_tuple()
203 .one(&mgr.inner.read().await.db)
204 .await?
205 .unwrap();
206
207 let pb_view = PbView {
208 schema_id: TEST_SCHEMA_ID,
209 database_id: TEST_DATABASE_ID,
210 name: "view_1".to_owned(),
211 owner: TEST_OWNER_ID as _,
212 sql: "CREATE VIEW view_1 AS SELECT v1 FROM s1".to_owned(),
213 ..Default::default()
214 };
215 mgr.create_view(pb_view, HashSet::from([source_id.as_object_id()]))
216 .await?;
217 let view_id: ViewId = View::find()
218 .select_only()
219 .column(view::Column::ViewId)
220 .filter(view::Column::Name.eq("view_1"))
221 .into_tuple()
222 .one(&mgr.inner.read().await.db)
223 .await?
224 .unwrap();
225
226 mgr.alter_name(ObjectType::Source, source_id, "s2").await?;
227 let source = Source::find_by_id(source_id)
228 .one(&mgr.inner.read().await.db)
229 .await?
230 .unwrap();
231 assert_eq!(source.name, "s2");
232 assert_eq!(
233 source.definition,
234 "CREATE SOURCE s2 (v1 INT) WITH (\
235 connector = 'kafka', \
236 topic = 'kafka_alter', \
237 properties.bootstrap.server = 'message_queue:29092', \
238 scan.startup.mode = 'earliest'\
239) FORMAT PLAIN ENCODE JSON"
240 );
241
242 let view = View::find_by_id(view_id)
243 .one(&mgr.inner.read().await.db)
244 .await?
245 .unwrap();
246 assert_eq!(
247 view.definition,
248 "CREATE VIEW view_1 AS SELECT v1 FROM s2 AS s1"
249 );
250
251 mgr.drop_object(ObjectType::Source, source_id, DropMode::Cascade)
252 .await?;
253 assert!(
254 View::find_by_id(view_id)
255 .one(&mgr.inner.read().await.db)
256 .await?
257 .is_none()
258 );
259
260 Ok(())
261 }
262
263 #[tokio::test]
264 async fn test_abort_initial_materialized_view_reports_cancellation() -> MetaResult<()> {
265 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
266
267 let mut inner = mgr.inner.write().await;
268 let txn = inner.db.begin().await?;
269 let obj = CatalogController::create_object(
270 &txn,
271 ObjectType::Table,
272 TEST_OWNER_ID,
273 Some(TEST_DATABASE_ID),
274 Some(TEST_SCHEMA_ID),
275 )
276 .await?;
277 let job_id = obj.oid.as_job_id();
278
279 table::ActiveModel {
280 table_id: Set(obj.oid.as_table_id()),
281 name: Set("mv_abort_initial".to_owned()),
282 optional_associated_source_id: Set(None),
283 table_type: Set(TableType::MaterializedView),
284 belongs_to_job_id: Set(None),
285 columns: Set(vec![].into()),
286 pk: Set(vec![].into()),
287 distribution_key: Set(Vec::<i32>::new().into()),
288 stream_key: Set(Vec::<i32>::new().into()),
289 append_only: Set(false),
290 fragment_id: Set(None),
291 vnode_col_index: Set(None),
292 row_id_index: Set(None),
293 value_indices: Set(Vec::<i32>::new().into()),
294 definition: Set("CREATE MATERIALIZED VIEW mv_abort_initial AS SELECT 1".to_owned()),
295 handle_pk_conflict_behavior: Set(HandleConflictBehavior::NoCheck),
296 version_column_indices: Set(None),
297 read_prefix_len_hint: Set(0),
298 watermark_indices: Set(Vec::<i32>::new().into()),
299 dist_key_in_pk: Set(Vec::<i32>::new().into()),
300 dml_fragment_id: Set(None),
301 cardinality: Set(None),
302 cleaned_by_watermark: Set(false),
303 description: Set(None),
304 version: Set(None),
305 retention_seconds: Set(None),
306 cdc_table_id: Set(None),
307 vnode_count: Set(1),
308 webhook_info: Set(None),
309 engine: Set(None),
310 clean_watermark_index_in_pk: Set(None),
311 clean_watermark_indices: Set(None),
312 refreshable: Set(false),
313 vector_index_info: Set(None),
314 cdc_table_type: Set(None),
315 }
316 .insert(&txn)
317 .await?;
318
319 streaming_job::ActiveModel {
320 job_id: Set(job_id),
321 job_status: Set(JobStatus::Initial),
322 create_type: Set(CreateType::Foreground),
323 timezone: Set(None),
324 config_override: Set(None),
325 adaptive_parallelism_strategy: Set(None),
326 parallelism: Set(StreamingParallelism::Adaptive),
327 backfill_parallelism: Set(None),
328 backfill_orders: Set(None),
329 max_parallelism: Set(1),
330 specific_resource_group: Set(None),
331 is_serverless_backfill: Set(false),
332 }
333 .insert(&txn)
334 .await?;
335
336 let (tx, rx) = oneshot::channel();
337 inner.register_finish_notifier(TEST_DATABASE_ID, job_id, tx);
338 txn.commit().await?;
339 drop(inner);
340
341 let (aborted, database_id) = mgr.try_abort_creating_streaming_job(job_id, true).await?;
342 assert!(aborted);
343 assert_eq!(database_id, Some(TEST_DATABASE_ID));
344
345 let err = rx
346 .await
347 .expect("finish notifier should be notified")
348 .expect_err("initial job drop should cancel the create wait");
349 assert!(err.contains("cancelled"));
350
351 let db = &mgr.inner.read().await.db;
352 assert!(Object::find_by_id(job_id).one(db).await?.is_none());
353 assert!(StreamingJob::find_by_id(job_id).one(db).await?.is_none());
354 assert!(
355 Table::find_by_id(job_id.as_mv_table_id())
356 .one(db)
357 .await?
358 .is_none()
359 );
360
361 Ok(())
362 }
363
364 #[tokio::test]
365 async fn test_abort_creating_subscription_commits_delete() -> MetaResult<()> {
366 let mgr = CatalogController::new(MetaSrvEnv::for_test().await).await?;
367 let pb_view = PbView {
368 schema_id: TEST_SCHEMA_ID,
369 database_id: TEST_DATABASE_ID,
370 name: "subscription_dep_view".to_owned(),
371 owner: TEST_OWNER_ID as _,
372 sql: "CREATE VIEW subscription_dep_view AS SELECT 1".to_owned(),
373 ..Default::default()
374 };
375 mgr.create_view(pb_view, HashSet::new()).await?;
376
377 let view_id: ViewId = View::find()
378 .select_only()
379 .column(view::Column::ViewId)
380 .filter(view::Column::Name.eq("subscription_dep_view"))
381 .into_tuple()
382 .one(&mgr.inner.read().await.db)
383 .await?
384 .unwrap();
385
386 let mut pb_subscription = PbSubscription {
387 name: "subscription_to_abort".to_owned(),
388 definition: "CREATE SUBSCRIPTION subscription_to_abort FROM subscription_dep_view"
389 .to_owned(),
390 retention_seconds: 86400,
391 database_id: TEST_DATABASE_ID,
392 schema_id: TEST_SCHEMA_ID,
393 dependent_table_id: view_id.as_object_id().as_table_id(),
394 owner: TEST_OWNER_ID as _,
395 subscription_state: SubscriptionState::Init as _,
396 ..Default::default()
397 };
398 mgr.create_subscription_catalog(&mut pb_subscription)
399 .await?;
400
401 mgr.try_abort_creating_subscription(pb_subscription.id)
402 .await?;
403
404 assert!(
405 Subscription::find_by_id(pb_subscription.id)
406 .one(&mgr.inner.read().await.db)
407 .await?
408 .is_none()
409 );
410
411 Ok(())
412 }
413}