1use anyhow::Context;
16use risingwave_common::cast::datetime_to_timestamp_millis;
17use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
18use risingwave_common::config::mutate::TomlTableMutateExt as _;
19use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
20use risingwave_common::id::JobId;
21use risingwave_common::system_param::{OverrideValidate, Validate};
22use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
23use risingwave_meta_model::refresh_job::{self, RefreshState};
24use sea_orm::ActiveValue::{NotSet, Set};
25use sea_orm::prelude::DateTime;
26use sea_orm::sea_query::Expr;
27use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseTransaction, SqlErr};
28use thiserror_ext::AsReport;
29
30use super::*;
31use crate::controller::utils::load_streaming_jobs_by_ids;
32use crate::error::bail_invalid_parameter;
33
34impl CatalogController {
35 async fn alter_database_name(
36 &self,
37 database_id: DatabaseId,
38 name: &str,
39 ) -> MetaResult<NotificationVersion> {
40 let inner = self.inner.write().await;
41 let txn = inner.db.begin().await?;
42 check_database_name_duplicate(name, &txn).await?;
43
44 let active_model = database::ActiveModel {
45 database_id: Set(database_id),
46 name: Set(name.to_owned()),
47 ..Default::default()
48 };
49 let database = active_model.update(&txn).await?;
50
51 let obj = Object::find_by_id(database_id)
52 .one(&txn)
53 .await?
54 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
55
56 txn.commit().await?;
57
58 let version = self
59 .notify_frontend(
60 NotificationOperation::Update,
61 NotificationInfo::Database(ObjectModel(database, obj, None).into()),
62 )
63 .await;
64 Ok(version)
65 }
66
67 async fn alter_schema_name(
68 &self,
69 schema_id: SchemaId,
70 name: &str,
71 ) -> MetaResult<NotificationVersion> {
72 let inner = self.inner.write().await;
73 let txn = inner.db.begin().await?;
74
75 let obj = Object::find_by_id(schema_id)
76 .one(&txn)
77 .await?
78 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
79 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
80
81 let active_model = schema::ActiveModel {
82 schema_id: Set(schema_id),
83 name: Set(name.to_owned()),
84 };
85 let schema = active_model.update(&txn).await?;
86
87 txn.commit().await?;
88
89 let version = self
90 .notify_frontend(
91 NotificationOperation::Update,
92 NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
93 )
94 .await;
95 Ok(version)
96 }
97
98 pub async fn alter_name(
99 &self,
100 object_type: ObjectType,
101 object_id: impl Into<ObjectId>,
102 object_name: &str,
103 ) -> MetaResult<NotificationVersion> {
104 let object_id = object_id.into();
105 if object_type == ObjectType::Database {
106 return self
107 .alter_database_name(object_id.as_database_id(), object_name)
108 .await;
109 } else if object_type == ObjectType::Schema {
110 return self
111 .alter_schema_name(object_id.as_schema_id(), object_name)
112 .await;
113 }
114
115 let inner = self.inner.write().await;
116 let txn = inner.db.begin().await?;
117 let obj: PartialObject = Object::find_by_id(object_id)
118 .into_partial_model()
119 .one(&txn)
120 .await?
121 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
122 assert_eq!(obj.obj_type, object_type);
123 check_relation_name_duplicate(
124 object_name,
125 obj.database_id.unwrap(),
126 obj.schema_id.unwrap(),
127 &txn,
128 )
129 .await?;
130
131 let (mut to_update_relations, old_name) =
133 rename_relation(&txn, object_type, object_id, object_name).await?;
134 to_update_relations.extend(
136 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
137 );
138
139 txn.commit().await?;
140
141 let version = self
142 .notify_frontend(
143 NotificationOperation::Update,
144 NotificationInfo::ObjectGroup(PbObjectGroup {
145 objects: to_update_relations,
146 dependencies: vec![],
147 }),
148 )
149 .await;
150
151 Ok(version)
152 }
153
154 pub async fn alter_swap_rename(
155 &self,
156 object_type: ObjectType,
157 object_id: ObjectId,
158 dst_object_id: ObjectId,
159 ) -> MetaResult<NotificationVersion> {
160 let inner = self.inner.write().await;
161 let txn = inner.db.begin().await?;
162 let dst_name: String = match object_type {
163 ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
164 .select_only()
165 .column(table::Column::Name)
166 .into_tuple()
167 .one(&txn)
168 .await?
169 .ok_or_else(|| {
170 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
171 })?,
172 ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
173 .select_only()
174 .column(source::Column::Name)
175 .into_tuple()
176 .one(&txn)
177 .await?
178 .ok_or_else(|| {
179 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
180 })?,
181 ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
182 .select_only()
183 .column(sink::Column::Name)
184 .into_tuple()
185 .one(&txn)
186 .await?
187 .ok_or_else(|| {
188 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
189 })?,
190 ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
191 .select_only()
192 .column(view::Column::Name)
193 .into_tuple()
194 .one(&txn)
195 .await?
196 .ok_or_else(|| {
197 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
198 })?,
199 ObjectType::Subscription => {
200 Subscription::find_by_id(dst_object_id.as_subscription_id())
201 .select_only()
202 .column(subscription::Column::Name)
203 .into_tuple()
204 .one(&txn)
205 .await?
206 .ok_or_else(|| {
207 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
208 })?
209 }
210 _ => {
211 return Err(MetaError::permission_denied(format!(
212 "swap rename not supported for object type: {:?}",
213 object_type
214 )));
215 }
216 };
217
218 let (mut to_update_relations, src_name) =
220 rename_relation(&txn, object_type, object_id, &dst_name).await?;
221 let (to_update_relations2, _) =
222 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
223 to_update_relations.extend(to_update_relations2);
224 to_update_relations.extend(
226 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
227 );
228 to_update_relations.extend(
229 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
230 );
231
232 txn.commit().await?;
233
234 let version = self
235 .notify_frontend(
236 NotificationOperation::Update,
237 NotificationInfo::ObjectGroup(PbObjectGroup {
238 objects: to_update_relations,
239 dependencies: vec![],
240 }),
241 )
242 .await;
243
244 Ok(version)
245 }
246
247 pub async fn alter_non_shared_source(
248 &self,
249 pb_source: PbSource,
250 ) -> MetaResult<NotificationVersion> {
251 let source_id: SourceId = pb_source.id;
252 let inner = self.inner.write().await;
253 let txn = inner.db.begin().await?;
254
255 let original_version: i64 = Source::find_by_id(source_id)
256 .select_only()
257 .column(source::Column::Version)
258 .into_tuple()
259 .one(&txn)
260 .await?
261 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
262 if original_version + 1 != pb_source.version as i64 {
263 return Err(MetaError::permission_denied(
264 "source version is stale".to_owned(),
265 ));
266 }
267
268 let source: source::ActiveModel = pb_source.clone().into();
269 Source::update(source).exec(&txn).await?;
270 txn.commit().await?;
271
272 let version = self
273 .notify_frontend_relation_info(
274 NotificationOperation::Update,
275 PbObjectInfo::Source(pb_source),
276 )
277 .await;
278 Ok(version)
279 }
280
281 pub async fn alter_owner(
282 &self,
283 object_type: ObjectType,
284 object_id: ObjectId,
285 new_owner: UserId,
286 ) -> MetaResult<NotificationVersion> {
287 let inner = self.inner.write().await;
288 let txn = inner.db.begin().await?;
289 ensure_user_id(new_owner, &txn).await?;
290
291 let obj = Object::find_by_id(object_id)
292 .one(&txn)
293 .await?
294 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
295 if obj.owner_id == new_owner {
296 return Ok(IGNORED_NOTIFICATION_VERSION);
297 }
298 let mut obj = obj.into_active_model();
299 obj.owner_id = Set(new_owner);
300 let obj = obj.update(&txn).await?;
301
302 let mut objects = vec![];
303 match object_type {
304 ObjectType::Database => {
305 let db = Database::find_by_id(object_id.as_database_id())
306 .one(&txn)
307 .await?
308 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
309
310 txn.commit().await?;
311
312 let version = self
313 .notify_frontend(
314 NotificationOperation::Update,
315 NotificationInfo::Database(ObjectModel(db, obj, None).into()),
316 )
317 .await;
318 return Ok(version);
319 }
320 ObjectType::Schema => {
321 let schema = Schema::find_by_id(object_id.as_schema_id())
322 .one(&txn)
323 .await?
324 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
325
326 txn.commit().await?;
327
328 let version = self
329 .notify_frontend(
330 NotificationOperation::Update,
331 NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
332 )
333 .await;
334 return Ok(version);
335 }
336 ObjectType::Table => {
337 let table = Table::find_by_id(object_id.as_table_id())
338 .one(&txn)
339 .await?
340 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
341 let streaming_job = streaming_job::Entity::find_by_id(object_id.as_job_id())
342 .one(&txn)
343 .await?;
344
345 if let Some(associated_source_id) = table.optional_associated_source_id {
347 let src_obj = object::ActiveModel {
348 oid: Set(associated_source_id.as_object_id()),
349 owner_id: Set(new_owner),
350 ..Default::default()
351 }
352 .update(&txn)
353 .await?;
354 let source = Source::find_by_id(associated_source_id)
355 .one(&txn)
356 .await?
357 .ok_or_else(|| {
358 MetaError::catalog_id_not_found("source", associated_source_id)
359 })?;
360 objects.push(PbObjectInfo::Source(
361 ObjectModel(source, src_obj, None).into(),
362 ));
363 }
364
365 if matches!(table.engine, Some(table::Engine::Iceberg)) {
367 let iceberg_sink = Sink::find()
368 .inner_join(Object)
369 .select_only()
370 .column(sink::Column::SinkId)
371 .filter(
372 object::Column::DatabaseId
373 .eq(obj.database_id)
374 .and(object::Column::SchemaId.eq(obj.schema_id))
375 .and(
376 sink::Column::Name
377 .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
378 ),
379 )
380 .into_tuple::<SinkId>()
381 .one(&txn)
382 .await?
383 .expect("iceberg sink must exist");
384 let sink_obj = object::ActiveModel {
385 oid: Set(iceberg_sink.as_object_id()),
386 owner_id: Set(new_owner),
387 ..Default::default()
388 }
389 .update(&txn)
390 .await?;
391 let sink = Sink::find_by_id(iceberg_sink)
392 .one(&txn)
393 .await?
394 .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
395 objects.push(PbObjectInfo::Sink(
396 ObjectModel(sink, sink_obj, streaming_job.clone()).into(),
397 ));
398
399 let iceberg_source = Source::find()
400 .inner_join(Object)
401 .select_only()
402 .column(source::Column::SourceId)
403 .filter(
404 object::Column::DatabaseId
405 .eq(obj.database_id)
406 .and(object::Column::SchemaId.eq(obj.schema_id))
407 .and(
408 source::Column::Name
409 .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
410 ),
411 )
412 .into_tuple::<SourceId>()
413 .one(&txn)
414 .await?
415 .expect("iceberg source must exist");
416 let source_obj = object::ActiveModel {
417 oid: Set(iceberg_source.as_object_id()),
418 owner_id: Set(new_owner),
419 ..Default::default()
420 }
421 .update(&txn)
422 .await?;
423 let source = Source::find_by_id(iceberg_source)
424 .one(&txn)
425 .await?
426 .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
427 objects.push(PbObjectInfo::Source(
428 ObjectModel(source, source_obj, None).into(),
429 ));
430 }
431
432 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
434 .select_only()
435 .columns([index::Column::IndexId, index::Column::IndexTableId])
436 .filter(index::Column::PrimaryTableId.eq(object_id))
437 .into_tuple::<(IndexId, TableId)>()
438 .all(&txn)
439 .await?
440 .into_iter()
441 .unzip();
442 objects.push(PbObjectInfo::Table(
443 ObjectModel(table, obj, streaming_job).into(),
444 ));
445
446 let internal_tables: Vec<TableId> = Table::find()
448 .select_only()
449 .column(table::Column::TableId)
450 .filter(
451 table::Column::BelongsToJobId.is_in(
452 table_ids
453 .iter()
454 .cloned()
455 .chain(std::iter::once(object_id.as_table_id())),
456 ),
457 )
458 .into_tuple()
459 .all(&txn)
460 .await?;
461 table_ids.extend(internal_tables);
462
463 if !index_ids.is_empty() || !table_ids.is_empty() {
464 Object::update_many()
465 .col_expr(object::Column::OwnerId, SimpleExpr::Value(new_owner.into()))
466 .filter(
467 object::Column::Oid.is_in::<ObjectId, _>(
468 index_ids
469 .iter()
470 .copied()
471 .map_into()
472 .chain(table_ids.iter().copied().map_into()),
473 ),
474 )
475 .exec(&txn)
476 .await?;
477 }
478
479 if !table_ids.is_empty() {
480 let table_objs = Table::find()
481 .find_also_related(Object)
482 .filter(table::Column::TableId.is_in(table_ids))
483 .all(&txn)
484 .await?;
485 let streaming_jobs = load_streaming_jobs_by_ids(
486 &txn,
487 table_objs.iter().map(|(table, _)| table.job_id()),
488 )
489 .await?;
490 for (table, table_obj) in table_objs {
491 let job_id = table.job_id();
492 let streaming_job = streaming_jobs.get(&job_id).cloned();
493 objects.push(PbObjectInfo::Table(
494 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
495 ));
496 }
497 }
498 if !index_ids.is_empty() {
500 let index_objs = Index::find()
501 .find_also_related(Object)
502 .filter(index::Column::IndexId.is_in(index_ids))
503 .all(&txn)
504 .await?;
505 let streaming_jobs = load_streaming_jobs_by_ids(
506 &txn,
507 index_objs
508 .iter()
509 .map(|(index, _)| index.index_id.as_job_id()),
510 )
511 .await?;
512 for (index, index_obj) in index_objs {
513 let streaming_job =
514 streaming_jobs.get(&index.index_id.as_job_id()).cloned();
515 objects.push(PbObjectInfo::Index(
516 ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
517 ));
518 }
519 }
520 }
521 ObjectType::Source => {
522 let source = Source::find_by_id(object_id.as_source_id())
523 .one(&txn)
524 .await?
525 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
526 let is_shared = source.is_shared();
527 objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
528
529 if is_shared {
532 update_internal_tables(
533 &txn,
534 object_id,
535 object::Column::OwnerId,
536 new_owner,
537 &mut objects,
538 )
539 .await?;
540 }
541 }
542 ObjectType::Sink => {
543 let (sink, sink_obj) = Sink::find_by_id(object_id.as_sink_id())
544 .find_also_related(Object)
545 .one(&txn)
546 .await?
547 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
548 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
549 .one(&txn)
550 .await?;
551 objects.push(PbObjectInfo::Sink(
552 ObjectModel(sink, sink_obj.unwrap(), streaming_job).into(),
553 ));
554
555 update_internal_tables(
556 &txn,
557 object_id,
558 object::Column::OwnerId,
559 new_owner,
560 &mut objects,
561 )
562 .await?;
563 }
564 ObjectType::Subscription => {
565 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
566 .one(&txn)
567 .await?
568 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
569 objects.push(PbObjectInfo::Subscription(
570 ObjectModel(subscription, obj, None).into(),
571 ));
572 }
573 ObjectType::View => {
574 let view = View::find_by_id(object_id.as_view_id())
575 .one(&txn)
576 .await?
577 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
578 objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
579 }
580 ObjectType::Connection => {
581 let connection = Connection::find_by_id(object_id.as_connection_id())
582 .one(&txn)
583 .await?
584 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
585 objects.push(PbObjectInfo::Connection(
586 ObjectModel(connection, obj, None).into(),
587 ));
588 }
589 ObjectType::Function => {
590 let function = Function::find_by_id(object_id.as_function_id())
591 .one(&txn)
592 .await?
593 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
594 objects.push(PbObjectInfo::Function(
595 ObjectModel(function, obj, None).into(),
596 ));
597 }
598 ObjectType::Secret => {
599 let secret = Secret::find_by_id(object_id.as_secret_id())
600 .one(&txn)
601 .await?
602 .ok_or_else(|| MetaError::catalog_id_not_found("secret", object_id))?;
603 objects.push(PbObjectInfo::Secret(ObjectModel(secret, obj, None).into()));
604 }
605 _ => unreachable!("not supported object type: {:?}", object_type),
606 };
607
608 txn.commit().await?;
609
610 let version = self
611 .notify_frontend(
612 NotificationOperation::Update,
613 NotificationInfo::ObjectGroup(PbObjectGroup {
614 objects: objects
615 .into_iter()
616 .map(|object| PbObject {
617 object_info: Some(object),
618 })
619 .collect(),
620 dependencies: vec![],
621 }),
622 )
623 .await;
624 Ok(version)
625 }
626
627 pub async fn alter_schema(
628 &self,
629 object_type: ObjectType,
630 object_id: ObjectId,
631 new_schema: SchemaId,
632 ) -> MetaResult<NotificationVersion> {
633 let inner = self.inner.write().await;
634 let txn = inner.db.begin().await?;
635 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
636
637 let obj = Object::find_by_id(object_id)
638 .one(&txn)
639 .await?
640 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
641 if obj.schema_id == Some(new_schema) {
642 return Ok(IGNORED_NOTIFICATION_VERSION);
643 }
644 let streaming_job = StreamingJob::find_by_id(object_id.as_job_id())
645 .one(&txn)
646 .await?;
647 let database_id = obj.database_id.unwrap();
648
649 let mut objects = vec![];
650 match object_type {
651 ObjectType::Table => {
652 let table = Table::find_by_id(object_id.as_table_id())
653 .one(&txn)
654 .await?
655 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
656 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
657 let associated_src_id = table.optional_associated_source_id;
658
659 let mut obj = obj.into_active_model();
660 obj.schema_id = Set(Some(new_schema));
661 let obj = obj.update(&txn).await?;
662
663 objects.push(PbObjectInfo::Table(
664 ObjectModel(table, obj, streaming_job).into(),
665 ));
666
667 if let Some(associated_source_id) = associated_src_id {
669 let src_obj = object::ActiveModel {
670 oid: Set(associated_source_id.as_object_id()),
671 schema_id: Set(Some(new_schema)),
672 ..Default::default()
673 }
674 .update(&txn)
675 .await?;
676 let source = Source::find_by_id(associated_source_id)
677 .one(&txn)
678 .await?
679 .ok_or_else(|| {
680 MetaError::catalog_id_not_found("source", associated_source_id)
681 })?;
682 objects.push(PbObjectInfo::Source(
683 ObjectModel(source, src_obj, None).into(),
684 ));
685 }
686
687 let (index_ids, (index_names, mut table_ids)): (
689 Vec<IndexId>,
690 (Vec<String>, Vec<TableId>),
691 ) = Index::find()
692 .select_only()
693 .columns([
694 index::Column::IndexId,
695 index::Column::Name,
696 index::Column::IndexTableId,
697 ])
698 .filter(index::Column::PrimaryTableId.eq(object_id))
699 .into_tuple::<(IndexId, String, TableId)>()
700 .all(&txn)
701 .await?
702 .into_iter()
703 .map(|(id, name, t_id)| (id, (name, t_id)))
704 .unzip();
705
706 let internal_tables: Vec<TableId> = Table::find()
708 .select_only()
709 .column(table::Column::TableId)
710 .filter(
711 table::Column::BelongsToJobId.is_in(
712 table_ids
713 .iter()
714 .map(|table_id| table_id.as_job_id())
715 .chain(std::iter::once(object_id.as_job_id())),
716 ),
717 )
718 .into_tuple()
719 .all(&txn)
720 .await?;
721 table_ids.extend(internal_tables);
722
723 if !index_ids.is_empty() || !table_ids.is_empty() {
724 for index_name in index_names {
725 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
726 .await?;
727 }
728
729 Object::update_many()
730 .col_expr(object::Column::SchemaId, new_schema.into())
731 .filter(
732 object::Column::Oid.is_in::<ObjectId, _>(
733 index_ids
734 .iter()
735 .copied()
736 .map_into()
737 .chain(table_ids.iter().copied().map_into()),
738 ),
739 )
740 .exec(&txn)
741 .await?;
742 }
743
744 if !table_ids.is_empty() {
745 let table_objs = Table::find()
746 .find_also_related(Object)
747 .filter(table::Column::TableId.is_in(table_ids))
748 .all(&txn)
749 .await?;
750 let streaming_jobs = load_streaming_jobs_by_ids(
751 &txn,
752 table_objs.iter().map(|(table, _)| table.job_id()),
753 )
754 .await?;
755 for (table, table_obj) in table_objs {
756 let job_id = table.job_id();
757 let streaming_job = streaming_jobs.get(&job_id).cloned();
758 objects.push(PbObjectInfo::Table(
759 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
760 ));
761 }
762 }
763 if !index_ids.is_empty() {
764 let index_objs = Index::find()
765 .find_also_related(Object)
766 .filter(index::Column::IndexId.is_in(index_ids))
767 .all(&txn)
768 .await?;
769 let streaming_jobs = load_streaming_jobs_by_ids(
770 &txn,
771 index_objs
772 .iter()
773 .map(|(index, _)| index.index_id.as_job_id()),
774 )
775 .await?;
776 for (index, index_obj) in index_objs {
777 let streaming_job =
778 streaming_jobs.get(&index.index_id.as_job_id()).cloned();
779 objects.push(PbObjectInfo::Index(
780 ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
781 ));
782 }
783 }
784 }
785 ObjectType::Source => {
786 let source = Source::find_by_id(object_id.as_source_id())
787 .one(&txn)
788 .await?
789 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
790 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
791 let is_shared = source.is_shared();
792
793 let mut obj = obj.into_active_model();
794 obj.schema_id = Set(Some(new_schema));
795 let obj = obj.update(&txn).await?;
796 objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
797
798 if is_shared {
801 update_internal_tables(
802 &txn,
803 object_id,
804 object::Column::SchemaId,
805 new_schema,
806 &mut objects,
807 )
808 .await?;
809 }
810 }
811 ObjectType::Sink => {
812 let sink = Sink::find_by_id(object_id.as_sink_id())
813 .one(&txn)
814 .await?
815 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
816 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
817
818 let mut obj = obj.into_active_model();
819 obj.schema_id = Set(Some(new_schema));
820 let obj = obj.update(&txn).await?;
821 objects.push(PbObjectInfo::Sink(
822 ObjectModel(sink, obj, streaming_job).into(),
823 ));
824
825 update_internal_tables(
826 &txn,
827 object_id,
828 object::Column::SchemaId,
829 new_schema,
830 &mut objects,
831 )
832 .await?;
833 }
834 ObjectType::Subscription => {
835 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
836 .one(&txn)
837 .await?
838 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
839 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
840 .await?;
841
842 let mut obj = obj.into_active_model();
843 obj.schema_id = Set(Some(new_schema));
844 let obj = obj.update(&txn).await?;
845 objects.push(PbObjectInfo::Subscription(
846 ObjectModel(subscription, obj, None).into(),
847 ));
848 }
849 ObjectType::View => {
850 let view = View::find_by_id(object_id.as_view_id())
851 .one(&txn)
852 .await?
853 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
854 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
855
856 let mut obj = obj.into_active_model();
857 obj.schema_id = Set(Some(new_schema));
858 let obj = obj.update(&txn).await?;
859 objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
860 }
861 ObjectType::Function => {
862 let function = Function::find_by_id(object_id.as_function_id())
863 .one(&txn)
864 .await?
865 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
866
867 let mut pb_function: PbFunction = ObjectModel(function, obj, None).into();
868 pb_function.schema_id = new_schema;
869 check_function_signature_duplicate(&pb_function, &txn).await?;
870
871 Object::update(object::ActiveModel {
872 oid: Set(object_id),
873 schema_id: Set(Some(new_schema)),
874 ..Default::default()
875 })
876 .exec(&txn)
877 .await?;
878
879 txn.commit().await?;
880 let version = self
881 .notify_frontend(
882 NotificationOperation::Update,
883 NotificationInfo::Function(pb_function),
884 )
885 .await;
886 return Ok(version);
887 }
888 ObjectType::Connection => {
889 let connection = Connection::find_by_id(object_id.as_connection_id())
890 .one(&txn)
891 .await?
892 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
893
894 let mut pb_connection: PbConnection = ObjectModel(connection, obj, None).into();
895 pb_connection.schema_id = new_schema;
896 check_connection_name_duplicate(&pb_connection, &txn).await?;
897
898 Object::update(object::ActiveModel {
899 oid: Set(object_id),
900 schema_id: Set(Some(new_schema)),
901 ..Default::default()
902 })
903 .exec(&txn)
904 .await?;
905
906 txn.commit().await?;
907 let version = self
908 .notify_frontend(
909 NotificationOperation::Update,
910 NotificationInfo::Connection(pb_connection),
911 )
912 .await;
913 return Ok(version);
914 }
915 _ => unreachable!("not supported object type: {:?}", object_type),
916 }
917
918 txn.commit().await?;
919 let version = self
920 .notify_frontend(
921 Operation::Update,
922 Info::ObjectGroup(PbObjectGroup {
923 objects: objects
924 .into_iter()
925 .map(|relation_info| PbObject {
926 object_info: Some(relation_info),
927 })
928 .collect_vec(),
929 dependencies: vec![],
930 }),
931 )
932 .await;
933 Ok(version)
934 }
935
936 pub async fn alter_secret(
937 &self,
938 pb_secret: PbSecret,
939 secret_plain_payload: Vec<u8>,
940 ) -> MetaResult<NotificationVersion> {
941 let inner = self.inner.write().await;
942 let owner_id = pb_secret.owner as _;
943 let txn = inner.db.begin().await?;
944 ensure_user_id(owner_id, &txn).await?;
945 ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
946 ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
947
948 ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
949 let secret: secret::ActiveModel = pb_secret.clone().into();
950 Secret::update(secret).exec(&txn).await?;
951
952 txn.commit().await?;
953
954 let mut secret_plain = pb_secret;
956 secret_plain.value.clone_from(&secret_plain_payload);
957
958 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
959 self.env
960 .notification_manager()
961 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
962
963 let version = self
964 .notify_frontend(
965 NotificationOperation::Update,
966 NotificationInfo::Secret(secret_plain),
967 )
968 .await;
969
970 Ok(version)
971 }
972
973 pub async fn drop_table_associated_source(
975 txn: &DatabaseTransaction,
976 drop_table_connector_ctx: &DropTableConnectorContext,
977 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
978 let to_drop_source_objects: Vec<PartialObject> = Object::find()
979 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
980 .into_partial_model()
981 .all(txn)
982 .await?;
983 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
984 .select_only()
985 .filter(
986 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
987 )
988 .into_partial_model()
989 .all(txn)
990 .await?;
991 let to_drop_objects = to_drop_source_objects
992 .into_iter()
993 .chain(to_drop_internal_table_objs)
994 .collect_vec();
995 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
997 .select_only()
998 .distinct()
999 .column(user_privilege::Column::UserId)
1000 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1001 .into_tuple()
1002 .all(txn)
1003 .await?;
1004
1005 tracing::debug!(
1006 "drop_table_associated_source: to_drop_objects: {:?}",
1007 to_drop_objects
1008 );
1009
1010 let res = Object::delete_many()
1012 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1013 .exec(txn)
1014 .await?;
1015 if res.rows_affected == 0 {
1016 return Err(MetaError::catalog_id_not_found(
1017 ObjectType::Source.as_str(),
1018 drop_table_connector_ctx.to_remove_source_id,
1019 ));
1020 }
1021 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
1022
1023 Ok((user_infos, to_drop_objects))
1024 }
1025
1026 pub async fn alter_database_param(
1027 &self,
1028 database_id: DatabaseId,
1029 param: AlterDatabaseParam,
1030 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
1031 let inner = self.inner.write().await;
1032 let txn = inner.db.begin().await?;
1033
1034 let mut database = database::ActiveModel {
1035 database_id: Set(database_id),
1036 ..Default::default()
1037 };
1038 match param {
1039 AlterDatabaseParam::BarrierIntervalMs(interval) => {
1040 if let Some(ref interval) = interval {
1041 OverrideValidate::barrier_interval_ms(interval)
1042 .map_err(|e| anyhow::anyhow!(e))?;
1043 }
1044 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
1045 }
1046 AlterDatabaseParam::CheckpointFrequency(frequency) => {
1047 if let Some(ref frequency) = frequency {
1048 OverrideValidate::checkpoint_frequency(frequency)
1049 .map_err(|e| anyhow::anyhow!(e))?;
1050 }
1051 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
1052 }
1053 }
1054 let database = database.update(&txn).await?;
1055
1056 let obj = Object::find_by_id(database_id)
1057 .one(&txn)
1058 .await?
1059 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1060
1061 txn.commit().await?;
1062
1063 let version = self
1064 .notify_frontend(
1065 NotificationOperation::Update,
1066 NotificationInfo::Database(ObjectModel(database.clone(), obj, None).into()),
1067 )
1068 .await;
1069 Ok((version, database))
1070 }
1071
1072 pub async fn alter_database_resource_group(
1073 &self,
1074 database_id: DatabaseId,
1075 resource_group: Option<String>,
1076 ) -> MetaResult<NotificationVersion> {
1077 let inner = self.inner.write().await;
1078 let txn = inner.db.begin().await?;
1079
1080 let database =
1081 database::ActiveModel {
1082 database_id: Set(database_id),
1083 resource_group: Set(
1084 resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1085 ),
1086 ..Default::default()
1087 }
1088 .update(&txn)
1089 .await?;
1090
1091 let obj = Object::find_by_id(database_id)
1092 .one(&txn)
1093 .await?
1094 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1095
1096 txn.commit().await?;
1097
1098 let version = self
1099 .notify_frontend(
1100 NotificationOperation::Update,
1101 NotificationInfo::Database(ObjectModel(database, obj, None).into()),
1102 )
1103 .await;
1104 Ok(version)
1105 }
1106
1107 pub async fn alter_subscription_retention(
1108 &self,
1109 subscription_id: SubscriptionId,
1110 retention_seconds: u64,
1111 definition: String,
1112 ) -> MetaResult<(NotificationVersion, PbSubscription)> {
1113 let inner = self.inner.write().await;
1114 let txn = inner.db.begin().await?;
1115
1116 let obj = Object::find_by_id(subscription_id)
1117 .one(&txn)
1118 .await?
1119 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1120
1121 let active_model = subscription::ActiveModel {
1122 subscription_id: Set(subscription_id),
1123 retention_seconds: Set(retention_seconds as i64),
1124 definition: Set(definition),
1125 ..Default::default()
1126 };
1127 let subscription = active_model.update(&txn).await?;
1128
1129 txn.commit().await?;
1130
1131 let pb_subscription: PbSubscription = ObjectModel(subscription, obj, None).into();
1132 let subscription_info = PbObjectInfo::Subscription(pb_subscription.clone());
1133
1134 let version = self
1135 .notify_frontend(
1136 NotificationOperation::Update,
1137 NotificationInfo::ObjectGroup(PbObjectGroup {
1138 objects: vec![PbObject {
1139 object_info: Some(subscription_info),
1140 }],
1141 dependencies: vec![],
1142 }),
1143 )
1144 .await;
1145
1146 Ok((version, pb_subscription))
1147 }
1148
1149 pub async fn alter_streaming_job_config(
1150 &self,
1151 job_id: JobId,
1152 entries_to_add: HashMap<String, String>,
1153 keys_to_remove: Vec<String>,
1154 ) -> MetaResult<NotificationVersion> {
1155 let inner = self.inner.write().await;
1156 let txn = inner.db.begin().await?;
1157
1158 let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1159 .select_only()
1160 .column(streaming_job::Column::ConfigOverride)
1161 .into_tuple()
1162 .one(&txn)
1163 .await?
1164 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1165 let config_override = config_override.unwrap_or_default();
1166
1167 let mut table: toml::Table =
1168 toml::from_str(&config_override).context("invalid streaming job config")?;
1169
1170 for (key, value) in entries_to_add {
1172 let value: toml::Value = value
1173 .parse()
1174 .with_context(|| format!("invalid config value for path {key}"))?;
1175 table
1176 .upsert(&key, value)
1177 .with_context(|| format!("failed to set config path {key}"))?;
1178 }
1179 for key in keys_to_remove {
1180 table
1181 .delete(&key)
1182 .with_context(|| format!("failed to reset config path {key}"))?;
1183 }
1184
1185 let updated_config_override = table.to_string();
1186
1187 {
1189 let merged = merge_streaming_config_section(
1190 &StreamingConfig::default(),
1191 &updated_config_override,
1192 )
1193 .context("invalid streaming job config override")?;
1194
1195 if let Some(merged) = merged {
1199 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1200 if !unrecognized_keys.is_empty() {
1201 bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1202 }
1203 }
1204 }
1205
1206 StreamingJob::update(streaming_job::ActiveModel {
1207 job_id: Set(job_id),
1208 config_override: Set(Some(updated_config_override)),
1209 ..Default::default()
1210 })
1211 .exec(&txn)
1212 .await?;
1213
1214 txn.commit().await?;
1215
1216 Ok(IGNORED_NOTIFICATION_VERSION)
1217 }
1218
1219 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1220 let inner = self.inner.read().await;
1221 let active = refresh_job::ActiveModel {
1222 table_id: Set(table_id),
1223 last_trigger_time: Set(None),
1224 trigger_interval_secs: Set(None),
1225 current_status: Set(RefreshState::Idle),
1226 last_success_time: Set(None),
1227 };
1228 match RefreshJob::insert(active)
1229 .on_conflict_do_nothing()
1230 .exec(&inner.db)
1231 .await
1232 {
1233 Ok(_) => Ok(()),
1234 Err(sea_orm::DbErr::RecordNotInserted) => {
1235 tracing::debug!("refresh job already exists for table_id={}", table_id);
1237 Ok(())
1238 }
1239 Err(e) => {
1240 if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1241 tracing::warn!(
1242 %table_id,
1243 error = %e.as_report(),
1244 "skip ensure_refresh_job for stale dropped table"
1245 );
1246 Ok(())
1247 } else {
1248 Err(e.into())
1249 }
1250 }
1251 }
1252 }
1253
1254 pub async fn update_refresh_job_status(
1255 &self,
1256 table_id: TableId,
1257 status: RefreshState,
1258 trigger_time: Option<DateTime>,
1259 is_success: bool,
1260 ) -> MetaResult<()> {
1261 self.ensure_refresh_job(table_id).await?;
1262 let inner = self.inner.read().await;
1263
1264 assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1266 let active = refresh_job::ActiveModel {
1267 table_id: Set(table_id),
1268 current_status: Set(status),
1269 last_trigger_time: if trigger_time.is_some() {
1270 Set(trigger_time.map(datetime_to_timestamp_millis))
1271 } else {
1272 NotSet
1273 },
1274 last_success_time: if is_success {
1275 Set(Some(chrono::Utc::now().timestamp_millis()))
1276 } else {
1277 NotSet
1278 },
1279 ..Default::default()
1280 };
1281 match RefreshJob::update(active).exec(&inner.db).await {
1282 Ok(_) => Ok(()),
1283 Err(e) => {
1284 if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1285 tracing::warn!(
1286 %table_id,
1287 error = %e.as_report(),
1288 "skip update_refresh_job_status for stale dropped table"
1289 );
1290 Ok(())
1291 } else {
1292 Err(e.into())
1293 }
1294 }
1295 }
1296 }
1297
1298 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1299 let inner = self.inner.read().await;
1300 RefreshJob::update_many()
1301 .col_expr(
1302 refresh_job::Column::CurrentStatus,
1303 Expr::value(RefreshState::Idle),
1304 )
1305 .exec(&inner.db)
1306 .await?;
1307 Ok(())
1308 }
1309
1310 pub async fn update_refresh_job_interval(
1311 &self,
1312 table_id: TableId,
1313 trigger_interval_secs: Option<i64>,
1314 ) -> MetaResult<()> {
1315 self.ensure_refresh_job(table_id).await?;
1316 let inner = self.inner.read().await;
1317 let active = refresh_job::ActiveModel {
1318 table_id: Set(table_id),
1319 trigger_interval_secs: Set(trigger_interval_secs),
1320 ..Default::default()
1321 };
1322 RefreshJob::update(active).exec(&inner.db).await?;
1323 Ok(())
1324 }
1325}
1326
1327async fn should_skip_refresh_job_db_err<C>(
1328 db: &C,
1329 table_id: TableId,
1330 err: &sea_orm::DbErr,
1331) -> MetaResult<bool>
1332where
1333 C: ConnectionTrait,
1334{
1335 if matches!(err, sea_orm::DbErr::RecordNotUpdated) {
1336 return Ok(true);
1337 }
1338
1339 if !matches!(
1340 err.sql_err(),
1341 Some(SqlErr::ForeignKeyConstraintViolation(_))
1342 ) {
1343 return Ok(false);
1344 }
1345
1346 let table_exists = Table::find_by_id(table_id).one(db).await?.is_some();
1347 Ok(!table_exists)
1348}