1use risingwave_common::catalog::AlterDatabaseParam;
16use sea_orm::DatabaseTransaction;
17
18use super::*;
19
20impl CatalogController {
21 async fn alter_database_name(
22 &self,
23 database_id: DatabaseId,
24 name: &str,
25 ) -> MetaResult<NotificationVersion> {
26 let inner = self.inner.write().await;
27 let txn = inner.db.begin().await?;
28 check_database_name_duplicate(name, &txn).await?;
29
30 let active_model = database::ActiveModel {
31 database_id: Set(database_id),
32 name: Set(name.to_owned()),
33 ..Default::default()
34 };
35 let database = active_model.update(&txn).await?;
36
37 let obj = Object::find_by_id(database_id)
38 .one(&txn)
39 .await?
40 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
41
42 txn.commit().await?;
43
44 let version = self
45 .notify_frontend(
46 NotificationOperation::Update,
47 NotificationInfo::Database(ObjectModel(database, obj).into()),
48 )
49 .await;
50 Ok(version)
51 }
52
53 async fn alter_schema_name(
54 &self,
55 schema_id: SchemaId,
56 name: &str,
57 ) -> MetaResult<NotificationVersion> {
58 let inner = self.inner.write().await;
59 let txn = inner.db.begin().await?;
60
61 let obj = Object::find_by_id(schema_id)
62 .one(&txn)
63 .await?
64 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
65 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
66
67 let active_model = schema::ActiveModel {
68 schema_id: Set(schema_id),
69 name: Set(name.to_owned()),
70 };
71 let schema = active_model.update(&txn).await?;
72
73 txn.commit().await?;
74
75 let version = self
76 .notify_frontend(
77 NotificationOperation::Update,
78 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
79 )
80 .await;
81 Ok(version)
82 }
83
84 pub async fn alter_name(
85 &self,
86 object_type: ObjectType,
87 object_id: ObjectId,
88 object_name: &str,
89 ) -> MetaResult<NotificationVersion> {
90 if object_type == ObjectType::Database {
91 return self.alter_database_name(object_id as _, object_name).await;
92 } else if object_type == ObjectType::Schema {
93 return self.alter_schema_name(object_id as _, object_name).await;
94 }
95
96 let inner = self.inner.write().await;
97 let txn = inner.db.begin().await?;
98 let obj: PartialObject = Object::find_by_id(object_id)
99 .into_partial_model()
100 .one(&txn)
101 .await?
102 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
103 assert_eq!(obj.obj_type, object_type);
104 check_relation_name_duplicate(
105 object_name,
106 obj.database_id.unwrap(),
107 obj.schema_id.unwrap(),
108 &txn,
109 )
110 .await?;
111
112 let (mut to_update_relations, old_name) =
114 rename_relation(&txn, object_type, object_id, object_name).await?;
115 to_update_relations.extend(
117 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
118 );
119
120 txn.commit().await?;
121
122 let version = self
123 .notify_frontend(
124 NotificationOperation::Update,
125 NotificationInfo::ObjectGroup(PbObjectGroup {
126 objects: to_update_relations,
127 }),
128 )
129 .await;
130
131 Ok(version)
132 }
133
134 pub async fn alter_swap_rename(
135 &self,
136 object_type: ObjectType,
137 object_id: ObjectId,
138 dst_object_id: ObjectId,
139 ) -> MetaResult<NotificationVersion> {
140 let inner = self.inner.write().await;
141 let txn = inner.db.begin().await?;
142 let dst_name: String = match object_type {
143 ObjectType::Table => Table::find_by_id(dst_object_id)
144 .select_only()
145 .column(table::Column::Name)
146 .into_tuple()
147 .one(&txn)
148 .await?
149 .ok_or_else(|| {
150 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
151 })?,
152 ObjectType::Source => Source::find_by_id(dst_object_id)
153 .select_only()
154 .column(source::Column::Name)
155 .into_tuple()
156 .one(&txn)
157 .await?
158 .ok_or_else(|| {
159 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
160 })?,
161 ObjectType::Sink => Sink::find_by_id(dst_object_id)
162 .select_only()
163 .column(sink::Column::Name)
164 .into_tuple()
165 .one(&txn)
166 .await?
167 .ok_or_else(|| {
168 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
169 })?,
170 ObjectType::View => View::find_by_id(dst_object_id)
171 .select_only()
172 .column(view::Column::Name)
173 .into_tuple()
174 .one(&txn)
175 .await?
176 .ok_or_else(|| {
177 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
178 })?,
179 ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
180 .select_only()
181 .column(subscription::Column::Name)
182 .into_tuple()
183 .one(&txn)
184 .await?
185 .ok_or_else(|| {
186 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
187 })?,
188 _ => {
189 return Err(MetaError::permission_denied(format!(
190 "swap rename not supported for object type: {:?}",
191 object_type
192 )));
193 }
194 };
195
196 let (mut to_update_relations, src_name) =
198 rename_relation(&txn, object_type, object_id, &dst_name).await?;
199 let (to_update_relations2, _) =
200 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
201 to_update_relations.extend(to_update_relations2);
202 to_update_relations.extend(
204 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
205 );
206 to_update_relations.extend(
207 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
208 );
209
210 txn.commit().await?;
211
212 let version = self
213 .notify_frontend(
214 NotificationOperation::Update,
215 NotificationInfo::ObjectGroup(PbObjectGroup {
216 objects: to_update_relations,
217 }),
218 )
219 .await;
220
221 Ok(version)
222 }
223
224 pub async fn alter_non_shared_source(
225 &self,
226 pb_source: PbSource,
227 ) -> MetaResult<NotificationVersion> {
228 let source_id = pb_source.id as SourceId;
229 let inner = self.inner.write().await;
230 let txn = inner.db.begin().await?;
231
232 let original_version: i64 = Source::find_by_id(source_id)
233 .select_only()
234 .column(source::Column::Version)
235 .into_tuple()
236 .one(&txn)
237 .await?
238 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
239 if original_version + 1 != pb_source.version as i64 {
240 return Err(MetaError::permission_denied(
241 "source version is stale".to_owned(),
242 ));
243 }
244
245 let source: source::ActiveModel = pb_source.clone().into();
246 source.update(&txn).await?;
247 txn.commit().await?;
248
249 let version = self
250 .notify_frontend_relation_info(
251 NotificationOperation::Update,
252 PbObjectInfo::Source(pb_source),
253 )
254 .await;
255 Ok(version)
256 }
257
258 pub async fn alter_owner(
259 &self,
260 object_type: ObjectType,
261 object_id: ObjectId,
262 new_owner: UserId,
263 ) -> MetaResult<NotificationVersion> {
264 let inner = self.inner.write().await;
265 let txn = inner.db.begin().await?;
266 ensure_user_id(new_owner, &txn).await?;
267
268 let obj = Object::find_by_id(object_id)
269 .one(&txn)
270 .await?
271 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
272 if obj.owner_id == new_owner {
273 return Ok(IGNORED_NOTIFICATION_VERSION);
274 }
275 let mut obj = obj.into_active_model();
276 obj.owner_id = Set(new_owner);
277 let obj = obj.update(&txn).await?;
278
279 let mut objects = vec![];
280 match object_type {
281 ObjectType::Database => {
282 let db = Database::find_by_id(object_id)
283 .one(&txn)
284 .await?
285 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
286
287 txn.commit().await?;
288
289 let version = self
290 .notify_frontend(
291 NotificationOperation::Update,
292 NotificationInfo::Database(ObjectModel(db, obj).into()),
293 )
294 .await;
295 return Ok(version);
296 }
297 ObjectType::Schema => {
298 let schema = Schema::find_by_id(object_id)
299 .one(&txn)
300 .await?
301 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
302
303 txn.commit().await?;
304
305 let version = self
306 .notify_frontend(
307 NotificationOperation::Update,
308 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
309 )
310 .await;
311 return Ok(version);
312 }
313 ObjectType::Table => {
314 let table = Table::find_by_id(object_id)
315 .one(&txn)
316 .await?
317 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
318
319 if let Some(associated_source_id) = table.optional_associated_source_id {
321 let src_obj = object::ActiveModel {
322 oid: Set(associated_source_id as _),
323 owner_id: Set(new_owner),
324 ..Default::default()
325 }
326 .update(&txn)
327 .await?;
328 let source = Source::find_by_id(associated_source_id)
329 .one(&txn)
330 .await?
331 .ok_or_else(|| {
332 MetaError::catalog_id_not_found("source", associated_source_id)
333 })?;
334 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
335 }
336
337 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
339 .select_only()
340 .columns([index::Column::IndexId, index::Column::IndexTableId])
341 .filter(index::Column::PrimaryTableId.eq(object_id))
342 .into_tuple::<(IndexId, TableId)>()
343 .all(&txn)
344 .await?
345 .into_iter()
346 .unzip();
347 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
348
349 let internal_tables: Vec<TableId> = Table::find()
351 .select_only()
352 .column(table::Column::TableId)
353 .filter(
354 table::Column::BelongsToJobId
355 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
356 )
357 .into_tuple()
358 .all(&txn)
359 .await?;
360 table_ids.extend(internal_tables);
361
362 if !index_ids.is_empty() || !table_ids.is_empty() {
363 Object::update_many()
364 .col_expr(
365 object::Column::OwnerId,
366 SimpleExpr::Value(Value::Int(Some(new_owner))),
367 )
368 .filter(
369 object::Column::Oid
370 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
371 )
372 .exec(&txn)
373 .await?;
374 }
375
376 if !table_ids.is_empty() {
377 let table_objs = Table::find()
378 .find_also_related(Object)
379 .filter(table::Column::TableId.is_in(table_ids))
380 .all(&txn)
381 .await?;
382 for (table, table_obj) in table_objs {
383 objects.push(PbObjectInfo::Table(
384 ObjectModel(table, table_obj.unwrap()).into(),
385 ));
386 }
387 }
388 if !index_ids.is_empty() {
390 let index_objs = Index::find()
391 .find_also_related(Object)
392 .filter(index::Column::IndexId.is_in(index_ids))
393 .all(&txn)
394 .await?;
395 for (index, index_obj) in index_objs {
396 objects.push(PbObjectInfo::Index(
397 ObjectModel(index, index_obj.unwrap()).into(),
398 ));
399 }
400 }
401 }
402 ObjectType::Source => {
403 let source = Source::find_by_id(object_id)
404 .one(&txn)
405 .await?
406 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
407 let is_shared = source.is_shared();
408 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
409
410 if is_shared {
413 update_internal_tables(
414 &txn,
415 object_id,
416 object::Column::OwnerId,
417 Value::Int(Some(new_owner)),
418 &mut objects,
419 )
420 .await?;
421 }
422 }
423 ObjectType::Sink => {
424 let sink = Sink::find_by_id(object_id)
425 .one(&txn)
426 .await?
427 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
428 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
429
430 update_internal_tables(
431 &txn,
432 object_id,
433 object::Column::OwnerId,
434 Value::Int(Some(new_owner)),
435 &mut objects,
436 )
437 .await?;
438 }
439 ObjectType::Subscription => {
440 let subscription = Subscription::find_by_id(object_id)
441 .one(&txn)
442 .await?
443 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
444 objects.push(PbObjectInfo::Subscription(
445 ObjectModel(subscription, obj).into(),
446 ));
447 }
448 ObjectType::View => {
449 let view = View::find_by_id(object_id)
450 .one(&txn)
451 .await?
452 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
453 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
454 }
455 ObjectType::Connection => {
456 let connection = Connection::find_by_id(object_id)
457 .one(&txn)
458 .await?
459 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
460 objects.push(PbObjectInfo::Connection(
461 ObjectModel(connection, obj).into(),
462 ));
463 }
464 _ => unreachable!("not supported object type: {:?}", object_type),
465 };
466
467 txn.commit().await?;
468
469 let version = self
470 .notify_frontend(
471 NotificationOperation::Update,
472 NotificationInfo::ObjectGroup(PbObjectGroup {
473 objects: objects
474 .into_iter()
475 .map(|object| PbObject {
476 object_info: Some(object),
477 })
478 .collect(),
479 }),
480 )
481 .await;
482 Ok(version)
483 }
484
485 pub async fn alter_schema(
486 &self,
487 object_type: ObjectType,
488 object_id: ObjectId,
489 new_schema: SchemaId,
490 ) -> MetaResult<NotificationVersion> {
491 let inner = self.inner.write().await;
492 let txn = inner.db.begin().await?;
493 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
494
495 let obj = Object::find_by_id(object_id)
496 .one(&txn)
497 .await?
498 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
499 if obj.schema_id == Some(new_schema) {
500 return Ok(IGNORED_NOTIFICATION_VERSION);
501 }
502 let database_id = obj.database_id.unwrap();
503
504 let mut objects = vec![];
505 match object_type {
506 ObjectType::Table => {
507 let table = Table::find_by_id(object_id)
508 .one(&txn)
509 .await?
510 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
511 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
512 let associated_src_id = table.optional_associated_source_id;
513
514 let mut obj = obj.into_active_model();
515 obj.schema_id = Set(Some(new_schema));
516 let obj = obj.update(&txn).await?;
517 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
518
519 if let Some(associated_source_id) = associated_src_id {
521 let src_obj = object::ActiveModel {
522 oid: Set(associated_source_id as _),
523 schema_id: Set(Some(new_schema)),
524 ..Default::default()
525 }
526 .update(&txn)
527 .await?;
528 let source = Source::find_by_id(associated_source_id)
529 .one(&txn)
530 .await?
531 .ok_or_else(|| {
532 MetaError::catalog_id_not_found("source", associated_source_id)
533 })?;
534 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
535 }
536
537 let (index_ids, (index_names, mut table_ids)): (
539 Vec<IndexId>,
540 (Vec<String>, Vec<TableId>),
541 ) = Index::find()
542 .select_only()
543 .columns([
544 index::Column::IndexId,
545 index::Column::Name,
546 index::Column::IndexTableId,
547 ])
548 .filter(index::Column::PrimaryTableId.eq(object_id))
549 .into_tuple::<(IndexId, String, TableId)>()
550 .all(&txn)
551 .await?
552 .into_iter()
553 .map(|(id, name, t_id)| (id, (name, t_id)))
554 .unzip();
555
556 let internal_tables: Vec<TableId> = Table::find()
558 .select_only()
559 .column(table::Column::TableId)
560 .filter(
561 table::Column::BelongsToJobId
562 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
563 )
564 .into_tuple()
565 .all(&txn)
566 .await?;
567 table_ids.extend(internal_tables);
568
569 if !index_ids.is_empty() || !table_ids.is_empty() {
570 for index_name in index_names {
571 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
572 .await?;
573 }
574
575 Object::update_many()
576 .col_expr(
577 object::Column::SchemaId,
578 SimpleExpr::Value(Value::Int(Some(new_schema))),
579 )
580 .filter(
581 object::Column::Oid
582 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
583 )
584 .exec(&txn)
585 .await?;
586 }
587
588 if !table_ids.is_empty() {
589 let table_objs = Table::find()
590 .find_also_related(Object)
591 .filter(table::Column::TableId.is_in(table_ids))
592 .all(&txn)
593 .await?;
594 for (table, table_obj) in table_objs {
595 objects.push(PbObjectInfo::Table(
596 ObjectModel(table, table_obj.unwrap()).into(),
597 ));
598 }
599 }
600 if !index_ids.is_empty() {
601 let index_objs = Index::find()
602 .find_also_related(Object)
603 .filter(index::Column::IndexId.is_in(index_ids))
604 .all(&txn)
605 .await?;
606 for (index, index_obj) in index_objs {
607 objects.push(PbObjectInfo::Index(
608 ObjectModel(index, index_obj.unwrap()).into(),
609 ));
610 }
611 }
612 }
613 ObjectType::Source => {
614 let source = Source::find_by_id(object_id)
615 .one(&txn)
616 .await?
617 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
618 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
619 let is_shared = source.is_shared();
620
621 let mut obj = obj.into_active_model();
622 obj.schema_id = Set(Some(new_schema));
623 let obj = obj.update(&txn).await?;
624 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
625
626 if is_shared {
629 update_internal_tables(
630 &txn,
631 object_id,
632 object::Column::SchemaId,
633 Value::Int(Some(new_schema)),
634 &mut objects,
635 )
636 .await?;
637 }
638 }
639 ObjectType::Sink => {
640 let sink = Sink::find_by_id(object_id)
641 .one(&txn)
642 .await?
643 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
644 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
645
646 let mut obj = obj.into_active_model();
647 obj.schema_id = Set(Some(new_schema));
648 let obj = obj.update(&txn).await?;
649 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
650
651 update_internal_tables(
652 &txn,
653 object_id,
654 object::Column::SchemaId,
655 Value::Int(Some(new_schema)),
656 &mut objects,
657 )
658 .await?;
659 }
660 ObjectType::Subscription => {
661 let subscription = Subscription::find_by_id(object_id)
662 .one(&txn)
663 .await?
664 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
665 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
666 .await?;
667
668 let mut obj = obj.into_active_model();
669 obj.schema_id = Set(Some(new_schema));
670 let obj = obj.update(&txn).await?;
671 objects.push(PbObjectInfo::Subscription(
672 ObjectModel(subscription, obj).into(),
673 ));
674 }
675 ObjectType::View => {
676 let view = View::find_by_id(object_id)
677 .one(&txn)
678 .await?
679 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
680 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
681
682 let mut obj = obj.into_active_model();
683 obj.schema_id = Set(Some(new_schema));
684 let obj = obj.update(&txn).await?;
685 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
686 }
687 ObjectType::Function => {
688 let function = Function::find_by_id(object_id)
689 .one(&txn)
690 .await?
691 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
692
693 let mut pb_function: PbFunction = ObjectModel(function, obj).into();
694 pb_function.schema_id = new_schema as _;
695 check_function_signature_duplicate(&pb_function, &txn).await?;
696
697 object::ActiveModel {
698 oid: Set(object_id),
699 schema_id: Set(Some(new_schema)),
700 ..Default::default()
701 }
702 .update(&txn)
703 .await?;
704
705 txn.commit().await?;
706 let version = self
707 .notify_frontend(
708 NotificationOperation::Update,
709 NotificationInfo::Function(pb_function),
710 )
711 .await;
712 return Ok(version);
713 }
714 ObjectType::Connection => {
715 let connection = Connection::find_by_id(object_id)
716 .one(&txn)
717 .await?
718 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
719
720 let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
721 pb_connection.schema_id = new_schema as _;
722 check_connection_name_duplicate(&pb_connection, &txn).await?;
723
724 object::ActiveModel {
725 oid: Set(object_id),
726 schema_id: Set(Some(new_schema)),
727 ..Default::default()
728 }
729 .update(&txn)
730 .await?;
731
732 txn.commit().await?;
733 let version = self
734 .notify_frontend(
735 NotificationOperation::Update,
736 NotificationInfo::Connection(pb_connection),
737 )
738 .await;
739 return Ok(version);
740 }
741 _ => unreachable!("not supported object type: {:?}", object_type),
742 }
743
744 txn.commit().await?;
745 let version = self
746 .notify_frontend(
747 Operation::Update,
748 Info::ObjectGroup(PbObjectGroup {
749 objects: objects
750 .into_iter()
751 .map(|relation_info| PbObject {
752 object_info: Some(relation_info),
753 })
754 .collect_vec(),
755 }),
756 )
757 .await;
758 Ok(version)
759 }
760
761 pub async fn alter_secret(
762 &self,
763 pb_secret: PbSecret,
764 secret_plain_payload: Vec<u8>,
765 ) -> MetaResult<NotificationVersion> {
766 let inner = self.inner.write().await;
767 let owner_id = pb_secret.owner as _;
768 let txn = inner.db.begin().await?;
769 ensure_user_id(owner_id, &txn).await?;
770 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
771 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
772
773 ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
774 let secret: secret::ActiveModel = pb_secret.clone().into();
775 Secret::update(secret).exec(&txn).await?;
776
777 txn.commit().await?;
778
779 let mut secret_plain = pb_secret;
781 secret_plain.value.clone_from(&secret_plain_payload);
782
783 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
784 self.env
785 .notification_manager()
786 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
787
788 let version = self
789 .notify_frontend(
790 NotificationOperation::Update,
791 NotificationInfo::Secret(secret_plain),
792 )
793 .await;
794
795 Ok(version)
796 }
797
798 pub async fn drop_table_associated_source(
800 txn: &DatabaseTransaction,
801 drop_table_connector_ctx: &DropTableConnectorContext,
802 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
803 let to_drop_source_objects: Vec<PartialObject> = Object::find()
804 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
805 .into_partial_model()
806 .all(txn)
807 .await?;
808 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
809 .select_only()
810 .filter(
811 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
812 )
813 .into_partial_model()
814 .all(txn)
815 .await?;
816 let to_drop_objects = to_drop_source_objects
817 .into_iter()
818 .chain(to_drop_internal_table_objs.into_iter())
819 .collect_vec();
820 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
822 .select_only()
823 .distinct()
824 .column(user_privilege::Column::UserId)
825 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
826 .into_tuple()
827 .all(txn)
828 .await?;
829
830 tracing::debug!(
831 "drop_table_associated_source: to_drop_objects: {:?}",
832 to_drop_objects
833 );
834
835 let res = Object::delete_many()
837 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
838 .exec(txn)
839 .await?;
840 if res.rows_affected == 0 {
841 return Err(MetaError::catalog_id_not_found(
842 ObjectType::Source.as_str(),
843 drop_table_connector_ctx.to_remove_source_id,
844 ));
845 }
846 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
847
848 Ok((user_infos, to_drop_objects))
849 }
850
851 pub async fn alter_database_param(
852 &self,
853 database_id: DatabaseId,
854 param: AlterDatabaseParam,
855 ) -> MetaResult<NotificationVersion> {
856 let inner = self.inner.write().await;
857 let txn = inner.db.begin().await?;
858
859 let mut database = database::ActiveModel {
860 database_id: Set(database_id),
861 ..Default::default()
862 };
863 match param {
864 AlterDatabaseParam::BarrierIntervalMs(interval) => {
865 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
866 }
867 AlterDatabaseParam::CheckpointFrequency(frequency) => {
868 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
869 }
870 }
871 let database = database.update(&txn).await?;
872
873 let obj = Object::find_by_id(database_id)
874 .one(&txn)
875 .await?
876 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
877
878 txn.commit().await?;
879
880 let version = self
881 .notify_frontend(
882 NotificationOperation::Update,
883 NotificationInfo::Database(ObjectModel(database, obj).into()),
884 )
885 .await;
886 Ok(version)
887 }
888}