1use anyhow::Context;
16use risingwave_common::cast::datetime_to_timestamp_millis;
17use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
18use risingwave_common::config::mutate::TomlTableMutateExt as _;
19use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
20use risingwave_common::id::JobId;
21use risingwave_common::system_param::{OverrideValidate, Validate};
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use sea_orm::ActiveValue::{NotSet, Set};
24use sea_orm::prelude::DateTime;
25use sea_orm::sea_query::Expr;
26use sea_orm::{ActiveModelTrait, DatabaseTransaction};
27
28use super::*;
29use crate::controller::utils::load_streaming_jobs_by_ids;
30use crate::error::bail_invalid_parameter;
31
32impl CatalogController {
33 async fn alter_database_name(
34 &self,
35 database_id: DatabaseId,
36 name: &str,
37 ) -> MetaResult<NotificationVersion> {
38 let inner = self.inner.write().await;
39 let txn = inner.db.begin().await?;
40 check_database_name_duplicate(name, &txn).await?;
41
42 let active_model = database::ActiveModel {
43 database_id: Set(database_id),
44 name: Set(name.to_owned()),
45 ..Default::default()
46 };
47 let database = active_model.update(&txn).await?;
48
49 let obj = Object::find_by_id(database_id)
50 .one(&txn)
51 .await?
52 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
53
54 txn.commit().await?;
55
56 let version = self
57 .notify_frontend(
58 NotificationOperation::Update,
59 NotificationInfo::Database(ObjectModel(database, obj, None).into()),
60 )
61 .await;
62 Ok(version)
63 }
64
65 async fn alter_schema_name(
66 &self,
67 schema_id: SchemaId,
68 name: &str,
69 ) -> MetaResult<NotificationVersion> {
70 let inner = self.inner.write().await;
71 let txn = inner.db.begin().await?;
72
73 let obj = Object::find_by_id(schema_id)
74 .one(&txn)
75 .await?
76 .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
77 check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
78
79 let active_model = schema::ActiveModel {
80 schema_id: Set(schema_id),
81 name: Set(name.to_owned()),
82 };
83 let schema = active_model.update(&txn).await?;
84
85 txn.commit().await?;
86
87 let version = self
88 .notify_frontend(
89 NotificationOperation::Update,
90 NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
91 )
92 .await;
93 Ok(version)
94 }
95
96 pub async fn alter_name(
97 &self,
98 object_type: ObjectType,
99 object_id: impl Into<ObjectId>,
100 object_name: &str,
101 ) -> MetaResult<NotificationVersion> {
102 let object_id = object_id.into();
103 if object_type == ObjectType::Database {
104 return self
105 .alter_database_name(object_id.as_database_id(), object_name)
106 .await;
107 } else if object_type == ObjectType::Schema {
108 return self
109 .alter_schema_name(object_id.as_schema_id(), object_name)
110 .await;
111 }
112
113 let inner = self.inner.write().await;
114 let txn = inner.db.begin().await?;
115 let obj: PartialObject = Object::find_by_id(object_id)
116 .into_partial_model()
117 .one(&txn)
118 .await?
119 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
120 assert_eq!(obj.obj_type, object_type);
121 check_relation_name_duplicate(
122 object_name,
123 obj.database_id.unwrap(),
124 obj.schema_id.unwrap(),
125 &txn,
126 )
127 .await?;
128
129 let (mut to_update_relations, old_name) =
131 rename_relation(&txn, object_type, object_id, object_name).await?;
132 to_update_relations.extend(
134 rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
135 );
136
137 txn.commit().await?;
138
139 let version = self
140 .notify_frontend(
141 NotificationOperation::Update,
142 NotificationInfo::ObjectGroup(PbObjectGroup {
143 objects: to_update_relations,
144 dependencies: vec![],
145 }),
146 )
147 .await;
148
149 Ok(version)
150 }
151
152 pub async fn alter_swap_rename(
153 &self,
154 object_type: ObjectType,
155 object_id: ObjectId,
156 dst_object_id: ObjectId,
157 ) -> MetaResult<NotificationVersion> {
158 let inner = self.inner.write().await;
159 let txn = inner.db.begin().await?;
160 let dst_name: String = match object_type {
161 ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
162 .select_only()
163 .column(table::Column::Name)
164 .into_tuple()
165 .one(&txn)
166 .await?
167 .ok_or_else(|| {
168 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
169 })?,
170 ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
171 .select_only()
172 .column(source::Column::Name)
173 .into_tuple()
174 .one(&txn)
175 .await?
176 .ok_or_else(|| {
177 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
178 })?,
179 ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
180 .select_only()
181 .column(sink::Column::Name)
182 .into_tuple()
183 .one(&txn)
184 .await?
185 .ok_or_else(|| {
186 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
187 })?,
188 ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
189 .select_only()
190 .column(view::Column::Name)
191 .into_tuple()
192 .one(&txn)
193 .await?
194 .ok_or_else(|| {
195 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
196 })?,
197 ObjectType::Subscription => {
198 Subscription::find_by_id(dst_object_id.as_subscription_id())
199 .select_only()
200 .column(subscription::Column::Name)
201 .into_tuple()
202 .one(&txn)
203 .await?
204 .ok_or_else(|| {
205 MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
206 })?
207 }
208 _ => {
209 return Err(MetaError::permission_denied(format!(
210 "swap rename not supported for object type: {:?}",
211 object_type
212 )));
213 }
214 };
215
216 let (mut to_update_relations, src_name) =
218 rename_relation(&txn, object_type, object_id, &dst_name).await?;
219 let (to_update_relations2, _) =
220 rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
221 to_update_relations.extend(to_update_relations2);
222 to_update_relations.extend(
224 rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
225 );
226 to_update_relations.extend(
227 rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
228 );
229
230 txn.commit().await?;
231
232 let version = self
233 .notify_frontend(
234 NotificationOperation::Update,
235 NotificationInfo::ObjectGroup(PbObjectGroup {
236 objects: to_update_relations,
237 dependencies: vec![],
238 }),
239 )
240 .await;
241
242 Ok(version)
243 }
244
245 pub async fn alter_non_shared_source(
246 &self,
247 pb_source: PbSource,
248 ) -> MetaResult<NotificationVersion> {
249 let source_id: SourceId = pb_source.id;
250 let inner = self.inner.write().await;
251 let txn = inner.db.begin().await?;
252
253 let original_version: i64 = Source::find_by_id(source_id)
254 .select_only()
255 .column(source::Column::Version)
256 .into_tuple()
257 .one(&txn)
258 .await?
259 .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
260 if original_version + 1 != pb_source.version as i64 {
261 return Err(MetaError::permission_denied(
262 "source version is stale".to_owned(),
263 ));
264 }
265
266 let source: source::ActiveModel = pb_source.clone().into();
267 Source::update(source).exec(&txn).await?;
268 txn.commit().await?;
269
270 let version = self
271 .notify_frontend_relation_info(
272 NotificationOperation::Update,
273 PbObjectInfo::Source(pb_source),
274 )
275 .await;
276 Ok(version)
277 }
278
279 pub async fn alter_owner(
280 &self,
281 object_type: ObjectType,
282 object_id: ObjectId,
283 new_owner: UserId,
284 ) -> MetaResult<NotificationVersion> {
285 let inner = self.inner.write().await;
286 let txn = inner.db.begin().await?;
287 ensure_user_id(new_owner, &txn).await?;
288
289 let obj = Object::find_by_id(object_id)
290 .one(&txn)
291 .await?
292 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
293 if obj.owner_id == new_owner {
294 return Ok(IGNORED_NOTIFICATION_VERSION);
295 }
296 let mut obj = obj.into_active_model();
297 obj.owner_id = Set(new_owner);
298 let obj = obj.update(&txn).await?;
299
300 let mut objects = vec![];
301 match object_type {
302 ObjectType::Database => {
303 let db = Database::find_by_id(object_id.as_database_id())
304 .one(&txn)
305 .await?
306 .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
307
308 txn.commit().await?;
309
310 let version = self
311 .notify_frontend(
312 NotificationOperation::Update,
313 NotificationInfo::Database(ObjectModel(db, obj, None).into()),
314 )
315 .await;
316 return Ok(version);
317 }
318 ObjectType::Schema => {
319 let schema = Schema::find_by_id(object_id.as_schema_id())
320 .one(&txn)
321 .await?
322 .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
323
324 txn.commit().await?;
325
326 let version = self
327 .notify_frontend(
328 NotificationOperation::Update,
329 NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
330 )
331 .await;
332 return Ok(version);
333 }
334 ObjectType::Table => {
335 let table = Table::find_by_id(object_id.as_table_id())
336 .one(&txn)
337 .await?
338 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
339 let streaming_job = streaming_job::Entity::find_by_id(object_id.as_job_id())
340 .one(&txn)
341 .await?;
342
343 if let Some(associated_source_id) = table.optional_associated_source_id {
345 let src_obj = object::ActiveModel {
346 oid: Set(associated_source_id.as_object_id()),
347 owner_id: Set(new_owner),
348 ..Default::default()
349 }
350 .update(&txn)
351 .await?;
352 let source = Source::find_by_id(associated_source_id)
353 .one(&txn)
354 .await?
355 .ok_or_else(|| {
356 MetaError::catalog_id_not_found("source", associated_source_id)
357 })?;
358 objects.push(PbObjectInfo::Source(
359 ObjectModel(source, src_obj, None).into(),
360 ));
361 }
362
363 if matches!(table.engine, Some(table::Engine::Iceberg)) {
365 let iceberg_sink = Sink::find()
366 .inner_join(Object)
367 .select_only()
368 .column(sink::Column::SinkId)
369 .filter(
370 object::Column::DatabaseId
371 .eq(obj.database_id)
372 .and(object::Column::SchemaId.eq(obj.schema_id))
373 .and(
374 sink::Column::Name
375 .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
376 ),
377 )
378 .into_tuple::<SinkId>()
379 .one(&txn)
380 .await?
381 .expect("iceberg sink must exist");
382 let sink_obj = object::ActiveModel {
383 oid: Set(iceberg_sink.as_object_id()),
384 owner_id: Set(new_owner),
385 ..Default::default()
386 }
387 .update(&txn)
388 .await?;
389 let sink = Sink::find_by_id(iceberg_sink)
390 .one(&txn)
391 .await?
392 .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
393 objects.push(PbObjectInfo::Sink(
394 ObjectModel(sink, sink_obj, streaming_job.clone()).into(),
395 ));
396
397 let iceberg_source = Source::find()
398 .inner_join(Object)
399 .select_only()
400 .column(source::Column::SourceId)
401 .filter(
402 object::Column::DatabaseId
403 .eq(obj.database_id)
404 .and(object::Column::SchemaId.eq(obj.schema_id))
405 .and(
406 source::Column::Name
407 .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
408 ),
409 )
410 .into_tuple::<SourceId>()
411 .one(&txn)
412 .await?
413 .expect("iceberg source must exist");
414 let source_obj = object::ActiveModel {
415 oid: Set(iceberg_source.as_object_id()),
416 owner_id: Set(new_owner),
417 ..Default::default()
418 }
419 .update(&txn)
420 .await?;
421 let source = Source::find_by_id(iceberg_source)
422 .one(&txn)
423 .await?
424 .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
425 objects.push(PbObjectInfo::Source(
426 ObjectModel(source, source_obj, None).into(),
427 ));
428 }
429
430 let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
432 .select_only()
433 .columns([index::Column::IndexId, index::Column::IndexTableId])
434 .filter(index::Column::PrimaryTableId.eq(object_id))
435 .into_tuple::<(IndexId, TableId)>()
436 .all(&txn)
437 .await?
438 .into_iter()
439 .unzip();
440 objects.push(PbObjectInfo::Table(
441 ObjectModel(table, obj, streaming_job).into(),
442 ));
443
444 let internal_tables: Vec<TableId> = Table::find()
446 .select_only()
447 .column(table::Column::TableId)
448 .filter(
449 table::Column::BelongsToJobId.is_in(
450 table_ids
451 .iter()
452 .cloned()
453 .chain(std::iter::once(object_id.as_table_id())),
454 ),
455 )
456 .into_tuple()
457 .all(&txn)
458 .await?;
459 table_ids.extend(internal_tables);
460
461 if !index_ids.is_empty() || !table_ids.is_empty() {
462 Object::update_many()
463 .col_expr(object::Column::OwnerId, SimpleExpr::Value(new_owner.into()))
464 .filter(
465 object::Column::Oid.is_in::<ObjectId, _>(
466 index_ids
467 .iter()
468 .copied()
469 .map_into()
470 .chain(table_ids.iter().copied().map_into()),
471 ),
472 )
473 .exec(&txn)
474 .await?;
475 }
476
477 if !table_ids.is_empty() {
478 let table_objs = Table::find()
479 .find_also_related(Object)
480 .filter(table::Column::TableId.is_in(table_ids))
481 .all(&txn)
482 .await?;
483 let streaming_jobs = load_streaming_jobs_by_ids(
484 &txn,
485 table_objs.iter().map(|(table, _)| table.job_id()),
486 )
487 .await?;
488 for (table, table_obj) in table_objs {
489 let job_id = table.job_id();
490 let streaming_job = streaming_jobs.get(&job_id).cloned();
491 objects.push(PbObjectInfo::Table(
492 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
493 ));
494 }
495 }
496 if !index_ids.is_empty() {
498 let index_objs = Index::find()
499 .find_also_related(Object)
500 .filter(index::Column::IndexId.is_in(index_ids))
501 .all(&txn)
502 .await?;
503 let streaming_jobs = load_streaming_jobs_by_ids(
504 &txn,
505 index_objs
506 .iter()
507 .map(|(index, _)| index.index_id.as_job_id()),
508 )
509 .await?;
510 for (index, index_obj) in index_objs {
511 let streaming_job =
512 streaming_jobs.get(&index.index_id.as_job_id()).cloned();
513 objects.push(PbObjectInfo::Index(
514 ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
515 ));
516 }
517 }
518 }
519 ObjectType::Source => {
520 let source = Source::find_by_id(object_id.as_source_id())
521 .one(&txn)
522 .await?
523 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
524 let is_shared = source.is_shared();
525 objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
526
527 if is_shared {
530 update_internal_tables(
531 &txn,
532 object_id,
533 object::Column::OwnerId,
534 new_owner,
535 &mut objects,
536 )
537 .await?;
538 }
539 }
540 ObjectType::Sink => {
541 let (sink, sink_obj) = Sink::find_by_id(object_id.as_sink_id())
542 .find_also_related(Object)
543 .one(&txn)
544 .await?
545 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
546 let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
547 .one(&txn)
548 .await?;
549 objects.push(PbObjectInfo::Sink(
550 ObjectModel(sink, sink_obj.unwrap(), streaming_job).into(),
551 ));
552
553 update_internal_tables(
554 &txn,
555 object_id,
556 object::Column::OwnerId,
557 new_owner,
558 &mut objects,
559 )
560 .await?;
561 }
562 ObjectType::Subscription => {
563 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
564 .one(&txn)
565 .await?
566 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
567 objects.push(PbObjectInfo::Subscription(
568 ObjectModel(subscription, obj, None).into(),
569 ));
570 }
571 ObjectType::View => {
572 let view = View::find_by_id(object_id.as_view_id())
573 .one(&txn)
574 .await?
575 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
576 objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
577 }
578 ObjectType::Connection => {
579 let connection = Connection::find_by_id(object_id.as_connection_id())
580 .one(&txn)
581 .await?
582 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
583 objects.push(PbObjectInfo::Connection(
584 ObjectModel(connection, obj, None).into(),
585 ));
586 }
587 ObjectType::Function => {
588 let function = Function::find_by_id(object_id.as_function_id())
589 .one(&txn)
590 .await?
591 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
592 objects.push(PbObjectInfo::Function(
593 ObjectModel(function, obj, None).into(),
594 ));
595 }
596 ObjectType::Secret => {
597 let secret = Secret::find_by_id(object_id.as_secret_id())
598 .one(&txn)
599 .await?
600 .ok_or_else(|| MetaError::catalog_id_not_found("secret", object_id))?;
601 objects.push(PbObjectInfo::Secret(ObjectModel(secret, obj, None).into()));
602 }
603 _ => unreachable!("not supported object type: {:?}", object_type),
604 };
605
606 txn.commit().await?;
607
608 let version = self
609 .notify_frontend(
610 NotificationOperation::Update,
611 NotificationInfo::ObjectGroup(PbObjectGroup {
612 objects: objects
613 .into_iter()
614 .map(|object| PbObject {
615 object_info: Some(object),
616 })
617 .collect(),
618 dependencies: vec![],
619 }),
620 )
621 .await;
622 Ok(version)
623 }
624
625 pub async fn alter_schema(
626 &self,
627 object_type: ObjectType,
628 object_id: ObjectId,
629 new_schema: SchemaId,
630 ) -> MetaResult<NotificationVersion> {
631 let inner = self.inner.write().await;
632 let txn = inner.db.begin().await?;
633 ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
634
635 let obj = Object::find_by_id(object_id)
636 .one(&txn)
637 .await?
638 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
639 if obj.schema_id == Some(new_schema) {
640 return Ok(IGNORED_NOTIFICATION_VERSION);
641 }
642 let streaming_job = StreamingJob::find_by_id(object_id.as_job_id())
643 .one(&txn)
644 .await?;
645 let database_id = obj.database_id.unwrap();
646
647 let mut objects = vec![];
648 match object_type {
649 ObjectType::Table => {
650 let table = Table::find_by_id(object_id.as_table_id())
651 .one(&txn)
652 .await?
653 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
654 check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
655 let associated_src_id = table.optional_associated_source_id;
656
657 let mut obj = obj.into_active_model();
658 obj.schema_id = Set(Some(new_schema));
659 let obj = obj.update(&txn).await?;
660
661 objects.push(PbObjectInfo::Table(
662 ObjectModel(table, obj, streaming_job).into(),
663 ));
664
665 if let Some(associated_source_id) = associated_src_id {
667 let src_obj = object::ActiveModel {
668 oid: Set(associated_source_id.as_object_id()),
669 schema_id: Set(Some(new_schema)),
670 ..Default::default()
671 }
672 .update(&txn)
673 .await?;
674 let source = Source::find_by_id(associated_source_id)
675 .one(&txn)
676 .await?
677 .ok_or_else(|| {
678 MetaError::catalog_id_not_found("source", associated_source_id)
679 })?;
680 objects.push(PbObjectInfo::Source(
681 ObjectModel(source, src_obj, None).into(),
682 ));
683 }
684
685 let (index_ids, (index_names, mut table_ids)): (
687 Vec<IndexId>,
688 (Vec<String>, Vec<TableId>),
689 ) = Index::find()
690 .select_only()
691 .columns([
692 index::Column::IndexId,
693 index::Column::Name,
694 index::Column::IndexTableId,
695 ])
696 .filter(index::Column::PrimaryTableId.eq(object_id))
697 .into_tuple::<(IndexId, String, TableId)>()
698 .all(&txn)
699 .await?
700 .into_iter()
701 .map(|(id, name, t_id)| (id, (name, t_id)))
702 .unzip();
703
704 let internal_tables: Vec<TableId> = Table::find()
706 .select_only()
707 .column(table::Column::TableId)
708 .filter(
709 table::Column::BelongsToJobId.is_in(
710 table_ids
711 .iter()
712 .map(|table_id| table_id.as_job_id())
713 .chain(std::iter::once(object_id.as_job_id())),
714 ),
715 )
716 .into_tuple()
717 .all(&txn)
718 .await?;
719 table_ids.extend(internal_tables);
720
721 if !index_ids.is_empty() || !table_ids.is_empty() {
722 for index_name in index_names {
723 check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
724 .await?;
725 }
726
727 Object::update_many()
728 .col_expr(object::Column::SchemaId, new_schema.into())
729 .filter(
730 object::Column::Oid.is_in::<ObjectId, _>(
731 index_ids
732 .iter()
733 .copied()
734 .map_into()
735 .chain(table_ids.iter().copied().map_into()),
736 ),
737 )
738 .exec(&txn)
739 .await?;
740 }
741
742 if !table_ids.is_empty() {
743 let table_objs = Table::find()
744 .find_also_related(Object)
745 .filter(table::Column::TableId.is_in(table_ids))
746 .all(&txn)
747 .await?;
748 let streaming_jobs = load_streaming_jobs_by_ids(
749 &txn,
750 table_objs.iter().map(|(table, _)| table.job_id()),
751 )
752 .await?;
753 for (table, table_obj) in table_objs {
754 let job_id = table.job_id();
755 let streaming_job = streaming_jobs.get(&job_id).cloned();
756 objects.push(PbObjectInfo::Table(
757 ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
758 ));
759 }
760 }
761 if !index_ids.is_empty() {
762 let index_objs = Index::find()
763 .find_also_related(Object)
764 .filter(index::Column::IndexId.is_in(index_ids))
765 .all(&txn)
766 .await?;
767 let streaming_jobs = load_streaming_jobs_by_ids(
768 &txn,
769 index_objs
770 .iter()
771 .map(|(index, _)| index.index_id.as_job_id()),
772 )
773 .await?;
774 for (index, index_obj) in index_objs {
775 let streaming_job =
776 streaming_jobs.get(&index.index_id.as_job_id()).cloned();
777 objects.push(PbObjectInfo::Index(
778 ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
779 ));
780 }
781 }
782 }
783 ObjectType::Source => {
784 let source = Source::find_by_id(object_id.as_source_id())
785 .one(&txn)
786 .await?
787 .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
788 check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
789 let is_shared = source.is_shared();
790
791 let mut obj = obj.into_active_model();
792 obj.schema_id = Set(Some(new_schema));
793 let obj = obj.update(&txn).await?;
794 objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
795
796 if is_shared {
799 update_internal_tables(
800 &txn,
801 object_id,
802 object::Column::SchemaId,
803 new_schema,
804 &mut objects,
805 )
806 .await?;
807 }
808 }
809 ObjectType::Sink => {
810 let sink = Sink::find_by_id(object_id.as_sink_id())
811 .one(&txn)
812 .await?
813 .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
814 check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
815
816 let mut obj = obj.into_active_model();
817 obj.schema_id = Set(Some(new_schema));
818 let obj = obj.update(&txn).await?;
819 objects.push(PbObjectInfo::Sink(
820 ObjectModel(sink, obj, streaming_job).into(),
821 ));
822
823 update_internal_tables(
824 &txn,
825 object_id,
826 object::Column::SchemaId,
827 new_schema,
828 &mut objects,
829 )
830 .await?;
831 }
832 ObjectType::Subscription => {
833 let subscription = Subscription::find_by_id(object_id.as_subscription_id())
834 .one(&txn)
835 .await?
836 .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
837 check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
838 .await?;
839
840 let mut obj = obj.into_active_model();
841 obj.schema_id = Set(Some(new_schema));
842 let obj = obj.update(&txn).await?;
843 objects.push(PbObjectInfo::Subscription(
844 ObjectModel(subscription, obj, None).into(),
845 ));
846 }
847 ObjectType::View => {
848 let view = View::find_by_id(object_id.as_view_id())
849 .one(&txn)
850 .await?
851 .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
852 check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
853
854 let mut obj = obj.into_active_model();
855 obj.schema_id = Set(Some(new_schema));
856 let obj = obj.update(&txn).await?;
857 objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
858 }
859 ObjectType::Function => {
860 let function = Function::find_by_id(object_id.as_function_id())
861 .one(&txn)
862 .await?
863 .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
864
865 let mut pb_function: PbFunction = ObjectModel(function, obj, None).into();
866 pb_function.schema_id = new_schema;
867 check_function_signature_duplicate(&pb_function, &txn).await?;
868
869 Object::update(object::ActiveModel {
870 oid: Set(object_id),
871 schema_id: Set(Some(new_schema)),
872 ..Default::default()
873 })
874 .exec(&txn)
875 .await?;
876
877 txn.commit().await?;
878 let version = self
879 .notify_frontend(
880 NotificationOperation::Update,
881 NotificationInfo::Function(pb_function),
882 )
883 .await;
884 return Ok(version);
885 }
886 ObjectType::Connection => {
887 let connection = Connection::find_by_id(object_id.as_connection_id())
888 .one(&txn)
889 .await?
890 .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
891
892 let mut pb_connection: PbConnection = ObjectModel(connection, obj, None).into();
893 pb_connection.schema_id = new_schema;
894 check_connection_name_duplicate(&pb_connection, &txn).await?;
895
896 Object::update(object::ActiveModel {
897 oid: Set(object_id),
898 schema_id: Set(Some(new_schema)),
899 ..Default::default()
900 })
901 .exec(&txn)
902 .await?;
903
904 txn.commit().await?;
905 let version = self
906 .notify_frontend(
907 NotificationOperation::Update,
908 NotificationInfo::Connection(pb_connection),
909 )
910 .await;
911 return Ok(version);
912 }
913 _ => unreachable!("not supported object type: {:?}", object_type),
914 }
915
916 txn.commit().await?;
917 let version = self
918 .notify_frontend(
919 Operation::Update,
920 Info::ObjectGroup(PbObjectGroup {
921 objects: objects
922 .into_iter()
923 .map(|relation_info| PbObject {
924 object_info: Some(relation_info),
925 })
926 .collect_vec(),
927 dependencies: vec![],
928 }),
929 )
930 .await;
931 Ok(version)
932 }
933
934 pub async fn alter_secret(
935 &self,
936 pb_secret: PbSecret,
937 secret_plain_payload: Vec<u8>,
938 ) -> MetaResult<NotificationVersion> {
939 let inner = self.inner.write().await;
940 let owner_id = pb_secret.owner as _;
941 let txn = inner.db.begin().await?;
942 ensure_user_id(owner_id, &txn).await?;
943 ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
944 ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
945
946 ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
947 let secret: secret::ActiveModel = pb_secret.clone().into();
948 Secret::update(secret).exec(&txn).await?;
949
950 txn.commit().await?;
951
952 let mut secret_plain = pb_secret;
954 secret_plain.value.clone_from(&secret_plain_payload);
955
956 LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
957 self.env
958 .notification_manager()
959 .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
960
961 let version = self
962 .notify_frontend(
963 NotificationOperation::Update,
964 NotificationInfo::Secret(secret_plain),
965 )
966 .await;
967
968 Ok(version)
969 }
970
971 pub async fn drop_table_associated_source(
973 txn: &DatabaseTransaction,
974 drop_table_connector_ctx: &DropTableConnectorContext,
975 ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
976 let to_drop_source_objects: Vec<PartialObject> = Object::find()
977 .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
978 .into_partial_model()
979 .all(txn)
980 .await?;
981 let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
982 .select_only()
983 .filter(
984 object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
985 )
986 .into_partial_model()
987 .all(txn)
988 .await?;
989 let to_drop_objects = to_drop_source_objects
990 .into_iter()
991 .chain(to_drop_internal_table_objs.into_iter())
992 .collect_vec();
993 let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
995 .select_only()
996 .distinct()
997 .column(user_privilege::Column::UserId)
998 .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
999 .into_tuple()
1000 .all(txn)
1001 .await?;
1002
1003 tracing::debug!(
1004 "drop_table_associated_source: to_drop_objects: {:?}",
1005 to_drop_objects
1006 );
1007
1008 let res = Object::delete_many()
1010 .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1011 .exec(txn)
1012 .await?;
1013 if res.rows_affected == 0 {
1014 return Err(MetaError::catalog_id_not_found(
1015 ObjectType::Source.as_str(),
1016 drop_table_connector_ctx.to_remove_source_id,
1017 ));
1018 }
1019 let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
1020
1021 Ok((user_infos, to_drop_objects))
1022 }
1023
1024 pub async fn alter_database_param(
1025 &self,
1026 database_id: DatabaseId,
1027 param: AlterDatabaseParam,
1028 ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
1029 let inner = self.inner.write().await;
1030 let txn = inner.db.begin().await?;
1031
1032 let mut database = database::ActiveModel {
1033 database_id: Set(database_id),
1034 ..Default::default()
1035 };
1036 match param {
1037 AlterDatabaseParam::BarrierIntervalMs(interval) => {
1038 if let Some(ref interval) = interval {
1039 OverrideValidate::barrier_interval_ms(interval)
1040 .map_err(|e| anyhow::anyhow!(e))?;
1041 }
1042 database.barrier_interval_ms = Set(interval.map(|i| i as i32));
1043 }
1044 AlterDatabaseParam::CheckpointFrequency(frequency) => {
1045 if let Some(ref frequency) = frequency {
1046 OverrideValidate::checkpoint_frequency(frequency)
1047 .map_err(|e| anyhow::anyhow!(e))?;
1048 }
1049 database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
1050 }
1051 }
1052 let database = database.update(&txn).await?;
1053
1054 let obj = Object::find_by_id(database_id)
1055 .one(&txn)
1056 .await?
1057 .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1058
1059 txn.commit().await?;
1060
1061 let version = self
1062 .notify_frontend(
1063 NotificationOperation::Update,
1064 NotificationInfo::Database(ObjectModel(database.clone(), obj, None).into()),
1065 )
1066 .await;
1067 Ok((version, database))
1068 }
1069
1070 pub async fn alter_streaming_job_config(
1071 &self,
1072 job_id: JobId,
1073 entries_to_add: HashMap<String, String>,
1074 keys_to_remove: Vec<String>,
1075 ) -> MetaResult<NotificationVersion> {
1076 let inner = self.inner.write().await;
1077 let txn = inner.db.begin().await?;
1078
1079 let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1080 .select_only()
1081 .column(streaming_job::Column::ConfigOverride)
1082 .into_tuple()
1083 .one(&txn)
1084 .await?
1085 .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1086 let config_override = config_override.unwrap_or_default();
1087
1088 let mut table: toml::Table =
1089 toml::from_str(&config_override).context("invalid streaming job config")?;
1090
1091 for (key, value) in entries_to_add {
1093 let value: toml::Value = value
1094 .parse()
1095 .with_context(|| format!("invalid config value for path {key}"))?;
1096 table
1097 .upsert(&key, value)
1098 .with_context(|| format!("failed to set config path {key}"))?;
1099 }
1100 for key in keys_to_remove {
1101 table
1102 .delete(&key)
1103 .with_context(|| format!("failed to reset config path {key}"))?;
1104 }
1105
1106 let updated_config_override = table.to_string();
1107
1108 {
1110 let merged = merge_streaming_config_section(
1111 &StreamingConfig::default(),
1112 &updated_config_override,
1113 )
1114 .context("invalid streaming job config override")?;
1115
1116 if let Some(merged) = merged {
1120 let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1121 if !unrecognized_keys.is_empty() {
1122 bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1123 }
1124 }
1125 }
1126
1127 StreamingJob::update(streaming_job::ActiveModel {
1128 job_id: Set(job_id),
1129 config_override: Set(Some(updated_config_override)),
1130 ..Default::default()
1131 })
1132 .exec(&txn)
1133 .await?;
1134
1135 txn.commit().await?;
1136
1137 Ok(IGNORED_NOTIFICATION_VERSION)
1138 }
1139
1140 pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1141 let inner = self.inner.read().await;
1142 let active = refresh_job::ActiveModel {
1143 table_id: Set(table_id),
1144 last_trigger_time: Set(None),
1145 trigger_interval_secs: Set(None),
1146 current_status: Set(RefreshState::Idle),
1147 last_success_time: Set(None),
1148 };
1149 match RefreshJob::insert(active)
1150 .on_conflict_do_nothing()
1151 .exec(&inner.db)
1152 .await
1153 {
1154 Ok(_) => Ok(()),
1155 Err(sea_orm::DbErr::RecordNotInserted) => {
1156 tracing::debug!("refresh job already exists for table_id={}", table_id);
1158 Ok(())
1159 }
1160 Err(e) => Err(e.into()),
1161 }
1162 }
1163
1164 pub async fn update_refresh_job_status(
1165 &self,
1166 table_id: TableId,
1167 status: RefreshState,
1168 trigger_time: Option<DateTime>,
1169 is_success: bool,
1170 ) -> MetaResult<()> {
1171 self.ensure_refresh_job(table_id).await?;
1172 let inner = self.inner.read().await;
1173
1174 assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1176 let active = refresh_job::ActiveModel {
1177 table_id: Set(table_id),
1178 current_status: Set(status),
1179 last_trigger_time: if trigger_time.is_some() {
1180 Set(trigger_time.map(datetime_to_timestamp_millis))
1181 } else {
1182 NotSet
1183 },
1184 last_success_time: if is_success {
1185 Set(Some(chrono::Utc::now().timestamp_millis()))
1186 } else {
1187 NotSet
1188 },
1189 ..Default::default()
1190 };
1191 RefreshJob::update(active).exec(&inner.db).await?;
1192 Ok(())
1193 }
1194
1195 pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1196 let inner = self.inner.read().await;
1197 RefreshJob::update_many()
1198 .col_expr(
1199 refresh_job::Column::CurrentStatus,
1200 Expr::value(RefreshState::Idle),
1201 )
1202 .exec(&inner.db)
1203 .await?;
1204 Ok(())
1205 }
1206
1207 pub async fn update_refresh_job_interval(
1208 &self,
1209 table_id: TableId,
1210 trigger_interval_secs: Option<i64>,
1211 ) -> MetaResult<()> {
1212 self.ensure_refresh_job(table_id).await?;
1213 let inner = self.inner.read().await;
1214 let active = refresh_job::ActiveModel {
1215 table_id: Set(table_id),
1216 trigger_interval_secs: Set(trigger_interval_secs),
1217 ..Default::default()
1218 };
1219 RefreshJob::update(active).exec(&inner.db).await?;
1220 Ok(())
1221 }
1222}