risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::AlterDatabaseParam;
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use sea_orm::DatabaseTransaction;
18
19use super::*;
20
21impl CatalogController {
22    async fn alter_database_name(
23        &self,
24        database_id: DatabaseId,
25        name: &str,
26    ) -> MetaResult<NotificationVersion> {
27        let inner = self.inner.write().await;
28        let txn = inner.db.begin().await?;
29        check_database_name_duplicate(name, &txn).await?;
30
31        let active_model = database::ActiveModel {
32            database_id: Set(database_id),
33            name: Set(name.to_owned()),
34            ..Default::default()
35        };
36        let database = active_model.update(&txn).await?;
37
38        let obj = Object::find_by_id(database_id)
39            .one(&txn)
40            .await?
41            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
42
43        txn.commit().await?;
44
45        let version = self
46            .notify_frontend(
47                NotificationOperation::Update,
48                NotificationInfo::Database(ObjectModel(database, obj).into()),
49            )
50            .await;
51        Ok(version)
52    }
53
54    async fn alter_schema_name(
55        &self,
56        schema_id: SchemaId,
57        name: &str,
58    ) -> MetaResult<NotificationVersion> {
59        let inner = self.inner.write().await;
60        let txn = inner.db.begin().await?;
61
62        let obj = Object::find_by_id(schema_id)
63            .one(&txn)
64            .await?
65            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
66        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
67
68        let active_model = schema::ActiveModel {
69            schema_id: Set(schema_id),
70            name: Set(name.to_owned()),
71        };
72        let schema = active_model.update(&txn).await?;
73
74        txn.commit().await?;
75
76        let version = self
77            .notify_frontend(
78                NotificationOperation::Update,
79                NotificationInfo::Schema(ObjectModel(schema, obj).into()),
80            )
81            .await;
82        Ok(version)
83    }
84
85    pub async fn alter_name(
86        &self,
87        object_type: ObjectType,
88        object_id: ObjectId,
89        object_name: &str,
90    ) -> MetaResult<NotificationVersion> {
91        if object_type == ObjectType::Database {
92            return self.alter_database_name(object_id as _, object_name).await;
93        } else if object_type == ObjectType::Schema {
94            return self.alter_schema_name(object_id as _, object_name).await;
95        }
96
97        let inner = self.inner.write().await;
98        let txn = inner.db.begin().await?;
99        let obj: PartialObject = Object::find_by_id(object_id)
100            .into_partial_model()
101            .one(&txn)
102            .await?
103            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
104        assert_eq!(obj.obj_type, object_type);
105        check_relation_name_duplicate(
106            object_name,
107            obj.database_id.unwrap(),
108            obj.schema_id.unwrap(),
109            &txn,
110        )
111        .await?;
112
113        // rename relation.
114        let (mut to_update_relations, old_name) =
115            rename_relation(&txn, object_type, object_id, object_name).await?;
116        // rename referring relation name.
117        to_update_relations.extend(
118            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
119        );
120
121        txn.commit().await?;
122
123        let version = self
124            .notify_frontend(
125                NotificationOperation::Update,
126                NotificationInfo::ObjectGroup(PbObjectGroup {
127                    objects: to_update_relations,
128                }),
129            )
130            .await;
131
132        Ok(version)
133    }
134
135    pub async fn alter_swap_rename(
136        &self,
137        object_type: ObjectType,
138        object_id: ObjectId,
139        dst_object_id: ObjectId,
140    ) -> MetaResult<NotificationVersion> {
141        let inner = self.inner.write().await;
142        let txn = inner.db.begin().await?;
143        let dst_name: String = match object_type {
144            ObjectType::Table => Table::find_by_id(dst_object_id)
145                .select_only()
146                .column(table::Column::Name)
147                .into_tuple()
148                .one(&txn)
149                .await?
150                .ok_or_else(|| {
151                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
152                })?,
153            ObjectType::Source => Source::find_by_id(dst_object_id)
154                .select_only()
155                .column(source::Column::Name)
156                .into_tuple()
157                .one(&txn)
158                .await?
159                .ok_or_else(|| {
160                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
161                })?,
162            ObjectType::Sink => Sink::find_by_id(dst_object_id)
163                .select_only()
164                .column(sink::Column::Name)
165                .into_tuple()
166                .one(&txn)
167                .await?
168                .ok_or_else(|| {
169                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
170                })?,
171            ObjectType::View => View::find_by_id(dst_object_id)
172                .select_only()
173                .column(view::Column::Name)
174                .into_tuple()
175                .one(&txn)
176                .await?
177                .ok_or_else(|| {
178                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
179                })?,
180            ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
181                .select_only()
182                .column(subscription::Column::Name)
183                .into_tuple()
184                .one(&txn)
185                .await?
186                .ok_or_else(|| {
187                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
188                })?,
189            _ => {
190                return Err(MetaError::permission_denied(format!(
191                    "swap rename not supported for object type: {:?}",
192                    object_type
193                )));
194            }
195        };
196
197        // rename relations.
198        let (mut to_update_relations, src_name) =
199            rename_relation(&txn, object_type, object_id, &dst_name).await?;
200        let (to_update_relations2, _) =
201            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
202        to_update_relations.extend(to_update_relations2);
203        // rename referring relation name.
204        to_update_relations.extend(
205            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
206        );
207        to_update_relations.extend(
208            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
209        );
210
211        txn.commit().await?;
212
213        let version = self
214            .notify_frontend(
215                NotificationOperation::Update,
216                NotificationInfo::ObjectGroup(PbObjectGroup {
217                    objects: to_update_relations,
218                }),
219            )
220            .await;
221
222        Ok(version)
223    }
224
225    pub async fn alter_non_shared_source(
226        &self,
227        pb_source: PbSource,
228    ) -> MetaResult<NotificationVersion> {
229        let source_id = pb_source.id as SourceId;
230        let inner = self.inner.write().await;
231        let txn = inner.db.begin().await?;
232
233        let original_version: i64 = Source::find_by_id(source_id)
234            .select_only()
235            .column(source::Column::Version)
236            .into_tuple()
237            .one(&txn)
238            .await?
239            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
240        if original_version + 1 != pb_source.version as i64 {
241            return Err(MetaError::permission_denied(
242                "source version is stale".to_owned(),
243            ));
244        }
245
246        let source: source::ActiveModel = pb_source.clone().into();
247        source.update(&txn).await?;
248        txn.commit().await?;
249
250        let version = self
251            .notify_frontend_relation_info(
252                NotificationOperation::Update,
253                PbObjectInfo::Source(pb_source),
254            )
255            .await;
256        Ok(version)
257    }
258
259    pub async fn alter_owner(
260        &self,
261        object_type: ObjectType,
262        object_id: ObjectId,
263        new_owner: UserId,
264    ) -> MetaResult<NotificationVersion> {
265        let inner = self.inner.write().await;
266        let txn = inner.db.begin().await?;
267        ensure_user_id(new_owner, &txn).await?;
268
269        let obj = Object::find_by_id(object_id)
270            .one(&txn)
271            .await?
272            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
273        if obj.owner_id == new_owner {
274            return Ok(IGNORED_NOTIFICATION_VERSION);
275        }
276        let mut obj = obj.into_active_model();
277        obj.owner_id = Set(new_owner);
278        let obj = obj.update(&txn).await?;
279
280        let mut objects = vec![];
281        match object_type {
282            ObjectType::Database => {
283                let db = Database::find_by_id(object_id)
284                    .one(&txn)
285                    .await?
286                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
287
288                txn.commit().await?;
289
290                let version = self
291                    .notify_frontend(
292                        NotificationOperation::Update,
293                        NotificationInfo::Database(ObjectModel(db, obj).into()),
294                    )
295                    .await;
296                return Ok(version);
297            }
298            ObjectType::Schema => {
299                let schema = Schema::find_by_id(object_id)
300                    .one(&txn)
301                    .await?
302                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
303
304                txn.commit().await?;
305
306                let version = self
307                    .notify_frontend(
308                        NotificationOperation::Update,
309                        NotificationInfo::Schema(ObjectModel(schema, obj).into()),
310                    )
311                    .await;
312                return Ok(version);
313            }
314            ObjectType::Table => {
315                let table = Table::find_by_id(object_id)
316                    .one(&txn)
317                    .await?
318                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
319
320                // associated source.
321                if let Some(associated_source_id) = table.optional_associated_source_id {
322                    let src_obj = object::ActiveModel {
323                        oid: Set(associated_source_id as _),
324                        owner_id: Set(new_owner),
325                        ..Default::default()
326                    }
327                    .update(&txn)
328                    .await?;
329                    let source = Source::find_by_id(associated_source_id)
330                        .one(&txn)
331                        .await?
332                        .ok_or_else(|| {
333                            MetaError::catalog_id_not_found("source", associated_source_id)
334                        })?;
335                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
336                }
337
338                // indexes.
339                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
340                    .select_only()
341                    .columns([index::Column::IndexId, index::Column::IndexTableId])
342                    .filter(index::Column::PrimaryTableId.eq(object_id))
343                    .into_tuple::<(IndexId, TableId)>()
344                    .all(&txn)
345                    .await?
346                    .into_iter()
347                    .unzip();
348                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
349
350                // internal tables.
351                let internal_tables: Vec<TableId> = Table::find()
352                    .select_only()
353                    .column(table::Column::TableId)
354                    .filter(
355                        table::Column::BelongsToJobId
356                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
357                    )
358                    .into_tuple()
359                    .all(&txn)
360                    .await?;
361                table_ids.extend(internal_tables);
362
363                if !index_ids.is_empty() || !table_ids.is_empty() {
364                    Object::update_many()
365                        .col_expr(
366                            object::Column::OwnerId,
367                            SimpleExpr::Value(Value::Int(Some(new_owner))),
368                        )
369                        .filter(
370                            object::Column::Oid
371                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
372                        )
373                        .exec(&txn)
374                        .await?;
375                }
376
377                if !table_ids.is_empty() {
378                    let table_objs = Table::find()
379                        .find_also_related(Object)
380                        .filter(table::Column::TableId.is_in(table_ids))
381                        .all(&txn)
382                        .await?;
383                    for (table, table_obj) in table_objs {
384                        objects.push(PbObjectInfo::Table(
385                            ObjectModel(table, table_obj.unwrap()).into(),
386                        ));
387                    }
388                }
389                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
390                if !index_ids.is_empty() {
391                    let index_objs = Index::find()
392                        .find_also_related(Object)
393                        .filter(index::Column::IndexId.is_in(index_ids))
394                        .all(&txn)
395                        .await?;
396                    for (index, index_obj) in index_objs {
397                        objects.push(PbObjectInfo::Index(
398                            ObjectModel(index, index_obj.unwrap()).into(),
399                        ));
400                    }
401                }
402            }
403            ObjectType::Source => {
404                let source = Source::find_by_id(object_id)
405                    .one(&txn)
406                    .await?
407                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
408                let is_shared = source.is_shared();
409                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
410
411                // Note: For non-shared source, we don't update their state tables, which
412                // belongs to the MV.
413                if is_shared {
414                    update_internal_tables(
415                        &txn,
416                        object_id,
417                        object::Column::OwnerId,
418                        Value::Int(Some(new_owner)),
419                        &mut objects,
420                    )
421                    .await?;
422                }
423            }
424            ObjectType::Sink => {
425                let sink = Sink::find_by_id(object_id)
426                    .one(&txn)
427                    .await?
428                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
429                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
430
431                update_internal_tables(
432                    &txn,
433                    object_id,
434                    object::Column::OwnerId,
435                    Value::Int(Some(new_owner)),
436                    &mut objects,
437                )
438                .await?;
439            }
440            ObjectType::Subscription => {
441                let subscription = Subscription::find_by_id(object_id)
442                    .one(&txn)
443                    .await?
444                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
445                objects.push(PbObjectInfo::Subscription(
446                    ObjectModel(subscription, obj).into(),
447                ));
448            }
449            ObjectType::View => {
450                let view = View::find_by_id(object_id)
451                    .one(&txn)
452                    .await?
453                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
454                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
455            }
456            ObjectType::Connection => {
457                let connection = Connection::find_by_id(object_id)
458                    .one(&txn)
459                    .await?
460                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
461                objects.push(PbObjectInfo::Connection(
462                    ObjectModel(connection, obj).into(),
463                ));
464            }
465            _ => unreachable!("not supported object type: {:?}", object_type),
466        };
467
468        txn.commit().await?;
469
470        let version = self
471            .notify_frontend(
472                NotificationOperation::Update,
473                NotificationInfo::ObjectGroup(PbObjectGroup {
474                    objects: objects
475                        .into_iter()
476                        .map(|object| PbObject {
477                            object_info: Some(object),
478                        })
479                        .collect(),
480                }),
481            )
482            .await;
483        Ok(version)
484    }
485
486    pub async fn alter_schema(
487        &self,
488        object_type: ObjectType,
489        object_id: ObjectId,
490        new_schema: SchemaId,
491    ) -> MetaResult<NotificationVersion> {
492        let inner = self.inner.write().await;
493        let txn = inner.db.begin().await?;
494        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
495
496        let obj = Object::find_by_id(object_id)
497            .one(&txn)
498            .await?
499            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
500        if obj.schema_id == Some(new_schema) {
501            return Ok(IGNORED_NOTIFICATION_VERSION);
502        }
503        let database_id = obj.database_id.unwrap();
504
505        let mut objects = vec![];
506        match object_type {
507            ObjectType::Table => {
508                let table = Table::find_by_id(object_id)
509                    .one(&txn)
510                    .await?
511                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
512                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
513                let associated_src_id = table.optional_associated_source_id;
514
515                let mut obj = obj.into_active_model();
516                obj.schema_id = Set(Some(new_schema));
517                let obj = obj.update(&txn).await?;
518                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
519
520                // associated source.
521                if let Some(associated_source_id) = associated_src_id {
522                    let src_obj = object::ActiveModel {
523                        oid: Set(associated_source_id as _),
524                        schema_id: Set(Some(new_schema)),
525                        ..Default::default()
526                    }
527                    .update(&txn)
528                    .await?;
529                    let source = Source::find_by_id(associated_source_id)
530                        .one(&txn)
531                        .await?
532                        .ok_or_else(|| {
533                            MetaError::catalog_id_not_found("source", associated_source_id)
534                        })?;
535                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
536                }
537
538                // indexes.
539                let (index_ids, (index_names, mut table_ids)): (
540                    Vec<IndexId>,
541                    (Vec<String>, Vec<TableId>),
542                ) = Index::find()
543                    .select_only()
544                    .columns([
545                        index::Column::IndexId,
546                        index::Column::Name,
547                        index::Column::IndexTableId,
548                    ])
549                    .filter(index::Column::PrimaryTableId.eq(object_id))
550                    .into_tuple::<(IndexId, String, TableId)>()
551                    .all(&txn)
552                    .await?
553                    .into_iter()
554                    .map(|(id, name, t_id)| (id, (name, t_id)))
555                    .unzip();
556
557                // internal tables.
558                let internal_tables: Vec<TableId> = Table::find()
559                    .select_only()
560                    .column(table::Column::TableId)
561                    .filter(
562                        table::Column::BelongsToJobId
563                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
564                    )
565                    .into_tuple()
566                    .all(&txn)
567                    .await?;
568                table_ids.extend(internal_tables);
569
570                if !index_ids.is_empty() || !table_ids.is_empty() {
571                    for index_name in index_names {
572                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
573                            .await?;
574                    }
575
576                    Object::update_many()
577                        .col_expr(
578                            object::Column::SchemaId,
579                            SimpleExpr::Value(Value::Int(Some(new_schema))),
580                        )
581                        .filter(
582                            object::Column::Oid
583                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
584                        )
585                        .exec(&txn)
586                        .await?;
587                }
588
589                if !table_ids.is_empty() {
590                    let table_objs = Table::find()
591                        .find_also_related(Object)
592                        .filter(table::Column::TableId.is_in(table_ids))
593                        .all(&txn)
594                        .await?;
595                    for (table, table_obj) in table_objs {
596                        objects.push(PbObjectInfo::Table(
597                            ObjectModel(table, table_obj.unwrap()).into(),
598                        ));
599                    }
600                }
601                if !index_ids.is_empty() {
602                    let index_objs = Index::find()
603                        .find_also_related(Object)
604                        .filter(index::Column::IndexId.is_in(index_ids))
605                        .all(&txn)
606                        .await?;
607                    for (index, index_obj) in index_objs {
608                        objects.push(PbObjectInfo::Index(
609                            ObjectModel(index, index_obj.unwrap()).into(),
610                        ));
611                    }
612                }
613            }
614            ObjectType::Source => {
615                let source = Source::find_by_id(object_id)
616                    .one(&txn)
617                    .await?
618                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
619                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
620                let is_shared = source.is_shared();
621
622                let mut obj = obj.into_active_model();
623                obj.schema_id = Set(Some(new_schema));
624                let obj = obj.update(&txn).await?;
625                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
626
627                // Note: For non-shared source, we don't update their state tables, which
628                // belongs to the MV.
629                if is_shared {
630                    update_internal_tables(
631                        &txn,
632                        object_id,
633                        object::Column::SchemaId,
634                        Value::Int(Some(new_schema)),
635                        &mut objects,
636                    )
637                    .await?;
638                }
639            }
640            ObjectType::Sink => {
641                let sink = Sink::find_by_id(object_id)
642                    .one(&txn)
643                    .await?
644                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
645                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
646
647                let mut obj = obj.into_active_model();
648                obj.schema_id = Set(Some(new_schema));
649                let obj = obj.update(&txn).await?;
650                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
651
652                update_internal_tables(
653                    &txn,
654                    object_id,
655                    object::Column::SchemaId,
656                    Value::Int(Some(new_schema)),
657                    &mut objects,
658                )
659                .await?;
660            }
661            ObjectType::Subscription => {
662                let subscription = Subscription::find_by_id(object_id)
663                    .one(&txn)
664                    .await?
665                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
666                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
667                    .await?;
668
669                let mut obj = obj.into_active_model();
670                obj.schema_id = Set(Some(new_schema));
671                let obj = obj.update(&txn).await?;
672                objects.push(PbObjectInfo::Subscription(
673                    ObjectModel(subscription, obj).into(),
674                ));
675            }
676            ObjectType::View => {
677                let view = View::find_by_id(object_id)
678                    .one(&txn)
679                    .await?
680                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
681                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
682
683                let mut obj = obj.into_active_model();
684                obj.schema_id = Set(Some(new_schema));
685                let obj = obj.update(&txn).await?;
686                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
687            }
688            ObjectType::Function => {
689                let function = Function::find_by_id(object_id)
690                    .one(&txn)
691                    .await?
692                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
693
694                let mut pb_function: PbFunction = ObjectModel(function, obj).into();
695                pb_function.schema_id = new_schema as _;
696                check_function_signature_duplicate(&pb_function, &txn).await?;
697
698                object::ActiveModel {
699                    oid: Set(object_id),
700                    schema_id: Set(Some(new_schema)),
701                    ..Default::default()
702                }
703                .update(&txn)
704                .await?;
705
706                txn.commit().await?;
707                let version = self
708                    .notify_frontend(
709                        NotificationOperation::Update,
710                        NotificationInfo::Function(pb_function),
711                    )
712                    .await;
713                return Ok(version);
714            }
715            ObjectType::Connection => {
716                let connection = Connection::find_by_id(object_id)
717                    .one(&txn)
718                    .await?
719                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
720
721                let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
722                pb_connection.schema_id = new_schema as _;
723                check_connection_name_duplicate(&pb_connection, &txn).await?;
724
725                object::ActiveModel {
726                    oid: Set(object_id),
727                    schema_id: Set(Some(new_schema)),
728                    ..Default::default()
729                }
730                .update(&txn)
731                .await?;
732
733                txn.commit().await?;
734                let version = self
735                    .notify_frontend(
736                        NotificationOperation::Update,
737                        NotificationInfo::Connection(pb_connection),
738                    )
739                    .await;
740                return Ok(version);
741            }
742            _ => unreachable!("not supported object type: {:?}", object_type),
743        }
744
745        txn.commit().await?;
746        let version = self
747            .notify_frontend(
748                Operation::Update,
749                Info::ObjectGroup(PbObjectGroup {
750                    objects: objects
751                        .into_iter()
752                        .map(|relation_info| PbObject {
753                            object_info: Some(relation_info),
754                        })
755                        .collect_vec(),
756                }),
757            )
758            .await;
759        Ok(version)
760    }
761
762    pub async fn alter_secret(
763        &self,
764        pb_secret: PbSecret,
765        secret_plain_payload: Vec<u8>,
766    ) -> MetaResult<NotificationVersion> {
767        let inner = self.inner.write().await;
768        let owner_id = pb_secret.owner as _;
769        let txn = inner.db.begin().await?;
770        ensure_user_id(owner_id, &txn).await?;
771        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
772        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
773
774        ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
775        let secret: secret::ActiveModel = pb_secret.clone().into();
776        Secret::update(secret).exec(&txn).await?;
777
778        txn.commit().await?;
779
780        // Notify the compute and frontend node plain secret
781        let mut secret_plain = pb_secret;
782        secret_plain.value.clone_from(&secret_plain_payload);
783
784        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
785        self.env
786            .notification_manager()
787            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
788
789        let version = self
790            .notify_frontend(
791                NotificationOperation::Update,
792                NotificationInfo::Secret(secret_plain),
793            )
794            .await;
795
796        Ok(version)
797    }
798
799    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
800    pub async fn drop_table_associated_source(
801        txn: &DatabaseTransaction,
802        drop_table_connector_ctx: &DropTableConnectorContext,
803    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
804        let to_drop_source_objects: Vec<PartialObject> = Object::find()
805            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
806            .into_partial_model()
807            .all(txn)
808            .await?;
809        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
810            .select_only()
811            .filter(
812                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
813            )
814            .into_partial_model()
815            .all(txn)
816            .await?;
817        let to_drop_objects = to_drop_source_objects
818            .into_iter()
819            .chain(to_drop_internal_table_objs.into_iter())
820            .collect_vec();
821        // Find affect users with privileges on all this objects.
822        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
823            .select_only()
824            .distinct()
825            .column(user_privilege::Column::UserId)
826            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
827            .into_tuple()
828            .all(txn)
829            .await?;
830
831        tracing::debug!(
832            "drop_table_associated_source: to_drop_objects: {:?}",
833            to_drop_objects
834        );
835
836        // delete all in to_drop_objects.
837        let res = Object::delete_many()
838            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
839            .exec(txn)
840            .await?;
841        if res.rows_affected == 0 {
842            return Err(MetaError::catalog_id_not_found(
843                ObjectType::Source.as_str(),
844                drop_table_connector_ctx.to_remove_source_id,
845            ));
846        }
847        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
848
849        Ok((user_infos, to_drop_objects))
850    }
851
852    pub async fn alter_database_param(
853        &self,
854        database_id: DatabaseId,
855        param: AlterDatabaseParam,
856    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
857        let inner = self.inner.write().await;
858        let txn = inner.db.begin().await?;
859
860        let mut database = database::ActiveModel {
861            database_id: Set(database_id),
862            ..Default::default()
863        };
864        match param {
865            AlterDatabaseParam::BarrierIntervalMs(interval) => {
866                if let Some(ref interval) = interval {
867                    OverrideValidate::barrier_interval_ms(interval)
868                        .map_err(|e| anyhow::anyhow!(e))?;
869                }
870                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
871            }
872            AlterDatabaseParam::CheckpointFrequency(frequency) => {
873                if let Some(ref frequency) = frequency {
874                    OverrideValidate::checkpoint_frequency(frequency)
875                        .map_err(|e| anyhow::anyhow!(e))?;
876                }
877                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
878            }
879        }
880        let database = database.update(&txn).await?;
881
882        let obj = Object::find_by_id(database_id)
883            .one(&txn)
884            .await?
885            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
886
887        txn.commit().await?;
888
889        let version = self
890            .notify_frontend(
891                NotificationOperation::Update,
892                NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
893            )
894            .await;
895        Ok((version, database))
896    }
897}