risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::AlterDatabaseParam;
16use sea_orm::DatabaseTransaction;
17
18use super::*;
19
20impl CatalogController {
21    async fn alter_database_name(
22        &self,
23        database_id: DatabaseId,
24        name: &str,
25    ) -> MetaResult<NotificationVersion> {
26        let inner = self.inner.write().await;
27        let txn = inner.db.begin().await?;
28        check_database_name_duplicate(name, &txn).await?;
29
30        let active_model = database::ActiveModel {
31            database_id: Set(database_id),
32            name: Set(name.to_owned()),
33            ..Default::default()
34        };
35        let database = active_model.update(&txn).await?;
36
37        let obj = Object::find_by_id(database_id)
38            .one(&txn)
39            .await?
40            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
41
42        txn.commit().await?;
43
44        let version = self
45            .notify_frontend(
46                NotificationOperation::Update,
47                NotificationInfo::Database(ObjectModel(database, obj).into()),
48            )
49            .await;
50        Ok(version)
51    }
52
53    async fn alter_schema_name(
54        &self,
55        schema_id: SchemaId,
56        name: &str,
57    ) -> MetaResult<NotificationVersion> {
58        let inner = self.inner.write().await;
59        let txn = inner.db.begin().await?;
60
61        let obj = Object::find_by_id(schema_id)
62            .one(&txn)
63            .await?
64            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
65        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
66
67        let active_model = schema::ActiveModel {
68            schema_id: Set(schema_id),
69            name: Set(name.to_owned()),
70        };
71        let schema = active_model.update(&txn).await?;
72
73        txn.commit().await?;
74
75        let version = self
76            .notify_frontend(
77                NotificationOperation::Update,
78                NotificationInfo::Schema(ObjectModel(schema, obj).into()),
79            )
80            .await;
81        Ok(version)
82    }
83
84    pub async fn alter_name(
85        &self,
86        object_type: ObjectType,
87        object_id: ObjectId,
88        object_name: &str,
89    ) -> MetaResult<NotificationVersion> {
90        if object_type == ObjectType::Database {
91            return self.alter_database_name(object_id as _, object_name).await;
92        } else if object_type == ObjectType::Schema {
93            return self.alter_schema_name(object_id as _, object_name).await;
94        }
95
96        let inner = self.inner.write().await;
97        let txn = inner.db.begin().await?;
98        let obj: PartialObject = Object::find_by_id(object_id)
99            .into_partial_model()
100            .one(&txn)
101            .await?
102            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
103        assert_eq!(obj.obj_type, object_type);
104        check_relation_name_duplicate(
105            object_name,
106            obj.database_id.unwrap(),
107            obj.schema_id.unwrap(),
108            &txn,
109        )
110        .await?;
111
112        // rename relation.
113        let (mut to_update_relations, old_name) =
114            rename_relation(&txn, object_type, object_id, object_name).await?;
115        // rename referring relation name.
116        to_update_relations.extend(
117            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
118        );
119
120        txn.commit().await?;
121
122        let version = self
123            .notify_frontend(
124                NotificationOperation::Update,
125                NotificationInfo::ObjectGroup(PbObjectGroup {
126                    objects: to_update_relations,
127                }),
128            )
129            .await;
130
131        Ok(version)
132    }
133
134    pub async fn alter_swap_rename(
135        &self,
136        object_type: ObjectType,
137        object_id: ObjectId,
138        dst_object_id: ObjectId,
139    ) -> MetaResult<NotificationVersion> {
140        let inner = self.inner.write().await;
141        let txn = inner.db.begin().await?;
142        let dst_name: String = match object_type {
143            ObjectType::Table => Table::find_by_id(dst_object_id)
144                .select_only()
145                .column(table::Column::Name)
146                .into_tuple()
147                .one(&txn)
148                .await?
149                .ok_or_else(|| {
150                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
151                })?,
152            ObjectType::Source => Source::find_by_id(dst_object_id)
153                .select_only()
154                .column(source::Column::Name)
155                .into_tuple()
156                .one(&txn)
157                .await?
158                .ok_or_else(|| {
159                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
160                })?,
161            ObjectType::Sink => Sink::find_by_id(dst_object_id)
162                .select_only()
163                .column(sink::Column::Name)
164                .into_tuple()
165                .one(&txn)
166                .await?
167                .ok_or_else(|| {
168                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
169                })?,
170            ObjectType::View => View::find_by_id(dst_object_id)
171                .select_only()
172                .column(view::Column::Name)
173                .into_tuple()
174                .one(&txn)
175                .await?
176                .ok_or_else(|| {
177                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
178                })?,
179            ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
180                .select_only()
181                .column(subscription::Column::Name)
182                .into_tuple()
183                .one(&txn)
184                .await?
185                .ok_or_else(|| {
186                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
187                })?,
188            _ => {
189                return Err(MetaError::permission_denied(format!(
190                    "swap rename not supported for object type: {:?}",
191                    object_type
192                )));
193            }
194        };
195
196        // rename relations.
197        let (mut to_update_relations, src_name) =
198            rename_relation(&txn, object_type, object_id, &dst_name).await?;
199        let (to_update_relations2, _) =
200            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
201        to_update_relations.extend(to_update_relations2);
202        // rename referring relation name.
203        to_update_relations.extend(
204            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
205        );
206        to_update_relations.extend(
207            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
208        );
209
210        txn.commit().await?;
211
212        let version = self
213            .notify_frontend(
214                NotificationOperation::Update,
215                NotificationInfo::ObjectGroup(PbObjectGroup {
216                    objects: to_update_relations,
217                }),
218            )
219            .await;
220
221        Ok(version)
222    }
223
224    pub async fn alter_non_shared_source(
225        &self,
226        pb_source: PbSource,
227    ) -> MetaResult<NotificationVersion> {
228        let source_id = pb_source.id as SourceId;
229        let inner = self.inner.write().await;
230        let txn = inner.db.begin().await?;
231
232        let original_version: i64 = Source::find_by_id(source_id)
233            .select_only()
234            .column(source::Column::Version)
235            .into_tuple()
236            .one(&txn)
237            .await?
238            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
239        if original_version + 1 != pb_source.version as i64 {
240            return Err(MetaError::permission_denied(
241                "source version is stale".to_owned(),
242            ));
243        }
244
245        let source: source::ActiveModel = pb_source.clone().into();
246        source.update(&txn).await?;
247        txn.commit().await?;
248
249        let version = self
250            .notify_frontend_relation_info(
251                NotificationOperation::Update,
252                PbObjectInfo::Source(pb_source),
253            )
254            .await;
255        Ok(version)
256    }
257
258    pub async fn alter_owner(
259        &self,
260        object_type: ObjectType,
261        object_id: ObjectId,
262        new_owner: UserId,
263    ) -> MetaResult<NotificationVersion> {
264        let inner = self.inner.write().await;
265        let txn = inner.db.begin().await?;
266        ensure_user_id(new_owner, &txn).await?;
267
268        let obj = Object::find_by_id(object_id)
269            .one(&txn)
270            .await?
271            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
272        if obj.owner_id == new_owner {
273            return Ok(IGNORED_NOTIFICATION_VERSION);
274        }
275        let mut obj = obj.into_active_model();
276        obj.owner_id = Set(new_owner);
277        let obj = obj.update(&txn).await?;
278
279        let mut objects = vec![];
280        match object_type {
281            ObjectType::Database => {
282                let db = Database::find_by_id(object_id)
283                    .one(&txn)
284                    .await?
285                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
286
287                txn.commit().await?;
288
289                let version = self
290                    .notify_frontend(
291                        NotificationOperation::Update,
292                        NotificationInfo::Database(ObjectModel(db, obj).into()),
293                    )
294                    .await;
295                return Ok(version);
296            }
297            ObjectType::Schema => {
298                let schema = Schema::find_by_id(object_id)
299                    .one(&txn)
300                    .await?
301                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
302
303                txn.commit().await?;
304
305                let version = self
306                    .notify_frontend(
307                        NotificationOperation::Update,
308                        NotificationInfo::Schema(ObjectModel(schema, obj).into()),
309                    )
310                    .await;
311                return Ok(version);
312            }
313            ObjectType::Table => {
314                let table = Table::find_by_id(object_id)
315                    .one(&txn)
316                    .await?
317                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
318
319                // associated source.
320                if let Some(associated_source_id) = table.optional_associated_source_id {
321                    let src_obj = object::ActiveModel {
322                        oid: Set(associated_source_id as _),
323                        owner_id: Set(new_owner),
324                        ..Default::default()
325                    }
326                    .update(&txn)
327                    .await?;
328                    let source = Source::find_by_id(associated_source_id)
329                        .one(&txn)
330                        .await?
331                        .ok_or_else(|| {
332                            MetaError::catalog_id_not_found("source", associated_source_id)
333                        })?;
334                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
335                }
336
337                // indexes.
338                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
339                    .select_only()
340                    .columns([index::Column::IndexId, index::Column::IndexTableId])
341                    .filter(index::Column::PrimaryTableId.eq(object_id))
342                    .into_tuple::<(IndexId, TableId)>()
343                    .all(&txn)
344                    .await?
345                    .into_iter()
346                    .unzip();
347                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
348
349                // internal tables.
350                let internal_tables: Vec<TableId> = Table::find()
351                    .select_only()
352                    .column(table::Column::TableId)
353                    .filter(
354                        table::Column::BelongsToJobId
355                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
356                    )
357                    .into_tuple()
358                    .all(&txn)
359                    .await?;
360                table_ids.extend(internal_tables);
361
362                if !index_ids.is_empty() || !table_ids.is_empty() {
363                    Object::update_many()
364                        .col_expr(
365                            object::Column::OwnerId,
366                            SimpleExpr::Value(Value::Int(Some(new_owner))),
367                        )
368                        .filter(
369                            object::Column::Oid
370                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
371                        )
372                        .exec(&txn)
373                        .await?;
374                }
375
376                if !table_ids.is_empty() {
377                    let table_objs = Table::find()
378                        .find_also_related(Object)
379                        .filter(table::Column::TableId.is_in(table_ids))
380                        .all(&txn)
381                        .await?;
382                    for (table, table_obj) in table_objs {
383                        objects.push(PbObjectInfo::Table(
384                            ObjectModel(table, table_obj.unwrap()).into(),
385                        ));
386                    }
387                }
388                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
389                if !index_ids.is_empty() {
390                    let index_objs = Index::find()
391                        .find_also_related(Object)
392                        .filter(index::Column::IndexId.is_in(index_ids))
393                        .all(&txn)
394                        .await?;
395                    for (index, index_obj) in index_objs {
396                        objects.push(PbObjectInfo::Index(
397                            ObjectModel(index, index_obj.unwrap()).into(),
398                        ));
399                    }
400                }
401            }
402            ObjectType::Source => {
403                let source = Source::find_by_id(object_id)
404                    .one(&txn)
405                    .await?
406                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
407                let is_shared = source.is_shared();
408                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
409
410                // Note: For non-shared source, we don't update their state tables, which
411                // belongs to the MV.
412                if is_shared {
413                    update_internal_tables(
414                        &txn,
415                        object_id,
416                        object::Column::OwnerId,
417                        Value::Int(Some(new_owner)),
418                        &mut objects,
419                    )
420                    .await?;
421                }
422            }
423            ObjectType::Sink => {
424                let sink = Sink::find_by_id(object_id)
425                    .one(&txn)
426                    .await?
427                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
428                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
429
430                update_internal_tables(
431                    &txn,
432                    object_id,
433                    object::Column::OwnerId,
434                    Value::Int(Some(new_owner)),
435                    &mut objects,
436                )
437                .await?;
438            }
439            ObjectType::Subscription => {
440                let subscription = Subscription::find_by_id(object_id)
441                    .one(&txn)
442                    .await?
443                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
444                objects.push(PbObjectInfo::Subscription(
445                    ObjectModel(subscription, obj).into(),
446                ));
447            }
448            ObjectType::View => {
449                let view = View::find_by_id(object_id)
450                    .one(&txn)
451                    .await?
452                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
453                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
454            }
455            ObjectType::Connection => {
456                let connection = Connection::find_by_id(object_id)
457                    .one(&txn)
458                    .await?
459                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
460                objects.push(PbObjectInfo::Connection(
461                    ObjectModel(connection, obj).into(),
462                ));
463            }
464            _ => unreachable!("not supported object type: {:?}", object_type),
465        };
466
467        txn.commit().await?;
468
469        let version = self
470            .notify_frontend(
471                NotificationOperation::Update,
472                NotificationInfo::ObjectGroup(PbObjectGroup {
473                    objects: objects
474                        .into_iter()
475                        .map(|object| PbObject {
476                            object_info: Some(object),
477                        })
478                        .collect(),
479                }),
480            )
481            .await;
482        Ok(version)
483    }
484
485    pub async fn alter_schema(
486        &self,
487        object_type: ObjectType,
488        object_id: ObjectId,
489        new_schema: SchemaId,
490    ) -> MetaResult<NotificationVersion> {
491        let inner = self.inner.write().await;
492        let txn = inner.db.begin().await?;
493        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
494
495        let obj = Object::find_by_id(object_id)
496            .one(&txn)
497            .await?
498            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
499        if obj.schema_id == Some(new_schema) {
500            return Ok(IGNORED_NOTIFICATION_VERSION);
501        }
502        let database_id = obj.database_id.unwrap();
503
504        let mut objects = vec![];
505        match object_type {
506            ObjectType::Table => {
507                let table = Table::find_by_id(object_id)
508                    .one(&txn)
509                    .await?
510                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
511                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
512                let associated_src_id = table.optional_associated_source_id;
513
514                let mut obj = obj.into_active_model();
515                obj.schema_id = Set(Some(new_schema));
516                let obj = obj.update(&txn).await?;
517                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
518
519                // associated source.
520                if let Some(associated_source_id) = associated_src_id {
521                    let src_obj = object::ActiveModel {
522                        oid: Set(associated_source_id as _),
523                        schema_id: Set(Some(new_schema)),
524                        ..Default::default()
525                    }
526                    .update(&txn)
527                    .await?;
528                    let source = Source::find_by_id(associated_source_id)
529                        .one(&txn)
530                        .await?
531                        .ok_or_else(|| {
532                            MetaError::catalog_id_not_found("source", associated_source_id)
533                        })?;
534                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
535                }
536
537                // indexes.
538                let (index_ids, (index_names, mut table_ids)): (
539                    Vec<IndexId>,
540                    (Vec<String>, Vec<TableId>),
541                ) = Index::find()
542                    .select_only()
543                    .columns([
544                        index::Column::IndexId,
545                        index::Column::Name,
546                        index::Column::IndexTableId,
547                    ])
548                    .filter(index::Column::PrimaryTableId.eq(object_id))
549                    .into_tuple::<(IndexId, String, TableId)>()
550                    .all(&txn)
551                    .await?
552                    .into_iter()
553                    .map(|(id, name, t_id)| (id, (name, t_id)))
554                    .unzip();
555
556                // internal tables.
557                let internal_tables: Vec<TableId> = Table::find()
558                    .select_only()
559                    .column(table::Column::TableId)
560                    .filter(
561                        table::Column::BelongsToJobId
562                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
563                    )
564                    .into_tuple()
565                    .all(&txn)
566                    .await?;
567                table_ids.extend(internal_tables);
568
569                if !index_ids.is_empty() || !table_ids.is_empty() {
570                    for index_name in index_names {
571                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
572                            .await?;
573                    }
574
575                    Object::update_many()
576                        .col_expr(
577                            object::Column::SchemaId,
578                            SimpleExpr::Value(Value::Int(Some(new_schema))),
579                        )
580                        .filter(
581                            object::Column::Oid
582                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
583                        )
584                        .exec(&txn)
585                        .await?;
586                }
587
588                if !table_ids.is_empty() {
589                    let table_objs = Table::find()
590                        .find_also_related(Object)
591                        .filter(table::Column::TableId.is_in(table_ids))
592                        .all(&txn)
593                        .await?;
594                    for (table, table_obj) in table_objs {
595                        objects.push(PbObjectInfo::Table(
596                            ObjectModel(table, table_obj.unwrap()).into(),
597                        ));
598                    }
599                }
600                if !index_ids.is_empty() {
601                    let index_objs = Index::find()
602                        .find_also_related(Object)
603                        .filter(index::Column::IndexId.is_in(index_ids))
604                        .all(&txn)
605                        .await?;
606                    for (index, index_obj) in index_objs {
607                        objects.push(PbObjectInfo::Index(
608                            ObjectModel(index, index_obj.unwrap()).into(),
609                        ));
610                    }
611                }
612            }
613            ObjectType::Source => {
614                let source = Source::find_by_id(object_id)
615                    .one(&txn)
616                    .await?
617                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
618                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
619                let is_shared = source.is_shared();
620
621                let mut obj = obj.into_active_model();
622                obj.schema_id = Set(Some(new_schema));
623                let obj = obj.update(&txn).await?;
624                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
625
626                // Note: For non-shared source, we don't update their state tables, which
627                // belongs to the MV.
628                if is_shared {
629                    update_internal_tables(
630                        &txn,
631                        object_id,
632                        object::Column::SchemaId,
633                        Value::Int(Some(new_schema)),
634                        &mut objects,
635                    )
636                    .await?;
637                }
638            }
639            ObjectType::Sink => {
640                let sink = Sink::find_by_id(object_id)
641                    .one(&txn)
642                    .await?
643                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
644                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
645
646                let mut obj = obj.into_active_model();
647                obj.schema_id = Set(Some(new_schema));
648                let obj = obj.update(&txn).await?;
649                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
650
651                update_internal_tables(
652                    &txn,
653                    object_id,
654                    object::Column::SchemaId,
655                    Value::Int(Some(new_schema)),
656                    &mut objects,
657                )
658                .await?;
659            }
660            ObjectType::Subscription => {
661                let subscription = Subscription::find_by_id(object_id)
662                    .one(&txn)
663                    .await?
664                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
665                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
666                    .await?;
667
668                let mut obj = obj.into_active_model();
669                obj.schema_id = Set(Some(new_schema));
670                let obj = obj.update(&txn).await?;
671                objects.push(PbObjectInfo::Subscription(
672                    ObjectModel(subscription, obj).into(),
673                ));
674            }
675            ObjectType::View => {
676                let view = View::find_by_id(object_id)
677                    .one(&txn)
678                    .await?
679                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
680                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
681
682                let mut obj = obj.into_active_model();
683                obj.schema_id = Set(Some(new_schema));
684                let obj = obj.update(&txn).await?;
685                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
686            }
687            ObjectType::Function => {
688                let function = Function::find_by_id(object_id)
689                    .one(&txn)
690                    .await?
691                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
692
693                let mut pb_function: PbFunction = ObjectModel(function, obj).into();
694                pb_function.schema_id = new_schema as _;
695                check_function_signature_duplicate(&pb_function, &txn).await?;
696
697                object::ActiveModel {
698                    oid: Set(object_id),
699                    schema_id: Set(Some(new_schema)),
700                    ..Default::default()
701                }
702                .update(&txn)
703                .await?;
704
705                txn.commit().await?;
706                let version = self
707                    .notify_frontend(
708                        NotificationOperation::Update,
709                        NotificationInfo::Function(pb_function),
710                    )
711                    .await;
712                return Ok(version);
713            }
714            ObjectType::Connection => {
715                let connection = Connection::find_by_id(object_id)
716                    .one(&txn)
717                    .await?
718                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
719
720                let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
721                pb_connection.schema_id = new_schema as _;
722                check_connection_name_duplicate(&pb_connection, &txn).await?;
723
724                object::ActiveModel {
725                    oid: Set(object_id),
726                    schema_id: Set(Some(new_schema)),
727                    ..Default::default()
728                }
729                .update(&txn)
730                .await?;
731
732                txn.commit().await?;
733                let version = self
734                    .notify_frontend(
735                        NotificationOperation::Update,
736                        NotificationInfo::Connection(pb_connection),
737                    )
738                    .await;
739                return Ok(version);
740            }
741            _ => unreachable!("not supported object type: {:?}", object_type),
742        }
743
744        txn.commit().await?;
745        let version = self
746            .notify_frontend(
747                Operation::Update,
748                Info::ObjectGroup(PbObjectGroup {
749                    objects: objects
750                        .into_iter()
751                        .map(|relation_info| PbObject {
752                            object_info: Some(relation_info),
753                        })
754                        .collect_vec(),
755                }),
756            )
757            .await;
758        Ok(version)
759    }
760
761    pub async fn alter_secret(
762        &self,
763        pb_secret: PbSecret,
764        secret_plain_payload: Vec<u8>,
765    ) -> MetaResult<NotificationVersion> {
766        let inner = self.inner.write().await;
767        let owner_id = pb_secret.owner as _;
768        let txn = inner.db.begin().await?;
769        ensure_user_id(owner_id, &txn).await?;
770        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
771        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
772
773        ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
774        let secret: secret::ActiveModel = pb_secret.clone().into();
775        Secret::update(secret).exec(&txn).await?;
776
777        txn.commit().await?;
778
779        // Notify the compute and frontend node plain secret
780        let mut secret_plain = pb_secret;
781        secret_plain.value.clone_from(&secret_plain_payload);
782
783        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
784        self.env
785            .notification_manager()
786            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
787
788        let version = self
789            .notify_frontend(
790                NotificationOperation::Update,
791                NotificationInfo::Secret(secret_plain),
792            )
793            .await;
794
795        Ok(version)
796    }
797
798    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
799    pub async fn drop_table_associated_source(
800        txn: &DatabaseTransaction,
801        drop_table_connector_ctx: &DropTableConnectorContext,
802    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
803        let to_drop_source_objects: Vec<PartialObject> = Object::find()
804            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
805            .into_partial_model()
806            .all(txn)
807            .await?;
808        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
809            .select_only()
810            .filter(
811                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
812            )
813            .into_partial_model()
814            .all(txn)
815            .await?;
816        let to_drop_objects = to_drop_source_objects
817            .into_iter()
818            .chain(to_drop_internal_table_objs.into_iter())
819            .collect_vec();
820        // Find affect users with privileges on all this objects.
821        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
822            .select_only()
823            .distinct()
824            .column(user_privilege::Column::UserId)
825            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
826            .into_tuple()
827            .all(txn)
828            .await?;
829
830        tracing::debug!(
831            "drop_table_associated_source: to_drop_objects: {:?}",
832            to_drop_objects
833        );
834
835        // delete all in to_drop_objects.
836        let res = Object::delete_many()
837            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
838            .exec(txn)
839            .await?;
840        if res.rows_affected == 0 {
841            return Err(MetaError::catalog_id_not_found(
842                ObjectType::Source.as_str(),
843                drop_table_connector_ctx.to_remove_source_id,
844            ));
845        }
846        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
847
848        Ok((user_infos, to_drop_objects))
849    }
850
851    pub async fn alter_database_param(
852        &self,
853        database_id: DatabaseId,
854        param: AlterDatabaseParam,
855    ) -> MetaResult<NotificationVersion> {
856        let inner = self.inner.write().await;
857        let txn = inner.db.begin().await?;
858
859        let mut database = database::ActiveModel {
860            database_id: Set(database_id),
861            ..Default::default()
862        };
863        match param {
864            AlterDatabaseParam::BarrierIntervalMs(interval) => {
865                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
866            }
867            AlterDatabaseParam::CheckpointFrequency(frequency) => {
868                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
869            }
870        }
871        let database = database.update(&txn).await?;
872
873        let obj = Object::find_by_id(database_id)
874            .one(&txn)
875            .await?
876            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
877
878        txn.commit().await?;
879
880        let version = self
881            .notify_frontend(
882                NotificationOperation::Update,
883                NotificationInfo::Database(ObjectModel(database, obj).into()),
884            )
885            .await;
886        Ok(version)
887    }
888}