Skip to main content

risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::cast::datetime_to_timestamp_millis;
17use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
18use risingwave_common::config::mutate::TomlTableMutateExt as _;
19use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
20use risingwave_common::id::JobId;
21use risingwave_common::system_param::{OverrideValidate, Validate};
22use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
23use risingwave_meta_model::refresh_job::{self, RefreshState};
24use sea_orm::ActiveValue::{NotSet, Set};
25use sea_orm::prelude::DateTime;
26use sea_orm::sea_query::Expr;
27use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseTransaction, SqlErr};
28use thiserror_ext::AsReport;
29
30use super::*;
31use crate::controller::utils::load_streaming_jobs_by_ids;
32use crate::error::bail_invalid_parameter;
33
34impl CatalogController {
35    async fn alter_database_name(
36        &self,
37        database_id: DatabaseId,
38        name: &str,
39    ) -> MetaResult<NotificationVersion> {
40        let inner = self.inner.write().await;
41        let txn = inner.db.begin().await?;
42        check_database_name_duplicate(name, &txn).await?;
43
44        let active_model = database::ActiveModel {
45            database_id: Set(database_id),
46            name: Set(name.to_owned()),
47            ..Default::default()
48        };
49        let database = active_model.update(&txn).await?;
50
51        let obj = Object::find_by_id(database_id)
52            .one(&txn)
53            .await?
54            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
55
56        txn.commit().await?;
57
58        let version = self
59            .notify_frontend(
60                NotificationOperation::Update,
61                NotificationInfo::Database(ObjectModel(database, obj, None).into()),
62            )
63            .await;
64        Ok(version)
65    }
66
67    async fn alter_schema_name(
68        &self,
69        schema_id: SchemaId,
70        name: &str,
71    ) -> MetaResult<NotificationVersion> {
72        let inner = self.inner.write().await;
73        let txn = inner.db.begin().await?;
74
75        let obj = Object::find_by_id(schema_id)
76            .one(&txn)
77            .await?
78            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
79        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
80
81        let active_model = schema::ActiveModel {
82            schema_id: Set(schema_id),
83            name: Set(name.to_owned()),
84        };
85        let schema = active_model.update(&txn).await?;
86
87        txn.commit().await?;
88
89        let version = self
90            .notify_frontend(
91                NotificationOperation::Update,
92                NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
93            )
94            .await;
95        Ok(version)
96    }
97
98    pub async fn alter_name(
99        &self,
100        object_type: ObjectType,
101        object_id: impl Into<ObjectId>,
102        object_name: &str,
103    ) -> MetaResult<NotificationVersion> {
104        let object_id = object_id.into();
105        if object_type == ObjectType::Database {
106            return self
107                .alter_database_name(object_id.as_database_id(), object_name)
108                .await;
109        } else if object_type == ObjectType::Schema {
110            return self
111                .alter_schema_name(object_id.as_schema_id(), object_name)
112                .await;
113        }
114
115        let inner = self.inner.write().await;
116        let txn = inner.db.begin().await?;
117        let obj: PartialObject = Object::find_by_id(object_id)
118            .into_partial_model()
119            .one(&txn)
120            .await?
121            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
122        assert_eq!(obj.obj_type, object_type);
123        check_relation_name_duplicate(
124            object_name,
125            obj.database_id.unwrap(),
126            obj.schema_id.unwrap(),
127            &txn,
128        )
129        .await?;
130
131        // rename relation.
132        let (mut to_update_relations, old_name) =
133            rename_relation(&txn, object_type, object_id, object_name).await?;
134        // rename referring relation name.
135        to_update_relations.extend(
136            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
137        );
138
139        txn.commit().await?;
140
141        let version = self
142            .notify_frontend(
143                NotificationOperation::Update,
144                NotificationInfo::ObjectGroup(PbObjectGroup {
145                    objects: to_update_relations,
146                    dependencies: vec![],
147                }),
148            )
149            .await;
150
151        Ok(version)
152    }
153
154    pub async fn alter_swap_rename(
155        &self,
156        object_type: ObjectType,
157        object_id: ObjectId,
158        dst_object_id: ObjectId,
159    ) -> MetaResult<NotificationVersion> {
160        let inner = self.inner.write().await;
161        let txn = inner.db.begin().await?;
162        let dst_name: String = match object_type {
163            ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
164                .select_only()
165                .column(table::Column::Name)
166                .into_tuple()
167                .one(&txn)
168                .await?
169                .ok_or_else(|| {
170                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
171                })?,
172            ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
173                .select_only()
174                .column(source::Column::Name)
175                .into_tuple()
176                .one(&txn)
177                .await?
178                .ok_or_else(|| {
179                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
180                })?,
181            ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
182                .select_only()
183                .column(sink::Column::Name)
184                .into_tuple()
185                .one(&txn)
186                .await?
187                .ok_or_else(|| {
188                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
189                })?,
190            ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
191                .select_only()
192                .column(view::Column::Name)
193                .into_tuple()
194                .one(&txn)
195                .await?
196                .ok_or_else(|| {
197                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
198                })?,
199            ObjectType::Subscription => {
200                Subscription::find_by_id(dst_object_id.as_subscription_id())
201                    .select_only()
202                    .column(subscription::Column::Name)
203                    .into_tuple()
204                    .one(&txn)
205                    .await?
206                    .ok_or_else(|| {
207                        MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
208                    })?
209            }
210            _ => {
211                return Err(MetaError::permission_denied(format!(
212                    "swap rename not supported for object type: {:?}",
213                    object_type
214                )));
215            }
216        };
217
218        // rename relations.
219        let (mut to_update_relations, src_name) =
220            rename_relation(&txn, object_type, object_id, &dst_name).await?;
221        let (to_update_relations2, _) =
222            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
223        to_update_relations.extend(to_update_relations2);
224        // rename referring relation name.
225        to_update_relations.extend(
226            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
227        );
228        to_update_relations.extend(
229            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
230        );
231
232        txn.commit().await?;
233
234        let version = self
235            .notify_frontend(
236                NotificationOperation::Update,
237                NotificationInfo::ObjectGroup(PbObjectGroup {
238                    objects: to_update_relations,
239                    dependencies: vec![],
240                }),
241            )
242            .await;
243
244        Ok(version)
245    }
246
247    pub async fn alter_non_shared_source(
248        &self,
249        pb_source: PbSource,
250    ) -> MetaResult<NotificationVersion> {
251        let source_id: SourceId = pb_source.id;
252        let inner = self.inner.write().await;
253        let txn = inner.db.begin().await?;
254
255        let original_version: i64 = Source::find_by_id(source_id)
256            .select_only()
257            .column(source::Column::Version)
258            .into_tuple()
259            .one(&txn)
260            .await?
261            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
262        if original_version + 1 != pb_source.version as i64 {
263            return Err(MetaError::permission_denied(
264                "source version is stale".to_owned(),
265            ));
266        }
267
268        let source: source::ActiveModel = pb_source.clone().into();
269        Source::update(source).exec(&txn).await?;
270        txn.commit().await?;
271
272        let version = self
273            .notify_frontend_relation_info(
274                NotificationOperation::Update,
275                PbObjectInfo::Source(pb_source),
276            )
277            .await;
278        Ok(version)
279    }
280
281    pub async fn alter_owner(
282        &self,
283        object_type: ObjectType,
284        object_id: ObjectId,
285        new_owner: UserId,
286    ) -> MetaResult<NotificationVersion> {
287        let inner = self.inner.write().await;
288        let txn = inner.db.begin().await?;
289        ensure_user_id(new_owner, &txn).await?;
290
291        let obj = Object::find_by_id(object_id)
292            .one(&txn)
293            .await?
294            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
295        if obj.owner_id == new_owner {
296            return Ok(IGNORED_NOTIFICATION_VERSION);
297        }
298        let mut obj = obj.into_active_model();
299        obj.owner_id = Set(new_owner);
300        let obj = obj.update(&txn).await?;
301
302        let mut objects = vec![];
303        match object_type {
304            ObjectType::Database => {
305                let db = Database::find_by_id(object_id.as_database_id())
306                    .one(&txn)
307                    .await?
308                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
309
310                txn.commit().await?;
311
312                let version = self
313                    .notify_frontend(
314                        NotificationOperation::Update,
315                        NotificationInfo::Database(ObjectModel(db, obj, None).into()),
316                    )
317                    .await;
318                return Ok(version);
319            }
320            ObjectType::Schema => {
321                let schema = Schema::find_by_id(object_id.as_schema_id())
322                    .one(&txn)
323                    .await?
324                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
325
326                txn.commit().await?;
327
328                let version = self
329                    .notify_frontend(
330                        NotificationOperation::Update,
331                        NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
332                    )
333                    .await;
334                return Ok(version);
335            }
336            ObjectType::Table => {
337                let table = Table::find_by_id(object_id.as_table_id())
338                    .one(&txn)
339                    .await?
340                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
341                let streaming_job = streaming_job::Entity::find_by_id(object_id.as_job_id())
342                    .one(&txn)
343                    .await?;
344
345                // associated source.
346                if let Some(associated_source_id) = table.optional_associated_source_id {
347                    let src_obj = object::ActiveModel {
348                        oid: Set(associated_source_id.as_object_id()),
349                        owner_id: Set(new_owner),
350                        ..Default::default()
351                    }
352                    .update(&txn)
353                    .await?;
354                    let source = Source::find_by_id(associated_source_id)
355                        .one(&txn)
356                        .await?
357                        .ok_or_else(|| {
358                            MetaError::catalog_id_not_found("source", associated_source_id)
359                        })?;
360                    objects.push(PbObjectInfo::Source(
361                        ObjectModel(source, src_obj, None).into(),
362                    ));
363                }
364
365                // associated sink and source for iceberg table.
366                if matches!(table.engine, Some(table::Engine::Iceberg)) {
367                    let iceberg_sink = Sink::find()
368                        .inner_join(Object)
369                        .select_only()
370                        .column(sink::Column::SinkId)
371                        .filter(
372                            object::Column::DatabaseId
373                                .eq(obj.database_id)
374                                .and(object::Column::SchemaId.eq(obj.schema_id))
375                                .and(
376                                    sink::Column::Name
377                                        .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
378                                ),
379                        )
380                        .into_tuple::<SinkId>()
381                        .one(&txn)
382                        .await?
383                        .expect("iceberg sink must exist");
384                    let sink_obj = object::ActiveModel {
385                        oid: Set(iceberg_sink.as_object_id()),
386                        owner_id: Set(new_owner),
387                        ..Default::default()
388                    }
389                    .update(&txn)
390                    .await?;
391                    let sink = Sink::find_by_id(iceberg_sink)
392                        .one(&txn)
393                        .await?
394                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
395                    objects.push(PbObjectInfo::Sink(
396                        ObjectModel(sink, sink_obj, streaming_job.clone()).into(),
397                    ));
398
399                    let iceberg_source = Source::find()
400                        .inner_join(Object)
401                        .select_only()
402                        .column(source::Column::SourceId)
403                        .filter(
404                            object::Column::DatabaseId
405                                .eq(obj.database_id)
406                                .and(object::Column::SchemaId.eq(obj.schema_id))
407                                .and(
408                                    source::Column::Name
409                                        .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
410                                ),
411                        )
412                        .into_tuple::<SourceId>()
413                        .one(&txn)
414                        .await?
415                        .expect("iceberg source must exist");
416                    let source_obj = object::ActiveModel {
417                        oid: Set(iceberg_source.as_object_id()),
418                        owner_id: Set(new_owner),
419                        ..Default::default()
420                    }
421                    .update(&txn)
422                    .await?;
423                    let source = Source::find_by_id(iceberg_source)
424                        .one(&txn)
425                        .await?
426                        .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
427                    objects.push(PbObjectInfo::Source(
428                        ObjectModel(source, source_obj, None).into(),
429                    ));
430                }
431
432                // indexes.
433                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
434                    .select_only()
435                    .columns([index::Column::IndexId, index::Column::IndexTableId])
436                    .filter(index::Column::PrimaryTableId.eq(object_id))
437                    .into_tuple::<(IndexId, TableId)>()
438                    .all(&txn)
439                    .await?
440                    .into_iter()
441                    .unzip();
442                objects.push(PbObjectInfo::Table(
443                    ObjectModel(table, obj, streaming_job).into(),
444                ));
445
446                // internal tables.
447                let internal_tables: Vec<TableId> = Table::find()
448                    .select_only()
449                    .column(table::Column::TableId)
450                    .filter(
451                        table::Column::BelongsToJobId.is_in(
452                            table_ids
453                                .iter()
454                                .cloned()
455                                .chain(std::iter::once(object_id.as_table_id())),
456                        ),
457                    )
458                    .into_tuple()
459                    .all(&txn)
460                    .await?;
461                table_ids.extend(internal_tables);
462
463                if !index_ids.is_empty() || !table_ids.is_empty() {
464                    Object::update_many()
465                        .col_expr(object::Column::OwnerId, SimpleExpr::Value(new_owner.into()))
466                        .filter(
467                            object::Column::Oid.is_in::<ObjectId, _>(
468                                index_ids
469                                    .iter()
470                                    .copied()
471                                    .map_into()
472                                    .chain(table_ids.iter().copied().map_into()),
473                            ),
474                        )
475                        .exec(&txn)
476                        .await?;
477                }
478
479                if !table_ids.is_empty() {
480                    let table_objs = Table::find()
481                        .find_also_related(Object)
482                        .filter(table::Column::TableId.is_in(table_ids))
483                        .all(&txn)
484                        .await?;
485                    let streaming_jobs = load_streaming_jobs_by_ids(
486                        &txn,
487                        table_objs.iter().map(|(table, _)| table.job_id()),
488                    )
489                    .await?;
490                    for (table, table_obj) in table_objs {
491                        let job_id = table.job_id();
492                        let streaming_job = streaming_jobs.get(&job_id).cloned();
493                        objects.push(PbObjectInfo::Table(
494                            ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
495                        ));
496                    }
497                }
498                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
499                if !index_ids.is_empty() {
500                    let index_objs = Index::find()
501                        .find_also_related(Object)
502                        .filter(index::Column::IndexId.is_in(index_ids))
503                        .all(&txn)
504                        .await?;
505                    let streaming_jobs = load_streaming_jobs_by_ids(
506                        &txn,
507                        index_objs
508                            .iter()
509                            .map(|(index, _)| index.index_id.as_job_id()),
510                    )
511                    .await?;
512                    for (index, index_obj) in index_objs {
513                        let streaming_job =
514                            streaming_jobs.get(&index.index_id.as_job_id()).cloned();
515                        objects.push(PbObjectInfo::Index(
516                            ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
517                        ));
518                    }
519                }
520            }
521            ObjectType::Source => {
522                let source = Source::find_by_id(object_id.as_source_id())
523                    .one(&txn)
524                    .await?
525                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
526                let is_shared = source.is_shared();
527                objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
528
529                // Note: For non-shared source, we don't update their state tables, which
530                // belongs to the MV.
531                if is_shared {
532                    update_internal_tables(
533                        &txn,
534                        object_id,
535                        object::Column::OwnerId,
536                        new_owner,
537                        &mut objects,
538                    )
539                    .await?;
540                }
541            }
542            ObjectType::Sink => {
543                let (sink, sink_obj) = Sink::find_by_id(object_id.as_sink_id())
544                    .find_also_related(Object)
545                    .one(&txn)
546                    .await?
547                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
548                let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
549                    .one(&txn)
550                    .await?;
551                objects.push(PbObjectInfo::Sink(
552                    ObjectModel(sink, sink_obj.unwrap(), streaming_job).into(),
553                ));
554
555                update_internal_tables(
556                    &txn,
557                    object_id,
558                    object::Column::OwnerId,
559                    new_owner,
560                    &mut objects,
561                )
562                .await?;
563            }
564            ObjectType::Subscription => {
565                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
566                    .one(&txn)
567                    .await?
568                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
569                objects.push(PbObjectInfo::Subscription(
570                    ObjectModel(subscription, obj, None).into(),
571                ));
572            }
573            ObjectType::View => {
574                let view = View::find_by_id(object_id.as_view_id())
575                    .one(&txn)
576                    .await?
577                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
578                objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
579            }
580            ObjectType::Connection => {
581                let connection = Connection::find_by_id(object_id.as_connection_id())
582                    .one(&txn)
583                    .await?
584                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
585                objects.push(PbObjectInfo::Connection(
586                    ObjectModel(connection, obj, None).into(),
587                ));
588            }
589            ObjectType::Function => {
590                let function = Function::find_by_id(object_id.as_function_id())
591                    .one(&txn)
592                    .await?
593                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
594                objects.push(PbObjectInfo::Function(
595                    ObjectModel(function, obj, None).into(),
596                ));
597            }
598            ObjectType::Secret => {
599                let secret = Secret::find_by_id(object_id.as_secret_id())
600                    .one(&txn)
601                    .await?
602                    .ok_or_else(|| MetaError::catalog_id_not_found("secret", object_id))?;
603                objects.push(PbObjectInfo::Secret(ObjectModel(secret, obj, None).into()));
604            }
605            _ => unreachable!("not supported object type: {:?}", object_type),
606        };
607
608        txn.commit().await?;
609
610        let version = self
611            .notify_frontend(
612                NotificationOperation::Update,
613                NotificationInfo::ObjectGroup(PbObjectGroup {
614                    objects: objects
615                        .into_iter()
616                        .map(|object| PbObject {
617                            object_info: Some(object),
618                        })
619                        .collect(),
620                    dependencies: vec![],
621                }),
622            )
623            .await;
624        Ok(version)
625    }
626
627    pub async fn alter_schema(
628        &self,
629        object_type: ObjectType,
630        object_id: ObjectId,
631        new_schema: SchemaId,
632    ) -> MetaResult<NotificationVersion> {
633        let inner = self.inner.write().await;
634        let txn = inner.db.begin().await?;
635        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
636
637        let obj = Object::find_by_id(object_id)
638            .one(&txn)
639            .await?
640            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
641        if obj.schema_id == Some(new_schema) {
642            return Ok(IGNORED_NOTIFICATION_VERSION);
643        }
644        let streaming_job = StreamingJob::find_by_id(object_id.as_job_id())
645            .one(&txn)
646            .await?;
647        let database_id = obj.database_id.unwrap();
648
649        let mut objects = vec![];
650        match object_type {
651            ObjectType::Table => {
652                let table = Table::find_by_id(object_id.as_table_id())
653                    .one(&txn)
654                    .await?
655                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
656                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
657                let associated_src_id = table.optional_associated_source_id;
658
659                let mut obj = obj.into_active_model();
660                obj.schema_id = Set(Some(new_schema));
661                let obj = obj.update(&txn).await?;
662
663                objects.push(PbObjectInfo::Table(
664                    ObjectModel(table, obj, streaming_job).into(),
665                ));
666
667                // associated source.
668                if let Some(associated_source_id) = associated_src_id {
669                    let src_obj = object::ActiveModel {
670                        oid: Set(associated_source_id.as_object_id()),
671                        schema_id: Set(Some(new_schema)),
672                        ..Default::default()
673                    }
674                    .update(&txn)
675                    .await?;
676                    let source = Source::find_by_id(associated_source_id)
677                        .one(&txn)
678                        .await?
679                        .ok_or_else(|| {
680                            MetaError::catalog_id_not_found("source", associated_source_id)
681                        })?;
682                    objects.push(PbObjectInfo::Source(
683                        ObjectModel(source, src_obj, None).into(),
684                    ));
685                }
686
687                // indexes.
688                let (index_ids, (index_names, mut table_ids)): (
689                    Vec<IndexId>,
690                    (Vec<String>, Vec<TableId>),
691                ) = Index::find()
692                    .select_only()
693                    .columns([
694                        index::Column::IndexId,
695                        index::Column::Name,
696                        index::Column::IndexTableId,
697                    ])
698                    .filter(index::Column::PrimaryTableId.eq(object_id))
699                    .into_tuple::<(IndexId, String, TableId)>()
700                    .all(&txn)
701                    .await?
702                    .into_iter()
703                    .map(|(id, name, t_id)| (id, (name, t_id)))
704                    .unzip();
705
706                // internal tables.
707                let internal_tables: Vec<TableId> = Table::find()
708                    .select_only()
709                    .column(table::Column::TableId)
710                    .filter(
711                        table::Column::BelongsToJobId.is_in(
712                            table_ids
713                                .iter()
714                                .map(|table_id| table_id.as_job_id())
715                                .chain(std::iter::once(object_id.as_job_id())),
716                        ),
717                    )
718                    .into_tuple()
719                    .all(&txn)
720                    .await?;
721                table_ids.extend(internal_tables);
722
723                if !index_ids.is_empty() || !table_ids.is_empty() {
724                    for index_name in index_names {
725                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
726                            .await?;
727                    }
728
729                    Object::update_many()
730                        .col_expr(object::Column::SchemaId, new_schema.into())
731                        .filter(
732                            object::Column::Oid.is_in::<ObjectId, _>(
733                                index_ids
734                                    .iter()
735                                    .copied()
736                                    .map_into()
737                                    .chain(table_ids.iter().copied().map_into()),
738                            ),
739                        )
740                        .exec(&txn)
741                        .await?;
742                }
743
744                if !table_ids.is_empty() {
745                    let table_objs = Table::find()
746                        .find_also_related(Object)
747                        .filter(table::Column::TableId.is_in(table_ids))
748                        .all(&txn)
749                        .await?;
750                    let streaming_jobs = load_streaming_jobs_by_ids(
751                        &txn,
752                        table_objs.iter().map(|(table, _)| table.job_id()),
753                    )
754                    .await?;
755                    for (table, table_obj) in table_objs {
756                        let job_id = table.job_id();
757                        let streaming_job = streaming_jobs.get(&job_id).cloned();
758                        objects.push(PbObjectInfo::Table(
759                            ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
760                        ));
761                    }
762                }
763                if !index_ids.is_empty() {
764                    let index_objs = Index::find()
765                        .find_also_related(Object)
766                        .filter(index::Column::IndexId.is_in(index_ids))
767                        .all(&txn)
768                        .await?;
769                    let streaming_jobs = load_streaming_jobs_by_ids(
770                        &txn,
771                        index_objs
772                            .iter()
773                            .map(|(index, _)| index.index_id.as_job_id()),
774                    )
775                    .await?;
776                    for (index, index_obj) in index_objs {
777                        let streaming_job =
778                            streaming_jobs.get(&index.index_id.as_job_id()).cloned();
779                        objects.push(PbObjectInfo::Index(
780                            ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
781                        ));
782                    }
783                }
784            }
785            ObjectType::Source => {
786                let source = Source::find_by_id(object_id.as_source_id())
787                    .one(&txn)
788                    .await?
789                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
790                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
791                let is_shared = source.is_shared();
792
793                let mut obj = obj.into_active_model();
794                obj.schema_id = Set(Some(new_schema));
795                let obj = obj.update(&txn).await?;
796                objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
797
798                // Note: For non-shared source, we don't update their state tables, which
799                // belongs to the MV.
800                if is_shared {
801                    update_internal_tables(
802                        &txn,
803                        object_id,
804                        object::Column::SchemaId,
805                        new_schema,
806                        &mut objects,
807                    )
808                    .await?;
809                }
810            }
811            ObjectType::Sink => {
812                let sink = Sink::find_by_id(object_id.as_sink_id())
813                    .one(&txn)
814                    .await?
815                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
816                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
817
818                let mut obj = obj.into_active_model();
819                obj.schema_id = Set(Some(new_schema));
820                let obj = obj.update(&txn).await?;
821                objects.push(PbObjectInfo::Sink(
822                    ObjectModel(sink, obj, streaming_job).into(),
823                ));
824
825                update_internal_tables(
826                    &txn,
827                    object_id,
828                    object::Column::SchemaId,
829                    new_schema,
830                    &mut objects,
831                )
832                .await?;
833            }
834            ObjectType::Subscription => {
835                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
836                    .one(&txn)
837                    .await?
838                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
839                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
840                    .await?;
841
842                let mut obj = obj.into_active_model();
843                obj.schema_id = Set(Some(new_schema));
844                let obj = obj.update(&txn).await?;
845                objects.push(PbObjectInfo::Subscription(
846                    ObjectModel(subscription, obj, None).into(),
847                ));
848            }
849            ObjectType::View => {
850                let view = View::find_by_id(object_id.as_view_id())
851                    .one(&txn)
852                    .await?
853                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
854                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
855
856                let mut obj = obj.into_active_model();
857                obj.schema_id = Set(Some(new_schema));
858                let obj = obj.update(&txn).await?;
859                objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
860            }
861            ObjectType::Function => {
862                let function = Function::find_by_id(object_id.as_function_id())
863                    .one(&txn)
864                    .await?
865                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
866
867                let mut pb_function: PbFunction = ObjectModel(function, obj, None).into();
868                pb_function.schema_id = new_schema;
869                check_function_signature_duplicate(&pb_function, &txn).await?;
870
871                Object::update(object::ActiveModel {
872                    oid: Set(object_id),
873                    schema_id: Set(Some(new_schema)),
874                    ..Default::default()
875                })
876                .exec(&txn)
877                .await?;
878
879                txn.commit().await?;
880                let version = self
881                    .notify_frontend(
882                        NotificationOperation::Update,
883                        NotificationInfo::Function(pb_function),
884                    )
885                    .await;
886                return Ok(version);
887            }
888            ObjectType::Connection => {
889                let connection = Connection::find_by_id(object_id.as_connection_id())
890                    .one(&txn)
891                    .await?
892                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
893
894                let mut pb_connection: PbConnection = ObjectModel(connection, obj, None).into();
895                pb_connection.schema_id = new_schema;
896                check_connection_name_duplicate(&pb_connection, &txn).await?;
897
898                Object::update(object::ActiveModel {
899                    oid: Set(object_id),
900                    schema_id: Set(Some(new_schema)),
901                    ..Default::default()
902                })
903                .exec(&txn)
904                .await?;
905
906                txn.commit().await?;
907                let version = self
908                    .notify_frontend(
909                        NotificationOperation::Update,
910                        NotificationInfo::Connection(pb_connection),
911                    )
912                    .await;
913                return Ok(version);
914            }
915            _ => unreachable!("not supported object type: {:?}", object_type),
916        }
917
918        txn.commit().await?;
919        let version = self
920            .notify_frontend(
921                Operation::Update,
922                Info::ObjectGroup(PbObjectGroup {
923                    objects: objects
924                        .into_iter()
925                        .map(|relation_info| PbObject {
926                            object_info: Some(relation_info),
927                        })
928                        .collect_vec(),
929                    dependencies: vec![],
930                }),
931            )
932            .await;
933        Ok(version)
934    }
935
936    pub async fn alter_secret(
937        &self,
938        pb_secret: PbSecret,
939        secret_plain_payload: Vec<u8>,
940    ) -> MetaResult<NotificationVersion> {
941        let inner = self.inner.write().await;
942        let owner_id = pb_secret.owner as _;
943        let txn = inner.db.begin().await?;
944        ensure_user_id(owner_id, &txn).await?;
945        ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
946        ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
947
948        ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
949        let secret: secret::ActiveModel = pb_secret.clone().into();
950        Secret::update(secret).exec(&txn).await?;
951
952        txn.commit().await?;
953
954        // Notify the compute and frontend node plain secret
955        let mut secret_plain = pb_secret;
956        secret_plain.value.clone_from(&secret_plain_payload);
957
958        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
959        self.env
960            .notification_manager()
961            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
962
963        let version = self
964            .notify_frontend(
965                NotificationOperation::Update,
966                NotificationInfo::Secret(secret_plain),
967            )
968            .await;
969
970        Ok(version)
971    }
972
973    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
974    pub async fn drop_table_associated_source(
975        txn: &DatabaseTransaction,
976        drop_table_connector_ctx: &DropTableConnectorContext,
977    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
978        let to_drop_source_objects: Vec<PartialObject> = Object::find()
979            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
980            .into_partial_model()
981            .all(txn)
982            .await?;
983        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
984            .select_only()
985            .filter(
986                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
987            )
988            .into_partial_model()
989            .all(txn)
990            .await?;
991        let to_drop_objects = to_drop_source_objects
992            .into_iter()
993            .chain(to_drop_internal_table_objs)
994            .collect_vec();
995        // Find affect users with privileges on all this objects.
996        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
997            .select_only()
998            .distinct()
999            .column(user_privilege::Column::UserId)
1000            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1001            .into_tuple()
1002            .all(txn)
1003            .await?;
1004
1005        tracing::debug!(
1006            "drop_table_associated_source: to_drop_objects: {:?}",
1007            to_drop_objects
1008        );
1009
1010        // delete all in to_drop_objects.
1011        let res = Object::delete_many()
1012            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1013            .exec(txn)
1014            .await?;
1015        if res.rows_affected == 0 {
1016            return Err(MetaError::catalog_id_not_found(
1017                ObjectType::Source.as_str(),
1018                drop_table_connector_ctx.to_remove_source_id,
1019            ));
1020        }
1021        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
1022
1023        Ok((user_infos, to_drop_objects))
1024    }
1025
1026    pub async fn alter_database_param(
1027        &self,
1028        database_id: DatabaseId,
1029        param: AlterDatabaseParam,
1030    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
1031        let inner = self.inner.write().await;
1032        let txn = inner.db.begin().await?;
1033
1034        let mut database = database::ActiveModel {
1035            database_id: Set(database_id),
1036            ..Default::default()
1037        };
1038        match param {
1039            AlterDatabaseParam::BarrierIntervalMs(interval) => {
1040                if let Some(ref interval) = interval {
1041                    OverrideValidate::barrier_interval_ms(interval)
1042                        .map_err(|e| anyhow::anyhow!(e))?;
1043                }
1044                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
1045            }
1046            AlterDatabaseParam::CheckpointFrequency(frequency) => {
1047                if let Some(ref frequency) = frequency {
1048                    OverrideValidate::checkpoint_frequency(frequency)
1049                        .map_err(|e| anyhow::anyhow!(e))?;
1050                }
1051                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
1052            }
1053        }
1054        let database = database.update(&txn).await?;
1055
1056        let obj = Object::find_by_id(database_id)
1057            .one(&txn)
1058            .await?
1059            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1060
1061        txn.commit().await?;
1062
1063        let version = self
1064            .notify_frontend(
1065                NotificationOperation::Update,
1066                NotificationInfo::Database(ObjectModel(database.clone(), obj, None).into()),
1067            )
1068            .await;
1069        Ok((version, database))
1070    }
1071
1072    pub async fn alter_database_resource_group(
1073        &self,
1074        database_id: DatabaseId,
1075        resource_group: Option<String>,
1076    ) -> MetaResult<NotificationVersion> {
1077        let inner = self.inner.write().await;
1078        let txn = inner.db.begin().await?;
1079
1080        let database =
1081            database::ActiveModel {
1082                database_id: Set(database_id),
1083                resource_group: Set(
1084                    resource_group.unwrap_or_else(|| DEFAULT_RESOURCE_GROUP.to_owned())
1085                ),
1086                ..Default::default()
1087            }
1088            .update(&txn)
1089            .await?;
1090
1091        let obj = Object::find_by_id(database_id)
1092            .one(&txn)
1093            .await?
1094            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1095
1096        txn.commit().await?;
1097
1098        let version = self
1099            .notify_frontend(
1100                NotificationOperation::Update,
1101                NotificationInfo::Database(ObjectModel(database, obj, None).into()),
1102            )
1103            .await;
1104        Ok(version)
1105    }
1106
1107    pub async fn alter_subscription_retention(
1108        &self,
1109        subscription_id: SubscriptionId,
1110        retention_seconds: u64,
1111        definition: String,
1112    ) -> MetaResult<(NotificationVersion, PbSubscription)> {
1113        let inner = self.inner.write().await;
1114        let txn = inner.db.begin().await?;
1115
1116        let obj = Object::find_by_id(subscription_id)
1117            .one(&txn)
1118            .await?
1119            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1120
1121        let active_model = subscription::ActiveModel {
1122            subscription_id: Set(subscription_id),
1123            retention_seconds: Set(retention_seconds as i64),
1124            definition: Set(definition),
1125            ..Default::default()
1126        };
1127        let subscription = active_model.update(&txn).await?;
1128
1129        txn.commit().await?;
1130
1131        let pb_subscription: PbSubscription = ObjectModel(subscription, obj, None).into();
1132        let subscription_info = PbObjectInfo::Subscription(pb_subscription.clone());
1133
1134        let version = self
1135            .notify_frontend(
1136                NotificationOperation::Update,
1137                NotificationInfo::ObjectGroup(PbObjectGroup {
1138                    objects: vec![PbObject {
1139                        object_info: Some(subscription_info),
1140                    }],
1141                    dependencies: vec![],
1142                }),
1143            )
1144            .await;
1145
1146        Ok((version, pb_subscription))
1147    }
1148
1149    pub async fn alter_streaming_job_config(
1150        &self,
1151        job_id: JobId,
1152        entries_to_add: HashMap<String, String>,
1153        keys_to_remove: Vec<String>,
1154    ) -> MetaResult<NotificationVersion> {
1155        let inner = self.inner.write().await;
1156        let txn = inner.db.begin().await?;
1157
1158        let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1159            .select_only()
1160            .column(streaming_job::Column::ConfigOverride)
1161            .into_tuple()
1162            .one(&txn)
1163            .await?
1164            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1165        let config_override = config_override.unwrap_or_default();
1166
1167        let mut table: toml::Table =
1168            toml::from_str(&config_override).context("invalid streaming job config")?;
1169
1170        // The frontend guarantees that there's no duplicated keys in `to_add` and `to_remove`.
1171        for (key, value) in entries_to_add {
1172            let value: toml::Value = value
1173                .parse()
1174                .with_context(|| format!("invalid config value for path {key}"))?;
1175            table
1176                .upsert(&key, value)
1177                .with_context(|| format!("failed to set config path {key}"))?;
1178        }
1179        for key in keys_to_remove {
1180            table
1181                .delete(&key)
1182                .with_context(|| format!("failed to reset config path {key}"))?;
1183        }
1184
1185        let updated_config_override = table.to_string();
1186
1187        // Validate the config override by trying to merge it to the default config.
1188        {
1189            let merged = merge_streaming_config_section(
1190                &StreamingConfig::default(),
1191                &updated_config_override,
1192            )
1193            .context("invalid streaming job config override")?;
1194
1195            // Reject unrecognized entries.
1196            // Note: If these unrecognized entries are pre-existing, we also reject them here.
1197            // Users are able to fix them by issuing a `RESET` first.
1198            if let Some(merged) = merged {
1199                let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1200                if !unrecognized_keys.is_empty() {
1201                    bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1202                }
1203            }
1204        }
1205
1206        StreamingJob::update(streaming_job::ActiveModel {
1207            job_id: Set(job_id),
1208            config_override: Set(Some(updated_config_override)),
1209            ..Default::default()
1210        })
1211        .exec(&txn)
1212        .await?;
1213
1214        txn.commit().await?;
1215
1216        Ok(IGNORED_NOTIFICATION_VERSION)
1217    }
1218
1219    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1220        let inner = self.inner.read().await;
1221        let active = refresh_job::ActiveModel {
1222            table_id: Set(table_id),
1223            last_trigger_time: Set(None),
1224            trigger_interval_secs: Set(None),
1225            current_status: Set(RefreshState::Idle),
1226            last_success_time: Set(None),
1227        };
1228        match RefreshJob::insert(active)
1229            .on_conflict_do_nothing()
1230            .exec(&inner.db)
1231            .await
1232        {
1233            Ok(_) => Ok(()),
1234            Err(sea_orm::DbErr::RecordNotInserted) => {
1235                // This is expected when the refresh job already exists due to ON CONFLICT DO NOTHING
1236                tracing::debug!("refresh job already exists for table_id={}", table_id);
1237                Ok(())
1238            }
1239            Err(e) => {
1240                if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1241                    tracing::warn!(
1242                        %table_id,
1243                        error = %e.as_report(),
1244                        "skip ensure_refresh_job for stale dropped table"
1245                    );
1246                    Ok(())
1247                } else {
1248                    Err(e.into())
1249                }
1250            }
1251        }
1252    }
1253
1254    pub async fn update_refresh_job_status(
1255        &self,
1256        table_id: TableId,
1257        status: RefreshState,
1258        trigger_time: Option<DateTime>,
1259        is_success: bool,
1260    ) -> MetaResult<()> {
1261        self.ensure_refresh_job(table_id).await?;
1262        let inner = self.inner.read().await;
1263
1264        // expect only update trigger_time when the status changes to Refreshing
1265        assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1266        let active = refresh_job::ActiveModel {
1267            table_id: Set(table_id),
1268            current_status: Set(status),
1269            last_trigger_time: if trigger_time.is_some() {
1270                Set(trigger_time.map(datetime_to_timestamp_millis))
1271            } else {
1272                NotSet
1273            },
1274            last_success_time: if is_success {
1275                Set(Some(chrono::Utc::now().timestamp_millis()))
1276            } else {
1277                NotSet
1278            },
1279            ..Default::default()
1280        };
1281        match RefreshJob::update(active).exec(&inner.db).await {
1282            Ok(_) => Ok(()),
1283            Err(e) => {
1284                if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1285                    tracing::warn!(
1286                        %table_id,
1287                        error = %e.as_report(),
1288                        "skip update_refresh_job_status for stale dropped table"
1289                    );
1290                    Ok(())
1291                } else {
1292                    Err(e.into())
1293                }
1294            }
1295        }
1296    }
1297
1298    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1299        let inner = self.inner.read().await;
1300        RefreshJob::update_many()
1301            .col_expr(
1302                refresh_job::Column::CurrentStatus,
1303                Expr::value(RefreshState::Idle),
1304            )
1305            .exec(&inner.db)
1306            .await?;
1307        Ok(())
1308    }
1309
1310    pub async fn update_refresh_job_interval(
1311        &self,
1312        table_id: TableId,
1313        trigger_interval_secs: Option<i64>,
1314    ) -> MetaResult<()> {
1315        self.ensure_refresh_job(table_id).await?;
1316        let inner = self.inner.read().await;
1317        let active = refresh_job::ActiveModel {
1318            table_id: Set(table_id),
1319            trigger_interval_secs: Set(trigger_interval_secs),
1320            ..Default::default()
1321        };
1322        RefreshJob::update(active).exec(&inner.db).await?;
1323        Ok(())
1324    }
1325}
1326
1327async fn should_skip_refresh_job_db_err<C>(
1328    db: &C,
1329    table_id: TableId,
1330    err: &sea_orm::DbErr,
1331) -> MetaResult<bool>
1332where
1333    C: ConnectionTrait,
1334{
1335    if matches!(err, sea_orm::DbErr::RecordNotUpdated) {
1336        return Ok(true);
1337    }
1338
1339    if !matches!(
1340        err.sql_err(),
1341        Some(SqlErr::ForeignKeyConstraintViolation(_))
1342    ) {
1343        return Ok(false);
1344    }
1345
1346    let table_exists = Table::find_by_id(table_id).one(db).await?.is_some();
1347    Ok(!table_exists)
1348}