1use risingwave_common::catalog::AlterDatabaseParam;
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use risingwave_meta_model::table::RefreshState;
18use sea_orm::DatabaseTransaction;
19use thiserror_ext::AsReport;
20
21use super::*;
22
23impl CatalogController {
24 async fn alter_database_name(
25 &self,
26 database_id: DatabaseId,
27 name: &str,
28 ) -> MetaResult<NotificationVersion> {
29 let inner = self.inner.write().await;
30 let txn = inner.db.begin().await?;
31 check_database_name_duplicate(name, &txn).await?;
32
33 let active_model = database::ActiveModel {
34 database_id: Set(database_id),
35 name: Set(name.to_owned()),
36 ..Default::default()
37 };
38 let database = active_model.update(&txn).await?;
39
40 let obj = Object::find_by_id(database_id.as_raw_id() as ObjectId)
41 .one(&txn)
42 .await?
43 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
44
45 txn.commit().await?;
46
47 let version = self
48 .notify_frontend(
49 NotificationOperation::Update,
50 NotificationInfo::Database(ObjectModel(database, obj).into()),
51 )
52 .await;
53 Ok(version)
54 }
55
56 async fn alter_schema_name(
57 &self,
58 schema_id: SchemaId,
59 name: &str,
60 ) -> MetaResult<NotificationVersion> {
61 let inner = self.inner.write().await;
62 let txn = inner.db.begin().await?;
63
64 let obj = Object::find_by_id(schema_id.as_raw_id() as ObjectId)
65 .one(&txn)
66 .await?
67 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
68 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
69
70 let active_model = schema::ActiveModel {
71 schema_id: Set(schema_id),
72 name: Set(name.to_owned()),
73 };
74 let schema = active_model.update(&txn).await?;
75
76 txn.commit().await?;
77
78 let version = self
79 .notify_frontend(
80 NotificationOperation::Update,
81 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
82 )
83 .await;
84 Ok(version)
85 }
86
87 pub async fn alter_name(
88 &self,
89 object_type: ObjectType,
90 object_id: ObjectId,
91 object_name: &str,
92 ) -> MetaResult<NotificationVersion> {
93 if object_type == ObjectType::Database {
94 return self
95 .alter_database_name(DatabaseId::new(object_id as _), object_name)
96 .await;
97 } else if object_type == ObjectType::Schema {
98 return self
99 .alter_schema_name(SchemaId::new(object_id as _), object_name)
100 .await;
101 }
102
103 let inner = self.inner.write().await;
104 let txn = inner.db.begin().await?;
105 let obj: PartialObject = Object::find_by_id(object_id)
106 .into_partial_model()
107 .one(&txn)
108 .await?
109 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
110 assert_eq!(obj.obj_type, object_type);
111 check_relation_name_duplicate(
112 object_name,
113 obj.database_id.unwrap(),
114 obj.schema_id.unwrap(),
115 &txn,
116 )
117 .await?;
118
119 let (mut to_update_relations, old_name) =
121 rename_relation(&txn, object_type, object_id, object_name).await?;
122 to_update_relations.extend(
124 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
125 );
126
127 txn.commit().await?;
128
129 let version = self
130 .notify_frontend(
131 NotificationOperation::Update,
132 NotificationInfo::ObjectGroup(PbObjectGroup {
133 objects: to_update_relations,
134 }),
135 )
136 .await;
137
138 Ok(version)
139 }
140
141 pub async fn alter_swap_rename(
142 &self,
143 object_type: ObjectType,
144 object_id: ObjectId,
145 dst_object_id: ObjectId,
146 ) -> MetaResult<NotificationVersion> {
147 let inner = self.inner.write().await;
148 let txn = inner.db.begin().await?;
149 let dst_name: String = match object_type {
150 ObjectType::Table => Table::find_by_id(TableId::new(dst_object_id as _))
151 .select_only()
152 .column(table::Column::Name)
153 .into_tuple()
154 .one(&txn)
155 .await?
156 .ok_or_else(|| {
157 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
158 })?,
159 ObjectType::Source => Source::find_by_id(dst_object_id)
160 .select_only()
161 .column(source::Column::Name)
162 .into_tuple()
163 .one(&txn)
164 .await?
165 .ok_or_else(|| {
166 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
167 })?,
168 ObjectType::Sink => Sink::find_by_id(dst_object_id)
169 .select_only()
170 .column(sink::Column::Name)
171 .into_tuple()
172 .one(&txn)
173 .await?
174 .ok_or_else(|| {
175 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
176 })?,
177 ObjectType::View => View::find_by_id(dst_object_id)
178 .select_only()
179 .column(view::Column::Name)
180 .into_tuple()
181 .one(&txn)
182 .await?
183 .ok_or_else(|| {
184 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
185 })?,
186 ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
187 .select_only()
188 .column(subscription::Column::Name)
189 .into_tuple()
190 .one(&txn)
191 .await?
192 .ok_or_else(|| {
193 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
194 })?,
195 _ => {
196 return Err(MetaError::permission_denied(format!(
197 "swap rename not supported for object type: {:?}",
198 object_type
199 )));
200 }
201 };
202
203 let (mut to_update_relations, src_name) =
205 rename_relation(&txn, object_type, object_id, &dst_name).await?;
206 let (to_update_relations2, _) =
207 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
208 to_update_relations.extend(to_update_relations2);
209 to_update_relations.extend(
211 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
212 );
213 to_update_relations.extend(
214 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
215 );
216
217 txn.commit().await?;
218
219 let version = self
220 .notify_frontend(
221 NotificationOperation::Update,
222 NotificationInfo::ObjectGroup(PbObjectGroup {
223 objects: to_update_relations,
224 }),
225 )
226 .await;
227
228 Ok(version)
229 }
230
231 pub async fn alter_non_shared_source(
232 &self,
233 pb_source: PbSource,
234 ) -> MetaResult<NotificationVersion> {
235 let source_id = pb_source.id as SourceId;
236 let inner = self.inner.write().await;
237 let txn = inner.db.begin().await?;
238
239 let original_version: i64 = Source::find_by_id(source_id)
240 .select_only()
241 .column(source::Column::Version)
242 .into_tuple()
243 .one(&txn)
244 .await?
245 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
246 if original_version + 1 != pb_source.version as i64 {
247 return Err(MetaError::permission_denied(
248 "source version is stale".to_owned(),
249 ));
250 }
251
252 let source: source::ActiveModel = pb_source.clone().into();
253 source.update(&txn).await?;
254 txn.commit().await?;
255
256 let version = self
257 .notify_frontend_relation_info(
258 NotificationOperation::Update,
259 PbObjectInfo::Source(pb_source),
260 )
261 .await;
262 Ok(version)
263 }
264
265 pub async fn alter_owner(
266 &self,
267 object_type: ObjectType,
268 object_id: ObjectId,
269 new_owner: UserId,
270 ) -> MetaResult<NotificationVersion> {
271 let inner = self.inner.write().await;
272 let txn = inner.db.begin().await?;
273 ensure_user_id(new_owner, &txn).await?;
274
275 let obj = Object::find_by_id(object_id)
276 .one(&txn)
277 .await?
278 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
279 if obj.owner_id == new_owner {
280 return Ok(IGNORED_NOTIFICATION_VERSION);
281 }
282 let mut obj = obj.into_active_model();
283 obj.owner_id = Set(new_owner);
284 let obj = obj.update(&txn).await?;
285
286 let mut objects = vec![];
287 match object_type {
288 ObjectType::Database => {
289 let db = Database::find_by_id(DatabaseId::new(object_id as _))
290 .one(&txn)
291 .await?
292 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
293
294 txn.commit().await?;
295
296 let version = self
297 .notify_frontend(
298 NotificationOperation::Update,
299 NotificationInfo::Database(ObjectModel(db, obj).into()),
300 )
301 .await;
302 return Ok(version);
303 }
304 ObjectType::Schema => {
305 let schema = Schema::find_by_id(SchemaId::new(object_id as _))
306 .one(&txn)
307 .await?
308 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
309
310 txn.commit().await?;
311
312 let version = self
313 .notify_frontend(
314 NotificationOperation::Update,
315 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
316 )
317 .await;
318 return Ok(version);
319 }
320 ObjectType::Table => {
321 let table = Table::find_by_id(TableId::new(object_id as _))
322 .one(&txn)
323 .await?
324 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
325
326 if let Some(associated_source_id) = table.optional_associated_source_id {
328 let src_obj = object::ActiveModel {
329 oid: Set(associated_source_id as _),
330 owner_id: Set(new_owner),
331 ..Default::default()
332 }
333 .update(&txn)
334 .await?;
335 let source = Source::find_by_id(associated_source_id)
336 .one(&txn)
337 .await?
338 .ok_or_else(|| {
339 MetaError::catalog_id_not_found("source", associated_source_id)
340 })?;
341 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
342 }
343
344 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
346 .select_only()
347 .columns([index::Column::IndexId, index::Column::IndexTableId])
348 .filter(index::Column::PrimaryTableId.eq(object_id))
349 .into_tuple::<(IndexId, TableId)>()
350 .all(&txn)
351 .await?
352 .into_iter()
353 .unzip();
354 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
355
356 let internal_tables: Vec<TableId> = Table::find()
358 .select_only()
359 .column(table::Column::TableId)
360 .filter(
361 table::Column::BelongsToJobId.is_in(
362 table_ids
363 .iter()
364 .cloned()
365 .chain(std::iter::once(TableId::new(object_id as _))),
366 ),
367 )
368 .into_tuple()
369 .all(&txn)
370 .await?;
371 table_ids.extend(internal_tables);
372
373 if !index_ids.is_empty() || !table_ids.is_empty() {
374 Object::update_many()
375 .col_expr(
376 object::Column::OwnerId,
377 SimpleExpr::Value(Value::Int(Some(new_owner))),
378 )
379 .filter(
380 object::Column::Oid.is_in(
381 index_ids.iter().cloned().chain(
382 table_ids.iter().map(|table_id| table_id.as_raw_id() as _),
383 ),
384 ),
385 )
386 .exec(&txn)
387 .await?;
388 }
389
390 if !table_ids.is_empty() {
391 let table_objs = Table::find()
392 .find_also_related(Object)
393 .filter(table::Column::TableId.is_in(table_ids))
394 .all(&txn)
395 .await?;
396 for (table, table_obj) in table_objs {
397 objects.push(PbObjectInfo::Table(
398 ObjectModel(table, table_obj.unwrap()).into(),
399 ));
400 }
401 }
402 if !index_ids.is_empty() {
404 let index_objs = Index::find()
405 .find_also_related(Object)
406 .filter(index::Column::IndexId.is_in(index_ids))
407 .all(&txn)
408 .await?;
409 for (index, index_obj) in index_objs {
410 objects.push(PbObjectInfo::Index(
411 ObjectModel(index, index_obj.unwrap()).into(),
412 ));
413 }
414 }
415 }
416 ObjectType::Source => {
417 let source = Source::find_by_id(object_id)
418 .one(&txn)
419 .await?
420 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
421 let is_shared = source.is_shared();
422 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
423
424 if is_shared {
427 update_internal_tables(
428 &txn,
429 object_id,
430 object::Column::OwnerId,
431 Value::Int(Some(new_owner)),
432 &mut objects,
433 )
434 .await?;
435 }
436 }
437 ObjectType::Sink => {
438 let sink = Sink::find_by_id(object_id)
439 .one(&txn)
440 .await?
441 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
442 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
443
444 update_internal_tables(
445 &txn,
446 object_id,
447 object::Column::OwnerId,
448 Value::Int(Some(new_owner)),
449 &mut objects,
450 )
451 .await?;
452 }
453 ObjectType::Subscription => {
454 let subscription = Subscription::find_by_id(object_id)
455 .one(&txn)
456 .await?
457 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
458 objects.push(PbObjectInfo::Subscription(
459 ObjectModel(subscription, obj).into(),
460 ));
461 }
462 ObjectType::View => {
463 let view = View::find_by_id(object_id)
464 .one(&txn)
465 .await?
466 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
467 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
468 }
469 ObjectType::Connection => {
470 let connection = Connection::find_by_id(object_id)
471 .one(&txn)
472 .await?
473 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
474 objects.push(PbObjectInfo::Connection(
475 ObjectModel(connection, obj).into(),
476 ));
477 }
478 _ => unreachable!("not supported object type: {:?}", object_type),
479 };
480
481 txn.commit().await?;
482
483 let version = self
484 .notify_frontend(
485 NotificationOperation::Update,
486 NotificationInfo::ObjectGroup(PbObjectGroup {
487 objects: objects
488 .into_iter()
489 .map(|object| PbObject {
490 object_info: Some(object),
491 })
492 .collect(),
493 }),
494 )
495 .await;
496 Ok(version)
497 }
498
499 pub async fn alter_schema(
500 &self,
501 object_type: ObjectType,
502 object_id: ObjectId,
503 new_schema: SchemaId,
504 ) -> MetaResult<NotificationVersion> {
505 let inner = self.inner.write().await;
506 let txn = inner.db.begin().await?;
507 ensure_object_id(ObjectType::Schema, new_schema.as_raw_id() as ObjectId, &txn).await?;
508
509 let obj = Object::find_by_id(object_id)
510 .one(&txn)
511 .await?
512 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
513 if obj.schema_id == Some(new_schema) {
514 return Ok(IGNORED_NOTIFICATION_VERSION);
515 }
516 let database_id = obj.database_id.unwrap();
517
518 let mut objects = vec![];
519 match object_type {
520 ObjectType::Table => {
521 let table = Table::find_by_id(TableId::new(object_id as _))
522 .one(&txn)
523 .await?
524 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
525 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
526 let associated_src_id = table.optional_associated_source_id;
527
528 let mut obj = obj.into_active_model();
529 obj.schema_id = Set(Some(new_schema));
530 let obj = obj.update(&txn).await?;
531 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
532
533 if let Some(associated_source_id) = associated_src_id {
535 let src_obj = object::ActiveModel {
536 oid: Set(associated_source_id as _),
537 schema_id: Set(Some(new_schema)),
538 ..Default::default()
539 }
540 .update(&txn)
541 .await?;
542 let source = Source::find_by_id(associated_source_id)
543 .one(&txn)
544 .await?
545 .ok_or_else(|| {
546 MetaError::catalog_id_not_found("source", associated_source_id)
547 })?;
548 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
549 }
550
551 let (index_ids, (index_names, mut table_ids)): (
553 Vec<IndexId>,
554 (Vec<String>, Vec<TableId>),
555 ) = Index::find()
556 .select_only()
557 .columns([
558 index::Column::IndexId,
559 index::Column::Name,
560 index::Column::IndexTableId,
561 ])
562 .filter(index::Column::PrimaryTableId.eq(object_id))
563 .into_tuple::<(IndexId, String, TableId)>()
564 .all(&txn)
565 .await?
566 .into_iter()
567 .map(|(id, name, t_id)| (id, (name, t_id)))
568 .unzip();
569
570 let internal_tables: Vec<TableId> = Table::find()
572 .select_only()
573 .column(table::Column::TableId)
574 .filter(
575 table::Column::BelongsToJobId.is_in(
576 table_ids
577 .iter()
578 .map(|table_id| table_id.as_job_id())
579 .chain(std::iter::once(JobId::new(object_id as _))),
580 ),
581 )
582 .into_tuple()
583 .all(&txn)
584 .await?;
585 table_ids.extend(internal_tables);
586
587 if !index_ids.is_empty() || !table_ids.is_empty() {
588 for index_name in index_names {
589 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
590 .await?;
591 }
592
593 Object::update_many()
594 .col_expr(object::Column::SchemaId, new_schema.into())
595 .filter(
596 object::Column::Oid.is_in(
597 index_ids.iter().cloned().chain(
598 table_ids.iter().map(|table_id| table_id.as_raw_id() as _),
599 ),
600 ),
601 )
602 .exec(&txn)
603 .await?;
604 }
605
606 if !table_ids.is_empty() {
607 let table_objs = Table::find()
608 .find_also_related(Object)
609 .filter(table::Column::TableId.is_in(table_ids))
610 .all(&txn)
611 .await?;
612 for (table, table_obj) in table_objs {
613 objects.push(PbObjectInfo::Table(
614 ObjectModel(table, table_obj.unwrap()).into(),
615 ));
616 }
617 }
618 if !index_ids.is_empty() {
619 let index_objs = Index::find()
620 .find_also_related(Object)
621 .filter(index::Column::IndexId.is_in(index_ids))
622 .all(&txn)
623 .await?;
624 for (index, index_obj) in index_objs {
625 objects.push(PbObjectInfo::Index(
626 ObjectModel(index, index_obj.unwrap()).into(),
627 ));
628 }
629 }
630 }
631 ObjectType::Source => {
632 let source = Source::find_by_id(object_id)
633 .one(&txn)
634 .await?
635 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
636 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
637 let is_shared = source.is_shared();
638
639 let mut obj = obj.into_active_model();
640 obj.schema_id = Set(Some(new_schema));
641 let obj = obj.update(&txn).await?;
642 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
643
644 if is_shared {
647 update_internal_tables(
648 &txn,
649 object_id,
650 object::Column::SchemaId,
651 new_schema.into(),
652 &mut objects,
653 )
654 .await?;
655 }
656 }
657 ObjectType::Sink => {
658 let sink = Sink::find_by_id(object_id)
659 .one(&txn)
660 .await?
661 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
662 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
663
664 let mut obj = obj.into_active_model();
665 obj.schema_id = Set(Some(new_schema));
666 let obj = obj.update(&txn).await?;
667 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
668
669 update_internal_tables(
670 &txn,
671 object_id,
672 object::Column::SchemaId,
673 new_schema.into(),
674 &mut objects,
675 )
676 .await?;
677 }
678 ObjectType::Subscription => {
679 let subscription = Subscription::find_by_id(object_id)
680 .one(&txn)
681 .await?
682 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
683 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
684 .await?;
685
686 let mut obj = obj.into_active_model();
687 obj.schema_id = Set(Some(new_schema));
688 let obj = obj.update(&txn).await?;
689 objects.push(PbObjectInfo::Subscription(
690 ObjectModel(subscription, obj).into(),
691 ));
692 }
693 ObjectType::View => {
694 let view = View::find_by_id(object_id)
695 .one(&txn)
696 .await?
697 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
698 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
699
700 let mut obj = obj.into_active_model();
701 obj.schema_id = Set(Some(new_schema));
702 let obj = obj.update(&txn).await?;
703 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
704 }
705 ObjectType::Function => {
706 let function = Function::find_by_id(object_id)
707 .one(&txn)
708 .await?
709 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
710
711 let mut pb_function: PbFunction = ObjectModel(function, obj).into();
712 pb_function.schema_id = new_schema;
713 check_function_signature_duplicate(&pb_function, &txn).await?;
714
715 object::ActiveModel {
716 oid: Set(object_id),
717 schema_id: Set(Some(new_schema)),
718 ..Default::default()
719 }
720 .update(&txn)
721 .await?;
722
723 txn.commit().await?;
724 let version = self
725 .notify_frontend(
726 NotificationOperation::Update,
727 NotificationInfo::Function(pb_function),
728 )
729 .await;
730 return Ok(version);
731 }
732 ObjectType::Connection => {
733 let connection = Connection::find_by_id(object_id)
734 .one(&txn)
735 .await?
736 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
737
738 let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
739 pb_connection.schema_id = new_schema;
740 check_connection_name_duplicate(&pb_connection, &txn).await?;
741
742 object::ActiveModel {
743 oid: Set(object_id),
744 schema_id: Set(Some(new_schema)),
745 ..Default::default()
746 }
747 .update(&txn)
748 .await?;
749
750 txn.commit().await?;
751 let version = self
752 .notify_frontend(
753 NotificationOperation::Update,
754 NotificationInfo::Connection(pb_connection),
755 )
756 .await;
757 return Ok(version);
758 }
759 _ => unreachable!("not supported object type: {:?}", object_type),
760 }
761
762 txn.commit().await?;
763 let version = self
764 .notify_frontend(
765 Operation::Update,
766 Info::ObjectGroup(PbObjectGroup {
767 objects: objects
768 .into_iter()
769 .map(|relation_info| PbObject {
770 object_info: Some(relation_info),
771 })
772 .collect_vec(),
773 }),
774 )
775 .await;
776 Ok(version)
777 }
778
779 pub async fn alter_secret(
780 &self,
781 pb_secret: PbSecret,
782 secret_plain_payload: Vec<u8>,
783 ) -> MetaResult<NotificationVersion> {
784 let inner = self.inner.write().await;
785 let owner_id = pb_secret.owner as _;
786 let txn = inner.db.begin().await?;
787 ensure_user_id(owner_id, &txn).await?;
788 ensure_object_id(
789 ObjectType::Database,
790 pb_secret.database_id.as_raw_id() as _,
791 &txn,
792 )
793 .await?;
794 ensure_object_id(
795 ObjectType::Schema,
796 pb_secret.schema_id.as_raw_id() as _,
797 &txn,
798 )
799 .await?;
800
801 ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
802 let secret: secret::ActiveModel = pb_secret.clone().into();
803 Secret::update(secret).exec(&txn).await?;
804
805 txn.commit().await?;
806
807 let mut secret_plain = pb_secret;
809 secret_plain.value.clone_from(&secret_plain_payload);
810
811 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
812 self.env
813 .notification_manager()
814 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
815
816 let version = self
817 .notify_frontend(
818 NotificationOperation::Update,
819 NotificationInfo::Secret(secret_plain),
820 )
821 .await;
822
823 Ok(version)
824 }
825
826 pub async fn drop_table_associated_source(
828 txn: &DatabaseTransaction,
829 drop_table_connector_ctx: &DropTableConnectorContext,
830 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
831 let to_drop_source_objects: Vec<PartialObject> = Object::find()
832 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
833 .into_partial_model()
834 .all(txn)
835 .await?;
836 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
837 .select_only()
838 .filter(
839 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
840 )
841 .into_partial_model()
842 .all(txn)
843 .await?;
844 let to_drop_objects = to_drop_source_objects
845 .into_iter()
846 .chain(to_drop_internal_table_objs.into_iter())
847 .collect_vec();
848 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
850 .select_only()
851 .distinct()
852 .column(user_privilege::Column::UserId)
853 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
854 .into_tuple()
855 .all(txn)
856 .await?;
857
858 tracing::debug!(
859 "drop_table_associated_source: to_drop_objects: {:?}",
860 to_drop_objects
861 );
862
863 let res = Object::delete_many()
865 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
866 .exec(txn)
867 .await?;
868 if res.rows_affected == 0 {
869 return Err(MetaError::catalog_id_not_found(
870 ObjectType::Source.as_str(),
871 drop_table_connector_ctx.to_remove_source_id,
872 ));
873 }
874 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
875
876 Ok((user_infos, to_drop_objects))
877 }
878
879 pub async fn alter_database_param(
880 &self,
881 database_id: DatabaseId,
882 param: AlterDatabaseParam,
883 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
884 let inner = self.inner.write().await;
885 let txn = inner.db.begin().await?;
886
887 let mut database = database::ActiveModel {
888 database_id: Set(database_id),
889 ..Default::default()
890 };
891 match param {
892 AlterDatabaseParam::BarrierIntervalMs(interval) => {
893 if let Some(ref interval) = interval {
894 OverrideValidate::barrier_interval_ms(interval)
895 .map_err(|e| anyhow::anyhow!(e))?;
896 }
897 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
898 }
899 AlterDatabaseParam::CheckpointFrequency(frequency) => {
900 if let Some(ref frequency) = frequency {
901 OverrideValidate::checkpoint_frequency(frequency)
902 .map_err(|e| anyhow::anyhow!(e))?;
903 }
904 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
905 }
906 }
907 let database = database.update(&txn).await?;
908
909 let obj = Object::find_by_id(database_id.as_raw_id() as ObjectId)
910 .one(&txn)
911 .await?
912 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
913
914 txn.commit().await?;
915
916 let version = self
917 .notify_frontend(
918 NotificationOperation::Update,
919 NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
920 )
921 .await;
922 Ok((version, database))
923 }
924
925 pub async fn set_table_refresh_state(
927 &self,
928 table_id: TableId,
929 new_state: RefreshState,
930 ) -> MetaResult<bool> {
931 let inner = self.inner.write().await;
932 let txn = inner.db.begin().await?;
933
934 let active_model = table::ActiveModel {
936 table_id: Set(table_id),
937 refresh_state: Set(Some(new_state)),
938 ..Default::default()
939 };
940 if let Err(e) = active_model.update(&txn).await {
941 tracing::warn!(
942 "Failed to update table refresh state for table {}: {}",
943 table_id,
944 e.as_report()
945 );
946 let t = Table::find_by_id(table_id).all(&txn).await;
947 tracing::info!(table = ?t, "Table found");
948 }
949 txn.commit().await?;
950
951 tracing::debug!(
952 table_id = %table_id,
953 new_state = ?new_state,
954 "Updated table refresh state"
955 );
956
957 Ok(true)
958 }
959}