risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use sea_orm::DatabaseTransaction;
16
17use super::*;
18
19impl CatalogController {
20    async fn alter_database_name(
21        &self,
22        database_id: DatabaseId,
23        name: &str,
24    ) -> MetaResult<NotificationVersion> {
25        let inner = self.inner.write().await;
26        let txn = inner.db.begin().await?;
27        check_database_name_duplicate(name, &txn).await?;
28
29        let active_model = database::ActiveModel {
30            database_id: Set(database_id),
31            name: Set(name.to_owned()),
32            ..Default::default()
33        };
34        let database = active_model.update(&txn).await?;
35
36        let obj = Object::find_by_id(database_id)
37            .one(&txn)
38            .await?
39            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
40
41        txn.commit().await?;
42
43        let version = self
44            .notify_frontend(
45                NotificationOperation::Update,
46                NotificationInfo::Database(ObjectModel(database, obj).into()),
47            )
48            .await;
49        Ok(version)
50    }
51
52    async fn alter_schema_name(
53        &self,
54        schema_id: SchemaId,
55        name: &str,
56    ) -> MetaResult<NotificationVersion> {
57        let inner = self.inner.write().await;
58        let txn = inner.db.begin().await?;
59
60        let obj = Object::find_by_id(schema_id)
61            .one(&txn)
62            .await?
63            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
64        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
65
66        let active_model = schema::ActiveModel {
67            schema_id: Set(schema_id),
68            name: Set(name.to_owned()),
69        };
70        let schema = active_model.update(&txn).await?;
71
72        txn.commit().await?;
73
74        let version = self
75            .notify_frontend(
76                NotificationOperation::Update,
77                NotificationInfo::Schema(ObjectModel(schema, obj).into()),
78            )
79            .await;
80        Ok(version)
81    }
82
83    pub async fn alter_name(
84        &self,
85        object_type: ObjectType,
86        object_id: ObjectId,
87        object_name: &str,
88    ) -> MetaResult<NotificationVersion> {
89        if object_type == ObjectType::Database {
90            return self.alter_database_name(object_id as _, object_name).await;
91        } else if object_type == ObjectType::Schema {
92            return self.alter_schema_name(object_id as _, object_name).await;
93        }
94
95        let inner = self.inner.write().await;
96        let txn = inner.db.begin().await?;
97        let obj: PartialObject = Object::find_by_id(object_id)
98            .into_partial_model()
99            .one(&txn)
100            .await?
101            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
102        assert_eq!(obj.obj_type, object_type);
103        check_relation_name_duplicate(
104            object_name,
105            obj.database_id.unwrap(),
106            obj.schema_id.unwrap(),
107            &txn,
108        )
109        .await?;
110
111        // rename relation.
112        let (mut to_update_relations, old_name) =
113            rename_relation(&txn, object_type, object_id, object_name).await?;
114        // rename referring relation name.
115        to_update_relations.extend(
116            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
117        );
118
119        txn.commit().await?;
120
121        let version = self
122            .notify_frontend(
123                NotificationOperation::Update,
124                NotificationInfo::ObjectGroup(PbObjectGroup {
125                    objects: to_update_relations,
126                }),
127            )
128            .await;
129
130        Ok(version)
131    }
132
133    pub async fn alter_swap_rename(
134        &self,
135        object_type: ObjectType,
136        object_id: ObjectId,
137        dst_object_id: ObjectId,
138    ) -> MetaResult<NotificationVersion> {
139        let inner = self.inner.write().await;
140        let txn = inner.db.begin().await?;
141        let dst_name: String = match object_type {
142            ObjectType::Table => Table::find_by_id(dst_object_id)
143                .select_only()
144                .column(table::Column::Name)
145                .into_tuple()
146                .one(&txn)
147                .await?
148                .ok_or_else(|| {
149                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
150                })?,
151            ObjectType::Source => Source::find_by_id(dst_object_id)
152                .select_only()
153                .column(source::Column::Name)
154                .into_tuple()
155                .one(&txn)
156                .await?
157                .ok_or_else(|| {
158                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
159                })?,
160            ObjectType::Sink => Sink::find_by_id(dst_object_id)
161                .select_only()
162                .column(sink::Column::Name)
163                .into_tuple()
164                .one(&txn)
165                .await?
166                .ok_or_else(|| {
167                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
168                })?,
169            ObjectType::View => View::find_by_id(dst_object_id)
170                .select_only()
171                .column(view::Column::Name)
172                .into_tuple()
173                .one(&txn)
174                .await?
175                .ok_or_else(|| {
176                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
177                })?,
178            ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
179                .select_only()
180                .column(subscription::Column::Name)
181                .into_tuple()
182                .one(&txn)
183                .await?
184                .ok_or_else(|| {
185                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
186                })?,
187            _ => {
188                return Err(MetaError::permission_denied(format!(
189                    "swap rename not supported for object type: {:?}",
190                    object_type
191                )));
192            }
193        };
194
195        // rename relations.
196        let (mut to_update_relations, src_name) =
197            rename_relation(&txn, object_type, object_id, &dst_name).await?;
198        let (to_update_relations2, _) =
199            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
200        to_update_relations.extend(to_update_relations2);
201        // rename referring relation name.
202        to_update_relations.extend(
203            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
204        );
205        to_update_relations.extend(
206            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
207        );
208
209        txn.commit().await?;
210
211        let version = self
212            .notify_frontend(
213                NotificationOperation::Update,
214                NotificationInfo::ObjectGroup(PbObjectGroup {
215                    objects: to_update_relations,
216                }),
217            )
218            .await;
219
220        Ok(version)
221    }
222
223    pub async fn alter_non_shared_source(
224        &self,
225        pb_source: PbSource,
226    ) -> MetaResult<NotificationVersion> {
227        let source_id = pb_source.id as SourceId;
228        let inner = self.inner.write().await;
229        let txn = inner.db.begin().await?;
230
231        let original_version: i64 = Source::find_by_id(source_id)
232            .select_only()
233            .column(source::Column::Version)
234            .into_tuple()
235            .one(&txn)
236            .await?
237            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
238        if original_version + 1 != pb_source.version as i64 {
239            return Err(MetaError::permission_denied(
240                "source version is stale".to_owned(),
241            ));
242        }
243
244        let source: source::ActiveModel = pb_source.clone().into();
245        source.update(&txn).await?;
246        txn.commit().await?;
247
248        let version = self
249            .notify_frontend_relation_info(
250                NotificationOperation::Update,
251                PbObjectInfo::Source(pb_source),
252            )
253            .await;
254        Ok(version)
255    }
256
257    pub async fn alter_owner(
258        &self,
259        object_type: ObjectType,
260        object_id: ObjectId,
261        new_owner: UserId,
262    ) -> MetaResult<NotificationVersion> {
263        let inner = self.inner.write().await;
264        let txn = inner.db.begin().await?;
265        ensure_user_id(new_owner, &txn).await?;
266
267        let obj = Object::find_by_id(object_id)
268            .one(&txn)
269            .await?
270            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
271        if obj.owner_id == new_owner {
272            return Ok(IGNORED_NOTIFICATION_VERSION);
273        }
274        let mut obj = obj.into_active_model();
275        obj.owner_id = Set(new_owner);
276        let obj = obj.update(&txn).await?;
277
278        let mut objects = vec![];
279        match object_type {
280            ObjectType::Database => {
281                let db = Database::find_by_id(object_id)
282                    .one(&txn)
283                    .await?
284                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
285
286                txn.commit().await?;
287
288                let version = self
289                    .notify_frontend(
290                        NotificationOperation::Update,
291                        NotificationInfo::Database(ObjectModel(db, obj).into()),
292                    )
293                    .await;
294                return Ok(version);
295            }
296            ObjectType::Schema => {
297                let schema = Schema::find_by_id(object_id)
298                    .one(&txn)
299                    .await?
300                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
301
302                txn.commit().await?;
303
304                let version = self
305                    .notify_frontend(
306                        NotificationOperation::Update,
307                        NotificationInfo::Schema(ObjectModel(schema, obj).into()),
308                    )
309                    .await;
310                return Ok(version);
311            }
312            ObjectType::Table => {
313                let table = Table::find_by_id(object_id)
314                    .one(&txn)
315                    .await?
316                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
317
318                // associated source.
319                if let Some(associated_source_id) = table.optional_associated_source_id {
320                    let src_obj = object::ActiveModel {
321                        oid: Set(associated_source_id as _),
322                        owner_id: Set(new_owner),
323                        ..Default::default()
324                    }
325                    .update(&txn)
326                    .await?;
327                    let source = Source::find_by_id(associated_source_id)
328                        .one(&txn)
329                        .await?
330                        .ok_or_else(|| {
331                            MetaError::catalog_id_not_found("source", associated_source_id)
332                        })?;
333                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
334                }
335
336                // indexes.
337                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
338                    .select_only()
339                    .columns([index::Column::IndexId, index::Column::IndexTableId])
340                    .filter(index::Column::PrimaryTableId.eq(object_id))
341                    .into_tuple::<(IndexId, TableId)>()
342                    .all(&txn)
343                    .await?
344                    .into_iter()
345                    .unzip();
346                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
347
348                // internal tables.
349                let internal_tables: Vec<TableId> = Table::find()
350                    .select_only()
351                    .column(table::Column::TableId)
352                    .filter(
353                        table::Column::BelongsToJobId
354                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
355                    )
356                    .into_tuple()
357                    .all(&txn)
358                    .await?;
359                table_ids.extend(internal_tables);
360
361                if !index_ids.is_empty() || !table_ids.is_empty() {
362                    Object::update_many()
363                        .col_expr(
364                            object::Column::OwnerId,
365                            SimpleExpr::Value(Value::Int(Some(new_owner))),
366                        )
367                        .filter(
368                            object::Column::Oid
369                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
370                        )
371                        .exec(&txn)
372                        .await?;
373                }
374
375                if !table_ids.is_empty() {
376                    let table_objs = Table::find()
377                        .find_also_related(Object)
378                        .filter(table::Column::TableId.is_in(table_ids))
379                        .all(&txn)
380                        .await?;
381                    for (table, table_obj) in table_objs {
382                        objects.push(PbObjectInfo::Table(
383                            ObjectModel(table, table_obj.unwrap()).into(),
384                        ));
385                    }
386                }
387                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
388                if !index_ids.is_empty() {
389                    let index_objs = Index::find()
390                        .find_also_related(Object)
391                        .filter(index::Column::IndexId.is_in(index_ids))
392                        .all(&txn)
393                        .await?;
394                    for (index, index_obj) in index_objs {
395                        objects.push(PbObjectInfo::Index(
396                            ObjectModel(index, index_obj.unwrap()).into(),
397                        ));
398                    }
399                }
400            }
401            ObjectType::Source => {
402                let source = Source::find_by_id(object_id)
403                    .one(&txn)
404                    .await?
405                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
406                let is_shared = source.is_shared();
407                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
408
409                // Note: For non-shared source, we don't update their state tables, which
410                // belongs to the MV.
411                if is_shared {
412                    update_internal_tables(
413                        &txn,
414                        object_id,
415                        object::Column::OwnerId,
416                        Value::Int(Some(new_owner)),
417                        &mut objects,
418                    )
419                    .await?;
420                }
421            }
422            ObjectType::Sink => {
423                let sink = Sink::find_by_id(object_id)
424                    .one(&txn)
425                    .await?
426                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
427                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
428
429                update_internal_tables(
430                    &txn,
431                    object_id,
432                    object::Column::OwnerId,
433                    Value::Int(Some(new_owner)),
434                    &mut objects,
435                )
436                .await?;
437            }
438            ObjectType::Subscription => {
439                let subscription = Subscription::find_by_id(object_id)
440                    .one(&txn)
441                    .await?
442                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
443                objects.push(PbObjectInfo::Subscription(
444                    ObjectModel(subscription, obj).into(),
445                ));
446            }
447            ObjectType::View => {
448                let view = View::find_by_id(object_id)
449                    .one(&txn)
450                    .await?
451                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
452                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
453            }
454            ObjectType::Connection => {
455                let connection = Connection::find_by_id(object_id)
456                    .one(&txn)
457                    .await?
458                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
459                objects.push(PbObjectInfo::Connection(
460                    ObjectModel(connection, obj).into(),
461                ));
462            }
463            _ => unreachable!("not supported object type: {:?}", object_type),
464        };
465
466        txn.commit().await?;
467
468        let version = self
469            .notify_frontend(
470                NotificationOperation::Update,
471                NotificationInfo::ObjectGroup(PbObjectGroup {
472                    objects: objects
473                        .into_iter()
474                        .map(|object| PbObject {
475                            object_info: Some(object),
476                        })
477                        .collect(),
478                }),
479            )
480            .await;
481        Ok(version)
482    }
483
484    pub async fn alter_schema(
485        &self,
486        object_type: ObjectType,
487        object_id: ObjectId,
488        new_schema: SchemaId,
489    ) -> MetaResult<NotificationVersion> {
490        let inner = self.inner.write().await;
491        let txn = inner.db.begin().await?;
492        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
493
494        let obj = Object::find_by_id(object_id)
495            .one(&txn)
496            .await?
497            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
498        if obj.schema_id == Some(new_schema) {
499            return Ok(IGNORED_NOTIFICATION_VERSION);
500        }
501        let database_id = obj.database_id.unwrap();
502
503        let mut objects = vec![];
504        match object_type {
505            ObjectType::Table => {
506                let table = Table::find_by_id(object_id)
507                    .one(&txn)
508                    .await?
509                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
510                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
511                let associated_src_id = table.optional_associated_source_id;
512
513                let mut obj = obj.into_active_model();
514                obj.schema_id = Set(Some(new_schema));
515                let obj = obj.update(&txn).await?;
516                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
517
518                // associated source.
519                if let Some(associated_source_id) = associated_src_id {
520                    let src_obj = object::ActiveModel {
521                        oid: Set(associated_source_id as _),
522                        schema_id: Set(Some(new_schema)),
523                        ..Default::default()
524                    }
525                    .update(&txn)
526                    .await?;
527                    let source = Source::find_by_id(associated_source_id)
528                        .one(&txn)
529                        .await?
530                        .ok_or_else(|| {
531                            MetaError::catalog_id_not_found("source", associated_source_id)
532                        })?;
533                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
534                }
535
536                // indexes.
537                let (index_ids, (index_names, mut table_ids)): (
538                    Vec<IndexId>,
539                    (Vec<String>, Vec<TableId>),
540                ) = Index::find()
541                    .select_only()
542                    .columns([
543                        index::Column::IndexId,
544                        index::Column::Name,
545                        index::Column::IndexTableId,
546                    ])
547                    .filter(index::Column::PrimaryTableId.eq(object_id))
548                    .into_tuple::<(IndexId, String, TableId)>()
549                    .all(&txn)
550                    .await?
551                    .into_iter()
552                    .map(|(id, name, t_id)| (id, (name, t_id)))
553                    .unzip();
554
555                // internal tables.
556                let internal_tables: Vec<TableId> = Table::find()
557                    .select_only()
558                    .column(table::Column::TableId)
559                    .filter(
560                        table::Column::BelongsToJobId
561                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
562                    )
563                    .into_tuple()
564                    .all(&txn)
565                    .await?;
566                table_ids.extend(internal_tables);
567
568                if !index_ids.is_empty() || !table_ids.is_empty() {
569                    for index_name in index_names {
570                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
571                            .await?;
572                    }
573
574                    Object::update_many()
575                        .col_expr(
576                            object::Column::SchemaId,
577                            SimpleExpr::Value(Value::Int(Some(new_schema))),
578                        )
579                        .filter(
580                            object::Column::Oid
581                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
582                        )
583                        .exec(&txn)
584                        .await?;
585                }
586
587                if !table_ids.is_empty() {
588                    let table_objs = Table::find()
589                        .find_also_related(Object)
590                        .filter(table::Column::TableId.is_in(table_ids))
591                        .all(&txn)
592                        .await?;
593                    for (table, table_obj) in table_objs {
594                        objects.push(PbObjectInfo::Table(
595                            ObjectModel(table, table_obj.unwrap()).into(),
596                        ));
597                    }
598                }
599                if !index_ids.is_empty() {
600                    let index_objs = Index::find()
601                        .find_also_related(Object)
602                        .filter(index::Column::IndexId.is_in(index_ids))
603                        .all(&txn)
604                        .await?;
605                    for (index, index_obj) in index_objs {
606                        objects.push(PbObjectInfo::Index(
607                            ObjectModel(index, index_obj.unwrap()).into(),
608                        ));
609                    }
610                }
611            }
612            ObjectType::Source => {
613                let source = Source::find_by_id(object_id)
614                    .one(&txn)
615                    .await?
616                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
617                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
618                let is_shared = source.is_shared();
619
620                let mut obj = obj.into_active_model();
621                obj.schema_id = Set(Some(new_schema));
622                let obj = obj.update(&txn).await?;
623                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
624
625                // Note: For non-shared source, we don't update their state tables, which
626                // belongs to the MV.
627                if is_shared {
628                    update_internal_tables(
629                        &txn,
630                        object_id,
631                        object::Column::SchemaId,
632                        Value::Int(Some(new_schema)),
633                        &mut objects,
634                    )
635                    .await?;
636                }
637            }
638            ObjectType::Sink => {
639                let sink = Sink::find_by_id(object_id)
640                    .one(&txn)
641                    .await?
642                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
643                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
644
645                let mut obj = obj.into_active_model();
646                obj.schema_id = Set(Some(new_schema));
647                let obj = obj.update(&txn).await?;
648                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
649
650                update_internal_tables(
651                    &txn,
652                    object_id,
653                    object::Column::SchemaId,
654                    Value::Int(Some(new_schema)),
655                    &mut objects,
656                )
657                .await?;
658            }
659            ObjectType::Subscription => {
660                let subscription = Subscription::find_by_id(object_id)
661                    .one(&txn)
662                    .await?
663                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
664                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
665                    .await?;
666
667                let mut obj = obj.into_active_model();
668                obj.schema_id = Set(Some(new_schema));
669                let obj = obj.update(&txn).await?;
670                objects.push(PbObjectInfo::Subscription(
671                    ObjectModel(subscription, obj).into(),
672                ));
673            }
674            ObjectType::View => {
675                let view = View::find_by_id(object_id)
676                    .one(&txn)
677                    .await?
678                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
679                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
680
681                let mut obj = obj.into_active_model();
682                obj.schema_id = Set(Some(new_schema));
683                let obj = obj.update(&txn).await?;
684                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
685            }
686            ObjectType::Function => {
687                let function = Function::find_by_id(object_id)
688                    .one(&txn)
689                    .await?
690                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
691
692                let mut pb_function: PbFunction = ObjectModel(function, obj).into();
693                pb_function.schema_id = new_schema as _;
694                check_function_signature_duplicate(&pb_function, &txn).await?;
695
696                object::ActiveModel {
697                    oid: Set(object_id),
698                    schema_id: Set(Some(new_schema)),
699                    ..Default::default()
700                }
701                .update(&txn)
702                .await?;
703
704                txn.commit().await?;
705                let version = self
706                    .notify_frontend(
707                        NotificationOperation::Update,
708                        NotificationInfo::Function(pb_function),
709                    )
710                    .await;
711                return Ok(version);
712            }
713            ObjectType::Connection => {
714                let connection = Connection::find_by_id(object_id)
715                    .one(&txn)
716                    .await?
717                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
718
719                let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
720                pb_connection.schema_id = new_schema as _;
721                check_connection_name_duplicate(&pb_connection, &txn).await?;
722
723                object::ActiveModel {
724                    oid: Set(object_id),
725                    schema_id: Set(Some(new_schema)),
726                    ..Default::default()
727                }
728                .update(&txn)
729                .await?;
730
731                txn.commit().await?;
732                let version = self
733                    .notify_frontend(
734                        NotificationOperation::Update,
735                        NotificationInfo::Connection(pb_connection),
736                    )
737                    .await;
738                return Ok(version);
739            }
740            _ => unreachable!("not supported object type: {:?}", object_type),
741        }
742
743        txn.commit().await?;
744        let version = self
745            .notify_frontend(
746                Operation::Update,
747                Info::ObjectGroup(PbObjectGroup {
748                    objects: objects
749                        .into_iter()
750                        .map(|relation_info| PbObject {
751                            object_info: Some(relation_info),
752                        })
753                        .collect_vec(),
754                }),
755            )
756            .await;
757        Ok(version)
758    }
759
760    pub async fn alter_secret(
761        &self,
762        pb_secret: PbSecret,
763        secret_plain_payload: Vec<u8>,
764    ) -> MetaResult<NotificationVersion> {
765        let inner = self.inner.write().await;
766        let owner_id = pb_secret.owner as _;
767        let txn = inner.db.begin().await?;
768        ensure_user_id(owner_id, &txn).await?;
769        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
770        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
771
772        ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
773        let secret: secret::ActiveModel = pb_secret.clone().into();
774        Secret::update(secret).exec(&txn).await?;
775
776        txn.commit().await?;
777
778        // Notify the compute and frontend node plain secret
779        let mut secret_plain = pb_secret;
780        secret_plain.value.clone_from(&secret_plain_payload);
781
782        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
783        self.env
784            .notification_manager()
785            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
786
787        let version = self
788            .notify_frontend(
789                NotificationOperation::Update,
790                NotificationInfo::Secret(secret_plain),
791            )
792            .await;
793
794        Ok(version)
795    }
796
797    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
798    pub async fn drop_table_associated_source(
799        txn: &DatabaseTransaction,
800        drop_table_connector_ctx: &DropTableConnectorContext,
801    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
802        let to_drop_source_objects: Vec<PartialObject> = Object::find()
803            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
804            .into_partial_model()
805            .all(txn)
806            .await?;
807        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
808            .select_only()
809            .filter(
810                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
811            )
812            .into_partial_model()
813            .all(txn)
814            .await?;
815        let to_drop_objects = to_drop_source_objects
816            .into_iter()
817            .chain(to_drop_internal_table_objs.into_iter())
818            .collect_vec();
819        // Find affect users with privileges on all this objects.
820        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
821            .select_only()
822            .distinct()
823            .column(user_privilege::Column::UserId)
824            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
825            .into_tuple()
826            .all(txn)
827            .await?;
828
829        tracing::debug!(
830            "drop_table_associated_source: to_drop_objects: {:?}",
831            to_drop_objects
832        );
833
834        // delete all in to_drop_objects.
835        let res = Object::delete_many()
836            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
837            .exec(txn)
838            .await?;
839        if res.rows_affected == 0 {
840            return Err(MetaError::catalog_id_not_found(
841                ObjectType::Source.as_str(),
842                drop_table_connector_ctx.to_remove_source_id,
843            ));
844        }
845        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
846
847        Ok((user_infos, to_drop_objects))
848    }
849}