risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::cast::datetime_to_timestamp_millis;
17use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
18use risingwave_common::config::mutate::TomlTableMutateExt as _;
19use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
20use risingwave_common::id::JobId;
21use risingwave_common::system_param::{OverrideValidate, Validate};
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use sea_orm::ActiveValue::{NotSet, Set};
24use sea_orm::prelude::DateTime;
25use sea_orm::sea_query::Expr;
26use sea_orm::{ActiveModelTrait, DatabaseTransaction};
27
28use super::*;
29use crate::controller::utils::load_streaming_jobs_by_ids;
30use crate::error::bail_invalid_parameter;
31
32impl CatalogController {
33    async fn alter_database_name(
34        &self,
35        database_id: DatabaseId,
36        name: &str,
37    ) -> MetaResult<NotificationVersion> {
38        let inner = self.inner.write().await;
39        let txn = inner.db.begin().await?;
40        check_database_name_duplicate(name, &txn).await?;
41
42        let active_model = database::ActiveModel {
43            database_id: Set(database_id),
44            name: Set(name.to_owned()),
45            ..Default::default()
46        };
47        let database = active_model.update(&txn).await?;
48
49        let obj = Object::find_by_id(database_id)
50            .one(&txn)
51            .await?
52            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
53
54        txn.commit().await?;
55
56        let version = self
57            .notify_frontend(
58                NotificationOperation::Update,
59                NotificationInfo::Database(ObjectModel(database, obj, None).into()),
60            )
61            .await;
62        Ok(version)
63    }
64
65    async fn alter_schema_name(
66        &self,
67        schema_id: SchemaId,
68        name: &str,
69    ) -> MetaResult<NotificationVersion> {
70        let inner = self.inner.write().await;
71        let txn = inner.db.begin().await?;
72
73        let obj = Object::find_by_id(schema_id)
74            .one(&txn)
75            .await?
76            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
77        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
78
79        let active_model = schema::ActiveModel {
80            schema_id: Set(schema_id),
81            name: Set(name.to_owned()),
82        };
83        let schema = active_model.update(&txn).await?;
84
85        txn.commit().await?;
86
87        let version = self
88            .notify_frontend(
89                NotificationOperation::Update,
90                NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
91            )
92            .await;
93        Ok(version)
94    }
95
96    pub async fn alter_name(
97        &self,
98        object_type: ObjectType,
99        object_id: impl Into<ObjectId>,
100        object_name: &str,
101    ) -> MetaResult<NotificationVersion> {
102        let object_id = object_id.into();
103        if object_type == ObjectType::Database {
104            return self
105                .alter_database_name(object_id.as_database_id(), object_name)
106                .await;
107        } else if object_type == ObjectType::Schema {
108            return self
109                .alter_schema_name(object_id.as_schema_id(), object_name)
110                .await;
111        }
112
113        let inner = self.inner.write().await;
114        let txn = inner.db.begin().await?;
115        let obj: PartialObject = Object::find_by_id(object_id)
116            .into_partial_model()
117            .one(&txn)
118            .await?
119            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
120        assert_eq!(obj.obj_type, object_type);
121        check_relation_name_duplicate(
122            object_name,
123            obj.database_id.unwrap(),
124            obj.schema_id.unwrap(),
125            &txn,
126        )
127        .await?;
128
129        // rename relation.
130        let (mut to_update_relations, old_name) =
131            rename_relation(&txn, object_type, object_id, object_name).await?;
132        // rename referring relation name.
133        to_update_relations.extend(
134            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
135        );
136
137        txn.commit().await?;
138
139        let version = self
140            .notify_frontend(
141                NotificationOperation::Update,
142                NotificationInfo::ObjectGroup(PbObjectGroup {
143                    objects: to_update_relations,
144                    dependencies: vec![],
145                }),
146            )
147            .await;
148
149        Ok(version)
150    }
151
152    pub async fn alter_swap_rename(
153        &self,
154        object_type: ObjectType,
155        object_id: ObjectId,
156        dst_object_id: ObjectId,
157    ) -> MetaResult<NotificationVersion> {
158        let inner = self.inner.write().await;
159        let txn = inner.db.begin().await?;
160        let dst_name: String = match object_type {
161            ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
162                .select_only()
163                .column(table::Column::Name)
164                .into_tuple()
165                .one(&txn)
166                .await?
167                .ok_or_else(|| {
168                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
169                })?,
170            ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
171                .select_only()
172                .column(source::Column::Name)
173                .into_tuple()
174                .one(&txn)
175                .await?
176                .ok_or_else(|| {
177                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
178                })?,
179            ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
180                .select_only()
181                .column(sink::Column::Name)
182                .into_tuple()
183                .one(&txn)
184                .await?
185                .ok_or_else(|| {
186                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
187                })?,
188            ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
189                .select_only()
190                .column(view::Column::Name)
191                .into_tuple()
192                .one(&txn)
193                .await?
194                .ok_or_else(|| {
195                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
196                })?,
197            ObjectType::Subscription => {
198                Subscription::find_by_id(dst_object_id.as_subscription_id())
199                    .select_only()
200                    .column(subscription::Column::Name)
201                    .into_tuple()
202                    .one(&txn)
203                    .await?
204                    .ok_or_else(|| {
205                        MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
206                    })?
207            }
208            _ => {
209                return Err(MetaError::permission_denied(format!(
210                    "swap rename not supported for object type: {:?}",
211                    object_type
212                )));
213            }
214        };
215
216        // rename relations.
217        let (mut to_update_relations, src_name) =
218            rename_relation(&txn, object_type, object_id, &dst_name).await?;
219        let (to_update_relations2, _) =
220            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
221        to_update_relations.extend(to_update_relations2);
222        // rename referring relation name.
223        to_update_relations.extend(
224            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
225        );
226        to_update_relations.extend(
227            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
228        );
229
230        txn.commit().await?;
231
232        let version = self
233            .notify_frontend(
234                NotificationOperation::Update,
235                NotificationInfo::ObjectGroup(PbObjectGroup {
236                    objects: to_update_relations,
237                    dependencies: vec![],
238                }),
239            )
240            .await;
241
242        Ok(version)
243    }
244
245    pub async fn alter_non_shared_source(
246        &self,
247        pb_source: PbSource,
248    ) -> MetaResult<NotificationVersion> {
249        let source_id: SourceId = pb_source.id;
250        let inner = self.inner.write().await;
251        let txn = inner.db.begin().await?;
252
253        let original_version: i64 = Source::find_by_id(source_id)
254            .select_only()
255            .column(source::Column::Version)
256            .into_tuple()
257            .one(&txn)
258            .await?
259            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
260        if original_version + 1 != pb_source.version as i64 {
261            return Err(MetaError::permission_denied(
262                "source version is stale".to_owned(),
263            ));
264        }
265
266        let source: source::ActiveModel = pb_source.clone().into();
267        Source::update(source).exec(&txn).await?;
268        txn.commit().await?;
269
270        let version = self
271            .notify_frontend_relation_info(
272                NotificationOperation::Update,
273                PbObjectInfo::Source(pb_source),
274            )
275            .await;
276        Ok(version)
277    }
278
279    pub async fn alter_owner(
280        &self,
281        object_type: ObjectType,
282        object_id: ObjectId,
283        new_owner: UserId,
284    ) -> MetaResult<NotificationVersion> {
285        let inner = self.inner.write().await;
286        let txn = inner.db.begin().await?;
287        ensure_user_id(new_owner, &txn).await?;
288
289        let obj = Object::find_by_id(object_id)
290            .one(&txn)
291            .await?
292            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
293        if obj.owner_id == new_owner {
294            return Ok(IGNORED_NOTIFICATION_VERSION);
295        }
296        let mut obj = obj.into_active_model();
297        obj.owner_id = Set(new_owner);
298        let obj = obj.update(&txn).await?;
299
300        let mut objects = vec![];
301        match object_type {
302            ObjectType::Database => {
303                let db = Database::find_by_id(object_id.as_database_id())
304                    .one(&txn)
305                    .await?
306                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
307
308                txn.commit().await?;
309
310                let version = self
311                    .notify_frontend(
312                        NotificationOperation::Update,
313                        NotificationInfo::Database(ObjectModel(db, obj, None).into()),
314                    )
315                    .await;
316                return Ok(version);
317            }
318            ObjectType::Schema => {
319                let schema = Schema::find_by_id(object_id.as_schema_id())
320                    .one(&txn)
321                    .await?
322                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
323
324                txn.commit().await?;
325
326                let version = self
327                    .notify_frontend(
328                        NotificationOperation::Update,
329                        NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
330                    )
331                    .await;
332                return Ok(version);
333            }
334            ObjectType::Table => {
335                let table = Table::find_by_id(object_id.as_table_id())
336                    .one(&txn)
337                    .await?
338                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
339                let streaming_job = streaming_job::Entity::find_by_id(object_id.as_job_id())
340                    .one(&txn)
341                    .await?;
342
343                // associated source.
344                if let Some(associated_source_id) = table.optional_associated_source_id {
345                    let src_obj = object::ActiveModel {
346                        oid: Set(associated_source_id.as_object_id()),
347                        owner_id: Set(new_owner),
348                        ..Default::default()
349                    }
350                    .update(&txn)
351                    .await?;
352                    let source = Source::find_by_id(associated_source_id)
353                        .one(&txn)
354                        .await?
355                        .ok_or_else(|| {
356                            MetaError::catalog_id_not_found("source", associated_source_id)
357                        })?;
358                    objects.push(PbObjectInfo::Source(
359                        ObjectModel(source, src_obj, None).into(),
360                    ));
361                }
362
363                // associated sink and source for iceberg table.
364                if matches!(table.engine, Some(table::Engine::Iceberg)) {
365                    let iceberg_sink = Sink::find()
366                        .inner_join(Object)
367                        .select_only()
368                        .column(sink::Column::SinkId)
369                        .filter(
370                            object::Column::DatabaseId
371                                .eq(obj.database_id)
372                                .and(object::Column::SchemaId.eq(obj.schema_id))
373                                .and(
374                                    sink::Column::Name
375                                        .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
376                                ),
377                        )
378                        .into_tuple::<SinkId>()
379                        .one(&txn)
380                        .await?
381                        .expect("iceberg sink must exist");
382                    let sink_obj = object::ActiveModel {
383                        oid: Set(iceberg_sink.as_object_id()),
384                        owner_id: Set(new_owner),
385                        ..Default::default()
386                    }
387                    .update(&txn)
388                    .await?;
389                    let sink = Sink::find_by_id(iceberg_sink)
390                        .one(&txn)
391                        .await?
392                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
393                    objects.push(PbObjectInfo::Sink(
394                        ObjectModel(sink, sink_obj, streaming_job.clone()).into(),
395                    ));
396
397                    let iceberg_source = Source::find()
398                        .inner_join(Object)
399                        .select_only()
400                        .column(source::Column::SourceId)
401                        .filter(
402                            object::Column::DatabaseId
403                                .eq(obj.database_id)
404                                .and(object::Column::SchemaId.eq(obj.schema_id))
405                                .and(
406                                    source::Column::Name
407                                        .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
408                                ),
409                        )
410                        .into_tuple::<SourceId>()
411                        .one(&txn)
412                        .await?
413                        .expect("iceberg source must exist");
414                    let source_obj = object::ActiveModel {
415                        oid: Set(iceberg_source.as_object_id()),
416                        owner_id: Set(new_owner),
417                        ..Default::default()
418                    }
419                    .update(&txn)
420                    .await?;
421                    let source = Source::find_by_id(iceberg_source)
422                        .one(&txn)
423                        .await?
424                        .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
425                    objects.push(PbObjectInfo::Source(
426                        ObjectModel(source, source_obj, None).into(),
427                    ));
428                }
429
430                // indexes.
431                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
432                    .select_only()
433                    .columns([index::Column::IndexId, index::Column::IndexTableId])
434                    .filter(index::Column::PrimaryTableId.eq(object_id))
435                    .into_tuple::<(IndexId, TableId)>()
436                    .all(&txn)
437                    .await?
438                    .into_iter()
439                    .unzip();
440                objects.push(PbObjectInfo::Table(
441                    ObjectModel(table, obj, streaming_job).into(),
442                ));
443
444                // internal tables.
445                let internal_tables: Vec<TableId> = Table::find()
446                    .select_only()
447                    .column(table::Column::TableId)
448                    .filter(
449                        table::Column::BelongsToJobId.is_in(
450                            table_ids
451                                .iter()
452                                .cloned()
453                                .chain(std::iter::once(object_id.as_table_id())),
454                        ),
455                    )
456                    .into_tuple()
457                    .all(&txn)
458                    .await?;
459                table_ids.extend(internal_tables);
460
461                if !index_ids.is_empty() || !table_ids.is_empty() {
462                    Object::update_many()
463                        .col_expr(object::Column::OwnerId, SimpleExpr::Value(new_owner.into()))
464                        .filter(
465                            object::Column::Oid.is_in::<ObjectId, _>(
466                                index_ids
467                                    .iter()
468                                    .copied()
469                                    .map_into()
470                                    .chain(table_ids.iter().copied().map_into()),
471                            ),
472                        )
473                        .exec(&txn)
474                        .await?;
475                }
476
477                if !table_ids.is_empty() {
478                    let table_objs = Table::find()
479                        .find_also_related(Object)
480                        .filter(table::Column::TableId.is_in(table_ids))
481                        .all(&txn)
482                        .await?;
483                    let streaming_jobs = load_streaming_jobs_by_ids(
484                        &txn,
485                        table_objs.iter().map(|(table, _)| table.job_id()),
486                    )
487                    .await?;
488                    for (table, table_obj) in table_objs {
489                        let job_id = table.job_id();
490                        let streaming_job = streaming_jobs.get(&job_id).cloned();
491                        objects.push(PbObjectInfo::Table(
492                            ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
493                        ));
494                    }
495                }
496                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
497                if !index_ids.is_empty() {
498                    let index_objs = Index::find()
499                        .find_also_related(Object)
500                        .filter(index::Column::IndexId.is_in(index_ids))
501                        .all(&txn)
502                        .await?;
503                    let streaming_jobs = load_streaming_jobs_by_ids(
504                        &txn,
505                        index_objs
506                            .iter()
507                            .map(|(index, _)| index.index_id.as_job_id()),
508                    )
509                    .await?;
510                    for (index, index_obj) in index_objs {
511                        let streaming_job =
512                            streaming_jobs.get(&index.index_id.as_job_id()).cloned();
513                        objects.push(PbObjectInfo::Index(
514                            ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
515                        ));
516                    }
517                }
518            }
519            ObjectType::Source => {
520                let source = Source::find_by_id(object_id.as_source_id())
521                    .one(&txn)
522                    .await?
523                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
524                let is_shared = source.is_shared();
525                objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
526
527                // Note: For non-shared source, we don't update their state tables, which
528                // belongs to the MV.
529                if is_shared {
530                    update_internal_tables(
531                        &txn,
532                        object_id,
533                        object::Column::OwnerId,
534                        new_owner,
535                        &mut objects,
536                    )
537                    .await?;
538                }
539            }
540            ObjectType::Sink => {
541                let (sink, sink_obj) = Sink::find_by_id(object_id.as_sink_id())
542                    .find_also_related(Object)
543                    .one(&txn)
544                    .await?
545                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
546                let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
547                    .one(&txn)
548                    .await?;
549                objects.push(PbObjectInfo::Sink(
550                    ObjectModel(sink, sink_obj.unwrap(), streaming_job).into(),
551                ));
552
553                update_internal_tables(
554                    &txn,
555                    object_id,
556                    object::Column::OwnerId,
557                    new_owner,
558                    &mut objects,
559                )
560                .await?;
561            }
562            ObjectType::Subscription => {
563                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
564                    .one(&txn)
565                    .await?
566                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
567                objects.push(PbObjectInfo::Subscription(
568                    ObjectModel(subscription, obj, None).into(),
569                ));
570            }
571            ObjectType::View => {
572                let view = View::find_by_id(object_id.as_view_id())
573                    .one(&txn)
574                    .await?
575                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
576                objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
577            }
578            ObjectType::Connection => {
579                let connection = Connection::find_by_id(object_id.as_connection_id())
580                    .one(&txn)
581                    .await?
582                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
583                objects.push(PbObjectInfo::Connection(
584                    ObjectModel(connection, obj, None).into(),
585                ));
586            }
587            ObjectType::Function => {
588                let function = Function::find_by_id(object_id.as_function_id())
589                    .one(&txn)
590                    .await?
591                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
592                objects.push(PbObjectInfo::Function(
593                    ObjectModel(function, obj, None).into(),
594                ));
595            }
596            ObjectType::Secret => {
597                let secret = Secret::find_by_id(object_id.as_secret_id())
598                    .one(&txn)
599                    .await?
600                    .ok_or_else(|| MetaError::catalog_id_not_found("secret", object_id))?;
601                objects.push(PbObjectInfo::Secret(ObjectModel(secret, obj, None).into()));
602            }
603            _ => unreachable!("not supported object type: {:?}", object_type),
604        };
605
606        txn.commit().await?;
607
608        let version = self
609            .notify_frontend(
610                NotificationOperation::Update,
611                NotificationInfo::ObjectGroup(PbObjectGroup {
612                    objects: objects
613                        .into_iter()
614                        .map(|object| PbObject {
615                            object_info: Some(object),
616                        })
617                        .collect(),
618                    dependencies: vec![],
619                }),
620            )
621            .await;
622        Ok(version)
623    }
624
625    pub async fn alter_schema(
626        &self,
627        object_type: ObjectType,
628        object_id: ObjectId,
629        new_schema: SchemaId,
630    ) -> MetaResult<NotificationVersion> {
631        let inner = self.inner.write().await;
632        let txn = inner.db.begin().await?;
633        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
634
635        let obj = Object::find_by_id(object_id)
636            .one(&txn)
637            .await?
638            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
639        if obj.schema_id == Some(new_schema) {
640            return Ok(IGNORED_NOTIFICATION_VERSION);
641        }
642        let streaming_job = StreamingJob::find_by_id(object_id.as_job_id())
643            .one(&txn)
644            .await?;
645        let database_id = obj.database_id.unwrap();
646
647        let mut objects = vec![];
648        match object_type {
649            ObjectType::Table => {
650                let table = Table::find_by_id(object_id.as_table_id())
651                    .one(&txn)
652                    .await?
653                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
654                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
655                let associated_src_id = table.optional_associated_source_id;
656
657                let mut obj = obj.into_active_model();
658                obj.schema_id = Set(Some(new_schema));
659                let obj = obj.update(&txn).await?;
660
661                objects.push(PbObjectInfo::Table(
662                    ObjectModel(table, obj, streaming_job).into(),
663                ));
664
665                // associated source.
666                if let Some(associated_source_id) = associated_src_id {
667                    let src_obj = object::ActiveModel {
668                        oid: Set(associated_source_id.as_object_id()),
669                        schema_id: Set(Some(new_schema)),
670                        ..Default::default()
671                    }
672                    .update(&txn)
673                    .await?;
674                    let source = Source::find_by_id(associated_source_id)
675                        .one(&txn)
676                        .await?
677                        .ok_or_else(|| {
678                            MetaError::catalog_id_not_found("source", associated_source_id)
679                        })?;
680                    objects.push(PbObjectInfo::Source(
681                        ObjectModel(source, src_obj, None).into(),
682                    ));
683                }
684
685                // indexes.
686                let (index_ids, (index_names, mut table_ids)): (
687                    Vec<IndexId>,
688                    (Vec<String>, Vec<TableId>),
689                ) = Index::find()
690                    .select_only()
691                    .columns([
692                        index::Column::IndexId,
693                        index::Column::Name,
694                        index::Column::IndexTableId,
695                    ])
696                    .filter(index::Column::PrimaryTableId.eq(object_id))
697                    .into_tuple::<(IndexId, String, TableId)>()
698                    .all(&txn)
699                    .await?
700                    .into_iter()
701                    .map(|(id, name, t_id)| (id, (name, t_id)))
702                    .unzip();
703
704                // internal tables.
705                let internal_tables: Vec<TableId> = Table::find()
706                    .select_only()
707                    .column(table::Column::TableId)
708                    .filter(
709                        table::Column::BelongsToJobId.is_in(
710                            table_ids
711                                .iter()
712                                .map(|table_id| table_id.as_job_id())
713                                .chain(std::iter::once(object_id.as_job_id())),
714                        ),
715                    )
716                    .into_tuple()
717                    .all(&txn)
718                    .await?;
719                table_ids.extend(internal_tables);
720
721                if !index_ids.is_empty() || !table_ids.is_empty() {
722                    for index_name in index_names {
723                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
724                            .await?;
725                    }
726
727                    Object::update_many()
728                        .col_expr(object::Column::SchemaId, new_schema.into())
729                        .filter(
730                            object::Column::Oid.is_in::<ObjectId, _>(
731                                index_ids
732                                    .iter()
733                                    .copied()
734                                    .map_into()
735                                    .chain(table_ids.iter().copied().map_into()),
736                            ),
737                        )
738                        .exec(&txn)
739                        .await?;
740                }
741
742                if !table_ids.is_empty() {
743                    let table_objs = Table::find()
744                        .find_also_related(Object)
745                        .filter(table::Column::TableId.is_in(table_ids))
746                        .all(&txn)
747                        .await?;
748                    let streaming_jobs = load_streaming_jobs_by_ids(
749                        &txn,
750                        table_objs.iter().map(|(table, _)| table.job_id()),
751                    )
752                    .await?;
753                    for (table, table_obj) in table_objs {
754                        let job_id = table.job_id();
755                        let streaming_job = streaming_jobs.get(&job_id).cloned();
756                        objects.push(PbObjectInfo::Table(
757                            ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
758                        ));
759                    }
760                }
761                if !index_ids.is_empty() {
762                    let index_objs = Index::find()
763                        .find_also_related(Object)
764                        .filter(index::Column::IndexId.is_in(index_ids))
765                        .all(&txn)
766                        .await?;
767                    let streaming_jobs = load_streaming_jobs_by_ids(
768                        &txn,
769                        index_objs
770                            .iter()
771                            .map(|(index, _)| index.index_id.as_job_id()),
772                    )
773                    .await?;
774                    for (index, index_obj) in index_objs {
775                        let streaming_job =
776                            streaming_jobs.get(&index.index_id.as_job_id()).cloned();
777                        objects.push(PbObjectInfo::Index(
778                            ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
779                        ));
780                    }
781                }
782            }
783            ObjectType::Source => {
784                let source = Source::find_by_id(object_id.as_source_id())
785                    .one(&txn)
786                    .await?
787                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
788                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
789                let is_shared = source.is_shared();
790
791                let mut obj = obj.into_active_model();
792                obj.schema_id = Set(Some(new_schema));
793                let obj = obj.update(&txn).await?;
794                objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
795
796                // Note: For non-shared source, we don't update their state tables, which
797                // belongs to the MV.
798                if is_shared {
799                    update_internal_tables(
800                        &txn,
801                        object_id,
802                        object::Column::SchemaId,
803                        new_schema,
804                        &mut objects,
805                    )
806                    .await?;
807                }
808            }
809            ObjectType::Sink => {
810                let sink = Sink::find_by_id(object_id.as_sink_id())
811                    .one(&txn)
812                    .await?
813                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
814                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
815
816                let mut obj = obj.into_active_model();
817                obj.schema_id = Set(Some(new_schema));
818                let obj = obj.update(&txn).await?;
819                objects.push(PbObjectInfo::Sink(
820                    ObjectModel(sink, obj, streaming_job).into(),
821                ));
822
823                update_internal_tables(
824                    &txn,
825                    object_id,
826                    object::Column::SchemaId,
827                    new_schema,
828                    &mut objects,
829                )
830                .await?;
831            }
832            ObjectType::Subscription => {
833                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
834                    .one(&txn)
835                    .await?
836                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
837                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
838                    .await?;
839
840                let mut obj = obj.into_active_model();
841                obj.schema_id = Set(Some(new_schema));
842                let obj = obj.update(&txn).await?;
843                objects.push(PbObjectInfo::Subscription(
844                    ObjectModel(subscription, obj, None).into(),
845                ));
846            }
847            ObjectType::View => {
848                let view = View::find_by_id(object_id.as_view_id())
849                    .one(&txn)
850                    .await?
851                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
852                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
853
854                let mut obj = obj.into_active_model();
855                obj.schema_id = Set(Some(new_schema));
856                let obj = obj.update(&txn).await?;
857                objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
858            }
859            ObjectType::Function => {
860                let function = Function::find_by_id(object_id.as_function_id())
861                    .one(&txn)
862                    .await?
863                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
864
865                let mut pb_function: PbFunction = ObjectModel(function, obj, None).into();
866                pb_function.schema_id = new_schema;
867                check_function_signature_duplicate(&pb_function, &txn).await?;
868
869                Object::update(object::ActiveModel {
870                    oid: Set(object_id),
871                    schema_id: Set(Some(new_schema)),
872                    ..Default::default()
873                })
874                .exec(&txn)
875                .await?;
876
877                txn.commit().await?;
878                let version = self
879                    .notify_frontend(
880                        NotificationOperation::Update,
881                        NotificationInfo::Function(pb_function),
882                    )
883                    .await;
884                return Ok(version);
885            }
886            ObjectType::Connection => {
887                let connection = Connection::find_by_id(object_id.as_connection_id())
888                    .one(&txn)
889                    .await?
890                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
891
892                let mut pb_connection: PbConnection = ObjectModel(connection, obj, None).into();
893                pb_connection.schema_id = new_schema;
894                check_connection_name_duplicate(&pb_connection, &txn).await?;
895
896                Object::update(object::ActiveModel {
897                    oid: Set(object_id),
898                    schema_id: Set(Some(new_schema)),
899                    ..Default::default()
900                })
901                .exec(&txn)
902                .await?;
903
904                txn.commit().await?;
905                let version = self
906                    .notify_frontend(
907                        NotificationOperation::Update,
908                        NotificationInfo::Connection(pb_connection),
909                    )
910                    .await;
911                return Ok(version);
912            }
913            _ => unreachable!("not supported object type: {:?}", object_type),
914        }
915
916        txn.commit().await?;
917        let version = self
918            .notify_frontend(
919                Operation::Update,
920                Info::ObjectGroup(PbObjectGroup {
921                    objects: objects
922                        .into_iter()
923                        .map(|relation_info| PbObject {
924                            object_info: Some(relation_info),
925                        })
926                        .collect_vec(),
927                    dependencies: vec![],
928                }),
929            )
930            .await;
931        Ok(version)
932    }
933
934    pub async fn alter_secret(
935        &self,
936        pb_secret: PbSecret,
937        secret_plain_payload: Vec<u8>,
938    ) -> MetaResult<NotificationVersion> {
939        let inner = self.inner.write().await;
940        let owner_id = pb_secret.owner as _;
941        let txn = inner.db.begin().await?;
942        ensure_user_id(owner_id, &txn).await?;
943        ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
944        ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
945
946        ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
947        let secret: secret::ActiveModel = pb_secret.clone().into();
948        Secret::update(secret).exec(&txn).await?;
949
950        txn.commit().await?;
951
952        // Notify the compute and frontend node plain secret
953        let mut secret_plain = pb_secret;
954        secret_plain.value.clone_from(&secret_plain_payload);
955
956        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
957        self.env
958            .notification_manager()
959            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
960
961        let version = self
962            .notify_frontend(
963                NotificationOperation::Update,
964                NotificationInfo::Secret(secret_plain),
965            )
966            .await;
967
968        Ok(version)
969    }
970
971    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
972    pub async fn drop_table_associated_source(
973        txn: &DatabaseTransaction,
974        drop_table_connector_ctx: &DropTableConnectorContext,
975    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
976        let to_drop_source_objects: Vec<PartialObject> = Object::find()
977            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
978            .into_partial_model()
979            .all(txn)
980            .await?;
981        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
982            .select_only()
983            .filter(
984                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
985            )
986            .into_partial_model()
987            .all(txn)
988            .await?;
989        let to_drop_objects = to_drop_source_objects
990            .into_iter()
991            .chain(to_drop_internal_table_objs.into_iter())
992            .collect_vec();
993        // Find affect users with privileges on all this objects.
994        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
995            .select_only()
996            .distinct()
997            .column(user_privilege::Column::UserId)
998            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
999            .into_tuple()
1000            .all(txn)
1001            .await?;
1002
1003        tracing::debug!(
1004            "drop_table_associated_source: to_drop_objects: {:?}",
1005            to_drop_objects
1006        );
1007
1008        // delete all in to_drop_objects.
1009        let res = Object::delete_many()
1010            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1011            .exec(txn)
1012            .await?;
1013        if res.rows_affected == 0 {
1014            return Err(MetaError::catalog_id_not_found(
1015                ObjectType::Source.as_str(),
1016                drop_table_connector_ctx.to_remove_source_id,
1017            ));
1018        }
1019        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
1020
1021        Ok((user_infos, to_drop_objects))
1022    }
1023
1024    pub async fn alter_database_param(
1025        &self,
1026        database_id: DatabaseId,
1027        param: AlterDatabaseParam,
1028    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
1029        let inner = self.inner.write().await;
1030        let txn = inner.db.begin().await?;
1031
1032        let mut database = database::ActiveModel {
1033            database_id: Set(database_id),
1034            ..Default::default()
1035        };
1036        match param {
1037            AlterDatabaseParam::BarrierIntervalMs(interval) => {
1038                if let Some(ref interval) = interval {
1039                    OverrideValidate::barrier_interval_ms(interval)
1040                        .map_err(|e| anyhow::anyhow!(e))?;
1041                }
1042                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
1043            }
1044            AlterDatabaseParam::CheckpointFrequency(frequency) => {
1045                if let Some(ref frequency) = frequency {
1046                    OverrideValidate::checkpoint_frequency(frequency)
1047                        .map_err(|e| anyhow::anyhow!(e))?;
1048                }
1049                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
1050            }
1051        }
1052        let database = database.update(&txn).await?;
1053
1054        let obj = Object::find_by_id(database_id)
1055            .one(&txn)
1056            .await?
1057            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1058
1059        txn.commit().await?;
1060
1061        let version = self
1062            .notify_frontend(
1063                NotificationOperation::Update,
1064                NotificationInfo::Database(ObjectModel(database.clone(), obj, None).into()),
1065            )
1066            .await;
1067        Ok((version, database))
1068    }
1069
1070    pub async fn alter_streaming_job_config(
1071        &self,
1072        job_id: JobId,
1073        entries_to_add: HashMap<String, String>,
1074        keys_to_remove: Vec<String>,
1075    ) -> MetaResult<NotificationVersion> {
1076        let inner = self.inner.write().await;
1077        let txn = inner.db.begin().await?;
1078
1079        let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1080            .select_only()
1081            .column(streaming_job::Column::ConfigOverride)
1082            .into_tuple()
1083            .one(&txn)
1084            .await?
1085            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1086        let config_override = config_override.unwrap_or_default();
1087
1088        let mut table: toml::Table =
1089            toml::from_str(&config_override).context("invalid streaming job config")?;
1090
1091        // The frontend guarantees that there's no duplicated keys in `to_add` and `to_remove`.
1092        for (key, value) in entries_to_add {
1093            let value: toml::Value = value
1094                .parse()
1095                .with_context(|| format!("invalid config value for path {key}"))?;
1096            table
1097                .upsert(&key, value)
1098                .with_context(|| format!("failed to set config path {key}"))?;
1099        }
1100        for key in keys_to_remove {
1101            table
1102                .delete(&key)
1103                .with_context(|| format!("failed to reset config path {key}"))?;
1104        }
1105
1106        let updated_config_override = table.to_string();
1107
1108        // Validate the config override by trying to merge it to the default config.
1109        {
1110            let merged = merge_streaming_config_section(
1111                &StreamingConfig::default(),
1112                &updated_config_override,
1113            )
1114            .context("invalid streaming job config override")?;
1115
1116            // Reject unrecognized entries.
1117            // Note: If these unrecognized entries are pre-existing, we also reject them here.
1118            // Users are able to fix them by issuing a `RESET` first.
1119            if let Some(merged) = merged {
1120                let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1121                if !unrecognized_keys.is_empty() {
1122                    bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1123                }
1124            }
1125        }
1126
1127        StreamingJob::update(streaming_job::ActiveModel {
1128            job_id: Set(job_id),
1129            config_override: Set(Some(updated_config_override)),
1130            ..Default::default()
1131        })
1132        .exec(&txn)
1133        .await?;
1134
1135        txn.commit().await?;
1136
1137        Ok(IGNORED_NOTIFICATION_VERSION)
1138    }
1139
1140    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1141        let inner = self.inner.read().await;
1142        let active = refresh_job::ActiveModel {
1143            table_id: Set(table_id),
1144            last_trigger_time: Set(None),
1145            trigger_interval_secs: Set(None),
1146            current_status: Set(RefreshState::Idle),
1147            last_success_time: Set(None),
1148        };
1149        match RefreshJob::insert(active)
1150            .on_conflict_do_nothing()
1151            .exec(&inner.db)
1152            .await
1153        {
1154            Ok(_) => Ok(()),
1155            Err(sea_orm::DbErr::RecordNotInserted) => {
1156                // This is expected when the refresh job already exists due to ON CONFLICT DO NOTHING
1157                tracing::debug!("refresh job already exists for table_id={}", table_id);
1158                Ok(())
1159            }
1160            Err(e) => Err(e.into()),
1161        }
1162    }
1163
1164    pub async fn update_refresh_job_status(
1165        &self,
1166        table_id: TableId,
1167        status: RefreshState,
1168        trigger_time: Option<DateTime>,
1169        is_success: bool,
1170    ) -> MetaResult<()> {
1171        self.ensure_refresh_job(table_id).await?;
1172        let inner = self.inner.read().await;
1173
1174        // expect only update trigger_time when the status changes to Refreshing
1175        assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1176        let active = refresh_job::ActiveModel {
1177            table_id: Set(table_id),
1178            current_status: Set(status),
1179            last_trigger_time: if trigger_time.is_some() {
1180                Set(trigger_time.map(datetime_to_timestamp_millis))
1181            } else {
1182                NotSet
1183            },
1184            last_success_time: if is_success {
1185                Set(Some(chrono::Utc::now().timestamp_millis()))
1186            } else {
1187                NotSet
1188            },
1189            ..Default::default()
1190        };
1191        RefreshJob::update(active).exec(&inner.db).await?;
1192        Ok(())
1193    }
1194
1195    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1196        let inner = self.inner.read().await;
1197        RefreshJob::update_many()
1198            .col_expr(
1199                refresh_job::Column::CurrentStatus,
1200                Expr::value(RefreshState::Idle),
1201            )
1202            .exec(&inner.db)
1203            .await?;
1204        Ok(())
1205    }
1206
1207    pub async fn update_refresh_job_interval(
1208        &self,
1209        table_id: TableId,
1210        trigger_interval_secs: Option<i64>,
1211    ) -> MetaResult<()> {
1212        self.ensure_refresh_job(table_id).await?;
1213        let inner = self.inner.read().await;
1214        let active = refresh_job::ActiveModel {
1215            table_id: Set(table_id),
1216            trigger_interval_secs: Set(trigger_interval_secs),
1217            ..Default::default()
1218        };
1219        RefreshJob::update(active).exec(&inner.db).await?;
1220        Ok(())
1221    }
1222}