risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::AlterDatabaseParam;
16use risingwave_common::system_param::{OverrideValidate, Validate};
17use risingwave_meta_model::table::RefreshState;
18use sea_orm::DatabaseTransaction;
19use thiserror_ext::AsReport;
20
21use super::*;
22
23impl CatalogController {
24    async fn alter_database_name(
25        &self,
26        database_id: DatabaseId,
27        name: &str,
28    ) -> MetaResult<NotificationVersion> {
29        let inner = self.inner.write().await;
30        let txn = inner.db.begin().await?;
31        check_database_name_duplicate(name, &txn).await?;
32
33        let active_model = database::ActiveModel {
34            database_id: Set(database_id),
35            name: Set(name.to_owned()),
36            ..Default::default()
37        };
38        let database = active_model.update(&txn).await?;
39
40        let obj = Object::find_by_id(database_id.as_raw_id() as ObjectId)
41            .one(&txn)
42            .await?
43            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
44
45        txn.commit().await?;
46
47        let version = self
48            .notify_frontend(
49                NotificationOperation::Update,
50                NotificationInfo::Database(ObjectModel(database, obj).into()),
51            )
52            .await;
53        Ok(version)
54    }
55
56    async fn alter_schema_name(
57        &self,
58        schema_id: SchemaId,
59        name: &str,
60    ) -> MetaResult<NotificationVersion> {
61        let inner = self.inner.write().await;
62        let txn = inner.db.begin().await?;
63
64        let obj = Object::find_by_id(schema_id.as_raw_id() as ObjectId)
65            .one(&txn)
66            .await?
67            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
68        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
69
70        let active_model = schema::ActiveModel {
71            schema_id: Set(schema_id),
72            name: Set(name.to_owned()),
73        };
74        let schema = active_model.update(&txn).await?;
75
76        txn.commit().await?;
77
78        let version = self
79            .notify_frontend(
80                NotificationOperation::Update,
81                NotificationInfo::Schema(ObjectModel(schema, obj).into()),
82            )
83            .await;
84        Ok(version)
85    }
86
87    pub async fn alter_name(
88        &self,
89        object_type: ObjectType,
90        object_id: ObjectId,
91        object_name: &str,
92    ) -> MetaResult<NotificationVersion> {
93        if object_type == ObjectType::Database {
94            return self
95                .alter_database_name(DatabaseId::new(object_id as _), object_name)
96                .await;
97        } else if object_type == ObjectType::Schema {
98            return self
99                .alter_schema_name(SchemaId::new(object_id as _), object_name)
100                .await;
101        }
102
103        let inner = self.inner.write().await;
104        let txn = inner.db.begin().await?;
105        let obj: PartialObject = Object::find_by_id(object_id)
106            .into_partial_model()
107            .one(&txn)
108            .await?
109            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
110        assert_eq!(obj.obj_type, object_type);
111        check_relation_name_duplicate(
112            object_name,
113            obj.database_id.unwrap(),
114            obj.schema_id.unwrap(),
115            &txn,
116        )
117        .await?;
118
119        // rename relation.
120        let (mut to_update_relations, old_name) =
121            rename_relation(&txn, object_type, object_id, object_name).await?;
122        // rename referring relation name.
123        to_update_relations.extend(
124            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
125        );
126
127        txn.commit().await?;
128
129        let version = self
130            .notify_frontend(
131                NotificationOperation::Update,
132                NotificationInfo::ObjectGroup(PbObjectGroup {
133                    objects: to_update_relations,
134                }),
135            )
136            .await;
137
138        Ok(version)
139    }
140
141    pub async fn alter_swap_rename(
142        &self,
143        object_type: ObjectType,
144        object_id: ObjectId,
145        dst_object_id: ObjectId,
146    ) -> MetaResult<NotificationVersion> {
147        let inner = self.inner.write().await;
148        let txn = inner.db.begin().await?;
149        let dst_name: String = match object_type {
150            ObjectType::Table => Table::find_by_id(TableId::new(dst_object_id as _))
151                .select_only()
152                .column(table::Column::Name)
153                .into_tuple()
154                .one(&txn)
155                .await?
156                .ok_or_else(|| {
157                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
158                })?,
159            ObjectType::Source => Source::find_by_id(dst_object_id)
160                .select_only()
161                .column(source::Column::Name)
162                .into_tuple()
163                .one(&txn)
164                .await?
165                .ok_or_else(|| {
166                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
167                })?,
168            ObjectType::Sink => Sink::find_by_id(dst_object_id)
169                .select_only()
170                .column(sink::Column::Name)
171                .into_tuple()
172                .one(&txn)
173                .await?
174                .ok_or_else(|| {
175                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
176                })?,
177            ObjectType::View => View::find_by_id(dst_object_id)
178                .select_only()
179                .column(view::Column::Name)
180                .into_tuple()
181                .one(&txn)
182                .await?
183                .ok_or_else(|| {
184                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
185                })?,
186            ObjectType::Subscription => Subscription::find_by_id(dst_object_id)
187                .select_only()
188                .column(subscription::Column::Name)
189                .into_tuple()
190                .one(&txn)
191                .await?
192                .ok_or_else(|| {
193                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
194                })?,
195            _ => {
196                return Err(MetaError::permission_denied(format!(
197                    "swap rename not supported for object type: {:?}",
198                    object_type
199                )));
200            }
201        };
202
203        // rename relations.
204        let (mut to_update_relations, src_name) =
205            rename_relation(&txn, object_type, object_id, &dst_name).await?;
206        let (to_update_relations2, _) =
207            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
208        to_update_relations.extend(to_update_relations2);
209        // rename referring relation name.
210        to_update_relations.extend(
211            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
212        );
213        to_update_relations.extend(
214            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
215        );
216
217        txn.commit().await?;
218
219        let version = self
220            .notify_frontend(
221                NotificationOperation::Update,
222                NotificationInfo::ObjectGroup(PbObjectGroup {
223                    objects: to_update_relations,
224                }),
225            )
226            .await;
227
228        Ok(version)
229    }
230
231    pub async fn alter_non_shared_source(
232        &self,
233        pb_source: PbSource,
234    ) -> MetaResult<NotificationVersion> {
235        let source_id = pb_source.id as SourceId;
236        let inner = self.inner.write().await;
237        let txn = inner.db.begin().await?;
238
239        let original_version: i64 = Source::find_by_id(source_id)
240            .select_only()
241            .column(source::Column::Version)
242            .into_tuple()
243            .one(&txn)
244            .await?
245            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
246        if original_version + 1 != pb_source.version as i64 {
247            return Err(MetaError::permission_denied(
248                "source version is stale".to_owned(),
249            ));
250        }
251
252        let source: source::ActiveModel = pb_source.clone().into();
253        source.update(&txn).await?;
254        txn.commit().await?;
255
256        let version = self
257            .notify_frontend_relation_info(
258                NotificationOperation::Update,
259                PbObjectInfo::Source(pb_source),
260            )
261            .await;
262        Ok(version)
263    }
264
265    pub async fn alter_owner(
266        &self,
267        object_type: ObjectType,
268        object_id: ObjectId,
269        new_owner: UserId,
270    ) -> MetaResult<NotificationVersion> {
271        let inner = self.inner.write().await;
272        let txn = inner.db.begin().await?;
273        ensure_user_id(new_owner, &txn).await?;
274
275        let obj = Object::find_by_id(object_id)
276            .one(&txn)
277            .await?
278            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
279        if obj.owner_id == new_owner {
280            return Ok(IGNORED_NOTIFICATION_VERSION);
281        }
282        let mut obj = obj.into_active_model();
283        obj.owner_id = Set(new_owner);
284        let obj = obj.update(&txn).await?;
285
286        let mut objects = vec![];
287        match object_type {
288            ObjectType::Database => {
289                let db = Database::find_by_id(DatabaseId::new(object_id as _))
290                    .one(&txn)
291                    .await?
292                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
293
294                txn.commit().await?;
295
296                let version = self
297                    .notify_frontend(
298                        NotificationOperation::Update,
299                        NotificationInfo::Database(ObjectModel(db, obj).into()),
300                    )
301                    .await;
302                return Ok(version);
303            }
304            ObjectType::Schema => {
305                let schema = Schema::find_by_id(SchemaId::new(object_id as _))
306                    .one(&txn)
307                    .await?
308                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
309
310                txn.commit().await?;
311
312                let version = self
313                    .notify_frontend(
314                        NotificationOperation::Update,
315                        NotificationInfo::Schema(ObjectModel(schema, obj).into()),
316                    )
317                    .await;
318                return Ok(version);
319            }
320            ObjectType::Table => {
321                let table = Table::find_by_id(TableId::new(object_id as _))
322                    .one(&txn)
323                    .await?
324                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
325
326                // associated source.
327                if let Some(associated_source_id) = table.optional_associated_source_id {
328                    let src_obj = object::ActiveModel {
329                        oid: Set(associated_source_id as _),
330                        owner_id: Set(new_owner),
331                        ..Default::default()
332                    }
333                    .update(&txn)
334                    .await?;
335                    let source = Source::find_by_id(associated_source_id)
336                        .one(&txn)
337                        .await?
338                        .ok_or_else(|| {
339                            MetaError::catalog_id_not_found("source", associated_source_id)
340                        })?;
341                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
342                }
343
344                // indexes.
345                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
346                    .select_only()
347                    .columns([index::Column::IndexId, index::Column::IndexTableId])
348                    .filter(index::Column::PrimaryTableId.eq(object_id))
349                    .into_tuple::<(IndexId, TableId)>()
350                    .all(&txn)
351                    .await?
352                    .into_iter()
353                    .unzip();
354                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
355
356                // internal tables.
357                let internal_tables: Vec<TableId> = Table::find()
358                    .select_only()
359                    .column(table::Column::TableId)
360                    .filter(
361                        table::Column::BelongsToJobId.is_in(
362                            table_ids
363                                .iter()
364                                .cloned()
365                                .chain(std::iter::once(TableId::new(object_id as _))),
366                        ),
367                    )
368                    .into_tuple()
369                    .all(&txn)
370                    .await?;
371                table_ids.extend(internal_tables);
372
373                if !index_ids.is_empty() || !table_ids.is_empty() {
374                    Object::update_many()
375                        .col_expr(
376                            object::Column::OwnerId,
377                            SimpleExpr::Value(Value::Int(Some(new_owner))),
378                        )
379                        .filter(
380                            object::Column::Oid.is_in(
381                                index_ids.iter().cloned().chain(
382                                    table_ids.iter().map(|table_id| table_id.as_raw_id() as _),
383                                ),
384                            ),
385                        )
386                        .exec(&txn)
387                        .await?;
388                }
389
390                if !table_ids.is_empty() {
391                    let table_objs = Table::find()
392                        .find_also_related(Object)
393                        .filter(table::Column::TableId.is_in(table_ids))
394                        .all(&txn)
395                        .await?;
396                    for (table, table_obj) in table_objs {
397                        objects.push(PbObjectInfo::Table(
398                            ObjectModel(table, table_obj.unwrap()).into(),
399                        ));
400                    }
401                }
402                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
403                if !index_ids.is_empty() {
404                    let index_objs = Index::find()
405                        .find_also_related(Object)
406                        .filter(index::Column::IndexId.is_in(index_ids))
407                        .all(&txn)
408                        .await?;
409                    for (index, index_obj) in index_objs {
410                        objects.push(PbObjectInfo::Index(
411                            ObjectModel(index, index_obj.unwrap()).into(),
412                        ));
413                    }
414                }
415            }
416            ObjectType::Source => {
417                let source = Source::find_by_id(object_id)
418                    .one(&txn)
419                    .await?
420                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
421                let is_shared = source.is_shared();
422                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
423
424                // Note: For non-shared source, we don't update their state tables, which
425                // belongs to the MV.
426                if is_shared {
427                    update_internal_tables(
428                        &txn,
429                        object_id,
430                        object::Column::OwnerId,
431                        Value::Int(Some(new_owner)),
432                        &mut objects,
433                    )
434                    .await?;
435                }
436            }
437            ObjectType::Sink => {
438                let sink = Sink::find_by_id(object_id)
439                    .one(&txn)
440                    .await?
441                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
442                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
443
444                update_internal_tables(
445                    &txn,
446                    object_id,
447                    object::Column::OwnerId,
448                    Value::Int(Some(new_owner)),
449                    &mut objects,
450                )
451                .await?;
452            }
453            ObjectType::Subscription => {
454                let subscription = Subscription::find_by_id(object_id)
455                    .one(&txn)
456                    .await?
457                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
458                objects.push(PbObjectInfo::Subscription(
459                    ObjectModel(subscription, obj).into(),
460                ));
461            }
462            ObjectType::View => {
463                let view = View::find_by_id(object_id)
464                    .one(&txn)
465                    .await?
466                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
467                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
468            }
469            ObjectType::Connection => {
470                let connection = Connection::find_by_id(object_id)
471                    .one(&txn)
472                    .await?
473                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
474                objects.push(PbObjectInfo::Connection(
475                    ObjectModel(connection, obj).into(),
476                ));
477            }
478            _ => unreachable!("not supported object type: {:?}", object_type),
479        };
480
481        txn.commit().await?;
482
483        let version = self
484            .notify_frontend(
485                NotificationOperation::Update,
486                NotificationInfo::ObjectGroup(PbObjectGroup {
487                    objects: objects
488                        .into_iter()
489                        .map(|object| PbObject {
490                            object_info: Some(object),
491                        })
492                        .collect(),
493                }),
494            )
495            .await;
496        Ok(version)
497    }
498
499    pub async fn alter_schema(
500        &self,
501        object_type: ObjectType,
502        object_id: ObjectId,
503        new_schema: SchemaId,
504    ) -> MetaResult<NotificationVersion> {
505        let inner = self.inner.write().await;
506        let txn = inner.db.begin().await?;
507        ensure_object_id(ObjectType::Schema, new_schema.as_raw_id() as ObjectId, &txn).await?;
508
509        let obj = Object::find_by_id(object_id)
510            .one(&txn)
511            .await?
512            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
513        if obj.schema_id == Some(new_schema) {
514            return Ok(IGNORED_NOTIFICATION_VERSION);
515        }
516        let database_id = obj.database_id.unwrap();
517
518        let mut objects = vec![];
519        match object_type {
520            ObjectType::Table => {
521                let table = Table::find_by_id(TableId::new(object_id as _))
522                    .one(&txn)
523                    .await?
524                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
525                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
526                let associated_src_id = table.optional_associated_source_id;
527
528                let mut obj = obj.into_active_model();
529                obj.schema_id = Set(Some(new_schema));
530                let obj = obj.update(&txn).await?;
531                objects.push(PbObjectInfo::Table(ObjectModel(table, obj).into()));
532
533                // associated source.
534                if let Some(associated_source_id) = associated_src_id {
535                    let src_obj = object::ActiveModel {
536                        oid: Set(associated_source_id as _),
537                        schema_id: Set(Some(new_schema)),
538                        ..Default::default()
539                    }
540                    .update(&txn)
541                    .await?;
542                    let source = Source::find_by_id(associated_source_id)
543                        .one(&txn)
544                        .await?
545                        .ok_or_else(|| {
546                            MetaError::catalog_id_not_found("source", associated_source_id)
547                        })?;
548                    objects.push(PbObjectInfo::Source(ObjectModel(source, src_obj).into()));
549                }
550
551                // indexes.
552                let (index_ids, (index_names, mut table_ids)): (
553                    Vec<IndexId>,
554                    (Vec<String>, Vec<TableId>),
555                ) = Index::find()
556                    .select_only()
557                    .columns([
558                        index::Column::IndexId,
559                        index::Column::Name,
560                        index::Column::IndexTableId,
561                    ])
562                    .filter(index::Column::PrimaryTableId.eq(object_id))
563                    .into_tuple::<(IndexId, String, TableId)>()
564                    .all(&txn)
565                    .await?
566                    .into_iter()
567                    .map(|(id, name, t_id)| (id, (name, t_id)))
568                    .unzip();
569
570                // internal tables.
571                let internal_tables: Vec<TableId> = Table::find()
572                    .select_only()
573                    .column(table::Column::TableId)
574                    .filter(
575                        table::Column::BelongsToJobId.is_in(
576                            table_ids
577                                .iter()
578                                .map(|table_id| table_id.as_job_id())
579                                .chain(std::iter::once(JobId::new(object_id as _))),
580                        ),
581                    )
582                    .into_tuple()
583                    .all(&txn)
584                    .await?;
585                table_ids.extend(internal_tables);
586
587                if !index_ids.is_empty() || !table_ids.is_empty() {
588                    for index_name in index_names {
589                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
590                            .await?;
591                    }
592
593                    Object::update_many()
594                        .col_expr(object::Column::SchemaId, new_schema.into())
595                        .filter(
596                            object::Column::Oid.is_in(
597                                index_ids.iter().cloned().chain(
598                                    table_ids.iter().map(|table_id| table_id.as_raw_id() as _),
599                                ),
600                            ),
601                        )
602                        .exec(&txn)
603                        .await?;
604                }
605
606                if !table_ids.is_empty() {
607                    let table_objs = Table::find()
608                        .find_also_related(Object)
609                        .filter(table::Column::TableId.is_in(table_ids))
610                        .all(&txn)
611                        .await?;
612                    for (table, table_obj) in table_objs {
613                        objects.push(PbObjectInfo::Table(
614                            ObjectModel(table, table_obj.unwrap()).into(),
615                        ));
616                    }
617                }
618                if !index_ids.is_empty() {
619                    let index_objs = Index::find()
620                        .find_also_related(Object)
621                        .filter(index::Column::IndexId.is_in(index_ids))
622                        .all(&txn)
623                        .await?;
624                    for (index, index_obj) in index_objs {
625                        objects.push(PbObjectInfo::Index(
626                            ObjectModel(index, index_obj.unwrap()).into(),
627                        ));
628                    }
629                }
630            }
631            ObjectType::Source => {
632                let source = Source::find_by_id(object_id)
633                    .one(&txn)
634                    .await?
635                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
636                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
637                let is_shared = source.is_shared();
638
639                let mut obj = obj.into_active_model();
640                obj.schema_id = Set(Some(new_schema));
641                let obj = obj.update(&txn).await?;
642                objects.push(PbObjectInfo::Source(ObjectModel(source, obj).into()));
643
644                // Note: For non-shared source, we don't update their state tables, which
645                // belongs to the MV.
646                if is_shared {
647                    update_internal_tables(
648                        &txn,
649                        object_id,
650                        object::Column::SchemaId,
651                        new_schema.into(),
652                        &mut objects,
653                    )
654                    .await?;
655                }
656            }
657            ObjectType::Sink => {
658                let sink = Sink::find_by_id(object_id)
659                    .one(&txn)
660                    .await?
661                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
662                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
663
664                let mut obj = obj.into_active_model();
665                obj.schema_id = Set(Some(new_schema));
666                let obj = obj.update(&txn).await?;
667                objects.push(PbObjectInfo::Sink(ObjectModel(sink, obj).into()));
668
669                update_internal_tables(
670                    &txn,
671                    object_id,
672                    object::Column::SchemaId,
673                    new_schema.into(),
674                    &mut objects,
675                )
676                .await?;
677            }
678            ObjectType::Subscription => {
679                let subscription = Subscription::find_by_id(object_id)
680                    .one(&txn)
681                    .await?
682                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
683                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
684                    .await?;
685
686                let mut obj = obj.into_active_model();
687                obj.schema_id = Set(Some(new_schema));
688                let obj = obj.update(&txn).await?;
689                objects.push(PbObjectInfo::Subscription(
690                    ObjectModel(subscription, obj).into(),
691                ));
692            }
693            ObjectType::View => {
694                let view = View::find_by_id(object_id)
695                    .one(&txn)
696                    .await?
697                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
698                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
699
700                let mut obj = obj.into_active_model();
701                obj.schema_id = Set(Some(new_schema));
702                let obj = obj.update(&txn).await?;
703                objects.push(PbObjectInfo::View(ObjectModel(view, obj).into()));
704            }
705            ObjectType::Function => {
706                let function = Function::find_by_id(object_id)
707                    .one(&txn)
708                    .await?
709                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
710
711                let mut pb_function: PbFunction = ObjectModel(function, obj).into();
712                pb_function.schema_id = new_schema;
713                check_function_signature_duplicate(&pb_function, &txn).await?;
714
715                object::ActiveModel {
716                    oid: Set(object_id),
717                    schema_id: Set(Some(new_schema)),
718                    ..Default::default()
719                }
720                .update(&txn)
721                .await?;
722
723                txn.commit().await?;
724                let version = self
725                    .notify_frontend(
726                        NotificationOperation::Update,
727                        NotificationInfo::Function(pb_function),
728                    )
729                    .await;
730                return Ok(version);
731            }
732            ObjectType::Connection => {
733                let connection = Connection::find_by_id(object_id)
734                    .one(&txn)
735                    .await?
736                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
737
738                let mut pb_connection: PbConnection = ObjectModel(connection, obj).into();
739                pb_connection.schema_id = new_schema;
740                check_connection_name_duplicate(&pb_connection, &txn).await?;
741
742                object::ActiveModel {
743                    oid: Set(object_id),
744                    schema_id: Set(Some(new_schema)),
745                    ..Default::default()
746                }
747                .update(&txn)
748                .await?;
749
750                txn.commit().await?;
751                let version = self
752                    .notify_frontend(
753                        NotificationOperation::Update,
754                        NotificationInfo::Connection(pb_connection),
755                    )
756                    .await;
757                return Ok(version);
758            }
759            _ => unreachable!("not supported object type: {:?}", object_type),
760        }
761
762        txn.commit().await?;
763        let version = self
764            .notify_frontend(
765                Operation::Update,
766                Info::ObjectGroup(PbObjectGroup {
767                    objects: objects
768                        .into_iter()
769                        .map(|relation_info| PbObject {
770                            object_info: Some(relation_info),
771                        })
772                        .collect_vec(),
773                }),
774            )
775            .await;
776        Ok(version)
777    }
778
779    pub async fn alter_secret(
780        &self,
781        pb_secret: PbSecret,
782        secret_plain_payload: Vec<u8>,
783    ) -> MetaResult<NotificationVersion> {
784        let inner = self.inner.write().await;
785        let owner_id = pb_secret.owner as _;
786        let txn = inner.db.begin().await?;
787        ensure_user_id(owner_id, &txn).await?;
788        ensure_object_id(
789            ObjectType::Database,
790            pb_secret.database_id.as_raw_id() as _,
791            &txn,
792        )
793        .await?;
794        ensure_object_id(
795            ObjectType::Schema,
796            pb_secret.schema_id.as_raw_id() as _,
797            &txn,
798        )
799        .await?;
800
801        ensure_object_id(ObjectType::Secret, pb_secret.id as _, &txn).await?;
802        let secret: secret::ActiveModel = pb_secret.clone().into();
803        Secret::update(secret).exec(&txn).await?;
804
805        txn.commit().await?;
806
807        // Notify the compute and frontend node plain secret
808        let mut secret_plain = pb_secret;
809        secret_plain.value.clone_from(&secret_plain_payload);
810
811        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
812        self.env
813            .notification_manager()
814            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
815
816        let version = self
817            .notify_frontend(
818                NotificationOperation::Update,
819                NotificationInfo::Secret(secret_plain),
820            )
821            .await;
822
823        Ok(version)
824    }
825
826    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
827    pub async fn drop_table_associated_source(
828        txn: &DatabaseTransaction,
829        drop_table_connector_ctx: &DropTableConnectorContext,
830    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
831        let to_drop_source_objects: Vec<PartialObject> = Object::find()
832            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
833            .into_partial_model()
834            .all(txn)
835            .await?;
836        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
837            .select_only()
838            .filter(
839                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
840            )
841            .into_partial_model()
842            .all(txn)
843            .await?;
844        let to_drop_objects = to_drop_source_objects
845            .into_iter()
846            .chain(to_drop_internal_table_objs.into_iter())
847            .collect_vec();
848        // Find affect users with privileges on all this objects.
849        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
850            .select_only()
851            .distinct()
852            .column(user_privilege::Column::UserId)
853            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
854            .into_tuple()
855            .all(txn)
856            .await?;
857
858        tracing::debug!(
859            "drop_table_associated_source: to_drop_objects: {:?}",
860            to_drop_objects
861        );
862
863        // delete all in to_drop_objects.
864        let res = Object::delete_many()
865            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
866            .exec(txn)
867            .await?;
868        if res.rows_affected == 0 {
869            return Err(MetaError::catalog_id_not_found(
870                ObjectType::Source.as_str(),
871                drop_table_connector_ctx.to_remove_source_id,
872            ));
873        }
874        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
875
876        Ok((user_infos, to_drop_objects))
877    }
878
879    pub async fn alter_database_param(
880        &self,
881        database_id: DatabaseId,
882        param: AlterDatabaseParam,
883    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
884        let inner = self.inner.write().await;
885        let txn = inner.db.begin().await?;
886
887        let mut database = database::ActiveModel {
888            database_id: Set(database_id),
889            ..Default::default()
890        };
891        match param {
892            AlterDatabaseParam::BarrierIntervalMs(interval) => {
893                if let Some(ref interval) = interval {
894                    OverrideValidate::barrier_interval_ms(interval)
895                        .map_err(|e| anyhow::anyhow!(e))?;
896                }
897                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
898            }
899            AlterDatabaseParam::CheckpointFrequency(frequency) => {
900                if let Some(ref frequency) = frequency {
901                    OverrideValidate::checkpoint_frequency(frequency)
902                        .map_err(|e| anyhow::anyhow!(e))?;
903                }
904                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
905            }
906        }
907        let database = database.update(&txn).await?;
908
909        let obj = Object::find_by_id(database_id.as_raw_id() as ObjectId)
910            .one(&txn)
911            .await?
912            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
913
914        txn.commit().await?;
915
916        let version = self
917            .notify_frontend(
918                NotificationOperation::Update,
919                NotificationInfo::Database(ObjectModel(database.clone(), obj).into()),
920            )
921            .await;
922        Ok((version, database))
923    }
924
925    /// Set the refresh state of a table
926    pub async fn set_table_refresh_state(
927        &self,
928        table_id: TableId,
929        new_state: RefreshState,
930    ) -> MetaResult<bool> {
931        let inner = self.inner.write().await;
932        let txn = inner.db.begin().await?;
933
934        // It is okay to update refresh state unconditionally because the check is done in `validate_refreshable_table` inside `RefreshManager`.
935        let active_model = table::ActiveModel {
936            table_id: Set(table_id),
937            refresh_state: Set(Some(new_state)),
938            ..Default::default()
939        };
940        if let Err(e) = active_model.update(&txn).await {
941            tracing::warn!(
942                "Failed to update table refresh state for table {}: {}",
943                table_id,
944                e.as_report()
945            );
946            let t = Table::find_by_id(table_id).all(&txn).await;
947            tracing::info!(table = ?t, "Table found");
948        }
949        txn.commit().await?;
950
951        tracing::debug!(
952            table_id = %table_id,
953            new_state = ?new_state,
954            "Updated table refresh state"
955        );
956
957        Ok(true)
958    }
959}