1use anyhow::Context;
16use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
17use risingwave_common::config::mutate::TomlTableMutateExt as _;
18use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
19use risingwave_common::id::JobId;
20use risingwave_common::system_param::{OverrideValidate, Validate};
21use risingwave_meta_model::refresh_job::{self, RefreshState};
22use sea_orm::ActiveValue::{NotSet, Set};
23use sea_orm::prelude::DateTime;
24use sea_orm::sea_query::Expr;
25use sea_orm::{ActiveModelTrait, DatabaseTransaction};
26
27use super::*;
28use crate::error::bail_invalid_parameter;
29
30impl CatalogController {
31 async fn alter_database_name(
32 &self,
33 database_id: DatabaseId,
34 name: &str,
35 ) -> MetaResult<NotificationVersion> {
36 let inner = self.inner.write().await;
37 let txn = inner.db.begin().await?;
38 check_database_name_duplicate(name, &txn).await?;
39
40 let active_model = database::ActiveModel {
41 database_id: Set(database_id),
42 name: Set(name.to_owned()),
43 ..Default::default()
44 };
45 let database = active_model.update(&txn).await?;
46
47 let obj = Object::find_by_id(database_id)
48 .one(&txn)
49 .await?
50 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
51
52 txn.commit().await?;
53
54 let version = self
55 .notify_frontend(
56 NotificationOperation::Update,
57 NotificationInfo::Database(ObjectModel(database, obj).into()),
58 )
59 .await;
60 Ok(version)
61 }
62
63 async fn alter_schema_name(
64 &self,
65 schema_id: SchemaId,
66 name: &str,
67 ) -> MetaResult<NotificationVersion> {
68 let inner = self.inner.write().await;
69 let txn = inner.db.begin().await?;
70
71 let obj = Object::find_by_id(schema_id)
72 .one(&txn)
73 .await?
74 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
75 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
76
77 let active_model = schema::ActiveModel {
78 schema_id: Set(schema_id),
79 name: Set(name.to_owned()),
80 };
81 let schema = active_model.update(&txn).await?;
82
83 txn.commit().await?;
84
85 let version = self
86 .notify_frontend(
87 NotificationOperation::Update,
88 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
89 )
90 .await;
91 Ok(version)
92 }
93
94 pub async fn alter_name(
95 &self,
96 object_type: ObjectType,
97 object_id: impl Into<ObjectId>,
98 object_name: &str,
99 ) -> MetaResult<NotificationVersion> {
100 let object_id = object_id.into();
101 if object_type == ObjectType::Database {
102 return self
103 .alter_database_name(object_id.as_database_id(), object_name)
104 .await;
105 } else if object_type == ObjectType::Schema {
106 return self
107 .alter_schema_name(object_id.as_schema_id(), object_name)
108 .await;
109 }
110
111 let inner = self.inner.write().await;
112 let txn = inner.db.begin().await?;
113 let obj: PartialObject = Object::find_by_id(object_id)
114 .into_partial_model()
115 .one(&txn)
116 .await?
117 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
118 assert_eq!(obj.obj_type, object_type);
119 check_relation_name_duplicate(
120 object_name,
121 obj.database_id.unwrap(),
122 obj.schema_id.unwrap(),
123 &txn,
124 )
125 .await?;
126
127 let (mut to_update_relations, old_name) =
129 rename_relation(&txn, object_type, object_id, object_name).await?;
130 to_update_relations.extend(
132 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
133 );
134
135 txn.commit().await?;
136
137 let version = self
138 .notify_frontend(
139 NotificationOperation::Update,
140 NotificationInfo::ObjectGroup(PbObjectGroup {
141 objects: to_update_relations,
142 }),
143 )
144 .await;
145
146 Ok(version)
147 }
148
149 pub async fn alter_swap_rename(
150 &self,
151 object_type: ObjectType,
152 object_id: ObjectId,
153 dst_object_id: ObjectId,
154 ) -> MetaResult<NotificationVersion> {
155 let inner = self.inner.write().await;
156 let txn = inner.db.begin().await?;
157 let dst_name: String = match object_type {
158 ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
159 .select_only()
160 .column(table::Column::Name)
161 .into_tuple()
162 .one(&txn)
163 .await?
164 .ok_or_else(|| {
165 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
166 })?,
167 ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
168 .select_only()
169 .column(source::Column::Name)
170 .into_tuple()
171 .one(&txn)
172 .await?
173 .ok_or_else(|| {
174 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
175 })?,
176 ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
177 .select_only()
178 .column(sink::Column::Name)
179 .into_tuple()
180 .one(&txn)
181 .await?
182 .ok_or_else(|| {
183 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
184 })?,
185 ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
186 .select_only()
187 .column(view::Column::Name)
188 .into_tuple()
189 .one(&txn)
190 .await?
191 .ok_or_else(|| {
192 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
193 })?,
194 ObjectType::Subscription => {
195 Subscription::find_by_id(dst_object_id.as_subscription_id())
196 .select_only()
197 .column(subscription::Column::Name)
198 .into_tuple()
199 .one(&txn)
200 .await?
201 .ok_or_else(|| {
202 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
203 })?
204 }
205 _ => {
206 return Err(MetaError::permission_denied(format!(
207 "swap rename not supported for object type: {:?}",
208 object_type
209 )));
210 }
211 };
212
213 let (mut to_update_relations, src_name) =
215 rename_relation(&txn, object_type, object_id, &dst_name).await?;
216 let (to_update_relations2, _) =
217 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
218 to_update_relations.extend(to_update_relations2);
219 to_update_relations.extend(
221 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
222 );
223 to_update_relations.extend(
224 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
225 );
226
227 txn.commit().await?;
228
229 let version = self
230 .notify_frontend(
231 NotificationOperation::Update,
232 NotificationInfo::ObjectGroup(PbObjectGroup {
233 objects: to_update_relations,
234 }),
235 )
236 .await;
237
238 Ok(version)
239 }
240
241 pub async fn alter_non_shared_source(
242 &self,
243 pb_source: PbSource,
244 ) -> MetaResult<NotificationVersion> {
245 let source_id: SourceId = pb_source.id;
246 let inner = self.inner.write().await;
247 let txn = inner.db.begin().await?;
248
249 let original_version: i64 = Source::find_by_id(source_id)
250 .select_only()
251 .column(source::Column::Version)
252 .into_tuple()
253 .one(&txn)
254 .await?
255 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
256 if original_version + 1 != pb_source.version as i64 {
257 return Err(MetaError::permission_denied(
258 "source version is stale".to_owned(),
259 ));
260 }
261
262 let source: source::ActiveModel = pb_source.clone().into();
263 source.update(&txn).await?;
264 txn.commit().await?;
265
266 let version = self
267 .notify_frontend_relation_info(
268 NotificationOperation::Update,
269 PbObjectInfo::Source(pb_source),
270 )
271 .await;
272 Ok(version)
273 }
274
275 pub async fn alter_owner(
276 &self,
277 object_type: ObjectType,
278 object_id: ObjectId,
279 new_owner: UserId,
280 ) -> MetaResult<NotificationVersion> {
281 let inner = self.inner.write().await;
282 let txn = inner.db.begin().await?;
283 ensure_user_id(new_owner, &txn).await?;
284
285 let obj = Object::find_by_id(object_id)
286 .one(&txn)
287 .await?
288 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
289 if obj.owner_id == new_owner {
290 return Ok(IGNORED_NOTIFICATION_VERSION);
291 }
292 let mut obj = obj.into_active_model();
293 obj.owner_id = Set(new_owner);
294 let obj = obj.update(&txn).await?;
295
296 let mut objects = vec![];
297 match object_type {
298 ObjectType::Database => {
299 let db = Database::find_by_id(object_id.as_database_id())
300 .one(&txn)
301 .await?
302 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
303
304 txn.commit().await?;
305
306 let version = self
307 .notify_frontend(
308 NotificationOperation::Update,
309 NotificationInfo::Database(ObjectModel(db, obj).into()),
310 )
311 .await;
312 return Ok(version);
313 }
314 ObjectType::Schema => {
315 let schema = Schema::find_by_id(object_id.as_schema_id())
316 .one(&txn)
317 .await?
318 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
319
320 txn.commit().await?;
321
322 let version = self
323 .notify_frontend(
324 NotificationOperation::Update,
325 NotificationInfo::Schema(ObjectModel(schema, obj).into()),
326 )
327 .await;
328 return Ok(version);
329 }
330 ObjectType::Table => {
331 let table = Table::find_by_id(object_id.as_table_id())
332 .one(&txn)
333 .await?
334 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
335
336 if let Some(associated_source_id) = table.optional_associated_source_id {
338 let src_obj = object::ActiveModel {
339 oid: Set(associated_source_id.as_object_id()),
340 owner_id: Set(new_owner),
341 ..Default::default()
342 }
343 .update(&txn)
344 .await?;
345 let source = Source::find_by_id(associated_source_id)
346 .one(&txn)
347 .await?
348 .ok_or_else(|| {
349 MetaError::catalog_id_not_found("source", associated_source_id)
350 })?;
351 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
352 }
353
354 if matches!(table.engine, Some(table::Engine::Iceberg)) {
356 let iceberg_sink = Sink::find()
357 .inner_join(Object)
358 .select_only()
359 .column(sink::Column::SinkId)
360 .filter(
361 object::Column::DatabaseId
362 .eq(obj.database_id)
363 .and(object::Column::SchemaId.eq(obj.schema_id))
364 .and(
365 sink::Column::Name
366 .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
367 ),
368 )
369 .into_tuple::<SinkId>()
370 .one(&txn)
371 .await?
372 .expect("iceberg sink must exist");
373 let sink_obj = object::ActiveModel {
374 oid: Set(iceberg_sink.as_object_id()),
375 owner_id: Set(new_owner),
376 ..Default::default()
377 }
378 .update(&txn)
379 .await?;
380 let sink = Sink::find_by_id(iceberg_sink)
381 .one(&txn)
382 .await?
383 .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
384 objects.push(PbObjectInfo::Sink(ObjectModel(sink, sink_obj).into()));
385
386 let iceberg_source = Source::find()
387 .inner_join(Object)
388 .select_only()
389 .column(source::Column::SourceId)
390 .filter(
391 object::Column::DatabaseId
392 .eq(obj.database_id)
393 .and(object::Column::SchemaId.eq(obj.schema_id))
394 .and(
395 source::Column::Name
396 .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
397 ),
398 )
399 .into_tuple::<SourceId>()
400 .one(&txn)
401 .await?
402 .expect("iceberg source must exist");
403 let source_obj = object::ActiveModel {
404 oid: Set(iceberg_source.as_object_id()),
405 owner_id: Set(new_owner),
406 ..Default::default()
407 }
408 .update(&txn)
409 .await?;
410 let source = Source::find_by_id(iceberg_source)
411 .one(&txn)
412 .await?
413 .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
414 objects.push(PbObjectInfo::Source(ObjectModel(source, source_obj).into()));
415 }
416
417 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
419 .select_only()
420 .columns([index::Column::IndexId, index::Column::IndexTableId])
421 .filter(index::Column::PrimaryTableId.eq(object_id))
422 .into_tuple::<(IndexId, TableId)>()
423 .all(&txn)
424 .await?
425 .into_iter()
426 .unzip();
427 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
428
429 let internal_tables: Vec<TableId> = Table::find()
431 .select_only()
432 .column(table::Column::TableId)
433 .filter(
434 table::Column::BelongsToJobId.is_in(
435 table_ids
436 .iter()
437 .cloned()
438 .chain(std::iter::once(object_id.as_table_id())),
439 ),
440 )
441 .into_tuple()
442 .all(&txn)
443 .await?;
444 table_ids.extend(internal_tables);
445
446 if !index_ids.is_empty() || !table_ids.is_empty() {
447 Object::update_many()
448 .col_expr(
449 object::Column::OwnerId,
450 SimpleExpr::Value(Value::Int(Some(new_owner))),
451 )
452 .filter(
453 object::Column::Oid.is_in::<ObjectId, _>(
454 index_ids
455 .iter()
456 .copied()
457 .map_into()
458 .chain(table_ids.iter().copied().map_into()),
459 ),
460 )
461 .exec(&txn)
462 .await?;
463 }
464
465 if !table_ids.is_empty() {
466 let table_objs = Table::find()
467 .find_also_related(Object)
468 .filter(table::Column::TableId.is_in(table_ids))
469 .all(&txn)
470 .await?;
471 for (table, table_obj) in table_objs {
472 objects.push(PbObjectInfo::Table(
473 ObjectModel(table, table_obj.unwrap()).into(),
474 ));
475 }
476 }
477 if !index_ids.is_empty() {
479 let index_objs = Index::find()
480 .find_also_related(Object)
481 .filter(index::Column::IndexId.is_in(index_ids))
482 .all(&txn)
483 .await?;
484 for (index, index_obj) in index_objs {
485 objects.push(PbObjectInfo::Index(
486 ObjectModel(index, index_obj.unwrap()).into(),
487 ));
488 }
489 }
490 }
491 ObjectType::Source => {
492 let source = Source::find_by_id(object_id.as_source_id())
493 .one(&txn)
494 .await?
495 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
496 let is_shared = source.is_shared();
497 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
498
499 if is_shared {
502 update_internal_tables(
503 &txn,
504 object_id,
505 object::Column::OwnerId,
506 Value::Int(Some(new_owner)),
507 &mut objects,
508 )
509 .await?;
510 }
511 }
512 ObjectType::Sink => {
513 let sink = Sink::find_by_id(object_id.as_sink_id())
514 .one(&txn)
515 .await?
516 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
517 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
518
519 update_internal_tables(
520 &txn,
521 object_id,
522 object::Column::OwnerId,
523 Value::Int(Some(new_owner)),
524 &mut objects,
525 )
526 .await?;
527 }
528 ObjectType::Subscription => {
529 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
530 .one(&txn)
531 .await?
532 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
533 objects.push(PbObjectInfo::Subscription(
534 ObjectModel(subscription, obj).into(),
535 ));
536 }
537 ObjectType::View => {
538 let view = View::find_by_id(object_id.as_view_id())
539 .one(&txn)
540 .await?
541 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
542 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
543 }
544 ObjectType::Connection => {
545 let connection = Connection::find_by_id(object_id.as_connection_id())
546 .one(&txn)
547 .await?
548 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
549 objects.push(PbObjectInfo::Connection(
550 ObjectModel(connection, obj).into(),
551 ));
552 }
553 _ => unreachable!("not supported object type: {:?}", object_type),
554 };
555
556 txn.commit().await?;
557
558 let version = self
559 .notify_frontend(
560 NotificationOperation::Update,
561 NotificationInfo::ObjectGroup(PbObjectGroup {
562 objects: objects
563 .into_iter()
564 .map(|object| PbObject {
565 object_info: Some(object),
566 })
567 .collect(),
568 }),
569 )
570 .await;
571 Ok(version)
572 }
573
574 pub async fn alter_schema(
575 &self,
576 object_type: ObjectType,
577 object_id: ObjectId,
578 new_schema: SchemaId,
579 ) -> MetaResult<NotificationVersion> {
580 let inner = self.inner.write().await;
581 let txn = inner.db.begin().await?;
582 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
583
584 let obj = Object::find_by_id(object_id)
585 .one(&txn)
586 .await?
587 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
588 if obj.schema_id == Some(new_schema) {
589 return Ok(IGNORED_NOTIFICATION_VERSION);
590 }
591 let database_id = obj.database_id.unwrap();
592
593 let mut objects = vec![];
594 match object_type {
595 ObjectType::Table => {
596 let table = Table::find_by_id(object_id.as_table_id())
597 .one(&txn)
598 .await?
599 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
600 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
601 let associated_src_id = table.optional_associated_source_id;
602
603 let mut obj = obj.into_active_model();
604 obj.schema_id = Set(Some(new_schema));
605 let obj = obj.update(&txn).await?;
606 objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
607
608 if let Some(associated_source_id) = associated_src_id {
610 let src_obj = object::ActiveModel {
611 oid: Set(associated_source_id.as_object_id()),
612 schema_id: Set(Some(new_schema)),
613 ..Default::default()
614 }
615 .update(&txn)
616 .await?;
617 let source = Source::find_by_id(associated_source_id)
618 .one(&txn)
619 .await?
620 .ok_or_else(|| {
621 MetaError::catalog_id_not_found("source", associated_source_id)
622 })?;
623 objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
624 }
625
626 let (index_ids, (index_names, mut table_ids)): (
628 Vec<IndexId>,
629 (Vec<String>, Vec<TableId>),
630 ) = Index::find()
631 .select_only()
632 .columns([
633 index::Column::IndexId,
634 index::Column::Name,
635 index::Column::IndexTableId,
636 ])
637 .filter(index::Column::PrimaryTableId.eq(object_id))
638 .into_tuple::<(IndexId, String, TableId)>()
639 .all(&txn)
640 .await?
641 .into_iter()
642 .map(|(id, name, t_id)| (id, (name, t_id)))
643 .unzip();
644
645 let internal_tables: Vec<TableId> = Table::find()
647 .select_only()
648 .column(table::Column::TableId)
649 .filter(
650 table::Column::BelongsToJobId.is_in(
651 table_ids
652 .iter()
653 .map(|table_id| table_id.as_job_id())
654 .chain(std::iter::once(object_id.as_job_id())),
655 ),
656 )
657 .into_tuple()
658 .all(&txn)
659 .await?;
660 table_ids.extend(internal_tables);
661
662 if !index_ids.is_empty() || !table_ids.is_empty() {
663 for index_name in index_names {
664 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
665 .await?;
666 }
667
668 Object::update_many()
669 .col_expr(object::Column::SchemaId, new_schema.into())
670 .filter(
671 object::Column::Oid.is_in::<ObjectId, _>(
672 index_ids
673 .iter()
674 .copied()
675 .map_into()
676 .chain(table_ids.iter().copied().map_into()),
677 ),
678 )
679 .exec(&txn)
680 .await?;
681 }
682
683 if !table_ids.is_empty() {
684 let table_objs = Table::find()
685 .find_also_related(Object)
686 .filter(table::Column::TableId.is_in(table_ids))
687 .all(&txn)
688 .await?;
689 for (table, table_obj) in table_objs {
690 objects.push(PbObjectInfo::Table(
691 ObjectModel(table, table_obj.unwrap()).into(),
692 ));
693 }
694 }
695 if !index_ids.is_empty() {
696 let index_objs = Index::find()
697 .find_also_related(Object)
698 .filter(index::Column::IndexId.is_in(index_ids))
699 .all(&txn)
700 .await?;
701 for (index, index_obj) in index_objs {
702 objects.push(PbObjectInfo::Index(
703 ObjectModel(index, index_obj.unwrap()).into(),
704 ));
705 }
706 }
707 }
708 ObjectType::Source => {
709 let source = Source::find_by_id(object_id.as_source_id())
710 .one(&txn)
711 .await?
712 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
713 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
714 let is_shared = source.is_shared();
715
716 let mut obj = obj.into_active_model();
717 obj.schema_id = Set(Some(new_schema));
718 let obj = obj.update(&txn).await?;
719 objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
720
721 if is_shared {
724 update_internal_tables(
725 &txn,
726 object_id,
727 object::Column::SchemaId,
728 new_schema.into(),
729 &mut objects,
730 )
731 .await?;
732 }
733 }
734 ObjectType::Sink => {
735 let sink = Sink::find_by_id(object_id.as_sink_id())
736 .one(&txn)
737 .await?
738 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
739 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
740
741 let mut obj = obj.into_active_model();
742 obj.schema_id = Set(Some(new_schema));
743 let obj = obj.update(&txn).await?;
744 objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
745
746 update_internal_tables(
747 &txn,
748 object_id,
749 object::Column::SchemaId,
750 new_schema.into(),
751 &mut objects,
752 )
753 .await?;
754 }
755 ObjectType::Subscription => {
756 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
757 .one(&txn)
758 .await?
759 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
760 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
761 .await?;
762
763 let mut obj = obj.into_active_model();
764 obj.schema_id = Set(Some(new_schema));
765 let obj = obj.update(&txn).await?;
766 objects.push(PbObjectInfo::Subscription(
767 ObjectModel(subscription, obj).into(),
768 ));
769 }
770 ObjectType::View => {
771 let view = View::find_by_id(object_id.as_view_id())
772 .one(&txn)
773 .await?
774 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
775 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
776
777 let mut obj = obj.into_active_model();
778 obj.schema_id = Set(Some(new_schema));
779 let obj = obj.update(&txn).await?;
780 objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
781 }
782 ObjectType::Function => {
783 let function = Function::find_by_id(object_id.as_function_id())
784 .one(&txn)
785 .await?
786 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
787
788 let mut pb_function: PbFunction = ObjectModel(function, obj).into();
789 pb_function.schema_id = new_schema;
790 check_function_signature_duplicate(&pb_function, &txn).await?;
791
792 object::ActiveModel {
793 oid: Set(object_id),
794 schema_id: Set(Some(new_schema)),
795 ..Default::default()
796 }
797 .update(&txn)
798 .await?;
799
800 txn.commit().await?;
801 let version = self
802 .notify_frontend(
803 NotificationOperation::Update,
804 NotificationInfo::Function(pb_function),
805 )
806 .await;
807 return Ok(version);
808 }
809 ObjectType::Connection => {
810 let connection = Connection::find_by_id(object_id.as_connection_id())
811 .one(&txn)
812 .await?
813 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
814
815 let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
816 pb_connection.schema_id = new_schema;
817 check_connection_name_duplicate(&pb_connection, &txn).await?;
818
819 object::ActiveModel {
820 oid: Set(object_id),
821 schema_id: Set(Some(new_schema)),
822 ..Default::default()
823 }
824 .update(&txn)
825 .await?;
826
827 txn.commit().await?;
828 let version = self
829 .notify_frontend(
830 NotificationOperation::Update,
831 NotificationInfo::Connection(pb_connection),
832 )
833 .await;
834 return Ok(version);
835 }
836 _ => unreachable!("not supported object type: {:?}", object_type),
837 }
838
839 txn.commit().await?;
840 let version = self
841 .notify_frontend(
842 Operation::Update,
843 Info::ObjectGroup(PbObjectGroup {
844 objects: objects
845 .into_iter()
846 .map(|relation_info| PbObject {
847 object_info: Some(relation_info),
848 })
849 .collect_vec(),
850 }),
851 )
852 .await;
853 Ok(version)
854 }
855
856 pub async fn alter_secret(
857 &self,
858 pb_secret: PbSecret,
859 secret_plain_payload: Vec<u8>,
860 ) -> MetaResult<NotificationVersion> {
861 let inner = self.inner.write().await;
862 let owner_id = pb_secret.owner as _;
863 let txn = inner.db.begin().await?;
864 ensure_user_id(owner_id, &txn).await?;
865 ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
866 ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
867
868 ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
869 let secret: secret::ActiveModel = pb_secret.clone().into();
870 Secret::update(secret).exec(&txn).await?;
871
872 txn.commit().await?;
873
874 let mut secret_plain = pb_secret;
876 secret_plain.value.clone_from(&secret_plain_payload);
877
878 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
879 self.env
880 .notification_manager()
881 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
882
883 let version = self
884 .notify_frontend(
885 NotificationOperation::Update,
886 NotificationInfo::Secret(secret_plain),
887 )
888 .await;
889
890 Ok(version)
891 }
892
893 pub async fn drop_table_associated_source(
895 txn: &DatabaseTransaction,
896 drop_table_connector_ctx: &DropTableConnectorContext,
897 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
898 let to_drop_source_objects: Vec<PartialObject> = Object::find()
899 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
900 .into_partial_model()
901 .all(txn)
902 .await?;
903 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
904 .select_only()
905 .filter(
906 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
907 )
908 .into_partial_model()
909 .all(txn)
910 .await?;
911 let to_drop_objects = to_drop_source_objects
912 .into_iter()
913 .chain(to_drop_internal_table_objs.into_iter())
914 .collect_vec();
915 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
917 .select_only()
918 .distinct()
919 .column(user_privilege::Column::UserId)
920 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
921 .into_tuple()
922 .all(txn)
923 .await?;
924
925 tracing::debug!(
926 "drop_table_associated_source: to_drop_objects: {:?}",
927 to_drop_objects
928 );
929
930 let res = Object::delete_many()
932 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
933 .exec(txn)
934 .await?;
935 if res.rows_affected == 0 {
936 return Err(MetaError::catalog_id_not_found(
937 ObjectType::Source.as_str(),
938 drop_table_connector_ctx.to_remove_source_id,
939 ));
940 }
941 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
942
943 Ok((user_infos, to_drop_objects))
944 }
945
946 pub async fn alter_database_param(
947 &self,
948 database_id: DatabaseId,
949 param: AlterDatabaseParam,
950 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
951 let inner = self.inner.write().await;
952 let txn = inner.db.begin().await?;
953
954 let mut database = database::ActiveModel {
955 database_id: Set(database_id),
956 ..Default::default()
957 };
958 match param {
959 AlterDatabaseParam::BarrierIntervalMs(interval) => {
960 if let Some(ref interval) = interval {
961 OverrideValidate::barrier_interval_ms(interval)
962 .map_err(|e| anyhow::anyhow!(e))?;
963 }
964 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
965 }
966 AlterDatabaseParam::CheckpointFrequency(frequency) => {
967 if let Some(ref frequency) = frequency {
968 OverrideValidate::checkpoint_frequency(frequency)
969 .map_err(|e| anyhow::anyhow!(e))?;
970 }
971 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
972 }
973 }
974 let database = database.update(&txn).await?;
975
976 let obj = Object::find_by_id(database_id)
977 .one(&txn)
978 .await?
979 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
980
981 txn.commit().await?;
982
983 let version = self
984 .notify_frontend(
985 NotificationOperation::Update,
986 NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
987 )
988 .await;
989 Ok((version, database))
990 }
991
992 pub async fn alter_streaming_job_config(
993 &self,
994 job_id: JobId,
995 entries_to_add: HashMap<String, String>,
996 keys_to_remove: Vec<String>,
997 ) -> MetaResult<NotificationVersion> {
998 let inner = self.inner.write().await;
999 let txn = inner.db.begin().await?;
1000
1001 let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1002 .select_only()
1003 .column(streaming_job::Column::ConfigOverride)
1004 .into_tuple()
1005 .one(&txn)
1006 .await?
1007 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1008 let config_override = config_override.unwrap_or_default();
1009
1010 let mut table: toml::Table =
1011 toml::from_str(&config_override).context("invalid streaming job config")?;
1012
1013 for (key, value) in entries_to_add {
1015 let value: toml::Value = value
1016 .parse()
1017 .with_context(|| format!("invalid config value for path {key}"))?;
1018 table
1019 .upsert(&key, value)
1020 .with_context(|| format!("failed to set config path {key}"))?;
1021 }
1022 for key in keys_to_remove {
1023 table
1024 .delete(&key)
1025 .with_context(|| format!("failed to reset config path {key}"))?;
1026 }
1027
1028 let updated_config_override = table.to_string();
1029
1030 {
1032 let merged = merge_streaming_config_section(
1033 &StreamingConfig::default(),
1034 &updated_config_override,
1035 )
1036 .context("invalid streaming job config override")?;
1037
1038 if let Some(merged) = merged {
1042 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1043 if !unrecognized_keys.is_empty() {
1044 bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1045 }
1046 }
1047 }
1048
1049 streaming_job::ActiveModel {
1050 job_id: Set(job_id),
1051 config_override: Set(Some(updated_config_override)),
1052 ..Default::default()
1053 }
1054 .update(&txn)
1055 .await?;
1056
1057 txn.commit().await?;
1058
1059 Ok(IGNORED_NOTIFICATION_VERSION)
1060 }
1061
1062 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1063 let inner = self.inner.read().await;
1064 let active = refresh_job::ActiveModel {
1065 table_id: Set(table_id),
1066 last_trigger_time: Set(None),
1067 trigger_interval_secs: Set(None),
1068 current_status: Set(RefreshState::Idle),
1069 last_success_time: Set(None),
1070 };
1071 match RefreshJob::insert(active)
1072 .on_conflict_do_nothing()
1073 .exec(&inner.db)
1074 .await
1075 {
1076 Ok(_) => Ok(()),
1077 Err(sea_orm::DbErr::RecordNotInserted) => {
1078 tracing::debug!("refresh job already exists for table_id={}", table_id);
1080 Ok(())
1081 }
1082 Err(e) => Err(e.into()),
1083 }
1084 }
1085
1086 pub async fn update_refresh_job_status(
1087 &self,
1088 table_id: TableId,
1089 status: RefreshState,
1090 trigger_time: Option<DateTime>,
1091 is_success: bool,
1092 ) -> MetaResult<()> {
1093 self.ensure_refresh_job(table_id).await?;
1094 let inner = self.inner.read().await;
1095
1096 assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1098 let active = refresh_job::ActiveModel {
1099 table_id: Set(table_id),
1100 current_status: Set(status),
1101 last_trigger_time: if trigger_time.is_some() {
1102 Set(trigger_time.map(|t| t.and_utc().timestamp_millis()))
1103 } else {
1104 NotSet
1105 },
1106 last_success_time: if is_success {
1107 Set(Some(chrono::Utc::now().timestamp_millis()))
1108 } else {
1109 NotSet
1110 },
1111 ..Default::default()
1112 };
1113 active.update(&inner.db).await?;
1114 Ok(())
1115 }
1116
1117 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1118 let inner = self.inner.read().await;
1119 RefreshJob::update_many()
1120 .col_expr(
1121 refresh_job::Column::CurrentStatus,
1122 Expr::value(RefreshState::Idle),
1123 )
1124 .exec(&inner.db)
1125 .await?;
1126 Ok(())
1127 }
1128
1129 pub async fn update_refresh_job_interval(
1130 &self,
1131 table_id: TableId,
1132 trigger_interval_secs: Option<i64>,
1133 ) -> MetaResult<()> {
1134 self.ensure_refresh_job(table_id).await?;
1135 let inner = self.inner.read().await;
1136 let active = refresh_job::ActiveModel {
1137 table_id: Set(table_id),
1138 trigger_interval_secs: Set(trigger_interval_secs),
1139 ..Default::default()
1140 };
1141 active.update(&inner.db).await?;
1142 Ok(())
1143 }
1144}