risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
17use risingwave_common::config::mutate::TomlTableMutateExt as _;
18use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
19use risingwave_common::id::JobId;
20use risingwave_common::system_param::{OverrideValidate, Validate};
21use risingwave_meta_model::refresh_job::{self, RefreshState};
22use sea_orm::ActiveValue::{NotSet, Set};
23use sea_orm::prelude::DateTime;
24use sea_orm::sea_query::Expr;
25use sea_orm::{ActiveModelTrait, DatabaseTransaction};
26
27use super::*;
28use crate::error::bail_invalid_parameter;
29
30impl CatalogController {
31    async fn alter_database_name(
32        &self,
33        database_id: DatabaseId,
34        name: &str,
35    ) -> MetaResult<NotificationVersion> {
36        let inner = self.inner.write().await;
37        let txn = inner.db.begin().await?;
38        check_database_name_duplicate(name, &txn).await?;
39
40        let active_model = database::ActiveModel {
41            database_id: Set(database_id),
42            name: Set(name.to_owned()),
43            ..Default::default()
44        };
45        let database = active_model.update(&txn).await?;
46
47        let obj = Object::find_by_id(database_id)
48            .one(&txn)
49            .await?
50            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
51
52        txn.commit().await?;
53
54        let version = self
55            .notify_frontend(
56                NotificationOperation::Update,
57                NotificationInfo::Database(ObjectModel(database, obj).into()),
58            )
59            .await;
60        Ok(version)
61    }
62
63    async fn alter_schema_name(
64        &self,
65        schema_id: SchemaId,
66        name: &str,
67    ) -> MetaResult<NotificationVersion> {
68        let inner = self.inner.write().await;
69        let txn = inner.db.begin().await?;
70
71        let obj = Object::find_by_id(schema_id)
72            .one(&txn)
73            .await?
74            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
75        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
76
77        let active_model = schema::ActiveModel {
78            schema_id: Set(schema_id),
79            name: Set(name.to_owned()),
80        };
81        let schema = active_model.update(&txn).await?;
82
83        txn.commit().await?;
84
85        let version = self
86            .notify_frontend(
87                NotificationOperation::Update,
88                NotificationInfo::Schema(ObjectModel(schema, obj).into()),
89            )
90            .await;
91        Ok(version)
92    }
93
94    pub async fn alter_name(
95        &self,
96        object_type: ObjectType,
97        object_id: impl Into<ObjectId>,
98        object_name: &str,
99    ) -> MetaResult<NotificationVersion> {
100        let object_id = object_id.into();
101        if object_type == ObjectType::Database {
102            return self
103                .alter_database_name(object_id.as_database_id(), object_name)
104                .await;
105        } else if object_type == ObjectType::Schema {
106            return self
107                .alter_schema_name(object_id.as_schema_id(), object_name)
108                .await;
109        }
110
111        let inner = self.inner.write().await;
112        let txn = inner.db.begin().await?;
113        let obj: PartialObject = Object::find_by_id(object_id)
114            .into_partial_model()
115            .one(&txn)
116            .await?
117            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
118        assert_eq!(obj.obj_type, object_type);
119        check_relation_name_duplicate(
120            object_name,
121            obj.database_id.unwrap(),
122            obj.schema_id.unwrap(),
123            &txn,
124        )
125        .await?;
126
127        // rename relation.
128        let (mut to_update_relations, old_name) =
129            rename_relation(&txn, object_type, object_id, object_name).await?;
130        // rename referring relation name.
131        to_update_relations.extend(
132            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
133        );
134
135        txn.commit().await?;
136
137        let version = self
138            .notify_frontend(
139                NotificationOperation::Update,
140                NotificationInfo::ObjectGroup(PbObjectGroup {
141                    objects: to_update_relations,
142                }),
143            )
144            .await;
145
146        Ok(version)
147    }
148
149    pub async fn alter_swap_rename(
150        &self,
151        object_type: ObjectType,
152        object_id: ObjectId,
153        dst_object_id: ObjectId,
154    ) -> MetaResult<NotificationVersion> {
155        let inner = self.inner.write().await;
156        let txn = inner.db.begin().await?;
157        let dst_name: String = match object_type {
158            ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
159                .select_only()
160                .column(table::Column::Name)
161                .into_tuple()
162                .one(&txn)
163                .await?
164                .ok_or_else(|| {
165                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
166                })?,
167            ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
168                .select_only()
169                .column(source::Column::Name)
170                .into_tuple()
171                .one(&txn)
172                .await?
173                .ok_or_else(|| {
174                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
175                })?,
176            ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
177                .select_only()
178                .column(sink::Column::Name)
179                .into_tuple()
180                .one(&txn)
181                .await?
182                .ok_or_else(|| {
183                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
184                })?,
185            ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
186                .select_only()
187                .column(view::Column::Name)
188                .into_tuple()
189                .one(&txn)
190                .await?
191                .ok_or_else(|| {
192                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
193                })?,
194            ObjectType::Subscription => {
195                Subscription::find_by_id(dst_object_id.as_subscription_id())
196                    .select_only()
197                    .column(subscription::Column::Name)
198                    .into_tuple()
199                    .one(&txn)
200                    .await?
201                    .ok_or_else(|| {
202                        MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
203                    })?
204            }
205            _ => {
206                return Err(MetaError::permission_denied(format!(
207                    "swap rename not supported for object type: {:?}",
208                    object_type
209                )));
210            }
211        };
212
213        // rename relations.
214        let (mut to_update_relations, src_name) =
215            rename_relation(&txn, object_type, object_id, &dst_name).await?;
216        let (to_update_relations2, _) =
217            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
218        to_update_relations.extend(to_update_relations2);
219        // rename referring relation name.
220        to_update_relations.extend(
221            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
222        );
223        to_update_relations.extend(
224            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
225        );
226
227        txn.commit().await?;
228
229        let version = self
230            .notify_frontend(
231                NotificationOperation::Update,
232                NotificationInfo::ObjectGroup(PbObjectGroup {
233                    objects: to_update_relations,
234                }),
235            )
236            .await;
237
238        Ok(version)
239    }
240
241    pub async fn alter_non_shared_source(
242        &self,
243        pb_source: PbSource,
244    ) -> MetaResult<NotificationVersion> {
245        let source_id: SourceId = pb_source.id;
246        let inner = self.inner.write().await;
247        let txn = inner.db.begin().await?;
248
249        let original_version: i64 = Source::find_by_id(source_id)
250            .select_only()
251            .column(source::Column::Version)
252            .into_tuple()
253            .one(&txn)
254            .await?
255            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
256        if original_version + 1 != pb_source.version as i64 {
257            return Err(MetaError::permission_denied(
258                "source version is stale".to_owned(),
259            ));
260        }
261
262        let source: source::ActiveModel = pb_source.clone().into();
263        source.update(&txn).await?;
264        txn.commit().await?;
265
266        let version = self
267            .notify_frontend_relation_info(
268                NotificationOperation::Update,
269                PbObjectInfo::Source(pb_source),
270            )
271            .await;
272        Ok(version)
273    }
274
275    pub async fn alter_owner(
276        &self,
277        object_type: ObjectType,
278        object_id: ObjectId,
279        new_owner: UserId,
280    ) -> MetaResult<NotificationVersion> {
281        let inner = self.inner.write().await;
282        let txn = inner.db.begin().await?;
283        ensure_user_id(new_owner, &txn).await?;
284
285        let obj = Object::find_by_id(object_id)
286            .one(&txn)
287            .await?
288            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
289        if obj.owner_id == new_owner {
290            return Ok(IGNORED_NOTIFICATION_VERSION);
291        }
292        let mut obj = obj.into_active_model();
293        obj.owner_id = Set(new_owner);
294        let obj = obj.update(&txn).await?;
295
296        let mut objects = vec![];
297        match object_type {
298            ObjectType::Database => {
299                let db = Database::find_by_id(object_id.as_database_id())
300                    .one(&txn)
301                    .await?
302                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
303
304                txn.commit().await?;
305
306                let version = self
307                    .notify_frontend(
308                        NotificationOperation::Update,
309                        NotificationInfo::Database(ObjectModel(db, obj).into()),
310                    )
311                    .await;
312                return Ok(version);
313            }
314            ObjectType::Schema => {
315                let schema = Schema::find_by_id(object_id.as_schema_id())
316                    .one(&txn)
317                    .await?
318                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
319
320                txn.commit().await?;
321
322                let version = self
323                    .notify_frontend(
324                        NotificationOperation::Update,
325                        NotificationInfo::Schema(ObjectModel(schema, obj).into()),
326                    )
327                    .await;
328                return Ok(version);
329            }
330            ObjectType::Table => {
331                let table = Table::find_by_id(object_id.as_table_id())
332                    .one(&txn)
333                    .await?
334                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
335
336                // associated source.
337                if let Some(associated_source_id) = table.optional_associated_source_id {
338                    let src_obj = object::ActiveModel {
339                        oid: Set(associated_source_id.as_object_id()),
340                        owner_id: Set(new_owner),
341                        ..Default::default()
342                    }
343                    .update(&txn)
344                    .await?;
345                    let source = Source::find_by_id(associated_source_id)
346                        .one(&txn)
347                        .await?
348                        .ok_or_else(|| {
349                            MetaError::catalog_id_not_found("source", associated_source_id)
350                        })?;
351                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
352                }
353
354                // associated sink and source for iceberg table.
355                if matches!(table.engine, Some(table::Engine::Iceberg)) {
356                    let iceberg_sink = Sink::find()
357                        .inner_join(Object)
358                        .select_only()
359                        .column(sink::Column::SinkId)
360                        .filter(
361                            object::Column::DatabaseId
362                                .eq(obj.database_id)
363                                .and(object::Column::SchemaId.eq(obj.schema_id))
364                                .and(
365                                    sink::Column::Name
366                                        .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
367                                ),
368                        )
369                        .into_tuple::<SinkId>()
370                        .one(&txn)
371                        .await?
372                        .expect("iceberg sink must exist");
373                    let sink_obj = object::ActiveModel {
374                        oid: Set(iceberg_sink.as_object_id()),
375                        owner_id: Set(new_owner),
376                        ..Default::default()
377                    }
378                    .update(&txn)
379                    .await?;
380                    let sink = Sink::find_by_id(iceberg_sink)
381                        .one(&txn)
382                        .await?
383                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
384                    objects.push(PbObjectInfo::Sink(ObjectModel(sink, sink_obj).into()));
385
386                    let iceberg_source = Source::find()
387                        .inner_join(Object)
388                        .select_only()
389                        .column(source::Column::SourceId)
390                        .filter(
391                            object::Column::DatabaseId
392                                .eq(obj.database_id)
393                                .and(object::Column::SchemaId.eq(obj.schema_id))
394                                .and(
395                                    source::Column::Name
396                                        .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
397                                ),
398                        )
399                        .into_tuple::<SourceId>()
400                        .one(&txn)
401                        .await?
402                        .expect("iceberg source must exist");
403                    let source_obj = object::ActiveModel {
404                        oid: Set(iceberg_source.as_object_id()),
405                        owner_id: Set(new_owner),
406                        ..Default::default()
407                    }
408                    .update(&txn)
409                    .await?;
410                    let source = Source::find_by_id(iceberg_source)
411                        .one(&txn)
412                        .await?
413                        .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
414                    objects.push(PbObjectInfo::Source(ObjectModel(source, source_obj).into()));
415                }
416
417                // indexes.
418                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
419                    .select_only()
420                    .columns([index::Column::IndexId, index::Column::IndexTableId])
421                    .filter(index::Column::PrimaryTableId.eq(object_id))
422                    .into_tuple::<(IndexId, TableId)>()
423                    .all(&txn)
424                    .await?
425                    .into_iter()
426                    .unzip();
427                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
428
429                // internal tables.
430                let internal_tables: Vec<TableId> = Table::find()
431                    .select_only()
432                    .column(table::Column::TableId)
433                    .filter(
434                        table::Column::BelongsToJobId.is_in(
435                            table_ids
436                                .iter()
437                                .cloned()
438                                .chain(std::iter::once(object_id.as_table_id())),
439                        ),
440                    )
441                    .into_tuple()
442                    .all(&txn)
443                    .await?;
444                table_ids.extend(internal_tables);
445
446                if !index_ids.is_empty() || !table_ids.is_empty() {
447                    Object::update_many()
448                        .col_expr(
449                            object::Column::OwnerId,
450                            SimpleExpr::Value(Value::Int(Some(new_owner))),
451                        )
452                        .filter(
453                            object::Column::Oid.is_in::<ObjectId, _>(
454                                index_ids
455                                    .iter()
456                                    .copied()
457                                    .map_into()
458                                    .chain(table_ids.iter().copied().map_into()),
459                            ),
460                        )
461                        .exec(&txn)
462                        .await?;
463                }
464
465                if !table_ids.is_empty() {
466                    let table_objs = Table::find()
467                        .find_also_related(Object)
468                        .filter(table::Column::TableId.is_in(table_ids))
469                        .all(&txn)
470                        .await?;
471                    for (table, table_obj) in table_objs {
472                        objects.push(PbObjectInfo::Table(
473                            ObjectModel(table, table_obj.unwrap()).into(),
474                        ));
475                    }
476                }
477                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
478                if !index_ids.is_empty() {
479                    let index_objs = Index::find()
480                        .find_also_related(Object)
481                        .filter(index::Column::IndexId.is_in(index_ids))
482                        .all(&txn)
483                        .await?;
484                    for (index, index_obj) in index_objs {
485                        objects.push(PbObjectInfo::Index(
486                            ObjectModel(index, index_obj.unwrap()).into(),
487                        ));
488                    }
489                }
490            }
491            ObjectType::Source => {
492                let source = Source::find_by_id(object_id.as_source_id())
493                    .one(&txn)
494                    .await?
495                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
496                let is_shared = source.is_shared();
497                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
498
499                // Note: For non-shared source, we don't update their state tables, which
500                // belongs to the MV.
501                if is_shared {
502                    update_internal_tables(
503                        &txn,
504                        object_id,
505                        object::Column::OwnerId,
506                        Value::Int(Some(new_owner)),
507                        &mut objects,
508                    )
509                    .await?;
510                }
511            }
512            ObjectType::Sink => {
513                let sink = Sink::find_by_id(object_id.as_sink_id())
514                    .one(&txn)
515                    .await?
516                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
517                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
518
519                update_internal_tables(
520                    &txn,
521                    object_id,
522                    object::Column::OwnerId,
523                    Value::Int(Some(new_owner)),
524                    &mut objects,
525                )
526                .await?;
527            }
528            ObjectType::Subscription => {
529                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
530                    .one(&txn)
531                    .await?
532                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
533                objects.push(PbObjectInfo::Subscription(
534                    ObjectModel(subscription, obj).into(),
535                ));
536            }
537            ObjectType::View => {
538                let view = View::find_by_id(object_id.as_view_id())
539                    .one(&txn)
540                    .await?
541                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
542                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
543            }
544            ObjectType::Connection => {
545                let connection = Connection::find_by_id(object_id.as_connection_id())
546                    .one(&txn)
547                    .await?
548                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
549                objects.push(PbObjectInfo::Connection(
550                    ObjectModel(connection, obj).into(),
551                ));
552            }
553            _ => unreachable!("not supported object type: {:?}", object_type),
554        };
555
556        txn.commit().await?;
557
558        let version = self
559            .notify_frontend(
560                NotificationOperation::Update,
561                NotificationInfo::ObjectGroup(PbObjectGroup {
562                    objects: objects
563                        .into_iter()
564                        .map(|object| PbObject {
565                            object_info: Some(object),
566                        })
567                        .collect(),
568                }),
569            )
570            .await;
571        Ok(version)
572    }
573
574    pub async fn alter_schema(
575        &self,
576        object_type: ObjectType,
577        object_id: ObjectId,
578        new_schema: SchemaId,
579    ) -> MetaResult<NotificationVersion> {
580        let inner = self.inner.write().await;
581        let txn = inner.db.begin().await?;
582        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
583
584        let obj = Object::find_by_id(object_id)
585            .one(&txn)
586            .await?
587            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
588        if obj.schema_id == Some(new_schema) {
589            return Ok(IGNORED_NOTIFICATION_VERSION);
590        }
591        let database_id = obj.database_id.unwrap();
592
593        let mut objects = vec![];
594        match object_type {
595            ObjectType::Table => {
596                let table = Table::find_by_id(object_id.as_table_id())
597                    .one(&txn)
598                    .await?
599                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
600                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
601                let associated_src_id = table.optional_associated_source_id;
602
603                let mut obj = obj.into_active_model();
604                obj.schema_id = Set(Some(new_schema));
605                let obj = obj.update(&txn).await?;
606                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
607
608                // associated source.
609                if let Some(associated_source_id) = associated_src_id {
610                    let src_obj = object::ActiveModel {
611                        oid: Set(associated_source_id.as_object_id()),
612                        schema_id: Set(Some(new_schema)),
613                        ..Default::default()
614                    }
615                    .update(&txn)
616                    .await?;
617                    let source = Source::find_by_id(associated_source_id)
618                        .one(&txn)
619                        .await?
620                        .ok_or_else(|| {
621                            MetaError::catalog_id_not_found("source", associated_source_id)
622                        })?;
623                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
624                }
625
626                // indexes.
627                let (index_ids, (index_names, mut table_ids)): (
628                    Vec<IndexId>,
629                    (Vec<String>, Vec<TableId>),
630                ) = Index::find()
631                    .select_only()
632                    .columns([
633                        index::Column::IndexId,
634                        index::Column::Name,
635                        index::Column::IndexTableId,
636                    ])
637                    .filter(index::Column::PrimaryTableId.eq(object_id))
638                    .into_tuple::<(IndexId, String, TableId)>()
639                    .all(&txn)
640                    .await?
641                    .into_iter()
642                    .map(|(id, name, t_id)| (id, (name, t_id)))
643                    .unzip();
644
645                // internal tables.
646                let internal_tables: Vec<TableId> = Table::find()
647                    .select_only()
648                    .column(table::Column::TableId)
649                    .filter(
650                        table::Column::BelongsToJobId.is_in(
651                            table_ids
652                                .iter()
653                                .map(|table_id| table_id.as_job_id())
654                                .chain(std::iter::once(object_id.as_job_id())),
655                        ),
656                    )
657                    .into_tuple()
658                    .all(&txn)
659                    .await?;
660                table_ids.extend(internal_tables);
661
662                if !index_ids.is_empty() || !table_ids.is_empty() {
663                    for index_name in index_names {
664                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
665                            .await?;
666                    }
667
668                    Object::update_many()
669                        .col_expr(object::Column::SchemaId, new_schema.into())
670                        .filter(
671                            object::Column::Oid.is_in::<ObjectId, _>(
672                                index_ids
673                                    .iter()
674                                    .copied()
675                                    .map_into()
676                                    .chain(table_ids.iter().copied().map_into()),
677                            ),
678                        )
679                        .exec(&txn)
680                        .await?;
681                }
682
683                if !table_ids.is_empty() {
684                    let table_objs = Table::find()
685                        .find_also_related(Object)
686                        .filter(table::Column::TableId.is_in(table_ids))
687                        .all(&txn)
688                        .await?;
689                    for (table, table_obj) in table_objs {
690                        objects.push(PbObjectInfo::Table(
691                            ObjectModel(table, table_obj.unwrap()).into(),
692                        ));
693                    }
694                }
695                if !index_ids.is_empty() {
696                    let index_objs = Index::find()
697                        .find_also_related(Object)
698                        .filter(index::Column::IndexId.is_in(index_ids))
699                        .all(&txn)
700                        .await?;
701                    for (index, index_obj) in index_objs {
702                        objects.push(PbObjectInfo::Index(
703                            ObjectModel(index, index_obj.unwrap()).into(),
704                        ));
705                    }
706                }
707            }
708            ObjectType::Source => {
709                let source = Source::find_by_id(object_id.as_source_id())
710                    .one(&txn)
711                    .await?
712                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
713                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
714                let is_shared = source.is_shared();
715
716                let mut obj = obj.into_active_model();
717                obj.schema_id = Set(Some(new_schema));
718                let obj = obj.update(&txn).await?;
719                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
720
721                // Note: For non-shared source, we don't update their state tables, which
722                // belongs to the MV.
723                if is_shared {
724                    update_internal_tables(
725                        &txn,
726                        object_id,
727                        object::Column::SchemaId,
728                        new_schema.into(),
729                        &mut objects,
730                    )
731                    .await?;
732                }
733            }
734            ObjectType::Sink => {
735                let sink = Sink::find_by_id(object_id.as_sink_id())
736                    .one(&txn)
737                    .await?
738                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
739                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
740
741                let mut obj = obj.into_active_model();
742                obj.schema_id = Set(Some(new_schema));
743                let obj = obj.update(&txn).await?;
744                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
745
746                update_internal_tables(
747                    &txn,
748                    object_id,
749                    object::Column::SchemaId,
750                    new_schema.into(),
751                    &mut objects,
752                )
753                .await?;
754            }
755            ObjectType::Subscription => {
756                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
757                    .one(&txn)
758                    .await?
759                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
760                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
761                    .await?;
762
763                let mut obj = obj.into_active_model();
764                obj.schema_id = Set(Some(new_schema));
765                let obj = obj.update(&txn).await?;
766                objects.push(PbObjectInfo::Subscription(
767                    ObjectModel(subscription, obj).into(),
768                ));
769            }
770            ObjectType::View => {
771                let view = View::find_by_id(object_id.as_view_id())
772                    .one(&txn)
773                    .await?
774                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
775                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
776
777                let mut obj = obj.into_active_model();
778                obj.schema_id = Set(Some(new_schema));
779                let obj = obj.update(&txn).await?;
780                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
781            }
782            ObjectType::Function => {
783                let function = Function::find_by_id(object_id.as_function_id())
784                    .one(&txn)
785                    .await?
786                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
787
788                let mut pb_function: PbFunction = ObjectModel(function, obj).into();
789                pb_function.schema_id = new_schema;
790                check_function_signature_duplicate(&pb_function, &txn).await?;
791
792                object::ActiveModel {
793                    oid: Set(object_id),
794                    schema_id: Set(Some(new_schema)),
795                    ..Default::default()
796                }
797                .update(&txn)
798                .await?;
799
800                txn.commit().await?;
801                let version = self
802                    .notify_frontend(
803                        NotificationOperation::Update,
804                        NotificationInfo::Function(pb_function),
805                    )
806                    .await;
807                return Ok(version);
808            }
809            ObjectType::Connection => {
810                let connection = Connection::find_by_id(object_id.as_connection_id())
811                    .one(&txn)
812                    .await?
813                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
814
815                let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
816                pb_connection.schema_id = new_schema;
817                check_connection_name_duplicate(&pb_connection, &txn).await?;
818
819                object::ActiveModel {
820                    oid: Set(object_id),
821                    schema_id: Set(Some(new_schema)),
822                    ..Default::default()
823                }
824                .update(&txn)
825                .await?;
826
827                txn.commit().await?;
828                let version = self
829                    .notify_frontend(
830                        NotificationOperation::Update,
831                        NotificationInfo::Connection(pb_connection),
832                    )
833                    .await;
834                return Ok(version);
835            }
836            _ => unreachable!("not supported object type: {:?}", object_type),
837        }
838
839        txn.commit().await?;
840        let version = self
841            .notify_frontend(
842                Operation::Update,
843                Info::ObjectGroup(PbObjectGroup {
844                    objects: objects
845                        .into_iter()
846                        .map(|relation_info| PbObject {
847                            object_info: Some(relation_info),
848                        })
849                        .collect_vec(),
850                }),
851            )
852            .await;
853        Ok(version)
854    }
855
856    pub async fn alter_secret(
857        &self,
858        pb_secret: PbSecret,
859        secret_plain_payload: Vec<u8>,
860    ) -> MetaResult<NotificationVersion> {
861        let inner = self.inner.write().await;
862        let owner_id = pb_secret.owner as _;
863        let txn = inner.db.begin().await?;
864        ensure_user_id(owner_id, &txn).await?;
865        ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
866        ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
867
868        ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
869        let secret: secret::ActiveModel = pb_secret.clone().into();
870        Secret::update(secret).exec(&txn).await?;
871
872        txn.commit().await?;
873
874        // Notify the compute and frontend node plain secret
875        let mut secret_plain = pb_secret;
876        secret_plain.value.clone_from(&secret_plain_payload);
877
878        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
879        self.env
880            .notification_manager()
881            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
882
883        let version = self
884            .notify_frontend(
885                NotificationOperation::Update,
886                NotificationInfo::Secret(secret_plain),
887            )
888            .await;
889
890        Ok(version)
891    }
892
893    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
894    pub async fn drop_table_associated_source(
895        txn: &DatabaseTransaction,
896        drop_table_connector_ctx: &DropTableConnectorContext,
897    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
898        let to_drop_source_objects: Vec<PartialObject> = Object::find()
899            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
900            .into_partial_model()
901            .all(txn)
902            .await?;
903        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
904            .select_only()
905            .filter(
906                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
907            )
908            .into_partial_model()
909            .all(txn)
910            .await?;
911        let to_drop_objects = to_drop_source_objects
912            .into_iter()
913            .chain(to_drop_internal_table_objs.into_iter())
914            .collect_vec();
915        // Find affect users with privileges on all this objects.
916        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
917            .select_only()
918            .distinct()
919            .column(user_privilege::Column::UserId)
920            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
921            .into_tuple()
922            .all(txn)
923            .await?;
924
925        tracing::debug!(
926            "drop_table_associated_source: to_drop_objects: {:?}",
927            to_drop_objects
928        );
929
930        // delete all in to_drop_objects.
931        let res = Object::delete_many()
932            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
933            .exec(txn)
934            .await?;
935        if res.rows_affected == 0 {
936            return Err(MetaError::catalog_id_not_found(
937                ObjectType::Source.as_str(),
938                drop_table_connector_ctx.to_remove_source_id,
939            ));
940        }
941        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
942
943        Ok((user_infos, to_drop_objects))
944    }
945
946    pub async fn alter_database_param(
947        &self,
948        database_id: DatabaseId,
949        param: AlterDatabaseParam,
950    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
951        let inner = self.inner.write().await;
952        let txn = inner.db.begin().await?;
953
954        let mut database = database::ActiveModel {
955            database_id: Set(database_id),
956            ..Default::default()
957        };
958        match param {
959            AlterDatabaseParam::BarrierIntervalMs(interval) => {
960                if let Some(ref interval) = interval {
961                    OverrideValidate::barrier_interval_ms(interval)
962                        .map_err(|e| anyhow::anyhow!(e))?;
963                }
964                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
965            }
966            AlterDatabaseParam::CheckpointFrequency(frequency) => {
967                if let Some(ref frequency) = frequency {
968                    OverrideValidate::checkpoint_frequency(frequency)
969                        .map_err(|e| anyhow::anyhow!(e))?;
970                }
971                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
972            }
973        }
974        let database = database.update(&txn).await?;
975
976        let obj = Object::find_by_id(database_id)
977            .one(&txn)
978            .await?
979            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
980
981        txn.commit().await?;
982
983        let version = self
984            .notify_frontend(
985                NotificationOperation::Update,
986                NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
987            )
988            .await;
989        Ok((version, database))
990    }
991
992    pub async fn alter_streaming_job_config(
993        &self,
994        job_id: JobId,
995        entries_to_add: HashMap<String, String>,
996        keys_to_remove: Vec<String>,
997    ) -> MetaResult<NotificationVersion> {
998        let inner = self.inner.write().await;
999        let txn = inner.db.begin().await?;
1000
1001        let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1002            .select_only()
1003            .column(streaming_job::Column::ConfigOverride)
1004            .into_tuple()
1005            .one(&txn)
1006            .await?
1007            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1008        let config_override = config_override.unwrap_or_default();
1009
1010        let mut table: toml::Table =
1011            toml::from_str(&config_override).context("invalid streaming job config")?;
1012
1013        // The frontend guarantees that there's no duplicated keys in `to_add` and `to_remove`.
1014        for (key, value) in entries_to_add {
1015            let value: toml::Value = value
1016                .parse()
1017                .with_context(|| format!("invalid config value for path {key}"))?;
1018            table
1019                .upsert(&key, value)
1020                .with_context(|| format!("failed to set config path {key}"))?;
1021        }
1022        for key in keys_to_remove {
1023            table
1024                .delete(&key)
1025                .with_context(|| format!("failed to reset config path {key}"))?;
1026        }
1027
1028        let updated_config_override = table.to_string();
1029
1030        // Validate the config override by trying to merge it to the default config.
1031        {
1032            let merged = merge_streaming_config_section(
1033                &StreamingConfig::default(),
1034                &updated_config_override,
1035            )
1036            .context("invalid streaming job config override")?;
1037
1038            // Reject unrecognized entries.
1039            // Note: If these unrecognized entries are pre-existing, we also reject them here.
1040            // Users are able to fix them by issuing a `RESET` first.
1041            if let Some(merged) = merged {
1042                let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1043                if !unrecognized_keys.is_empty() {
1044                    bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1045                }
1046            }
1047        }
1048
1049        streaming_job::ActiveModel {
1050            job_id: Set(job_id),
1051            config_override: Set(Some(updated_config_override)),
1052            ..Default::default()
1053        }
1054        .update(&txn)
1055        .await?;
1056
1057        txn.commit().await?;
1058
1059        Ok(IGNORED_NOTIFICATION_VERSION)
1060    }
1061
1062    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1063        let inner = self.inner.read().await;
1064        let active = refresh_job::ActiveModel {
1065            table_id: Set(table_id),
1066            last_trigger_time: Set(None),
1067            trigger_interval_secs: Set(None),
1068            current_status: Set(RefreshState::Idle),
1069            last_success_time: Set(None),
1070        };
1071        match RefreshJob::insert(active)
1072            .on_conflict_do_nothing()
1073            .exec(&inner.db)
1074            .await
1075        {
1076            Ok(_) => Ok(()),
1077            Err(sea_orm::DbErr::RecordNotInserted) => {
1078                // This is expected when the refresh job already exists due to ON CONFLICT DO NOTHING
1079                tracing::debug!("refresh job already exists for table_id={}", table_id);
1080                Ok(())
1081            }
1082            Err(e) => Err(e.into()),
1083        }
1084    }
1085
1086    pub async fn update_refresh_job_status(
1087        &self,
1088        table_id: TableId,
1089        status: RefreshState,
1090        trigger_time: Option<DateTime>,
1091        is_success: bool,
1092    ) -> MetaResult<()> {
1093        self.ensure_refresh_job(table_id).await?;
1094        let inner = self.inner.read().await;
1095
1096        // expect only update trigger_time when the status changes to Refreshing
1097        assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1098        let active = refresh_job::ActiveModel {
1099            table_id: Set(table_id),
1100            current_status: Set(status),
1101            last_trigger_time: if trigger_time.is_some() {
1102                Set(trigger_time.map(|t| t.and_utc().timestamp_millis()))
1103            } else {
1104                NotSet
1105            },
1106            last_success_time: if is_success {
1107                Set(Some(chrono::Utc::now().timestamp_millis()))
1108            } else {
1109                NotSet
1110            },
1111            ..Default::default()
1112        };
1113        active.update(&inner.db).await?;
1114        Ok(())
1115    }
1116
1117    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1118        let inner = self.inner.read().await;
1119        RefreshJob::update_many()
1120            .col_expr(
1121                refresh_job::Column::CurrentStatus,
1122                Expr::value(RefreshState::Idle),
1123            )
1124            .exec(&inner.db)
1125            .await?;
1126        Ok(())
1127    }
1128
1129    pub async fn update_refresh_job_interval(
1130        &self,
1131        table_id: TableId,
1132        trigger_interval_secs: Option<i64>,
1133    ) -> MetaResult<()> {
1134        self.ensure_refresh_job(table_id).await?;
1135        let inner = self.inner.read().await;
1136        let active = refresh_job::ActiveModel {
1137            table_id: Set(table_id),
1138            trigger_interval_secs: Set(trigger_interval_secs),
1139            ..Default::default()
1140        };
1141        active.update(&inner.db).await?;
1142        Ok(())
1143    }
1144}