1use risingwave_common::catalog::AlterDatabaseParam;
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use risingwave_meta_model::table::RefreshState;
18use sea_orm::DatabaseTransaction;
19use thiserror_ext::AsReport;
20
21use super::*;
22
23impl CatalogController {
24 async fn alter_database_name(
25 &self,
26 database_id: DatabaseId,
27 name: &str,
28 ) -> MetaResult<NotificationVersion> {
29 let inner = self.inner.write().await;
30 let txn = inner.db.begin().await?;
31 check_database_name_duplicate(name, &txn).await?;
32
33 let active_model = database::ActiveModel {
34 database_id: Set(database_id),
35 name: Set(name.to_owned()),
36 ..Default::default()
37 };
38 let database = active_model.update(&txn).await?;
39
40 let obj = Object::find_by_id(database_id)
41 .one(&txn)
42 .await?
43 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
44
45 txn.commit().await?;
46
47 let version = self
48 .notify_frontend(
49 NotificationOperation::Update,
50 NotificationInfo::Database(ObjectModel(database, obj).into()),
51 )
52 .await;
53 Ok(version)
54 }
55
56 async fn alter_schema_name(
57 &self,
58 schema_id: SchemaId,
59 name: &str,
60 ) -> MetaResult<NotificationVersion> {
61 let inner = self.inner.write().await;
62 let txn = inner.db.begin().await?;
63
64 let obj = Object::find_by_id(schema_id)
65 .one(&txn)
66 .await?
67 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
68 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
69
70 let active_model = schema::ActiveModel {
71 schema_id: Set(schema_id),
72 name: Set(name.to_owned()),
73 };
74 let schema = active_model.update(&txn).await?;
75
76 txn.commit().await?;
77
78 let version = self
79 .notify_frontend(
80 NotificationOperation::Update,
81 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
82 )
83 .await;
84 Ok(version)
85 }
86
87 pub async fn alter_name(
88 &self,
89 object_type: ObjectType,
90 object_id: ObjectId,
91 object_name: &str,
92 ) -> MetaResult<NotificationVersion> {
93 if object_type == ObjectType::Database {
94 return self.alter_database_name(object_id as _, object_name).await;
95 } else if object_type == ObjectType::Schema {
96 return self.alter_schema_name(object_id as _, object_name).await;
97 }
98
99 let inner = self.inner.write().await;
100 let txn = inner.db.begin().await?;
101 let obj: PartialObject = Object::find_by_id(object_id)
102 .into_partial_model()
103 .one(&txn)
104 .await?
105 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
106 assert_eq!(obj.obj_type, object_type);
107 check_relation_name_duplicate(
108 object_name,
109 obj.database_id.unwrap(),
110 obj.schema_id.unwrap(),
111 &txn,
112 )
113 .await?;
114
115 let (mut to_update_relations, old_name) =
117 rename_relation(&txn, object_type, object_id, object_name).await?;
118 to_update_relations.extend(
120 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
121 );
122
123 txn.commit().await?;
124
125 let version = self
126 .notify_frontend(
127 NotificationOperation::Update,
128 NotificationInfo::ObjectGroup(PbObjectGroup {
129 objects: to_update_relations,
130 }),
131 )
132 .await;
133
134 Ok(version)
135 }
136
137 pub async fn alter_swap_rename(
138 &self,
139 object_type: ObjectType,
140 object_id: ObjectId,
141 dst_object_id: ObjectId,
142 ) -> MetaResult<NotificationVersion> {
143 let inner = self.inner.write().await;
144 let txn = inner.db.begin().await?;
145 let dst_name: String = match object_type {
146 ObjectType::Table => Table::find_by_id(dst_object_id)
147 .select_only()
148 .column(table::Column::Name)
149 .into_tuple()
150 .one(&txn)
151 .await?
152 .ok_or_else(|| {
153 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
154 })?,
155 ObjectType::Source => Source::find_by_id(dst_object_id)
156 .select_only()
157 .column(source::Column::Name)
158 .into_tuple()
159 .one(&txn)
160 .await?
161 .ok_or_else(|| {
162 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
163 })?,
164 ObjectType::Sink => Sink::find_by_id(dst_object_id)
165 .select_only()
166 .column(sink::Column::Name)
167 .into_tuple()
168 .one(&txn)
169 .await?
170 .ok_or_else(|| {
171 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
172 })?,
173 ObjectType::View => View::find_by_id(dst_object_id)
174 .select_only()
175 .column(view::Column::Name)
176 .into_tuple()
177 .one(&txn)
178 .await?
179 .ok_or_else(|| {
180 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
181 })?,
182 ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
183 .select_only()
184 .column(subscription::Column::Name)
185 .into_tuple()
186 .one(&txn)
187 .await?
188 .ok_or_else(|| {
189 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
190 })?,
191 _ => {
192 return Err(MetaError::permission_denied(format!(
193 "swap rename not supported for object type: {:?}",
194 object_type
195 )));
196 }
197 };
198
199 let (mut to_update_relations, src_name) =
201 rename_relation(&txn, object_type, object_id, &dst_name).await?;
202 let (to_update_relations2, _) =
203 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
204 to_update_relations.extend(to_update_relations2);
205 to_update_relations.extend(
207 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
208 );
209 to_update_relations.extend(
210 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
211 );
212
213 txn.commit().await?;
214
215 let version = self
216 .notify_frontend(
217 NotificationOperation::Update,
218 NotificationInfo::ObjectGroup(PbObjectGroup {
219 objects: to_update_relations,
220 }),
221 )
222 .await;
223
224 Ok(version)
225 }
226
227 pub async fn alter_non_shared_source(
228 &self,
229 pb_source: PbSource,
230 ) -> MetaResult<NotificationVersion> {
231 let source_id = pb_source.id as SourceId;
232 let inner = self.inner.write().await;
233 let txn = inner.db.begin().await?;
234
235 let original_version: i64 = Source::find_by_id(source_id)
236 .select_only()
237 .column(source::Column::Version)
238 .into_tuple()
239 .one(&txn)
240 .await?
241 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
242 if original_version + 1 != pb_source.version as i64 {
243 return Err(MetaError::permission_denied(
244 "source version is stale".to_owned(),
245 ));
246 }
247
248 let source: source::ActiveModel = pb_source.clone().into();
249 source.update(&txn).await?;
250 txn.commit().await?;
251
252 let version = self
253 .notify_frontend_relation_info(
254 NotificationOperation::Update,
255 PbObjectInfo::Source(pb_source),
256 )
257 .await;
258 Ok(version)
259 }
260
261 pub async fn alter_owner(
262 &self,
263 object_type: ObjectType,
264 object_id: ObjectId,
265 new_owner: UserId,
266 ) -> MetaResult<NotificationVersion> {
267 let inner = self.inner.write().await;
268 let txn = inner.db.begin().await?;
269 ensure_user_id(new_owner, &txn).await?;
270
271 let obj = Object::find_by_id(object_id)
272 .one(&txn)
273 .await?
274 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
275 if obj.owner_id == new_owner {
276 return Ok(IGNORED_NOTIFICATION_VERSION);
277 }
278 let mut obj = obj.into_active_model();
279 obj.owner_id = Set(new_owner);
280 let obj = obj.update(&txn).await?;
281
282 let mut objects = vec![];
283 match object_type {
284 ObjectType::Database => {
285 let db = Database::find_by_id(object_id)
286 .one(&txn)
287 .await?
288 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
289
290 txn.commit().await?;
291
292 let version = self
293 .notify_frontend(
294 NotificationOperation::Update,
295 NotificationInfo::Database(ObjectModel(db, obj).into()),
296 )
297 .await;
298 return Ok(version);
299 }
300 ObjectType::Schema => {
301 let schema = Schema::find_by_id(object_id)
302 .one(&txn)
303 .await?
304 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
305
306 txn.commit().await?;
307
308 let version = self
309 .notify_frontend(
310 NotificationOperation::Update,
311 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
312 )
313 .await;
314 return Ok(version);
315 }
316 ObjectType::Table => {
317 let table = Table::find_by_id(object_id)
318 .one(&txn)
319 .await?
320 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
321
322 if let Some(associated_source_id) = table.optional_associated_source_id {
324 let src_obj = object::ActiveModel {
325 oid: Set(associated_source_id as _),
326 owner_id: Set(new_owner),
327 ..Default::default()
328 }
329 .update(&txn)
330 .await?;
331 let source = Source::find_by_id(associated_source_id)
332 .one(&txn)
333 .await?
334 .ok_or_else(|| {
335 MetaError::catalog_id_not_found("source", associated_source_id)
336 })?;
337 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
338 }
339
340 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
342 .select_only()
343 .columns([index::Column::IndexId, index::Column::IndexTableId])
344 .filter(index::Column::PrimaryTableId.eq(object_id))
345 .into_tuple::<(IndexId, TableId)>()
346 .all(&txn)
347 .await?
348 .into_iter()
349 .unzip();
350 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
351
352 let internal_tables: Vec<TableId> = Table::find()
354 .select_only()
355 .column(table::Column::TableId)
356 .filter(
357 table::Column::BelongsToJobId
358 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
359 )
360 .into_tuple()
361 .all(&txn)
362 .await?;
363 table_ids.extend(internal_tables);
364
365 if !index_ids.is_empty() || !table_ids.is_empty() {
366 Object::update_many()
367 .col_expr(
368 object::Column::OwnerId,
369 SimpleExpr::Value(Value::Int(Some(new_owner))),
370 )
371 .filter(
372 object::Column::Oid
373 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
374 )
375 .exec(&txn)
376 .await?;
377 }
378
379 if !table_ids.is_empty() {
380 let table_objs = Table::find()
381 .find_also_related(Object)
382 .filter(table::Column::TableId.is_in(table_ids))
383 .all(&txn)
384 .await?;
385 for (table, table_obj) in table_objs {
386 objects.push(PbObjectInfo::Table(
387 ObjectModel(table, table_obj.unwrap()).into(),
388 ));
389 }
390 }
391 if !index_ids.is_empty() {
393 let index_objs = Index::find()
394 .find_also_related(Object)
395 .filter(index::Column::IndexId.is_in(index_ids))
396 .all(&txn)
397 .await?;
398 for (index, index_obj) in index_objs {
399 objects.push(PbObjectInfo::Index(
400 ObjectModel(index, index_obj.unwrap()).into(),
401 ));
402 }
403 }
404 }
405 ObjectType::Source => {
406 let source = Source::find_by_id(object_id)
407 .one(&txn)
408 .await?
409 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
410 let is_shared = source.is_shared();
411 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
412
413 if is_shared {
416 update_internal_tables(
417 &txn,
418 object_id,
419 object::Column::OwnerId,
420 Value::Int(Some(new_owner)),
421 &mut objects,
422 )
423 .await?;
424 }
425 }
426 ObjectType::Sink => {
427 let sink = Sink::find_by_id(object_id)
428 .one(&txn)
429 .await?
430 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
431 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
432
433 update_internal_tables(
434 &txn,
435 object_id,
436 object::Column::OwnerId,
437 Value::Int(Some(new_owner)),
438 &mut objects,
439 )
440 .await?;
441 }
442 ObjectType::Subscription => {
443 let subscription = Subscription::find_by_id(object_id)
444 .one(&txn)
445 .await?
446 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
447 objects.push(PbObjectInfo::Subscription(
448 ObjectModel(subscription, obj).into(),
449 ));
450 }
451 ObjectType::View => {
452 let view = View::find_by_id(object_id)
453 .one(&txn)
454 .await?
455 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
456 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
457 }
458 ObjectType::Connection => {
459 let connection = Connection::find_by_id(object_id)
460 .one(&txn)
461 .await?
462 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
463 objects.push(PbObjectInfo::Connection(
464 ObjectModel(connection, obj).into(),
465 ));
466 }
467 _ => unreachable!("not supported object type: {:?}", object_type),
468 };
469
470 txn.commit().await?;
471
472 let version = self
473 .notify_frontend(
474 NotificationOperation::Update,
475 NotificationInfo::ObjectGroup(PbObjectGroup {
476 objects: objects
477 .into_iter()
478 .map(|object| PbObject {
479 object_info: Some(object),
480 })
481 .collect(),
482 }),
483 )
484 .await;
485 Ok(version)
486 }
487
488 pub async fn alter_schema(
489 &self,
490 object_type: ObjectType,
491 object_id: ObjectId,
492 new_schema: SchemaId,
493 ) -> MetaResult<NotificationVersion> {
494 let inner = self.inner.write().await;
495 let txn = inner.db.begin().await?;
496 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
497
498 let obj = Object::find_by_id(object_id)
499 .one(&txn)
500 .await?
501 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
502 if obj.schema_id == Some(new_schema) {
503 return Ok(IGNORED_NOTIFICATION_VERSION);
504 }
505 let database_id = obj.database_id.unwrap();
506
507 let mut objects = vec![];
508 match object_type {
509 ObjectType::Table => {
510 let table = Table::find_by_id(object_id)
511 .one(&txn)
512 .await?
513 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
514 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
515 let associated_src_id = table.optional_associated_source_id;
516
517 let mut obj = obj.into_active_model();
518 obj.schema_id = Set(Some(new_schema));
519 let obj = obj.update(&txn).await?;
520 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
521
522 if let Some(associated_source_id) = associated_src_id {
524 let src_obj = object::ActiveModel {
525 oid: Set(associated_source_id as _),
526 schema_id: Set(Some(new_schema)),
527 ..Default::default()
528 }
529 .update(&txn)
530 .await?;
531 let source = Source::find_by_id(associated_source_id)
532 .one(&txn)
533 .await?
534 .ok_or_else(|| {
535 MetaError::catalog_id_not_found("source", associated_source_id)
536 })?;
537 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
538 }
539
540 let (index_ids, (index_names, mut table_ids)): (
542 Vec<IndexId>,
543 (Vec<String>, Vec<TableId>),
544 ) = Index::find()
545 .select_only()
546 .columns([
547 index::Column::IndexId,
548 index::Column::Name,
549 index::Column::IndexTableId,
550 ])
551 .filter(index::Column::PrimaryTableId.eq(object_id))
552 .into_tuple::<(IndexId, String, TableId)>()
553 .all(&txn)
554 .await?
555 .into_iter()
556 .map(|(id, name, t_id)| (id, (name, t_id)))
557 .unzip();
558
559 let internal_tables: Vec<TableId> = Table::find()
561 .select_only()
562 .column(table::Column::TableId)
563 .filter(
564 table::Column::BelongsToJobId
565 .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
566 )
567 .into_tuple()
568 .all(&txn)
569 .await?;
570 table_ids.extend(internal_tables);
571
572 if !index_ids.is_empty() || !table_ids.is_empty() {
573 for index_name in index_names {
574 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
575 .await?;
576 }
577
578 Object::update_many()
579 .col_expr(
580 object::Column::SchemaId,
581 SimpleExpr::Value(Value::Int(Some(new_schema))),
582 )
583 .filter(
584 object::Column::Oid
585 .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
586 )
587 .exec(&txn)
588 .await?;
589 }
590
591 if !table_ids.is_empty() {
592 let table_objs = Table::find()
593 .find_also_related(Object)
594 .filter(table::Column::TableId.is_in(table_ids))
595 .all(&txn)
596 .await?;
597 for (table, table_obj) in table_objs {
598 objects.push(PbObjectInfo::Table(
599 ObjectModel(table, table_obj.unwrap()).into(),
600 ));
601 }
602 }
603 if !index_ids.is_empty() {
604 let index_objs = Index::find()
605 .find_also_related(Object)
606 .filter(index::Column::IndexId.is_in(index_ids))
607 .all(&txn)
608 .await?;
609 for (index, index_obj) in index_objs {
610 objects.push(PbObjectInfo::Index(
611 ObjectModel(index, index_obj.unwrap()).into(),
612 ));
613 }
614 }
615 }
616 ObjectType::Source => {
617 let source = Source::find_by_id(object_id)
618 .one(&txn)
619 .await?
620 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
621 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
622 let is_shared = source.is_shared();
623
624 let mut obj = obj.into_active_model();
625 obj.schema_id = Set(Some(new_schema));
626 let obj = obj.update(&txn).await?;
627 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
628
629 if is_shared {
632 update_internal_tables(
633 &txn,
634 object_id,
635 object::Column::SchemaId,
636 Value::Int(Some(new_schema)),
637 &mut objects,
638 )
639 .await?;
640 }
641 }
642 ObjectType::Sink => {
643 let sink = Sink::find_by_id(object_id)
644 .one(&txn)
645 .await?
646 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
647 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
648
649 let mut obj = obj.into_active_model();
650 obj.schema_id = Set(Some(new_schema));
651 let obj = obj.update(&txn).await?;
652 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
653
654 update_internal_tables(
655 &txn,
656 object_id,
657 object::Column::SchemaId,
658 Value::Int(Some(new_schema)),
659 &mut objects,
660 )
661 .await?;
662 }
663 ObjectType::Subscription => {
664 let subscription = Subscription::find_by_id(object_id)
665 .one(&txn)
666 .await?
667 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
668 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
669 .await?;
670
671 let mut obj = obj.into_active_model();
672 obj.schema_id = Set(Some(new_schema));
673 let obj = obj.update(&txn).await?;
674 objects.push(PbObjectInfo::Subscription(
675 ObjectModel(subscription, obj).into(),
676 ));
677 }
678 ObjectType::View => {
679 let view = View::find_by_id(object_id)
680 .one(&txn)
681 .await?
682 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
683 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
684
685 let mut obj = obj.into_active_model();
686 obj.schema_id = Set(Some(new_schema));
687 let obj = obj.update(&txn).await?;
688 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
689 }
690 ObjectType::Function => {
691 let function = Function::find_by_id(object_id)
692 .one(&txn)
693 .await?
694 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
695
696 let mut pb_function: PbFunction = ObjectModel(function, obj).into();
697 pb_function.schema_id = new_schema as _;
698 check_function_signature_duplicate(&pb_function, &txn).await?;
699
700 object::ActiveModel {
701 oid: Set(object_id),
702 schema_id: Set(Some(new_schema)),
703 ..Default::default()
704 }
705 .update(&txn)
706 .await?;
707
708 txn.commit().await?;
709 let version = self
710 .notify_frontend(
711 NotificationOperation::Update,
712 NotificationInfo::Function(pb_function),
713 )
714 .await;
715 return Ok(version);
716 }
717 ObjectType::Connection => {
718 let connection = Connection::find_by_id(object_id)
719 .one(&txn)
720 .await?
721 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
722
723 let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
724 pb_connection.schema_id = new_schema as _;
725 check_connection_name_duplicate(&pb_connection, &txn).await?;
726
727 object::ActiveModel {
728 oid: Set(object_id),
729 schema_id: Set(Some(new_schema)),
730 ..Default::default()
731 }
732 .update(&txn)
733 .await?;
734
735 txn.commit().await?;
736 let version = self
737 .notify_frontend(
738 NotificationOperation::Update,
739 NotificationInfo::Connection(pb_connection),
740 )
741 .await;
742 return Ok(version);
743 }
744 _ => unreachable!("not supported object type: {:?}", object_type),
745 }
746
747 txn.commit().await?;
748 let version = self
749 .notify_frontend(
750 Operation::Update,
751 Info::ObjectGroup(PbObjectGroup {
752 objects: objects
753 .into_iter()
754 .map(|relation_info| PbObject {
755 object_info: Some(relation_info),
756 })
757 .collect_vec(),
758 }),
759 )
760 .await;
761 Ok(version)
762 }
763
764 pub async fn alter_secret(
765 &self,
766 pb_secret: PbSecret,
767 secret_plain_payload: Vec<u8>,
768 ) -> MetaResult<NotificationVersion> {
769 let inner = self.inner.write().await;
770 let owner_id = pb_secret.owner as _;
771 let txn = inner.db.begin().await?;
772 ensure_user_id(owner_id, &txn).await?;
773 ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
774 ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
775
776 ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
777 let secret: secret::ActiveModel = pb_secret.clone().into();
778 Secret::update(secret).exec(&txn).await?;
779
780 txn.commit().await?;
781
782 let mut secret_plain = pb_secret;
784 secret_plain.value.clone_from(&secret_plain_payload);
785
786 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
787 self.env
788 .notification_manager()
789 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
790
791 let version = self
792 .notify_frontend(
793 NotificationOperation::Update,
794 NotificationInfo::Secret(secret_plain),
795 )
796 .await;
797
798 Ok(version)
799 }
800
801 pub async fn drop_table_associated_source(
803 txn: &DatabaseTransaction,
804 drop_table_connector_ctx: &DropTableConnectorContext,
805 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
806 let to_drop_source_objects: Vec<PartialObject> = Object::find()
807 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
808 .into_partial_model()
809 .all(txn)
810 .await?;
811 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
812 .select_only()
813 .filter(
814 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
815 )
816 .into_partial_model()
817 .all(txn)
818 .await?;
819 let to_drop_objects = to_drop_source_objects
820 .into_iter()
821 .chain(to_drop_internal_table_objs.into_iter())
822 .collect_vec();
823 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
825 .select_only()
826 .distinct()
827 .column(user_privilege::Column::UserId)
828 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
829 .into_tuple()
830 .all(txn)
831 .await?;
832
833 tracing::debug!(
834 "drop_table_associated_source: to_drop_objects: {:?}",
835 to_drop_objects
836 );
837
838 let res = Object::delete_many()
840 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
841 .exec(txn)
842 .await?;
843 if res.rows_affected == 0 {
844 return Err(MetaError::catalog_id_not_found(
845 ObjectType::Source.as_str(),
846 drop_table_connector_ctx.to_remove_source_id,
847 ));
848 }
849 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
850
851 Ok((user_infos, to_drop_objects))
852 }
853
854 pub async fn alter_database_param(
855 &self,
856 database_id: DatabaseId,
857 param: AlterDatabaseParam,
858 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
859 let inner = self.inner.write().await;
860 let txn = inner.db.begin().await?;
861
862 let mut database = database::ActiveModel {
863 database_id: Set(database_id),
864 ..Default::default()
865 };
866 match param {
867 AlterDatabaseParam::BarrierIntervalMs(interval) => {
868 if let Some(ref interval) = interval {
869 OverrideValidate::barrier_interval_ms(interval)
870 .map_err(|e| anyhow::anyhow!(e))?;
871 }
872 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
873 }
874 AlterDatabaseParam::CheckpointFrequency(frequency) => {
875 if let Some(ref frequency) = frequency {
876 OverrideValidate::checkpoint_frequency(frequency)
877 .map_err(|e| anyhow::anyhow!(e))?;
878 }
879 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
880 }
881 }
882 let database = database.update(&txn).await?;
883
884 let obj = Object::find_by_id(database_id)
885 .one(&txn)
886 .await?
887 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
888
889 txn.commit().await?;
890
891 let version = self
892 .notify_frontend(
893 NotificationOperation::Update,
894 NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
895 )
896 .await;
897 Ok((version, database))
898 }
899
900 pub async fn set_table_refresh_state(
902 &self,
903 table_id: TableId,
904 new_state: RefreshState,
905 ) -> MetaResult<bool> {
906 let inner = self.inner.write().await;
907 let txn = inner.db.begin().await?;
908
909 let active_model = table::ActiveModel {
911 table_id: Set(table_id),
912 refresh_state: Set(Some(new_state)),
913 ..Default::default()
914 };
915 if let Err(e) = active_model.update(&txn).await {
916 tracing::warn!(
917 "Failed to update table refresh state for table {}: {}",
918 table_id,
919 e.as_report()
920 );
921 let t = Table::find_by_id(table_id).all(&txn).await;
922 tracing::info!(table = ?t, "Table found");
923 }
924 txn.commit().await?;
925
926 tracing::debug!(
927 table_id = %table_id,
928 new_state = ?new_state,
929 "Updated table refresh state"
930 );
931
932 Ok(true)
933 }
934}