risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::AlterDatabaseParam;
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use risingwave_meta_model::table::RefreshState;
18use sea_orm::DatabaseTransaction;
19use thiserror_ext::AsReport;
20
21use super::*;
22
23impl CatalogController {
24    async fn alter_database_name(
25        &self,
26        database_id: DatabaseId,
27        name: &str,
28    ) -> MetaResult<NotificationVersion> {
29        let inner = self.inner.write().await;
30        let txn = inner.db.begin().await?;
31        check_database_name_duplicate(name, &txn).await?;
32
33        let active_model = database::ActiveModel {
34            database_id: Set(database_id),
35            name: Set(name.to_owned()),
36            ..Default::default()
37        };
38        let database = active_model.update(&txn).await?;
39
40        let obj = Object::find_by_id(database_id)
41            .one(&txn)
42            .await?
43            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
44
45        txn.commit().await?;
46
47        let version = self
48            .notify_frontend(
49                NotificationOperation::Update,
50                NotificationInfo::Database(ObjectModel(database, obj).into()),
51            )
52            .await;
53        Ok(version)
54    }
55
56    async fn alter_schema_name(
57        &self,
58        schema_id: SchemaId,
59        name: &str,
60    ) -> MetaResult<NotificationVersion> {
61        let inner = self.inner.write().await;
62        let txn = inner.db.begin().await?;
63
64        let obj = Object::find_by_id(schema_id)
65            .one(&txn)
66            .await?
67            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
68        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
69
70        let active_model = schema::ActiveModel {
71            schema_id: Set(schema_id),
72            name: Set(name.to_owned()),
73        };
74        let schema = active_model.update(&txn).await?;
75
76        txn.commit().await?;
77
78        let version = self
79            .notify_frontend(
80                NotificationOperation::Update,
81                NotificationInfo::Schema(ObjectModel(schema, obj).into()),
82            )
83            .await;
84        Ok(version)
85    }
86
87    pub async fn alter_name(
88        &self,
89        object_type: ObjectType,
90        object_id: ObjectId,
91        object_name: &str,
92    ) -> MetaResult<NotificationVersion> {
93        if object_type == ObjectType::Database {
94            return self.alter_database_name(object_id as _, object_name).await;
95        } else if object_type == ObjectType::Schema {
96            return self.alter_schema_name(object_id as _, object_name).await;
97        }
98
99        let inner = self.inner.write().await;
100        let txn = inner.db.begin().await?;
101        let obj: PartialObject = Object::find_by_id(object_id)
102            .into_partial_model()
103            .one(&txn)
104            .await?
105            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
106        assert_eq!(obj.obj_type, object_type);
107        check_relation_name_duplicate(
108            object_name,
109            obj.database_id.unwrap(),
110            obj.schema_id.unwrap(),
111            &txn,
112        )
113        .await?;
114
115        // rename relation.
116        let (mut to_update_relations, old_name) =
117            rename_relation(&txn, object_type, object_id, object_name).await?;
118        // rename referring relation name.
119        to_update_relations.extend(
120            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
121        );
122
123        txn.commit().await?;
124
125        let version = self
126            .notify_frontend(
127                NotificationOperation::Update,
128                NotificationInfo::ObjectGroup(PbObjectGroup {
129                    objects: to_update_relations,
130                }),
131            )
132            .await;
133
134        Ok(version)
135    }
136
137    pub async fn alter_swap_rename(
138        &self,
139        object_type: ObjectType,
140        object_id: ObjectId,
141        dst_object_id: ObjectId,
142    ) -> MetaResult<NotificationVersion> {
143        let inner = self.inner.write().await;
144        let txn = inner.db.begin().await?;
145        let dst_name: String = match object_type {
146            ObjectType::Table => Table::find_by_id(dst_object_id)
147                .select_only()
148                .column(table::Column::Name)
149                .into_tuple()
150                .one(&txn)
151                .await?
152                .ok_or_else(|| {
153                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
154                })?,
155            ObjectType::Source => Source::find_by_id(dst_object_id)
156                .select_only()
157                .column(source::Column::Name)
158                .into_tuple()
159                .one(&txn)
160                .await?
161                .ok_or_else(|| {
162                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
163                })?,
164            ObjectType::Sink => Sink::find_by_id(dst_object_id)
165                .select_only()
166                .column(sink::Column::Name)
167                .into_tuple()
168                .one(&txn)
169                .await?
170                .ok_or_else(|| {
171                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
172                })?,
173            ObjectType::View => View::find_by_id(dst_object_id)
174                .select_only()
175                .column(view::Column::Name)
176                .into_tuple()
177                .one(&txn)
178                .await?
179                .ok_or_else(|| {
180                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
181                })?,
182            ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
183                .select_only()
184                .column(subscription::Column::Name)
185                .into_tuple()
186                .one(&txn)
187                .await?
188                .ok_or_else(|| {
189                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
190                })?,
191            _ => {
192                return Err(MetaError::permission_denied(format!(
193                    "swap rename not supported for object type: {:?}",
194                    object_type
195                )));
196            }
197        };
198
199        // rename relations.
200        let (mut to_update_relations, src_name) =
201            rename_relation(&txn, object_type, object_id, &dst_name).await?;
202        let (to_update_relations2, _) =
203            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
204        to_update_relations.extend(to_update_relations2);
205        // rename referring relation name.
206        to_update_relations.extend(
207            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
208        );
209        to_update_relations.extend(
210            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
211        );
212
213        txn.commit().await?;
214
215        let version = self
216            .notify_frontend(
217                NotificationOperation::Update,
218                NotificationInfo::ObjectGroup(PbObjectGroup {
219                    objects: to_update_relations,
220                }),
221            )
222            .await;
223
224        Ok(version)
225    }
226
227    pub async fn alter_non_shared_source(
228        &self,
229        pb_source: PbSource,
230    ) -> MetaResult<NotificationVersion> {
231        let source_id = pb_source.id as SourceId;
232        let inner = self.inner.write().await;
233        let txn = inner.db.begin().await?;
234
235        let original_version: i64 = Source::find_by_id(source_id)
236            .select_only()
237            .column(source::Column::Version)
238            .into_tuple()
239            .one(&txn)
240            .await?
241            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
242        if original_version + 1 != pb_source.version as i64 {
243            return Err(MetaError::permission_denied(
244                "source version is stale".to_owned(),
245            ));
246        }
247
248        let source: source::ActiveModel = pb_source.clone().into();
249        source.update(&txn).await?;
250        txn.commit().await?;
251
252        let version = self
253            .notify_frontend_relation_info(
254                NotificationOperation::Update,
255                PbObjectInfo::Source(pb_source),
256            )
257            .await;
258        Ok(version)
259    }
260
261    pub async fn alter_owner(
262        &self,
263        object_type: ObjectType,
264        object_id: ObjectId,
265        new_owner: UserId,
266    ) -> MetaResult<NotificationVersion> {
267        let inner = self.inner.write().await;
268        let txn = inner.db.begin().await?;
269        ensure_user_id(new_owner, &txn).await?;
270
271        let obj = Object::find_by_id(object_id)
272            .one(&txn)
273            .await?
274            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
275        if obj.owner_id == new_owner {
276            return Ok(IGNORED_NOTIFICATION_VERSION);
277        }
278        let mut obj = obj.into_active_model();
279        obj.owner_id = Set(new_owner);
280        let obj = obj.update(&txn).await?;
281
282        let mut objects = vec![];
283        match object_type {
284            ObjectType::Database => {
285                let db = Database::find_by_id(object_id)
286                    .one(&txn)
287                    .await?
288                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
289
290                txn.commit().await?;
291
292                let version = self
293                    .notify_frontend(
294                        NotificationOperation::Update,
295                        NotificationInfo::Database(ObjectModel(db, obj).into()),
296                    )
297                    .await;
298                return Ok(version);
299            }
300            ObjectType::Schema => {
301                let schema = Schema::find_by_id(object_id)
302                    .one(&txn)
303                    .await?
304                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
305
306                txn.commit().await?;
307
308                let version = self
309                    .notify_frontend(
310                        NotificationOperation::Update,
311                        NotificationInfo::Schema(ObjectModel(schema, obj).into()),
312                    )
313                    .await;
314                return Ok(version);
315            }
316            ObjectType::Table => {
317                let table = Table::find_by_id(object_id)
318                    .one(&txn)
319                    .await?
320                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
321
322                // associated source.
323                if let Some(associated_source_id) = table.optional_associated_source_id {
324                    let src_obj = object::ActiveModel {
325                        oid: Set(associated_source_id as _),
326                        owner_id: Set(new_owner),
327                        ..Default::default()
328                    }
329                    .update(&txn)
330                    .await?;
331                    let source = Source::find_by_id(associated_source_id)
332                        .one(&txn)
333                        .await?
334                        .ok_or_else(|| {
335                            MetaError::catalog_id_not_found("source", associated_source_id)
336                        })?;
337                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
338                }
339
340                // indexes.
341                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
342                    .select_only()
343                    .columns([index::Column::IndexId, index::Column::IndexTableId])
344                    .filter(index::Column::PrimaryTableId.eq(object_id))
345                    .into_tuple::<(IndexId, TableId)>()
346                    .all(&txn)
347                    .await?
348                    .into_iter()
349                    .unzip();
350                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
351
352                // internal tables.
353                let internal_tables: Vec<TableId> = Table::find()
354                    .select_only()
355                    .column(table::Column::TableId)
356                    .filter(
357                        table::Column::BelongsToJobId
358                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
359                    )
360                    .into_tuple()
361                    .all(&txn)
362                    .await?;
363                table_ids.extend(internal_tables);
364
365                if !index_ids.is_empty() || !table_ids.is_empty() {
366                    Object::update_many()
367                        .col_expr(
368                            object::Column::OwnerId,
369                            SimpleExpr::Value(Value::Int(Some(new_owner))),
370                        )
371                        .filter(
372                            object::Column::Oid
373                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
374                        )
375                        .exec(&txn)
376                        .await?;
377                }
378
379                if !table_ids.is_empty() {
380                    let table_objs = Table::find()
381                        .find_also_related(Object)
382                        .filter(table::Column::TableId.is_in(table_ids))
383                        .all(&txn)
384                        .await?;
385                    for (table, table_obj) in table_objs {
386                        objects.push(PbObjectInfo::Table(
387                            ObjectModel(table, table_obj.unwrap()).into(),
388                        ));
389                    }
390                }
391                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
392                if !index_ids.is_empty() {
393                    let index_objs = Index::find()
394                        .find_also_related(Object)
395                        .filter(index::Column::IndexId.is_in(index_ids))
396                        .all(&txn)
397                        .await?;
398                    for (index, index_obj) in index_objs {
399                        objects.push(PbObjectInfo::Index(
400                            ObjectModel(index, index_obj.unwrap()).into(),
401                        ));
402                    }
403                }
404            }
405            ObjectType::Source => {
406                let source = Source::find_by_id(object_id)
407                    .one(&txn)
408                    .await?
409                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
410                let is_shared = source.is_shared();
411                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
412
413                // Note: For non-shared source, we don't update their state tables, which
414                // belongs to the MV.
415                if is_shared {
416                    update_internal_tables(
417                        &txn,
418                        object_id,
419                        object::Column::OwnerId,
420                        Value::Int(Some(new_owner)),
421                        &mut objects,
422                    )
423                    .await?;
424                }
425            }
426            ObjectType::Sink => {
427                let sink = Sink::find_by_id(object_id)
428                    .one(&txn)
429                    .await?
430                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
431                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
432
433                update_internal_tables(
434                    &txn,
435                    object_id,
436                    object::Column::OwnerId,
437                    Value::Int(Some(new_owner)),
438                    &mut objects,
439                )
440                .await?;
441            }
442            ObjectType::Subscription => {
443                let subscription = Subscription::find_by_id(object_id)
444                    .one(&txn)
445                    .await?
446                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
447                objects.push(PbObjectInfo::Subscription(
448                    ObjectModel(subscription, obj).into(),
449                ));
450            }
451            ObjectType::View => {
452                let view = View::find_by_id(object_id)
453                    .one(&txn)
454                    .await?
455                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
456                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
457            }
458            ObjectType::Connection => {
459                let connection = Connection::find_by_id(object_id)
460                    .one(&txn)
461                    .await?
462                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
463                objects.push(PbObjectInfo::Connection(
464                    ObjectModel(connection, obj).into(),
465                ));
466            }
467            _ => unreachable!("not supported object type: {:?}", object_type),
468        };
469
470        txn.commit().await?;
471
472        let version = self
473            .notify_frontend(
474                NotificationOperation::Update,
475                NotificationInfo::ObjectGroup(PbObjectGroup {
476                    objects: objects
477                        .into_iter()
478                        .map(|object| PbObject {
479                            object_info: Some(object),
480                        })
481                        .collect(),
482                }),
483            )
484            .await;
485        Ok(version)
486    }
487
488    pub async fn alter_schema(
489        &self,
490        object_type: ObjectType,
491        object_id: ObjectId,
492        new_schema: SchemaId,
493    ) -> MetaResult<NotificationVersion> {
494        let inner = self.inner.write().await;
495        let txn = inner.db.begin().await?;
496        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
497
498        let obj = Object::find_by_id(object_id)
499            .one(&txn)
500            .await?
501            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
502        if obj.schema_id == Some(new_schema) {
503            return Ok(IGNORED_NOTIFICATION_VERSION);
504        }
505        let database_id = obj.database_id.unwrap();
506
507        let mut objects = vec![];
508        match object_type {
509            ObjectType::Table => {
510                let table = Table::find_by_id(object_id)
511                    .one(&txn)
512                    .await?
513                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
514                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
515                let associated_src_id = table.optional_associated_source_id;
516
517                let mut obj = obj.into_active_model();
518                obj.schema_id = Set(Some(new_schema));
519                let obj = obj.update(&txn).await?;
520                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
521
522                // associated source.
523                if let Some(associated_source_id) = associated_src_id {
524                    let src_obj = object::ActiveModel {
525                        oid: Set(associated_source_id as _),
526                        schema_id: Set(Some(new_schema)),
527                        ..Default::default()
528                    }
529                    .update(&txn)
530                    .await?;
531                    let source = Source::find_by_id(associated_source_id)
532                        .one(&txn)
533                        .await?
534                        .ok_or_else(|| {
535                            MetaError::catalog_id_not_found("source", associated_source_id)
536                        })?;
537                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
538                }
539
540                // indexes.
541                let (index_ids, (index_names, mut table_ids)): (
542                    Vec<IndexId>,
543                    (Vec<String>, Vec<TableId>),
544                ) = Index::find()
545                    .select_only()
546                    .columns([
547                        index::Column::IndexId,
548                        index::Column::Name,
549                        index::Column::IndexTableId,
550                    ])
551                    .filter(index::Column::PrimaryTableId.eq(object_id))
552                    .into_tuple::<(IndexId, String, TableId)>()
553                    .all(&txn)
554                    .await?
555                    .into_iter()
556                    .map(|(id, name, t_id)| (id, (name, t_id)))
557                    .unzip();
558
559                // internal tables.
560                let internal_tables: Vec<TableId> = Table::find()
561                    .select_only()
562                    .column(table::Column::TableId)
563                    .filter(
564                        table::Column::BelongsToJobId
565                            .is_in(table_ids.iter().cloned().chain(std::iter::once(object_id))),
566                    )
567                    .into_tuple()
568                    .all(&txn)
569                    .await?;
570                table_ids.extend(internal_tables);
571
572                if !index_ids.is_empty() || !table_ids.is_empty() {
573                    for index_name in index_names {
574                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
575                            .await?;
576                    }
577
578                    Object::update_many()
579                        .col_expr(
580                            object::Column::SchemaId,
581                            SimpleExpr::Value(Value::Int(Some(new_schema))),
582                        )
583                        .filter(
584                            object::Column::Oid
585                                .is_in(index_ids.iter().cloned().chain(table_ids.iter().cloned())),
586                        )
587                        .exec(&txn)
588                        .await?;
589                }
590
591                if !table_ids.is_empty() {
592                    let table_objs = Table::find()
593                        .find_also_related(Object)
594                        .filter(table::Column::TableId.is_in(table_ids))
595                        .all(&txn)
596                        .await?;
597                    for (table, table_obj) in table_objs {
598                        objects.push(PbObjectInfo::Table(
599                            ObjectModel(table, table_obj.unwrap()).into(),
600                        ));
601                    }
602                }
603                if !index_ids.is_empty() {
604                    let index_objs = Index::find()
605                        .find_also_related(Object)
606                        .filter(index::Column::IndexId.is_in(index_ids))
607                        .all(&txn)
608                        .await?;
609                    for (index, index_obj) in index_objs {
610                        objects.push(PbObjectInfo::Index(
611                            ObjectModel(index, index_obj.unwrap()).into(),
612                        ));
613                    }
614                }
615            }
616            ObjectType::Source => {
617                let source = Source::find_by_id(object_id)
618                    .one(&txn)
619                    .await?
620                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
621                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
622                let is_shared = source.is_shared();
623
624                let mut obj = obj.into_active_model();
625                obj.schema_id = Set(Some(new_schema));
626                let obj = obj.update(&txn).await?;
627                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
628
629                // Note: For non-shared source, we don't update their state tables, which
630                // belongs to the MV.
631                if is_shared {
632                    update_internal_tables(
633                        &txn,
634                        object_id,
635                        object::Column::SchemaId,
636                        Value::Int(Some(new_schema)),
637                        &mut objects,
638                    )
639                    .await?;
640                }
641            }
642            ObjectType::Sink => {
643                let sink = Sink::find_by_id(object_id)
644                    .one(&txn)
645                    .await?
646                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
647                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
648
649                let mut obj = obj.into_active_model();
650                obj.schema_id = Set(Some(new_schema));
651                let obj = obj.update(&txn).await?;
652                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
653
654                update_internal_tables(
655                    &txn,
656                    object_id,
657                    object::Column::SchemaId,
658                    Value::Int(Some(new_schema)),
659                    &mut objects,
660                )
661                .await?;
662            }
663            ObjectType::Subscription => {
664                let subscription = Subscription::find_by_id(object_id)
665                    .one(&txn)
666                    .await?
667                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
668                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
669                    .await?;
670
671                let mut obj = obj.into_active_model();
672                obj.schema_id = Set(Some(new_schema));
673                let obj = obj.update(&txn).await?;
674                objects.push(PbObjectInfo::Subscription(
675                    ObjectModel(subscription, obj).into(),
676                ));
677            }
678            ObjectType::View => {
679                let view = View::find_by_id(object_id)
680                    .one(&txn)
681                    .await?
682                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
683                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
684
685                let mut obj = obj.into_active_model();
686                obj.schema_id = Set(Some(new_schema));
687                let obj = obj.update(&txn).await?;
688                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
689            }
690            ObjectType::Function => {
691                let function = Function::find_by_id(object_id)
692                    .one(&txn)
693                    .await?
694                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
695
696                let mut pb_function: PbFunction = ObjectModel(function, obj).into();
697                pb_function.schema_id = new_schema as _;
698                check_function_signature_duplicate(&pb_function, &txn).await?;
699
700                object::ActiveModel {
701                    oid: Set(object_id),
702                    schema_id: Set(Some(new_schema)),
703                    ..Default::default()
704                }
705                .update(&txn)
706                .await?;
707
708                txn.commit().await?;
709                let version = self
710                    .notify_frontend(
711                        NotificationOperation::Update,
712                        NotificationInfo::Function(pb_function),
713                    )
714                    .await;
715                return Ok(version);
716            }
717            ObjectType::Connection => {
718                let connection = Connection::find_by_id(object_id)
719                    .one(&txn)
720                    .await?
721                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
722
723                let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
724                pb_connection.schema_id = new_schema as _;
725                check_connection_name_duplicate(&pb_connection, &txn).await?;
726
727                object::ActiveModel {
728                    oid: Set(object_id),
729                    schema_id: Set(Some(new_schema)),
730                    ..Default::default()
731                }
732                .update(&txn)
733                .await?;
734
735                txn.commit().await?;
736                let version = self
737                    .notify_frontend(
738                        NotificationOperation::Update,
739                        NotificationInfo::Connection(pb_connection),
740                    )
741                    .await;
742                return Ok(version);
743            }
744            _ => unreachable!("not supported object type: {:?}", object_type),
745        }
746
747        txn.commit().await?;
748        let version = self
749            .notify_frontend(
750                Operation::Update,
751                Info::ObjectGroup(PbObjectGroup {
752                    objects: objects
753                        .into_iter()
754                        .map(|relation_info| PbObject {
755                            object_info: Some(relation_info),
756                        })
757                        .collect_vec(),
758                }),
759            )
760            .await;
761        Ok(version)
762    }
763
764    pub async fn alter_secret(
765        &self,
766        pb_secret: PbSecret,
767        secret_plain_payload: Vec<u8>,
768    ) -> MetaResult<NotificationVersion> {
769        let inner = self.inner.write().await;
770        let owner_id = pb_secret.owner as _;
771        let txn = inner.db.begin().await?;
772        ensure_user_id(owner_id, &txn).await?;
773        ensure_object_id(ObjectType::Database, pb_secret.database_id as _, &txn).await?;
774        ensure_object_id(ObjectType::Schema, pb_secret.schema_id as _, &txn).await?;
775
776        ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
777        let secret: secret::ActiveModel = pb_secret.clone().into();
778        Secret::update(secret).exec(&txn).await?;
779
780        txn.commit().await?;
781
782        // Notify the compute and frontend node plain secret
783        let mut secret_plain = pb_secret;
784        secret_plain.value.clone_from(&secret_plain_payload);
785
786        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
787        self.env
788            .notification_manager()
789            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
790
791        let version = self
792            .notify_frontend(
793                NotificationOperation::Update,
794                NotificationInfo::Secret(secret_plain),
795            )
796            .await;
797
798        Ok(version)
799    }
800
801    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
802    pub async fn drop_table_associated_source(
803        txn: &DatabaseTransaction,
804        drop_table_connector_ctx: &DropTableConnectorContext,
805    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
806        let to_drop_source_objects: Vec<PartialObject> = Object::find()
807            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
808            .into_partial_model()
809            .all(txn)
810            .await?;
811        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
812            .select_only()
813            .filter(
814                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
815            )
816            .into_partial_model()
817            .all(txn)
818            .await?;
819        let to_drop_objects = to_drop_source_objects
820            .into_iter()
821            .chain(to_drop_internal_table_objs.into_iter())
822            .collect_vec();
823        // Find affect users with privileges on all this objects.
824        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
825            .select_only()
826            .distinct()
827            .column(user_privilege::Column::UserId)
828            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
829            .into_tuple()
830            .all(txn)
831            .await?;
832
833        tracing::debug!(
834            "drop_table_associated_source: to_drop_objects: {:?}",
835            to_drop_objects
836        );
837
838        // delete all in to_drop_objects.
839        let res = Object::delete_many()
840            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
841            .exec(txn)
842            .await?;
843        if res.rows_affected == 0 {
844            return Err(MetaError::catalog_id_not_found(
845                ObjectType::Source.as_str(),
846                drop_table_connector_ctx.to_remove_source_id,
847            ));
848        }
849        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
850
851        Ok((user_infos, to_drop_objects))
852    }
853
854    pub async fn alter_database_param(
855        &self,
856        database_id: DatabaseId,
857        param: AlterDatabaseParam,
858    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
859        let inner = self.inner.write().await;
860        let txn = inner.db.begin().await?;
861
862        let mut database = database::ActiveModel {
863            database_id: Set(database_id),
864            ..Default::default()
865        };
866        match param {
867            AlterDatabaseParam::BarrierIntervalMs(interval) => {
868                if let Some(ref interval) = interval {
869                    OverrideValidate::barrier_interval_ms(interval)
870                        .map_err(|e| anyhow::anyhow!(e))?;
871                }
872                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
873            }
874            AlterDatabaseParam::CheckpointFrequency(frequency) => {
875                if let Some(ref frequency) = frequency {
876                    OverrideValidate::checkpoint_frequency(frequency)
877                        .map_err(|e| anyhow::anyhow!(e))?;
878                }
879                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
880            }
881        }
882        let database = database.update(&txn).await?;
883
884        let obj = Object::find_by_id(database_id)
885            .one(&txn)
886            .await?
887            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
888
889        txn.commit().await?;
890
891        let version = self
892            .notify_frontend(
893                NotificationOperation::Update,
894                NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
895            )
896            .await;
897        Ok((version, database))
898    }
899
900    /// Set the refresh state of a table
901    pub async fn set_table_refresh_state(
902        &self,
903        table_id: TableId,
904        new_state: RefreshState,
905    ) -> MetaResult<bool> {
906        let inner = self.inner.write().await;
907        let txn = inner.db.begin().await?;
908
909        // It is okay to update refresh state unconditionally because the check is done in `validate_refreshable_table` inside `RefreshManager`.
910        let active_model = table::ActiveModel {
911            table_id: Set(table_id),
912            refresh_state: Set(Some(new_state)),
913            ..Default::default()
914        };
915        if let Err(e) = active_model.update(&txn).await {
916            tracing::warn!(
917                "Failed to update table refresh state for table {}: {}",
918                table_id,
919                e.as_report()
920            );
921            let t = Table::find_by_id(table_id).all(&txn).await;
922            tracing::info!(table = ?t, "Table found");
923        }
924        txn.commit().await?;
925
926        tracing::debug!(
927            table_id = %table_id,
928            new_state = ?new_state,
929            "Updated table refresh state"
930        );
931
932        Ok(true)
933    }
934}