1use anyhow::Context;
16use risingwave_common::cast::datetime_to_timestamp_millis;
17use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
18use risingwave_common::config::mutate::TomlTableMutateExt as _;
19use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
20use risingwave_common::id::JobId;
21use risingwave_common::system_param::{OverrideValidate, Validate};
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use sea_orm::ActiveValue::{NotSet, Set};
24use sea_orm::prelude::DateTime;
25use sea_orm::sea_query::Expr;
26use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseTransaction, SqlErr};
27use thiserror_ext::AsReport;
28
29use super::*;
30use crate::controller::utils::load_streaming_jobs_by_ids;
31use crate::error::bail_invalid_parameter;
32
33impl CatalogController {
34 async fn alter_database_name(
35 &self,
36 database_id: DatabaseId,
37 name: &str,
38 ) -> MetaResult<NotificationVersion> {
39 let inner = self.inner.write().await;
40 let txn = inner.db.begin().await?;
41 check_database_name_duplicate(name, &txn).await?;
42
43 let active_model = database::ActiveModel {
44 database_id: Set(database_id),
45 name: Set(name.to_owned()),
46 ..Default::default()
47 };
48 let database = active_model.update(&txn).await?;
49
50 let obj = Object::find_by_id(database_id)
51 .one(&txn)
52 .await?
53 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
54
55 txn.commit().await?;
56
57 let version = self
58 .notify_frontend(
59 NotificationOperation::Update,
60 NotificationInfo::Database(ObjectModel(database, obj, None).into()),
61 )
62 .await;
63 Ok(version)
64 }
65
66 async fn alter_schema_name(
67 &self,
68 schema_id: SchemaId,
69 name: &str,
70 ) -> MetaResult<NotificationVersion> {
71 let inner = self.inner.write().await;
72 let txn = inner.db.begin().await?;
73
74 let obj = Object::find_by_id(schema_id)
75 .one(&txn)
76 .await?
77 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
78 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
79
80 let active_model = schema::ActiveModel {
81 schema_id: Set(schema_id),
82 name: Set(name.to_owned()),
83 };
84 let schema = active_model.update(&txn).await?;
85
86 txn.commit().await?;
87
88 let version = self
89 .notify_frontend(
90 NotificationOperation::Update,
91 NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
92 )
93 .await;
94 Ok(version)
95 }
96
97 pub async fn alter_name(
98 &self,
99 object_type: ObjectType,
100 object_id: impl Into<ObjectId>,
101 object_name: &str,
102 ) -> MetaResult<NotificationVersion> {
103 let object_id = object_id.into();
104 if object_type == ObjectType::Database {
105 return self
106 .alter_database_name(object_id.as_database_id(), object_name)
107 .await;
108 } else if object_type == ObjectType::Schema {
109 return self
110 .alter_schema_name(object_id.as_schema_id(), object_name)
111 .await;
112 }
113
114 let inner = self.inner.write().await;
115 let txn = inner.db.begin().await?;
116 let obj: PartialObject = Object::find_by_id(object_id)
117 .into_partial_model()
118 .one(&txn)
119 .await?
120 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
121 assert_eq!(obj.obj_type, object_type);
122 check_relation_name_duplicate(
123 object_name,
124 obj.database_id.unwrap(),
125 obj.schema_id.unwrap(),
126 &txn,
127 )
128 .await?;
129
130 let (mut to_update_relations, old_name) =
132 rename_relation(&txn, object_type, object_id, object_name).await?;
133 to_update_relations.extend(
135 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
136 );
137
138 txn.commit().await?;
139
140 let version = self
141 .notify_frontend(
142 NotificationOperation::Update,
143 NotificationInfo::ObjectGroup(PbObjectGroup {
144 objects: to_update_relations,
145 dependencies: vec![],
146 }),
147 )
148 .await;
149
150 Ok(version)
151 }
152
153 pub async fn alter_swap_rename(
154 &self,
155 object_type: ObjectType,
156 object_id: ObjectId,
157 dst_object_id: ObjectId,
158 ) -> MetaResult<NotificationVersion> {
159 let inner = self.inner.write().await;
160 let txn = inner.db.begin().await?;
161 let dst_name: String = match object_type {
162 ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
163 .select_only()
164 .column(table::Column::Name)
165 .into_tuple()
166 .one(&txn)
167 .await?
168 .ok_or_else(|| {
169 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
170 })?,
171 ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
172 .select_only()
173 .column(source::Column::Name)
174 .into_tuple()
175 .one(&txn)
176 .await?
177 .ok_or_else(|| {
178 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
179 })?,
180 ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
181 .select_only()
182 .column(sink::Column::Name)
183 .into_tuple()
184 .one(&txn)
185 .await?
186 .ok_or_else(|| {
187 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
188 })?,
189 ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
190 .select_only()
191 .column(view::Column::Name)
192 .into_tuple()
193 .one(&txn)
194 .await?
195 .ok_or_else(|| {
196 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
197 })?,
198 ObjectType::Subscription => {
199 Subscription::find_by_id(dst_object_id.as_subscription_id())
200 .select_only()
201 .column(subscription::Column::Name)
202 .into_tuple()
203 .one(&txn)
204 .await?
205 .ok_or_else(|| {
206 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
207 })?
208 }
209 _ => {
210 return Err(MetaError::permission_denied(format!(
211 "swap rename not supported for object type: {:?}",
212 object_type
213 )));
214 }
215 };
216
217 let (mut to_update_relations, src_name) =
219 rename_relation(&txn, object_type, object_id, &dst_name).await?;
220 let (to_update_relations2, _) =
221 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
222 to_update_relations.extend(to_update_relations2);
223 to_update_relations.extend(
225 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
226 );
227 to_update_relations.extend(
228 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
229 );
230
231 txn.commit().await?;
232
233 let version = self
234 .notify_frontend(
235 NotificationOperation::Update,
236 NotificationInfo::ObjectGroup(PbObjectGroup {
237 objects: to_update_relations,
238 dependencies: vec![],
239 }),
240 )
241 .await;
242
243 Ok(version)
244 }
245
246 pub async fn alter_non_shared_source(
247 &self,
248 pb_source: PbSource,
249 ) -> MetaResult<NotificationVersion> {
250 let source_id: SourceId = pb_source.id;
251 let inner = self.inner.write().await;
252 let txn = inner.db.begin().await?;
253
254 let original_version: i64 = Source::find_by_id(source_id)
255 .select_only()
256 .column(source::Column::Version)
257 .into_tuple()
258 .one(&txn)
259 .await?
260 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
261 if original_version + 1 != pb_source.version as i64 {
262 return Err(MetaError::permission_denied(
263 "source version is stale".to_owned(),
264 ));
265 }
266
267 let source: source::ActiveModel = pb_source.clone().into();
268 Source::update(source).exec(&txn).await?;
269 txn.commit().await?;
270
271 let version = self
272 .notify_frontend_relation_info(
273 NotificationOperation::Update,
274 PbObjectInfo::Source(pb_source),
275 )
276 .await;
277 Ok(version)
278 }
279
280 pub async fn alter_owner(
281 &self,
282 object_type: ObjectType,
283 object_id: ObjectId,
284 new_owner: UserId,
285 ) -> MetaResult<NotificationVersion> {
286 let inner = self.inner.write().await;
287 let txn = inner.db.begin().await?;
288 ensure_user_id(new_owner, &txn).await?;
289
290 let obj = Object::find_by_id(object_id)
291 .one(&txn)
292 .await?
293 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
294 if obj.owner_id == new_owner {
295 return Ok(IGNORED_NOTIFICATION_VERSION);
296 }
297 let mut obj = obj.into_active_model();
298 obj.owner_id = Set(new_owner);
299 let obj = obj.update(&txn).await?;
300
301 let mut objects = vec![];
302 match object_type {
303 ObjectType::Database => {
304 let db = Database::find_by_id(object_id.as_database_id())
305 .one(&txn)
306 .await?
307 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
308
309 txn.commit().await?;
310
311 let version = self
312 .notify_frontend(
313 NotificationOperation::Update,
314 NotificationInfo::Database(ObjectModel(db, obj, None).into()),
315 )
316 .await;
317 return Ok(version);
318 }
319 ObjectType::Schema => {
320 let schema = Schema::find_by_id(object_id.as_schema_id())
321 .one(&txn)
322 .await?
323 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
324
325 txn.commit().await?;
326
327 let version = self
328 .notify_frontend(
329 NotificationOperation::Update,
330 NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
331 )
332 .await;
333 return Ok(version);
334 }
335 ObjectType::Table => {
336 let table = Table::find_by_id(object_id.as_table_id())
337 .one(&txn)
338 .await?
339 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
340 let streaming_job = streaming_job::Entity::find_by_id(object_id.as_job_id())
341 .one(&txn)
342 .await?;
343
344 if let Some(associated_source_id) = table.optional_associated_source_id {
346 let src_obj = object::ActiveModel {
347 oid: Set(associated_source_id.as_object_id()),
348 owner_id: Set(new_owner),
349 ..Default::default()
350 }
351 .update(&txn)
352 .await?;
353 let source = Source::find_by_id(associated_source_id)
354 .one(&txn)
355 .await?
356 .ok_or_else(|| {
357 MetaError::catalog_id_not_found("source", associated_source_id)
358 })?;
359 objects.push(PbObjectInfo::Source(
360 ObjectModel(source, src_obj, None).into(),
361 ));
362 }
363
364 if matches!(table.engine, Some(table::Engine::Iceberg)) {
366 let iceberg_sink = Sink::find()
367 .inner_join(Object)
368 .select_only()
369 .column(sink::Column::SinkId)
370 .filter(
371 object::Column::DatabaseId
372 .eq(obj.database_id)
373 .and(object::Column::SchemaId.eq(obj.schema_id))
374 .and(
375 sink::Column::Name
376 .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
377 ),
378 )
379 .into_tuple::<SinkId>()
380 .one(&txn)
381 .await?
382 .expect("iceberg sink must exist");
383 let sink_obj = object::ActiveModel {
384 oid: Set(iceberg_sink.as_object_id()),
385 owner_id: Set(new_owner),
386 ..Default::default()
387 }
388 .update(&txn)
389 .await?;
390 let sink = Sink::find_by_id(iceberg_sink)
391 .one(&txn)
392 .await?
393 .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
394 objects.push(PbObjectInfo::Sink(
395 ObjectModel(sink, sink_obj, streaming_job.clone()).into(),
396 ));
397
398 let iceberg_source = Source::find()
399 .inner_join(Object)
400 .select_only()
401 .column(source::Column::SourceId)
402 .filter(
403 object::Column::DatabaseId
404 .eq(obj.database_id)
405 .and(object::Column::SchemaId.eq(obj.schema_id))
406 .and(
407 source::Column::Name
408 .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
409 ),
410 )
411 .into_tuple::<SourceId>()
412 .one(&txn)
413 .await?
414 .expect("iceberg source must exist");
415 let source_obj = object::ActiveModel {
416 oid: Set(iceberg_source.as_object_id()),
417 owner_id: Set(new_owner),
418 ..Default::default()
419 }
420 .update(&txn)
421 .await?;
422 let source = Source::find_by_id(iceberg_source)
423 .one(&txn)
424 .await?
425 .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
426 objects.push(PbObjectInfo::Source(
427 ObjectModel(source, source_obj, None).into(),
428 ));
429 }
430
431 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
433 .select_only()
434 .columns([index::Column::IndexId, index::Column::IndexTableId])
435 .filter(index::Column::PrimaryTableId.eq(object_id))
436 .into_tuple::<(IndexId, TableId)>()
437 .all(&txn)
438 .await?
439 .into_iter()
440 .unzip();
441 objects.push(PbObjectInfo::Table(
442 ObjectModel(table, obj, streaming_job).into(),
443 ));
444
445 let internal_tables: Vec<TableId> = Table::find()
447 .select_only()
448 .column(table::Column::TableId)
449 .filter(
450 table::Column::BelongsToJobId.is_in(
451 table_ids
452 .iter()
453 .cloned()
454 .chain(std::iter::once(object_id.as_table_id())),
455 ),
456 )
457 .into_tuple()
458 .all(&txn)
459 .await?;
460 table_ids.extend(internal_tables);
461
462 if !index_ids.is_empty() || !table_ids.is_empty() {
463 Object::update_many()
464 .col_expr(object::Column::OwnerId, SimpleExpr::Value(new_owner.into()))
465 .filter(
466 object::Column::Oid.is_in::<ObjectId, _>(
467 index_ids
468 .iter()
469 .copied()
470 .map_into()
471 .chain(table_ids.iter().copied().map_into()),
472 ),
473 )
474 .exec(&txn)
475 .await?;
476 }
477
478 if !table_ids.is_empty() {
479 let table_objs = Table::find()
480 .find_also_related(Object)
481 .filter(table::Column::TableId.is_in(table_ids))
482 .all(&txn)
483 .await?;
484 let streaming_jobs = load_streaming_jobs_by_ids(
485 &txn,
486 table_objs.iter().map(|(table, _)| table.job_id()),
487 )
488 .await?;
489 for (table, table_obj) in table_objs {
490 let job_id = table.job_id();
491 let streaming_job = streaming_jobs.get(&job_id).cloned();
492 objects.push(PbObjectInfo::Table(
493 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
494 ));
495 }
496 }
497 if !index_ids.is_empty() {
499 let index_objs = Index::find()
500 .find_also_related(Object)
501 .filter(index::Column::IndexId.is_in(index_ids))
502 .all(&txn)
503 .await?;
504 let streaming_jobs = load_streaming_jobs_by_ids(
505 &txn,
506 index_objs
507 .iter()
508 .map(|(index, _)| index.index_id.as_job_id()),
509 )
510 .await?;
511 for (index, index_obj) in index_objs {
512 let streaming_job =
513 streaming_jobs.get(&index.index_id.as_job_id()).cloned();
514 objects.push(PbObjectInfo::Index(
515 ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
516 ));
517 }
518 }
519 }
520 ObjectType::Source => {
521 let source = Source::find_by_id(object_id.as_source_id())
522 .one(&txn)
523 .await?
524 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
525 let is_shared = source.is_shared();
526 objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
527
528 if is_shared {
531 update_internal_tables(
532 &txn,
533 object_id,
534 object::Column::OwnerId,
535 new_owner,
536 &mut objects,
537 )
538 .await?;
539 }
540 }
541 ObjectType::Sink => {
542 let (sink, sink_obj) = Sink::find_by_id(object_id.as_sink_id())
543 .find_also_related(Object)
544 .one(&txn)
545 .await?
546 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
547 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
548 .one(&txn)
549 .await?;
550 objects.push(PbObjectInfo::Sink(
551 ObjectModel(sink, sink_obj.unwrap(), streaming_job).into(),
552 ));
553
554 update_internal_tables(
555 &txn,
556 object_id,
557 object::Column::OwnerId,
558 new_owner,
559 &mut objects,
560 )
561 .await?;
562 }
563 ObjectType::Subscription => {
564 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
565 .one(&txn)
566 .await?
567 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
568 objects.push(PbObjectInfo::Subscription(
569 ObjectModel(subscription, obj, None).into(),
570 ));
571 }
572 ObjectType::View => {
573 let view = View::find_by_id(object_id.as_view_id())
574 .one(&txn)
575 .await?
576 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
577 objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
578 }
579 ObjectType::Connection => {
580 let connection = Connection::find_by_id(object_id.as_connection_id())
581 .one(&txn)
582 .await?
583 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
584 objects.push(PbObjectInfo::Connection(
585 ObjectModel(connection, obj, None).into(),
586 ));
587 }
588 ObjectType::Function => {
589 let function = Function::find_by_id(object_id.as_function_id())
590 .one(&txn)
591 .await?
592 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
593 objects.push(PbObjectInfo::Function(
594 ObjectModel(function, obj, None).into(),
595 ));
596 }
597 ObjectType::Secret => {
598 let secret = Secret::find_by_id(object_id.as_secret_id())
599 .one(&txn)
600 .await?
601 .ok_or_else(|| MetaError::catalog_id_not_found("secret", object_id))?;
602 objects.push(PbObjectInfo::Secret(ObjectModel(secret, obj, None).into()));
603 }
604 _ => unreachable!("not supported object type: {:?}", object_type),
605 };
606
607 txn.commit().await?;
608
609 let version = self
610 .notify_frontend(
611 NotificationOperation::Update,
612 NotificationInfo::ObjectGroup(PbObjectGroup {
613 objects: objects
614 .into_iter()
615 .map(|object| PbObject {
616 object_info: Some(object),
617 })
618 .collect(),
619 dependencies: vec![],
620 }),
621 )
622 .await;
623 Ok(version)
624 }
625
626 pub async fn alter_schema(
627 &self,
628 object_type: ObjectType,
629 object_id: ObjectId,
630 new_schema: SchemaId,
631 ) -> MetaResult<NotificationVersion> {
632 let inner = self.inner.write().await;
633 let txn = inner.db.begin().await?;
634 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
635
636 let obj = Object::find_by_id(object_id)
637 .one(&txn)
638 .await?
639 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
640 if obj.schema_id == Some(new_schema) {
641 return Ok(IGNORED_NOTIFICATION_VERSION);
642 }
643 let streaming_job = StreamingJob::find_by_id(object_id.as_job_id())
644 .one(&txn)
645 .await?;
646 let database_id = obj.database_id.unwrap();
647
648 let mut objects = vec![];
649 match object_type {
650 ObjectType::Table => {
651 let table = Table::find_by_id(object_id.as_table_id())
652 .one(&txn)
653 .await?
654 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
655 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
656 let associated_src_id = table.optional_associated_source_id;
657
658 let mut obj = obj.into_active_model();
659 obj.schema_id = Set(Some(new_schema));
660 let obj = obj.update(&txn).await?;
661
662 objects.push(PbObjectInfo::Table(
663 ObjectModel(table, obj, streaming_job).into(),
664 ));
665
666 if let Some(associated_source_id) = associated_src_id {
668 let src_obj = object::ActiveModel {
669 oid: Set(associated_source_id.as_object_id()),
670 schema_id: Set(Some(new_schema)),
671 ..Default::default()
672 }
673 .update(&txn)
674 .await?;
675 let source = Source::find_by_id(associated_source_id)
676 .one(&txn)
677 .await?
678 .ok_or_else(|| {
679 MetaError::catalog_id_not_found("source", associated_source_id)
680 })?;
681 objects.push(PbObjectInfo::Source(
682 ObjectModel(source, src_obj, None).into(),
683 ));
684 }
685
686 let (index_ids, (index_names, mut table_ids)): (
688 Vec<IndexId>,
689 (Vec<String>, Vec<TableId>),
690 ) = Index::find()
691 .select_only()
692 .columns([
693 index::Column::IndexId,
694 index::Column::Name,
695 index::Column::IndexTableId,
696 ])
697 .filter(index::Column::PrimaryTableId.eq(object_id))
698 .into_tuple::<(IndexId, String, TableId)>()
699 .all(&txn)
700 .await?
701 .into_iter()
702 .map(|(id, name, t_id)| (id, (name, t_id)))
703 .unzip();
704
705 let internal_tables: Vec<TableId> = Table::find()
707 .select_only()
708 .column(table::Column::TableId)
709 .filter(
710 table::Column::BelongsToJobId.is_in(
711 table_ids
712 .iter()
713 .map(|table_id| table_id.as_job_id())
714 .chain(std::iter::once(object_id.as_job_id())),
715 ),
716 )
717 .into_tuple()
718 .all(&txn)
719 .await?;
720 table_ids.extend(internal_tables);
721
722 if !index_ids.is_empty() || !table_ids.is_empty() {
723 for index_name in index_names {
724 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
725 .await?;
726 }
727
728 Object::update_many()
729 .col_expr(object::Column::SchemaId, new_schema.into())
730 .filter(
731 object::Column::Oid.is_in::<ObjectId, _>(
732 index_ids
733 .iter()
734 .copied()
735 .map_into()
736 .chain(table_ids.iter().copied().map_into()),
737 ),
738 )
739 .exec(&txn)
740 .await?;
741 }
742
743 if !table_ids.is_empty() {
744 let table_objs = Table::find()
745 .find_also_related(Object)
746 .filter(table::Column::TableId.is_in(table_ids))
747 .all(&txn)
748 .await?;
749 let streaming_jobs = load_streaming_jobs_by_ids(
750 &txn,
751 table_objs.iter().map(|(table, _)| table.job_id()),
752 )
753 .await?;
754 for (table, table_obj) in table_objs {
755 let job_id = table.job_id();
756 let streaming_job = streaming_jobs.get(&job_id).cloned();
757 objects.push(PbObjectInfo::Table(
758 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
759 ));
760 }
761 }
762 if !index_ids.is_empty() {
763 let index_objs = Index::find()
764 .find_also_related(Object)
765 .filter(index::Column::IndexId.is_in(index_ids))
766 .all(&txn)
767 .await?;
768 let streaming_jobs = load_streaming_jobs_by_ids(
769 &txn,
770 index_objs
771 .iter()
772 .map(|(index, _)| index.index_id.as_job_id()),
773 )
774 .await?;
775 for (index, index_obj) in index_objs {
776 let streaming_job =
777 streaming_jobs.get(&index.index_id.as_job_id()).cloned();
778 objects.push(PbObjectInfo::Index(
779 ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
780 ));
781 }
782 }
783 }
784 ObjectType::Source => {
785 let source = Source::find_by_id(object_id.as_source_id())
786 .one(&txn)
787 .await?
788 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
789 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
790 let is_shared = source.is_shared();
791
792 let mut obj = obj.into_active_model();
793 obj.schema_id = Set(Some(new_schema));
794 let obj = obj.update(&txn).await?;
795 objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
796
797 if is_shared {
800 update_internal_tables(
801 &txn,
802 object_id,
803 object::Column::SchemaId,
804 new_schema,
805 &mut objects,
806 )
807 .await?;
808 }
809 }
810 ObjectType::Sink => {
811 let sink = Sink::find_by_id(object_id.as_sink_id())
812 .one(&txn)
813 .await?
814 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
815 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
816
817 let mut obj = obj.into_active_model();
818 obj.schema_id = Set(Some(new_schema));
819 let obj = obj.update(&txn).await?;
820 objects.push(PbObjectInfo::Sink(
821 ObjectModel(sink, obj, streaming_job).into(),
822 ));
823
824 update_internal_tables(
825 &txn,
826 object_id,
827 object::Column::SchemaId,
828 new_schema,
829 &mut objects,
830 )
831 .await?;
832 }
833 ObjectType::Subscription => {
834 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
835 .one(&txn)
836 .await?
837 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
838 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
839 .await?;
840
841 let mut obj = obj.into_active_model();
842 obj.schema_id = Set(Some(new_schema));
843 let obj = obj.update(&txn).await?;
844 objects.push(PbObjectInfo::Subscription(
845 ObjectModel(subscription, obj, None).into(),
846 ));
847 }
848 ObjectType::View => {
849 let view = View::find_by_id(object_id.as_view_id())
850 .one(&txn)
851 .await?
852 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
853 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
854
855 let mut obj = obj.into_active_model();
856 obj.schema_id = Set(Some(new_schema));
857 let obj = obj.update(&txn).await?;
858 objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
859 }
860 ObjectType::Function => {
861 let function = Function::find_by_id(object_id.as_function_id())
862 .one(&txn)
863 .await?
864 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
865
866 let mut pb_function: PbFunction = ObjectModel(function, obj, None).into();
867 pb_function.schema_id = new_schema;
868 check_function_signature_duplicate(&pb_function, &txn).await?;
869
870 Object::update(object::ActiveModel {
871 oid: Set(object_id),
872 schema_id: Set(Some(new_schema)),
873 ..Default::default()
874 })
875 .exec(&txn)
876 .await?;
877
878 txn.commit().await?;
879 let version = self
880 .notify_frontend(
881 NotificationOperation::Update,
882 NotificationInfo::Function(pb_function),
883 )
884 .await;
885 return Ok(version);
886 }
887 ObjectType::Connection => {
888 let connection = Connection::find_by_id(object_id.as_connection_id())
889 .one(&txn)
890 .await?
891 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
892
893 let mut pb_connection: PbConnection = ObjectModel(connection, obj, None).into();
894 pb_connection.schema_id = new_schema;
895 check_connection_name_duplicate(&pb_connection, &txn).await?;
896
897 Object::update(object::ActiveModel {
898 oid: Set(object_id),
899 schema_id: Set(Some(new_schema)),
900 ..Default::default()
901 })
902 .exec(&txn)
903 .await?;
904
905 txn.commit().await?;
906 let version = self
907 .notify_frontend(
908 NotificationOperation::Update,
909 NotificationInfo::Connection(pb_connection),
910 )
911 .await;
912 return Ok(version);
913 }
914 _ => unreachable!("not supported object type: {:?}", object_type),
915 }
916
917 txn.commit().await?;
918 let version = self
919 .notify_frontend(
920 Operation::Update,
921 Info::ObjectGroup(PbObjectGroup {
922 objects: objects
923 .into_iter()
924 .map(|relation_info| PbObject {
925 object_info: Some(relation_info),
926 })
927 .collect_vec(),
928 dependencies: vec![],
929 }),
930 )
931 .await;
932 Ok(version)
933 }
934
935 pub async fn alter_secret(
936 &self,
937 pb_secret: PbSecret,
938 secret_plain_payload: Vec<u8>,
939 ) -> MetaResult<NotificationVersion> {
940 let inner = self.inner.write().await;
941 let owner_id = pb_secret.owner as _;
942 let txn = inner.db.begin().await?;
943 ensure_user_id(owner_id, &txn).await?;
944 ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
945 ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
946
947 ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
948 let secret: secret::ActiveModel = pb_secret.clone().into();
949 Secret::update(secret).exec(&txn).await?;
950
951 txn.commit().await?;
952
953 let mut secret_plain = pb_secret;
955 secret_plain.value.clone_from(&secret_plain_payload);
956
957 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
958 self.env
959 .notification_manager()
960 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
961
962 let version = self
963 .notify_frontend(
964 NotificationOperation::Update,
965 NotificationInfo::Secret(secret_plain),
966 )
967 .await;
968
969 Ok(version)
970 }
971
972 pub async fn drop_table_associated_source(
974 txn: &DatabaseTransaction,
975 drop_table_connector_ctx: &DropTableConnectorContext,
976 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
977 let to_drop_source_objects: Vec<PartialObject> = Object::find()
978 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
979 .into_partial_model()
980 .all(txn)
981 .await?;
982 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
983 .select_only()
984 .filter(
985 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
986 )
987 .into_partial_model()
988 .all(txn)
989 .await?;
990 let to_drop_objects = to_drop_source_objects
991 .into_iter()
992 .chain(to_drop_internal_table_objs.into_iter())
993 .collect_vec();
994 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
996 .select_only()
997 .distinct()
998 .column(user_privilege::Column::UserId)
999 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1000 .into_tuple()
1001 .all(txn)
1002 .await?;
1003
1004 tracing::debug!(
1005 "drop_table_associated_source: to_drop_objects: {:?}",
1006 to_drop_objects
1007 );
1008
1009 let res = Object::delete_many()
1011 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1012 .exec(txn)
1013 .await?;
1014 if res.rows_affected == 0 {
1015 return Err(MetaError::catalog_id_not_found(
1016 ObjectType::Source.as_str(),
1017 drop_table_connector_ctx.to_remove_source_id,
1018 ));
1019 }
1020 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
1021
1022 Ok((user_infos, to_drop_objects))
1023 }
1024
1025 pub async fn alter_database_param(
1026 &self,
1027 database_id: DatabaseId,
1028 param: AlterDatabaseParam,
1029 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
1030 let inner = self.inner.write().await;
1031 let txn = inner.db.begin().await?;
1032
1033 let mut database = database::ActiveModel {
1034 database_id: Set(database_id),
1035 ..Default::default()
1036 };
1037 match param {
1038 AlterDatabaseParam::BarrierIntervalMs(interval) => {
1039 if let Some(ref interval) = interval {
1040 OverrideValidate::barrier_interval_ms(interval)
1041 .map_err(|e| anyhow::anyhow!(e))?;
1042 }
1043 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
1044 }
1045 AlterDatabaseParam::CheckpointFrequency(frequency) => {
1046 if let Some(ref frequency) = frequency {
1047 OverrideValidate::checkpoint_frequency(frequency)
1048 .map_err(|e| anyhow::anyhow!(e))?;
1049 }
1050 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
1051 }
1052 }
1053 let database = database.update(&txn).await?;
1054
1055 let obj = Object::find_by_id(database_id)
1056 .one(&txn)
1057 .await?
1058 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1059
1060 txn.commit().await?;
1061
1062 let version = self
1063 .notify_frontend(
1064 NotificationOperation::Update,
1065 NotificationInfo::Database(ObjectModel(database.clone(), obj, None).into()),
1066 )
1067 .await;
1068 Ok((version, database))
1069 }
1070
1071 pub async fn alter_subscription_retention(
1072 &self,
1073 subscription_id: SubscriptionId,
1074 retention_seconds: u64,
1075 definition: String,
1076 ) -> MetaResult<(NotificationVersion, PbSubscription)> {
1077 let inner = self.inner.write().await;
1078 let txn = inner.db.begin().await?;
1079
1080 let obj = Object::find_by_id(subscription_id)
1081 .one(&txn)
1082 .await?
1083 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1084
1085 let active_model = subscription::ActiveModel {
1086 subscription_id: Set(subscription_id),
1087 retention_seconds: Set(retention_seconds as i64),
1088 definition: Set(definition),
1089 ..Default::default()
1090 };
1091 let subscription = active_model.update(&txn).await?;
1092
1093 txn.commit().await?;
1094
1095 let pb_subscription: PbSubscription = ObjectModel(subscription, obj, None).into();
1096 let subscription_info = PbObjectInfo::Subscription(pb_subscription.clone());
1097
1098 let version = self
1099 .notify_frontend(
1100 NotificationOperation::Update,
1101 NotificationInfo::ObjectGroup(PbObjectGroup {
1102 objects: vec![PbObject {
1103 object_info: Some(subscription_info),
1104 }],
1105 dependencies: vec![],
1106 }),
1107 )
1108 .await;
1109
1110 Ok((version, pb_subscription))
1111 }
1112
1113 pub async fn alter_streaming_job_config(
1114 &self,
1115 job_id: JobId,
1116 entries_to_add: HashMap<String, String>,
1117 keys_to_remove: Vec<String>,
1118 ) -> MetaResult<NotificationVersion> {
1119 let inner = self.inner.write().await;
1120 let txn = inner.db.begin().await?;
1121
1122 let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1123 .select_only()
1124 .column(streaming_job::Column::ConfigOverride)
1125 .into_tuple()
1126 .one(&txn)
1127 .await?
1128 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1129 let config_override = config_override.unwrap_or_default();
1130
1131 let mut table: toml::Table =
1132 toml::from_str(&config_override).context("invalid streaming job config")?;
1133
1134 for (key, value) in entries_to_add {
1136 let value: toml::Value = value
1137 .parse()
1138 .with_context(|| format!("invalid config value for path {key}"))?;
1139 table
1140 .upsert(&key, value)
1141 .with_context(|| format!("failed to set config path {key}"))?;
1142 }
1143 for key in keys_to_remove {
1144 table
1145 .delete(&key)
1146 .with_context(|| format!("failed to reset config path {key}"))?;
1147 }
1148
1149 let updated_config_override = table.to_string();
1150
1151 {
1153 let merged = merge_streaming_config_section(
1154 &StreamingConfig::default(),
1155 &updated_config_override,
1156 )
1157 .context("invalid streaming job config override")?;
1158
1159 if let Some(merged) = merged {
1163 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1164 if !unrecognized_keys.is_empty() {
1165 bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1166 }
1167 }
1168 }
1169
1170 StreamingJob::update(streaming_job::ActiveModel {
1171 job_id: Set(job_id),
1172 config_override: Set(Some(updated_config_override)),
1173 ..Default::default()
1174 })
1175 .exec(&txn)
1176 .await?;
1177
1178 txn.commit().await?;
1179
1180 Ok(IGNORED_NOTIFICATION_VERSION)
1181 }
1182
1183 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1184 let inner = self.inner.read().await;
1185 let active = refresh_job::ActiveModel {
1186 table_id: Set(table_id),
1187 last_trigger_time: Set(None),
1188 trigger_interval_secs: Set(None),
1189 current_status: Set(RefreshState::Idle),
1190 last_success_time: Set(None),
1191 };
1192 match RefreshJob::insert(active)
1193 .on_conflict_do_nothing()
1194 .exec(&inner.db)
1195 .await
1196 {
1197 Ok(_) => Ok(()),
1198 Err(sea_orm::DbErr::RecordNotInserted) => {
1199 tracing::debug!("refresh job already exists for table_id={}", table_id);
1201 Ok(())
1202 }
1203 Err(e) => {
1204 if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1205 tracing::warn!(
1206 %table_id,
1207 error = %e.as_report(),
1208 "skip ensure_refresh_job for stale dropped table"
1209 );
1210 Ok(())
1211 } else {
1212 Err(e.into())
1213 }
1214 }
1215 }
1216 }
1217
1218 pub async fn update_refresh_job_status(
1219 &self,
1220 table_id: TableId,
1221 status: RefreshState,
1222 trigger_time: Option<DateTime>,
1223 is_success: bool,
1224 ) -> MetaResult<()> {
1225 self.ensure_refresh_job(table_id).await?;
1226 let inner = self.inner.read().await;
1227
1228 assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1230 let active = refresh_job::ActiveModel {
1231 table_id: Set(table_id),
1232 current_status: Set(status),
1233 last_trigger_time: if trigger_time.is_some() {
1234 Set(trigger_time.map(datetime_to_timestamp_millis))
1235 } else {
1236 NotSet
1237 },
1238 last_success_time: if is_success {
1239 Set(Some(chrono::Utc::now().timestamp_millis()))
1240 } else {
1241 NotSet
1242 },
1243 ..Default::default()
1244 };
1245 match RefreshJob::update(active).exec(&inner.db).await {
1246 Ok(_) => Ok(()),
1247 Err(e) => {
1248 if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1249 tracing::warn!(
1250 %table_id,
1251 error = %e.as_report(),
1252 "skip update_refresh_job_status for stale dropped table"
1253 );
1254 Ok(())
1255 } else {
1256 Err(e.into())
1257 }
1258 }
1259 }
1260 }
1261
1262 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1263 let inner = self.inner.read().await;
1264 RefreshJob::update_many()
1265 .col_expr(
1266 refresh_job::Column::CurrentStatus,
1267 Expr::value(RefreshState::Idle),
1268 )
1269 .exec(&inner.db)
1270 .await?;
1271 Ok(())
1272 }
1273
1274 pub async fn update_refresh_job_interval(
1275 &self,
1276 table_id: TableId,
1277 trigger_interval_secs: Option<i64>,
1278 ) -> MetaResult<()> {
1279 self.ensure_refresh_job(table_id).await?;
1280 let inner = self.inner.read().await;
1281 let active = refresh_job::ActiveModel {
1282 table_id: Set(table_id),
1283 trigger_interval_secs: Set(trigger_interval_secs),
1284 ..Default::default()
1285 };
1286 RefreshJob::update(active).exec(&inner.db).await?;
1287 Ok(())
1288 }
1289}
1290
1291async fn should_skip_refresh_job_db_err<C>(
1292 db: &C,
1293 table_id: TableId,
1294 err: &sea_orm::DbErr,
1295) -> MetaResult<bool>
1296where
1297 C: ConnectionTrait,
1298{
1299 if matches!(err, sea_orm::DbErr::RecordNotUpdated) {
1300 return Ok(true);
1301 }
1302
1303 if !matches!(
1304 err.sql_err(),
1305 Some(SqlErr::ForeignKeyConstraintViolation(_))
1306 ) {
1307 return Ok(false);
1308 }
1309
1310 let table_exists = Table::find_by_id(table_id).one(db).await?.is_some();
1311 Ok(!table_exists)
1312}