1use risingwave_common::catalog::AlterDatabaseParam;
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use sea_orm::DatabaseTransaction;
18
19use super::*;
20
21impl CatalogController {
22 async fn alter_database_name(
23 &self,
24 database_id: DatabaseId,
25 name: &str,
26 ) -> MetaResult<NotificationVersion> {
27 let inner = self.inner.write().await;
28 let txn = inner.db.begin().await?;
29 check_database_name_duplicate(name, &txn).await?;
30
31 let active_model = database::ActiveModel {
32 database_id: Set(database_id),
33 name: Set(name.to_owned()),
34 ..Default::default()
35 };
36 let database = active_model.update(&txn).await?;
37
38 let obj = Object::find_by_id(database_id)
39 .one(&txn)
40 .await?
41 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
42
43 txn.commit().await?;
44
45 let version = self
46 .notify_frontend(
47 NotificationOperation::Update,
48 NotificationInfo::Database(ObjectModel(database, obj).into()),
49 )
50 .await;
51 Ok(version)
52 }
53
54 async fn alter_schema_name(
55 &self,
56 schema_id: SchemaId,
57 name: &str,
58 ) -> MetaResult<NotificationVersion> {
59 let inner = self.inner.write().await;
60 let txn = inner.db.begin().await?;
61
62 let obj = Object::find_by_id(schema_id)
63 .one(&txn)
64 .await?
65 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
66 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
67
68 let active_model = schema::ActiveModel {
69 schema_id: Set(schema_id),
70 name: Set(name.to_owned()),
71 };
72 let schema = active_model.update(&txn).await?;
73
74 txn.commit().await?;
75
76 let version = self
77 .notify_frontend(
78 NotificationOperation::Update,
79 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
80 )
81 .await;
82 Ok(version)
83 }
84
85 pub async fn alter_name(
86 &self,
87 object_type: ObjectType,
88 object_id: ObjectId,
89 object_name: &str,
90 ) -> MetaResult<NotificationVersion> {
91 if object_type == ObjectType::Database {
92 return self.alter_database_name(object_id as _, object_name).await;
93 } else if object_type == ObjectType::Schema {
94 return self.alter_schema_name(object_id as _, object_name).await;
95 }
96
97 let inner = self.inner.write().await;
98 let txn = inner.db.begin().await?;
99 let obj: PartialObject = Object::find_by_id(object_id)
100 .into_partial_model()
101 .one(&txn)
102 .await?
103 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
104 assert_eq!(obj.obj_type, object_type);
105 check_relation_name_duplicate(
106 object_name,
107 obj.database_id.unwrap(),
108 obj.schema_id.unwrap(),
109 &txn,
110 )
111 .await?;
112
113 let (mut to_update_relations, old_name) =
115 rename_relation(&txn, object_type, object_id, object_name).await?;
116 to_update_relations.extend(
118 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
119 );
120
121 txn.commit().await?;
122
123 let version = self
124 .notify_frontend(
125 NotificationOperation::Update,
126 NotificationInfo::ObjectGroup(PbObjectGroup {
127 objects: to_update_relations,
128 }),
129 )
130 .await;
131
132 Ok(version)
133 }
134
135 pub async fn alter_swap_rename(
136 &self,
137 object_type: ObjectType,
138 object_id: ObjectId,
139 dst_object_id: ObjectId,
140 ) -> MetaResult<NotificationVersion> {
141 let inner = self.inner.write().await;
142 let txn = inner.db.begin().await?;
143 let dst_name: String = match object_type {
144 ObjectType::Table => Table::find_by_id(dst_object_id)
145 .select_only()
146 .column(table::Column::Name)
147 .into_tuple()
148 .one(&txn)
149 .await?
150 .ok_or_else(|| {
151 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
152 })?,
153 ObjectType::Source => Source::find_by_id(dst_object_id)
154 .select_only()
155 .column(source::Column::Name)
156 .into_tuple()
157 .one(&txn)
158 .await?
159 .ok_or_else(|| {
160 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
161 })?,
162 ObjectType::Sink => Sink::find_by_id(dst_object_id)
163 .select_only()
164 .column(sink::Column::Name)
165 .into_tuple()
166 .one(&txn)
167 .await?
168 .ok_or_else(|| {
169 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
170 })?,
171 ObjectType::View => View::find_by_id(dst_object_id)
172 .select_only()
173 .column(view::Column::Name)
174 .into_tuple()
175 .one(&txn)
176 .await?
177 .ok_or_else(|| {
178 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
179 })?,
180 ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
181 .select_only()
182 .column(subscription::Column::Name)
183 .into_tuple()
184 .one(&txn)
185 .await?
186 .ok_or_else(|| {
187 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
188 })?,
189 _ => {
190 return Err(MetaError::permission_denied(format!(
191 "swap rename not supported for object type: {:?}",
192 object_type
193 )));
194 }
195 };
196
197 let (mut to_update_relations, src_name) =
199 rename_relation(&txn, object_type, object_id, &dst_name).await?;
200 let (to_update_relations2, _) =
201 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
202 to_update_relations.extend(to_update_relations2);
203 to_update_relations.extend(
205 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
206 );
207 to_update_relations.extend(
208 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
209 );
210
211 txn.commit().await?;
212
213 let version = self
214 .notify_frontend(
215 NotificationOperation::Update,
216 NotificationInfo::ObjectGroup(PbObjectGroup {
217 objects: to_update_relations,
218 }),
219 )
220 .await;
221
222 Ok(version)
223 }
224
225 pub async fn alter_non_shared_source(
226 &self,
227 pb_source: PbSource,
228 ) -> MetaResult<NotificationVersion> {
229 let source_id = pb_source.id as SourceId;
230 let inner = self.inner.write().await;
231 let txn = inner.db.begin().await?;
232
233 let original_version: i64 = Source::find_by_id(source_id)
234 .select_only()
235 .column(source::Column::Version)
236 .into_tuple()
237 .one(&txn)
238 .await?
239 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
240 if original_version + 1 != pb_source.version as i64 {
241 return Err(MetaError::permission_denied(
242 "source version is stale".to_owned(),
243 ));
244 }
245
246 let source: source::ActiveModel = pb_source.clone().into();
247 source.update(&txn).await?;
248 txn.commit().await?;
249
250 let version = self
251 .notify_frontend_relation_info(
252 NotificationOperation::Update,
253 PbObjectInfo::Source(pb_source),
254 )
255 .await;
256 Ok(version)
257 }
258
259 pub async fn alter_owner(
260 &self,
261 object_type: ObjectType,
262 object_id: ObjectId,
263 new_owner: UserId,
264 ) -> MetaResult<NotificationVersion> {
265 let inner = self.inner.write().await;
266 let txn = inner.db.begin().await?;
267 ensure_user_id(new_owner, &txn).await?;
268
269 let obj = Object::find_by_id(object_id)
270 .one(&txn)
271 .await?
272 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
273 if obj.owner_id == new_owner {
274 return Ok(IGNORED_NOTIFICATION_VERSION);
275 }
276 let mut obj = obj.into_active_model();
277 obj.owner_id = Set(new_owner);
278 let obj = obj.update(&txn).await?;
279
280 let mut objects = vec![];
281 match object_type {
282 ObjectType::Database => {
283 let db = Database::find_by_id(object_id)
284 .one(&txn)
285 .await?
286 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
287
288 txn.commit().await?;
289
290 let version = self
291 .notify_frontend(
292 NotificationOperation::Update,
293 NotificationInfo::Database(ObjectModel(db, obj).into()),
294 )
295 .await;
296 return Ok(version);
297 }
298 ObjectType::Schema => {
299 let schema = Schema::find_by_id(object_id)
300 .one(&txn)
301 .await?
302 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
303
304 txn.commit().await?;
305
306 let version = self
307 .notify_frontend(
308 NotificationOperation::Update,
309 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
310 )
311 .await;
312 return Ok(version);
313 }
314 ObjectType::Table => {
315 let table = Table::find_by_id(object_id)
316 .one(&txn)
317 .await?
318 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
319
320 if let Some(associated_source_id) = table.optional_associated_source_id {
322 let src_obj = object::ActiveModel {
323 oid: Set(associated_source_id as _),
324 owner_id: Set(new_owner),
325 ..Default::default()
326 }
327 .update(&txn)
328 .await?;
329 let source = Source::find_by_id(associated_source_id)
330 .one(&txn)
331 .await?
332 .ok_or_else(|| {
333 MetaError::catalog_id_not_found("source", associated_source_id)
334 })?;
335 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
336 }
337
338 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
340 .select_only()
341 .columns([index::Column::IndexId, index::Column::IndexTableId])
342 .filter(index::Column::PrimaryTableId.eq(object_id))
343 .into_tuple::<(IndexId, TableId)>()
344 .all(&txn)
345 .await?
346 .into_iter()
347 .unzip();
348 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
349
350 let internal_tables: Vec<TableId> = Table::find()
352 .select_only()
353 .column(table::Column::TableId)
354 .filter(
355 table::Column::BelongsToJobId
356 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
357 )
358 .into_tuple()
359 .all(&txn)
360 .await?;
361 table_ids.extend(internal_tables);
362
363 if !index_ids.is_empty() || !table_ids.is_empty() {
364 Object::update_many()
365 .col_expr(
366 object::Column::OwnerId,
367 SimpleExpr::Value(Value::Int(Some(new_owner))),
368 )
369 .filter(
370 object::Column::Oid
371 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
372 )
373 .exec(&txn)
374 .await?;
375 }
376
377 if !table_ids.is_empty() {
378 let table_objs = Table::find()
379 .find_also_related(Object)
380 .filter(table::Column::TableId.is_in(table_ids))
381 .all(&txn)
382 .await?;
383 for (table, table_obj) in table_objs {
384 objects.push(PbObjectInfo::Table(
385 ObjectModel(table, table_obj.unwrap()).into(),
386 ));
387 }
388 }
389 if !index_ids.is_empty() {
391 let index_objs = Index::find()
392 .find_also_related(Object)
393 .filter(index::Column::IndexId.is_in(index_ids))
394 .all(&txn)
395 .await?;
396 for (index, index_obj) in index_objs {
397 objects.push(PbObjectInfo::Index(
398 ObjectModel(index, index_obj.unwrap()).into(),
399 ));
400 }
401 }
402 }
403 ObjectType::Source => {
404 let source = Source::find_by_id(object_id)
405 .one(&txn)
406 .await?
407 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
408 let is_shared = source.is_shared();
409 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
410
411 if is_shared {
414 update_internal_tables(
415 &txn,
416 object_id,
417 object::Column::OwnerId,
418 Value::Int(Some(new_owner)),
419 &mut objects,
420 )
421 .await?;
422 }
423 }
424 ObjectType::Sink => {
425 let sink = Sink::find_by_id(object_id)
426 .one(&txn)
427 .await?
428 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
429 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
430
431 update_internal_tables(
432 &txn,
433 object_id,
434 object::Column::OwnerId,
435 Value::Int(Some(new_owner)),
436 &mut objects,
437 )
438 .await?;
439 }
440 ObjectType::Subscription => {
441 let subscription = Subscription::find_by_id(object_id)
442 .one(&txn)
443 .await?
444 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
445 objects.push(PbObjectInfo::Subscription(
446 ObjectModel(subscription, obj).into(),
447 ));
448 }
449 ObjectType::View => {
450 let view = View::find_by_id(object_id)
451 .one(&txn)
452 .await?
453 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
454 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
455 }
456 ObjectType::Connection => {
457 let connection = Connection::find_by_id(object_id)
458 .one(&txn)
459 .await?
460 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
461 objects.push(PbObjectInfo::Connection(
462 ObjectModel(connection, obj).into(),
463 ));
464 }
465 _ => unreachable!("not supported object type: {:?}", object_type),
466 };
467
468 txn.commit().await?;
469
470 let version = self
471 .notify_frontend(
472 NotificationOperation::Update,
473 NotificationInfo::ObjectGroup(PbObjectGroup {
474 objects: objects
475 .into_iter()
476 .map(|object| PbObject {
477 object_info: Some(object),
478 })
479 .collect(),
480 }),
481 )
482 .await;
483 Ok(version)
484 }
485
486 pub async fn alter_schema(
487 &self,
488 object_type: ObjectType,
489 object_id: ObjectId,
490 new_schema: SchemaId,
491 ) -> MetaResult<NotificationVersion> {
492 let inner = self.inner.write().await;
493 let txn = inner.db.begin().await?;
494 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
495
496 let obj = Object::find_by_id(object_id)
497 .one(&txn)
498 .await?
499 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
500 if obj.schema_id == Some(new_schema) {
501 return Ok(IGNORED_NOTIFICATION_VERSION);
502 }
503 let database_id = obj.database_id.unwrap();
504
505 let mut objects = vec![];
506 match object_type {
507 ObjectType::Table => {
508 let table = Table::find_by_id(object_id)
509 .one(&txn)
510 .await?
511 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
512 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
513 let associated_src_id = table.optional_associated_source_id;
514
515 let mut obj = obj.into_active_model();
516 obj.schema_id = Set(Some(new_schema));
517 let obj = obj.update(&txn).await?;
518 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
519
520 if let Some(associated_source_id) = associated_src_id {
522 let src_obj = object::ActiveModel {
523 oid: Set(associated_source_id as _),
524 schema_id: Set(Some(new_schema)),
525 ..Default::default()
526 }
527 .update(&txn)
528 .await?;
529 let source = Source::find_by_id(associated_source_id)
530 .one(&txn)
531 .await?
532 .ok_or_else(|| {
533 MetaError::catalog_id_not_found("source", associated_source_id)
534 })?;
535 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
536 }
537
538 let (index_ids, (index_names, mut table_ids)): (
540 Vec<IndexId>,
541 (Vec<String>, Vec<TableId>),
542 ) = Index::find()
543 .select_only()
544 .columns([
545 index::Column::IndexId,
546 index::Column::Name,
547 index::Column::IndexTableId,
548 ])
549 .filter(index::Column::PrimaryTableId.eq(object_id))
550 .into_tuple::<(IndexId, String, TableId)>()
551 .all(&txn)
552 .await?
553 .into_iter()
554 .map(|(id, name, t_id)| (id, (name, t_id)))
555 .unzip();
556
557 let internal_tables: Vec<TableId> = Table::find()
559 .select_only()
560 .column(table::Column::TableId)
561 .filter(
562 table::Column::BelongsToJobId
563 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
564 )
565 .into_tuple()
566 .all(&txn)
567 .await?;
568 table_ids.extend(internal_tables);
569
570 if !index_ids.is_empty() || !table_ids.is_empty() {
571 for index_name in index_names {
572 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
573 .await?;
574 }
575
576 Object::update_many()
577 .col_expr(
578 object::Column::SchemaId,
579 SimpleExpr::Value(Value::Int(Some(new_schema))),
580 )
581 .filter(
582 object::Column::Oid
583 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
584 )
585 .exec(&txn)
586 .await?;
587 }
588
589 if !table_ids.is_empty() {
590 let table_objs = Table::find()
591 .find_also_related(Object)
592 .filter(table::Column::TableId.is_in(table_ids))
593 .all(&txn)
594 .await?;
595 for (table, table_obj) in table_objs {
596 objects.push(PbObjectInfo::Table(
597 ObjectModel(table, table_obj.unwrap()).into(),
598 ));
599 }
600 }
601 if !index_ids.is_empty() {
602 let index_objs = Index::find()
603 .find_also_related(Object)
604 .filter(index::Column::IndexId.is_in(index_ids))
605 .all(&txn)
606 .await?;
607 for (index, index_obj) in index_objs {
608 objects.push(PbObjectInfo::Index(
609 ObjectModel(index, index_obj.unwrap()).into(),
610 ));
611 }
612 }
613 }
614 ObjectType::Source => {
615 let source = Source::find_by_id(object_id)
616 .one(&txn)
617 .await?
618 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
619 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
620 let is_shared = source.is_shared();
621
622 let mut obj = obj.into_active_model();
623 obj.schema_id = Set(Some(new_schema));
624 let obj = obj.update(&txn).await?;
625 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
626
627 if is_shared {
630 update_internal_tables(
631 &txn,
632 object_id,
633 object::Column::SchemaId,
634 Value::Int(Some(new_schema)),
635 &mut objects,
636 )
637 .await?;
638 }
639 }
640 ObjectType::Sink => {
641 let sink = Sink::find_by_id(object_id)
642 .one(&txn)
643 .await?
644 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
645 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
646
647 let mut obj = obj.into_active_model();
648 obj.schema_id = Set(Some(new_schema));
649 let obj = obj.update(&txn).await?;
650 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
651
652 update_internal_tables(
653 &txn,
654 object_id,
655 object::Column::SchemaId,
656 Value::Int(Some(new_schema)),
657 &mut objects,
658 )
659 .await?;
660 }
661 ObjectType::Subscription => {
662 let subscription = Subscription::find_by_id(object_id)
663 .one(&txn)
664 .await?
665 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
666 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
667 .await?;
668
669 let mut obj = obj.into_active_model();
670 obj.schema_id = Set(Some(new_schema));
671 let obj = obj.update(&txn).await?;
672 objects.push(PbObjectInfo::Subscription(
673 ObjectModel(subscription, obj).into(),
674 ));
675 }
676 ObjectType::View => {
677 let view = View::find_by_id(object_id)
678 .one(&txn)
679 .await?
680 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
681 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
682
683 let mut obj = obj.into_active_model();
684 obj.schema_id = Set(Some(new_schema));
685 let obj = obj.update(&txn).await?;
686 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
687 }
688 ObjectType::Function => {
689 let function = Function::find_by_id(object_id)
690 .one(&txn)
691 .await?
692 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
693
694 let mut pb_function: PbFunction = ObjectModel(function, obj).into();
695 pb_function.schema_id = new_schema as _;
696 check_function_signature_duplicate(&pb_function, &txn).await?;
697
698 object::ActiveModel {
699 oid: Set(object_id),
700 schema_id: Set(Some(new_schema)),
701 ..Default::default()
702 }
703 .update(&txn)
704 .await?;
705
706 txn.commit().await?;
707 let version = self
708 .notify_frontend(
709 NotificationOperation::Update,
710 NotificationInfo::Function(pb_function),
711 )
712 .await;
713 return Ok(version);
714 }
715 ObjectType::Connection => {
716 let connection = Connection::find_by_id(object_id)
717 .one(&txn)
718 .await?
719 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
720
721 let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
722 pb_connection.schema_id = new_schema as _;
723 check_connection_name_duplicate(&pb_connection, &txn).await?;
724
725 object::ActiveModel {
726 oid: Set(object_id),
727 schema_id: Set(Some(new_schema)),
728 ..Default::default()
729 }
730 .update(&txn)
731 .await?;
732
733 txn.commit().await?;
734 let version = self
735 .notify_frontend(
736 NotificationOperation::Update,
737 NotificationInfo::Connection(pb_connection),
738 )
739 .await;
740 return Ok(version);
741 }
742 _ => unreachable!("not supported object type: {:?}", object_type),
743 }
744
745 txn.commit().await?;
746 let version = self
747 .notify_frontend(
748 Operation::Update,
749 Info::ObjectGroup(PbObjectGroup {
750 objects: objects
751 .into_iter()
752 .map(|relation_info| PbObject {
753 object_info: Some(relation_info),
754 })
755 .collect_vec(),
756 }),
757 )
758 .await;
759 Ok(version)
760 }
761
762 pub async fn alter_secret(
763 &self,
764 pb_secret: PbSecret,
765 secret_plain_payload: Vec<u8>,
766 ) -> MetaResult<NotificationVersion> {
767 let inner = self.inner.write().await;
768 let owner_id = pb_secret.owner as _;
769 let txn = inner.db.begin().await?;
770 ensure_user_id(owner_id, &txn).await?;
771 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
772 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
773
774 ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
775 let secret: secret::ActiveModel = pb_secret.clone().into();
776 Secret::update(secret).exec(&txn).await?;
777
778 txn.commit().await?;
779
780 let mut secret_plain = pb_secret;
782 secret_plain.value.clone_from(&secret_plain_payload);
783
784 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
785 self.env
786 .notification_manager()
787 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
788
789 let version = self
790 .notify_frontend(
791 NotificationOperation::Update,
792 NotificationInfo::Secret(secret_plain),
793 )
794 .await;
795
796 Ok(version)
797 }
798
799 pub async fn drop_table_associated_source(
801 txn: &DatabaseTransaction,
802 drop_table_connector_ctx: &DropTableConnectorContext,
803 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
804 let to_drop_source_objects: Vec<PartialObject> = Object::find()
805 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
806 .into_partial_model()
807 .all(txn)
808 .await?;
809 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
810 .select_only()
811 .filter(
812 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
813 )
814 .into_partial_model()
815 .all(txn)
816 .await?;
817 let to_drop_objects = to_drop_source_objects
818 .into_iter()
819 .chain(to_drop_internal_table_objs.into_iter())
820 .collect_vec();
821 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
823 .select_only()
824 .distinct()
825 .column(user_privilege::Column::UserId)
826 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
827 .into_tuple()
828 .all(txn)
829 .await?;
830
831 tracing::debug!(
832 "drop_table_associated_source: to_drop_objects: {:?}",
833 to_drop_objects
834 );
835
836 let res = Object::delete_many()
838 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
839 .exec(txn)
840 .await?;
841 if res.rows_affected == 0 {
842 return Err(MetaError::catalog_id_not_found(
843 ObjectType::Source.as_str(),
844 drop_table_connector_ctx.to_remove_source_id,
845 ));
846 }
847 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
848
849 Ok((user_infos, to_drop_objects))
850 }
851
852 pub async fn alter_database_param(
853 &self,
854 database_id: DatabaseId,
855 param: AlterDatabaseParam,
856 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
857 let inner = self.inner.write().await;
858 let txn = inner.db.begin().await?;
859
860 let mut database = database::ActiveModel {
861 database_id: Set(database_id),
862 ..Default::default()
863 };
864 match param {
865 AlterDatabaseParam::BarrierIntervalMs(interval) => {
866 if let Some(ref interval) = interval {
867 OverrideValidate::barrier_interval_ms(interval)
868 .map_err(|e| anyhow::anyhow!(e))?;
869 }
870 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
871 }
872 AlterDatabaseParam::CheckpointFrequency(frequency) => {
873 if let Some(ref frequency) = frequency {
874 OverrideValidate::checkpoint_frequency(frequency)
875 .map_err(|e| anyhow::anyhow!(e))?;
876 }
877 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
878 }
879 }
880 let database = database.update(&txn).await?;
881
882 let obj = Object::find_by_id(database_id)
883 .one(&txn)
884 .await?
885 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
886
887 txn.commit().await?;
888
889 let version = self
890 .notify_frontend(
891 NotificationOperation::Update,
892 NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
893 )
894 .await;
895 Ok((version, database))
896 }
897}