risingwave_meta/controller/catalog/
alter_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context;
16use risingwave_common::cast::datetime_to_timestamp_millis;
17use risingwave_common::catalog::{AlterDatabaseParam, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
18use risingwave_common::config::mutate::TomlTableMutateExt as _;
19use risingwave_common::config::{StreamingConfig, merge_streaming_config_section};
20use risingwave_common::id::JobId;
21use risingwave_common::system_param::{OverrideValidate, Validate};
22use risingwave_meta_model::refresh_job::{self, RefreshState};
23use sea_orm::ActiveValue::{NotSet, Set};
24use sea_orm::prelude::DateTime;
25use sea_orm::sea_query::Expr;
26use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseTransaction, SqlErr};
27use thiserror_ext::AsReport;
28
29use super::*;
30use crate::controller::utils::load_streaming_jobs_by_ids;
31use crate::error::bail_invalid_parameter;
32
33impl CatalogController {
34    async fn alter_database_name(
35        &self,
36        database_id: DatabaseId,
37        name: &str,
38    ) -> MetaResult<NotificationVersion> {
39        let inner = self.inner.write().await;
40        let txn = inner.db.begin().await?;
41        check_database_name_duplicate(name, &txn).await?;
42
43        let active_model = database::ActiveModel {
44            database_id: Set(database_id),
45            name: Set(name.to_owned()),
46            ..Default::default()
47        };
48        let database = active_model.update(&txn).await?;
49
50        let obj = Object::find_by_id(database_id)
51            .one(&txn)
52            .await?
53            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
54
55        txn.commit().await?;
56
57        let version = self
58            .notify_frontend(
59                NotificationOperation::Update,
60                NotificationInfo::Database(ObjectModel(database, obj, None).into()),
61            )
62            .await;
63        Ok(version)
64    }
65
66    async fn alter_schema_name(
67        &self,
68        schema_id: SchemaId,
69        name: &str,
70    ) -> MetaResult<NotificationVersion> {
71        let inner = self.inner.write().await;
72        let txn = inner.db.begin().await?;
73
74        let obj = Object::find_by_id(schema_id)
75            .one(&txn)
76            .await?
77            .ok_or_else(|| MetaError::catalog_id_not_found("schema", schema_id))?;
78        check_schema_name_duplicate(name, obj.database_id.unwrap(), &txn).await?;
79
80        let active_model = schema::ActiveModel {
81            schema_id: Set(schema_id),
82            name: Set(name.to_owned()),
83        };
84        let schema = active_model.update(&txn).await?;
85
86        txn.commit().await?;
87
88        let version = self
89            .notify_frontend(
90                NotificationOperation::Update,
91                NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
92            )
93            .await;
94        Ok(version)
95    }
96
97    pub async fn alter_name(
98        &self,
99        object_type: ObjectType,
100        object_id: impl Into<ObjectId>,
101        object_name: &str,
102    ) -> MetaResult<NotificationVersion> {
103        let object_id = object_id.into();
104        if object_type == ObjectType::Database {
105            return self
106                .alter_database_name(object_id.as_database_id(), object_name)
107                .await;
108        } else if object_type == ObjectType::Schema {
109            return self
110                .alter_schema_name(object_id.as_schema_id(), object_name)
111                .await;
112        }
113
114        let inner = self.inner.write().await;
115        let txn = inner.db.begin().await?;
116        let obj: PartialObject = Object::find_by_id(object_id)
117            .into_partial_model()
118            .one(&txn)
119            .await?
120            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
121        assert_eq!(obj.obj_type, object_type);
122        check_relation_name_duplicate(
123            object_name,
124            obj.database_id.unwrap(),
125            obj.schema_id.unwrap(),
126            &txn,
127        )
128        .await?;
129
130        // rename relation.
131        let (mut to_update_relations, old_name) =
132            rename_relation(&txn, object_type, object_id, object_name).await?;
133        // rename referring relation name.
134        to_update_relations.extend(
135            rename_relation_refer(&txn, object_type, object_id, object_name, &old_name).await?,
136        );
137
138        txn.commit().await?;
139
140        let version = self
141            .notify_frontend(
142                NotificationOperation::Update,
143                NotificationInfo::ObjectGroup(PbObjectGroup {
144                    objects: to_update_relations,
145                    dependencies: vec![],
146                }),
147            )
148            .await;
149
150        Ok(version)
151    }
152
153    pub async fn alter_swap_rename(
154        &self,
155        object_type: ObjectType,
156        object_id: ObjectId,
157        dst_object_id: ObjectId,
158    ) -> MetaResult<NotificationVersion> {
159        let inner = self.inner.write().await;
160        let txn = inner.db.begin().await?;
161        let dst_name: String = match object_type {
162            ObjectType::Table => Table::find_by_id(dst_object_id.as_table_id())
163                .select_only()
164                .column(table::Column::Name)
165                .into_tuple()
166                .one(&txn)
167                .await?
168                .ok_or_else(|| {
169                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
170                })?,
171            ObjectType::Source => Source::find_by_id(dst_object_id.as_source_id())
172                .select_only()
173                .column(source::Column::Name)
174                .into_tuple()
175                .one(&txn)
176                .await?
177                .ok_or_else(|| {
178                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
179                })?,
180            ObjectType::Sink => Sink::find_by_id(dst_object_id.as_sink_id())
181                .select_only()
182                .column(sink::Column::Name)
183                .into_tuple()
184                .one(&txn)
185                .await?
186                .ok_or_else(|| {
187                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
188                })?,
189            ObjectType::View => View::find_by_id(dst_object_id.as_view_id())
190                .select_only()
191                .column(view::Column::Name)
192                .into_tuple()
193                .one(&txn)
194                .await?
195                .ok_or_else(|| {
196                    MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
197                })?,
198            ObjectType::Subscription => {
199                Subscription::find_by_id(dst_object_id.as_subscription_id())
200                    .select_only()
201                    .column(subscription::Column::Name)
202                    .into_tuple()
203                    .one(&txn)
204                    .await?
205                    .ok_or_else(|| {
206                        MetaError::catalog_id_not_found(object_type.as_str(), dst_object_id)
207                    })?
208            }
209            _ => {
210                return Err(MetaError::permission_denied(format!(
211                    "swap rename not supported for object type: {:?}",
212                    object_type
213                )));
214            }
215        };
216
217        // rename relations.
218        let (mut to_update_relations, src_name) =
219            rename_relation(&txn, object_type, object_id, &dst_name).await?;
220        let (to_update_relations2, _) =
221            rename_relation(&txn, object_type, dst_object_id, &src_name).await?;
222        to_update_relations.extend(to_update_relations2);
223        // rename referring relation name.
224        to_update_relations.extend(
225            rename_relation_refer(&txn, object_type, object_id, &dst_name, &src_name).await?,
226        );
227        to_update_relations.extend(
228            rename_relation_refer(&txn, object_type, dst_object_id, &src_name, &dst_name).await?,
229        );
230
231        txn.commit().await?;
232
233        let version = self
234            .notify_frontend(
235                NotificationOperation::Update,
236                NotificationInfo::ObjectGroup(PbObjectGroup {
237                    objects: to_update_relations,
238                    dependencies: vec![],
239                }),
240            )
241            .await;
242
243        Ok(version)
244    }
245
246    pub async fn alter_non_shared_source(
247        &self,
248        pb_source: PbSource,
249    ) -> MetaResult<NotificationVersion> {
250        let source_id: SourceId = pb_source.id;
251        let inner = self.inner.write().await;
252        let txn = inner.db.begin().await?;
253
254        let original_version: i64 = Source::find_by_id(source_id)
255            .select_only()
256            .column(source::Column::Version)
257            .into_tuple()
258            .one(&txn)
259            .await?
260            .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
261        if original_version + 1 != pb_source.version as i64 {
262            return Err(MetaError::permission_denied(
263                "source version is stale".to_owned(),
264            ));
265        }
266
267        let source: source::ActiveModel = pb_source.clone().into();
268        Source::update(source).exec(&txn).await?;
269        txn.commit().await?;
270
271        let version = self
272            .notify_frontend_relation_info(
273                NotificationOperation::Update,
274                PbObjectInfo::Source(pb_source),
275            )
276            .await;
277        Ok(version)
278    }
279
280    pub async fn alter_owner(
281        &self,
282        object_type: ObjectType,
283        object_id: ObjectId,
284        new_owner: UserId,
285    ) -> MetaResult<NotificationVersion> {
286        let inner = self.inner.write().await;
287        let txn = inner.db.begin().await?;
288        ensure_user_id(new_owner, &txn).await?;
289
290        let obj = Object::find_by_id(object_id)
291            .one(&txn)
292            .await?
293            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
294        if obj.owner_id == new_owner {
295            return Ok(IGNORED_NOTIFICATION_VERSION);
296        }
297        let mut obj = obj.into_active_model();
298        obj.owner_id = Set(new_owner);
299        let obj = obj.update(&txn).await?;
300
301        let mut objects = vec![];
302        match object_type {
303            ObjectType::Database => {
304                let db = Database::find_by_id(object_id.as_database_id())
305                    .one(&txn)
306                    .await?
307                    .ok_or_else(|| MetaError::catalog_id_not_found("database", object_id))?;
308
309                txn.commit().await?;
310
311                let version = self
312                    .notify_frontend(
313                        NotificationOperation::Update,
314                        NotificationInfo::Database(ObjectModel(db, obj, None).into()),
315                    )
316                    .await;
317                return Ok(version);
318            }
319            ObjectType::Schema => {
320                let schema = Schema::find_by_id(object_id.as_schema_id())
321                    .one(&txn)
322                    .await?
323                    .ok_or_else(|| MetaError::catalog_id_not_found("schema", object_id))?;
324
325                txn.commit().await?;
326
327                let version = self
328                    .notify_frontend(
329                        NotificationOperation::Update,
330                        NotificationInfo::Schema(ObjectModel(schema, obj, None).into()),
331                    )
332                    .await;
333                return Ok(version);
334            }
335            ObjectType::Table => {
336                let table = Table::find_by_id(object_id.as_table_id())
337                    .one(&txn)
338                    .await?
339                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
340                let streaming_job = streaming_job::Entity::find_by_id(object_id.as_job_id())
341                    .one(&txn)
342                    .await?;
343
344                // associated source.
345                if let Some(associated_source_id) = table.optional_associated_source_id {
346                    let src_obj = object::ActiveModel {
347                        oid: Set(associated_source_id.as_object_id()),
348                        owner_id: Set(new_owner),
349                        ..Default::default()
350                    }
351                    .update(&txn)
352                    .await?;
353                    let source = Source::find_by_id(associated_source_id)
354                        .one(&txn)
355                        .await?
356                        .ok_or_else(|| {
357                            MetaError::catalog_id_not_found("source", associated_source_id)
358                        })?;
359                    objects.push(PbObjectInfo::Source(
360                        ObjectModel(source, src_obj, None).into(),
361                    ));
362                }
363
364                // associated sink and source for iceberg table.
365                if matches!(table.engine, Some(table::Engine::Iceberg)) {
366                    let iceberg_sink = Sink::find()
367                        .inner_join(Object)
368                        .select_only()
369                        .column(sink::Column::SinkId)
370                        .filter(
371                            object::Column::DatabaseId
372                                .eq(obj.database_id)
373                                .and(object::Column::SchemaId.eq(obj.schema_id))
374                                .and(
375                                    sink::Column::Name
376                                        .eq(format!("{}{}", ICEBERG_SINK_PREFIX, table.name)),
377                                ),
378                        )
379                        .into_tuple::<SinkId>()
380                        .one(&txn)
381                        .await?
382                        .expect("iceberg sink must exist");
383                    let sink_obj = object::ActiveModel {
384                        oid: Set(iceberg_sink.as_object_id()),
385                        owner_id: Set(new_owner),
386                        ..Default::default()
387                    }
388                    .update(&txn)
389                    .await?;
390                    let sink = Sink::find_by_id(iceberg_sink)
391                        .one(&txn)
392                        .await?
393                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", iceberg_sink))?;
394                    objects.push(PbObjectInfo::Sink(
395                        ObjectModel(sink, sink_obj, streaming_job.clone()).into(),
396                    ));
397
398                    let iceberg_source = Source::find()
399                        .inner_join(Object)
400                        .select_only()
401                        .column(source::Column::SourceId)
402                        .filter(
403                            object::Column::DatabaseId
404                                .eq(obj.database_id)
405                                .and(object::Column::SchemaId.eq(obj.schema_id))
406                                .and(
407                                    source::Column::Name
408                                        .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name)),
409                                ),
410                        )
411                        .into_tuple::<SourceId>()
412                        .one(&txn)
413                        .await?
414                        .expect("iceberg source must exist");
415                    let source_obj = object::ActiveModel {
416                        oid: Set(iceberg_source.as_object_id()),
417                        owner_id: Set(new_owner),
418                        ..Default::default()
419                    }
420                    .update(&txn)
421                    .await?;
422                    let source = Source::find_by_id(iceberg_source)
423                        .one(&txn)
424                        .await?
425                        .ok_or_else(|| MetaError::catalog_id_not_found("source", iceberg_source))?;
426                    objects.push(PbObjectInfo::Source(
427                        ObjectModel(source, source_obj, None).into(),
428                    ));
429                }
430
431                // indexes.
432                let (index_ids, mut table_ids): (Vec<IndexId>, Vec<TableId>) = Index::find()
433                    .select_only()
434                    .columns([index::Column::IndexId, index::Column::IndexTableId])
435                    .filter(index::Column::PrimaryTableId.eq(object_id))
436                    .into_tuple::<(IndexId, TableId)>()
437                    .all(&txn)
438                    .await?
439                    .into_iter()
440                    .unzip();
441                objects.push(PbObjectInfo::Table(
442                    ObjectModel(table, obj, streaming_job).into(),
443                ));
444
445                // internal tables.
446                let internal_tables: Vec<TableId> = Table::find()
447                    .select_only()
448                    .column(table::Column::TableId)
449                    .filter(
450                        table::Column::BelongsToJobId.is_in(
451                            table_ids
452                                .iter()
453                                .cloned()
454                                .chain(std::iter::once(object_id.as_table_id())),
455                        ),
456                    )
457                    .into_tuple()
458                    .all(&txn)
459                    .await?;
460                table_ids.extend(internal_tables);
461
462                if !index_ids.is_empty() || !table_ids.is_empty() {
463                    Object::update_many()
464                        .col_expr(object::Column::OwnerId, SimpleExpr::Value(new_owner.into()))
465                        .filter(
466                            object::Column::Oid.is_in::<ObjectId, _>(
467                                index_ids
468                                    .iter()
469                                    .copied()
470                                    .map_into()
471                                    .chain(table_ids.iter().copied().map_into()),
472                            ),
473                        )
474                        .exec(&txn)
475                        .await?;
476                }
477
478                if !table_ids.is_empty() {
479                    let table_objs = Table::find()
480                        .find_also_related(Object)
481                        .filter(table::Column::TableId.is_in(table_ids))
482                        .all(&txn)
483                        .await?;
484                    let streaming_jobs = load_streaming_jobs_by_ids(
485                        &txn,
486                        table_objs.iter().map(|(table, _)| table.job_id()),
487                    )
488                    .await?;
489                    for (table, table_obj) in table_objs {
490                        let job_id = table.job_id();
491                        let streaming_job = streaming_jobs.get(&job_id).cloned();
492                        objects.push(PbObjectInfo::Table(
493                            ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
494                        ));
495                    }
496                }
497                // FIXME: frontend will update index/primary table from cache, requires apply updates of indexes after tables.
498                if !index_ids.is_empty() {
499                    let index_objs = Index::find()
500                        .find_also_related(Object)
501                        .filter(index::Column::IndexId.is_in(index_ids))
502                        .all(&txn)
503                        .await?;
504                    let streaming_jobs = load_streaming_jobs_by_ids(
505                        &txn,
506                        index_objs
507                            .iter()
508                            .map(|(index, _)| index.index_id.as_job_id()),
509                    )
510                    .await?;
511                    for (index, index_obj) in index_objs {
512                        let streaming_job =
513                            streaming_jobs.get(&index.index_id.as_job_id()).cloned();
514                        objects.push(PbObjectInfo::Index(
515                            ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
516                        ));
517                    }
518                }
519            }
520            ObjectType::Source => {
521                let source = Source::find_by_id(object_id.as_source_id())
522                    .one(&txn)
523                    .await?
524                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
525                let is_shared = source.is_shared();
526                objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
527
528                // Note: For non-shared source, we don't update their state tables, which
529                // belongs to the MV.
530                if is_shared {
531                    update_internal_tables(
532                        &txn,
533                        object_id,
534                        object::Column::OwnerId,
535                        new_owner,
536                        &mut objects,
537                    )
538                    .await?;
539                }
540            }
541            ObjectType::Sink => {
542                let (sink, sink_obj) = Sink::find_by_id(object_id.as_sink_id())
543                    .find_also_related(Object)
544                    .one(&txn)
545                    .await?
546                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
547                let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
548                    .one(&txn)
549                    .await?;
550                objects.push(PbObjectInfo::Sink(
551                    ObjectModel(sink, sink_obj.unwrap(), streaming_job).into(),
552                ));
553
554                update_internal_tables(
555                    &txn,
556                    object_id,
557                    object::Column::OwnerId,
558                    new_owner,
559                    &mut objects,
560                )
561                .await?;
562            }
563            ObjectType::Subscription => {
564                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
565                    .one(&txn)
566                    .await?
567                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
568                objects.push(PbObjectInfo::Subscription(
569                    ObjectModel(subscription, obj, None).into(),
570                ));
571            }
572            ObjectType::View => {
573                let view = View::find_by_id(object_id.as_view_id())
574                    .one(&txn)
575                    .await?
576                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
577                objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
578            }
579            ObjectType::Connection => {
580                let connection = Connection::find_by_id(object_id.as_connection_id())
581                    .one(&txn)
582                    .await?
583                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
584                objects.push(PbObjectInfo::Connection(
585                    ObjectModel(connection, obj, None).into(),
586                ));
587            }
588            ObjectType::Function => {
589                let function = Function::find_by_id(object_id.as_function_id())
590                    .one(&txn)
591                    .await?
592                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
593                objects.push(PbObjectInfo::Function(
594                    ObjectModel(function, obj, None).into(),
595                ));
596            }
597            ObjectType::Secret => {
598                let secret = Secret::find_by_id(object_id.as_secret_id())
599                    .one(&txn)
600                    .await?
601                    .ok_or_else(|| MetaError::catalog_id_not_found("secret", object_id))?;
602                objects.push(PbObjectInfo::Secret(ObjectModel(secret, obj, None).into()));
603            }
604            _ => unreachable!("not supported object type: {:?}", object_type),
605        };
606
607        txn.commit().await?;
608
609        let version = self
610            .notify_frontend(
611                NotificationOperation::Update,
612                NotificationInfo::ObjectGroup(PbObjectGroup {
613                    objects: objects
614                        .into_iter()
615                        .map(|object| PbObject {
616                            object_info: Some(object),
617                        })
618                        .collect(),
619                    dependencies: vec![],
620                }),
621            )
622            .await;
623        Ok(version)
624    }
625
626    pub async fn alter_schema(
627        &self,
628        object_type: ObjectType,
629        object_id: ObjectId,
630        new_schema: SchemaId,
631    ) -> MetaResult<NotificationVersion> {
632        let inner = self.inner.write().await;
633        let txn = inner.db.begin().await?;
634        ensure_object_id(ObjectType::Schema, new_schema, &txn).await?;
635
636        let obj = Object::find_by_id(object_id)
637            .one(&txn)
638            .await?
639            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
640        if obj.schema_id == Some(new_schema) {
641            return Ok(IGNORED_NOTIFICATION_VERSION);
642        }
643        let streaming_job = StreamingJob::find_by_id(object_id.as_job_id())
644            .one(&txn)
645            .await?;
646        let database_id = obj.database_id.unwrap();
647
648        let mut objects = vec![];
649        match object_type {
650            ObjectType::Table => {
651                let table = Table::find_by_id(object_id.as_table_id())
652                    .one(&txn)
653                    .await?
654                    .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
655                check_relation_name_duplicate(&table.name, database_id, new_schema, &txn).await?;
656                let associated_src_id = table.optional_associated_source_id;
657
658                let mut obj = obj.into_active_model();
659                obj.schema_id = Set(Some(new_schema));
660                let obj = obj.update(&txn).await?;
661
662                objects.push(PbObjectInfo::Table(
663                    ObjectModel(table, obj, streaming_job).into(),
664                ));
665
666                // associated source.
667                if let Some(associated_source_id) = associated_src_id {
668                    let src_obj = object::ActiveModel {
669                        oid: Set(associated_source_id.as_object_id()),
670                        schema_id: Set(Some(new_schema)),
671                        ..Default::default()
672                    }
673                    .update(&txn)
674                    .await?;
675                    let source = Source::find_by_id(associated_source_id)
676                        .one(&txn)
677                        .await?
678                        .ok_or_else(|| {
679                            MetaError::catalog_id_not_found("source", associated_source_id)
680                        })?;
681                    objects.push(PbObjectInfo::Source(
682                        ObjectModel(source, src_obj, None).into(),
683                    ));
684                }
685
686                // indexes.
687                let (index_ids, (index_names, mut table_ids)): (
688                    Vec<IndexId>,
689                    (Vec<String>, Vec<TableId>),
690                ) = Index::find()
691                    .select_only()
692                    .columns([
693                        index::Column::IndexId,
694                        index::Column::Name,
695                        index::Column::IndexTableId,
696                    ])
697                    .filter(index::Column::PrimaryTableId.eq(object_id))
698                    .into_tuple::<(IndexId, String, TableId)>()
699                    .all(&txn)
700                    .await?
701                    .into_iter()
702                    .map(|(id, name, t_id)| (id, (name, t_id)))
703                    .unzip();
704
705                // internal tables.
706                let internal_tables: Vec<TableId> = Table::find()
707                    .select_only()
708                    .column(table::Column::TableId)
709                    .filter(
710                        table::Column::BelongsToJobId.is_in(
711                            table_ids
712                                .iter()
713                                .map(|table_id| table_id.as_job_id())
714                                .chain(std::iter::once(object_id.as_job_id())),
715                        ),
716                    )
717                    .into_tuple()
718                    .all(&txn)
719                    .await?;
720                table_ids.extend(internal_tables);
721
722                if !index_ids.is_empty() || !table_ids.is_empty() {
723                    for index_name in index_names {
724                        check_relation_name_duplicate(&index_name, database_id, new_schema, &txn)
725                            .await?;
726                    }
727
728                    Object::update_many()
729                        .col_expr(object::Column::SchemaId, new_schema.into())
730                        .filter(
731                            object::Column::Oid.is_in::<ObjectId, _>(
732                                index_ids
733                                    .iter()
734                                    .copied()
735                                    .map_into()
736                                    .chain(table_ids.iter().copied().map_into()),
737                            ),
738                        )
739                        .exec(&txn)
740                        .await?;
741                }
742
743                if !table_ids.is_empty() {
744                    let table_objs = Table::find()
745                        .find_also_related(Object)
746                        .filter(table::Column::TableId.is_in(table_ids))
747                        .all(&txn)
748                        .await?;
749                    let streaming_jobs = load_streaming_jobs_by_ids(
750                        &txn,
751                        table_objs.iter().map(|(table, _)| table.job_id()),
752                    )
753                    .await?;
754                    for (table, table_obj) in table_objs {
755                        let job_id = table.job_id();
756                        let streaming_job = streaming_jobs.get(&job_id).cloned();
757                        objects.push(PbObjectInfo::Table(
758                            ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
759                        ));
760                    }
761                }
762                if !index_ids.is_empty() {
763                    let index_objs = Index::find()
764                        .find_also_related(Object)
765                        .filter(index::Column::IndexId.is_in(index_ids))
766                        .all(&txn)
767                        .await?;
768                    let streaming_jobs = load_streaming_jobs_by_ids(
769                        &txn,
770                        index_objs
771                            .iter()
772                            .map(|(index, _)| index.index_id.as_job_id()),
773                    )
774                    .await?;
775                    for (index, index_obj) in index_objs {
776                        let streaming_job =
777                            streaming_jobs.get(&index.index_id.as_job_id()).cloned();
778                        objects.push(PbObjectInfo::Index(
779                            ObjectModel(index, index_obj.unwrap(), streaming_job).into(),
780                        ));
781                    }
782                }
783            }
784            ObjectType::Source => {
785                let source = Source::find_by_id(object_id.as_source_id())
786                    .one(&txn)
787                    .await?
788                    .ok_or_else(|| MetaError::catalog_id_not_found("source", object_id))?;
789                check_relation_name_duplicate(&source.name, database_id, new_schema, &txn).await?;
790                let is_shared = source.is_shared();
791
792                let mut obj = obj.into_active_model();
793                obj.schema_id = Set(Some(new_schema));
794                let obj = obj.update(&txn).await?;
795                objects.push(PbObjectInfo::Source(ObjectModel(source, obj, None).into()));
796
797                // Note: For non-shared source, we don't update their state tables, which
798                // belongs to the MV.
799                if is_shared {
800                    update_internal_tables(
801                        &txn,
802                        object_id,
803                        object::Column::SchemaId,
804                        new_schema,
805                        &mut objects,
806                    )
807                    .await?;
808                }
809            }
810            ObjectType::Sink => {
811                let sink = Sink::find_by_id(object_id.as_sink_id())
812                    .one(&txn)
813                    .await?
814                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", object_id))?;
815                check_relation_name_duplicate(&sink.name, database_id, new_schema, &txn).await?;
816
817                let mut obj = obj.into_active_model();
818                obj.schema_id = Set(Some(new_schema));
819                let obj = obj.update(&txn).await?;
820                objects.push(PbObjectInfo::Sink(
821                    ObjectModel(sink, obj, streaming_job).into(),
822                ));
823
824                update_internal_tables(
825                    &txn,
826                    object_id,
827                    object::Column::SchemaId,
828                    new_schema,
829                    &mut objects,
830                )
831                .await?;
832            }
833            ObjectType::Subscription => {
834                let subscription = Subscription::find_by_id(object_id.as_subscription_id())
835                    .one(&txn)
836                    .await?
837                    .ok_or_else(|| MetaError::catalog_id_not_found("subscription", object_id))?;
838                check_relation_name_duplicate(&subscription.name, database_id, new_schema, &txn)
839                    .await?;
840
841                let mut obj = obj.into_active_model();
842                obj.schema_id = Set(Some(new_schema));
843                let obj = obj.update(&txn).await?;
844                objects.push(PbObjectInfo::Subscription(
845                    ObjectModel(subscription, obj, None).into(),
846                ));
847            }
848            ObjectType::View => {
849                let view = View::find_by_id(object_id.as_view_id())
850                    .one(&txn)
851                    .await?
852                    .ok_or_else(|| MetaError::catalog_id_not_found("view", object_id))?;
853                check_relation_name_duplicate(&view.name, database_id, new_schema, &txn).await?;
854
855                let mut obj = obj.into_active_model();
856                obj.schema_id = Set(Some(new_schema));
857                let obj = obj.update(&txn).await?;
858                objects.push(PbObjectInfo::View(ObjectModel(view, obj, None).into()));
859            }
860            ObjectType::Function => {
861                let function = Function::find_by_id(object_id.as_function_id())
862                    .one(&txn)
863                    .await?
864                    .ok_or_else(|| MetaError::catalog_id_not_found("function", object_id))?;
865
866                let mut pb_function: PbFunction = ObjectModel(function, obj, None).into();
867                pb_function.schema_id = new_schema;
868                check_function_signature_duplicate(&pb_function, &txn).await?;
869
870                Object::update(object::ActiveModel {
871                    oid: Set(object_id),
872                    schema_id: Set(Some(new_schema)),
873                    ..Default::default()
874                })
875                .exec(&txn)
876                .await?;
877
878                txn.commit().await?;
879                let version = self
880                    .notify_frontend(
881                        NotificationOperation::Update,
882                        NotificationInfo::Function(pb_function),
883                    )
884                    .await;
885                return Ok(version);
886            }
887            ObjectType::Connection => {
888                let connection = Connection::find_by_id(object_id.as_connection_id())
889                    .one(&txn)
890                    .await?
891                    .ok_or_else(|| MetaError::catalog_id_not_found("connection", object_id))?;
892
893                let mut pb_connection: PbConnection = ObjectModel(connection, obj, None).into();
894                pb_connection.schema_id = new_schema;
895                check_connection_name_duplicate(&pb_connection, &txn).await?;
896
897                Object::update(object::ActiveModel {
898                    oid: Set(object_id),
899                    schema_id: Set(Some(new_schema)),
900                    ..Default::default()
901                })
902                .exec(&txn)
903                .await?;
904
905                txn.commit().await?;
906                let version = self
907                    .notify_frontend(
908                        NotificationOperation::Update,
909                        NotificationInfo::Connection(pb_connection),
910                    )
911                    .await;
912                return Ok(version);
913            }
914            _ => unreachable!("not supported object type: {:?}", object_type),
915        }
916
917        txn.commit().await?;
918        let version = self
919            .notify_frontend(
920                Operation::Update,
921                Info::ObjectGroup(PbObjectGroup {
922                    objects: objects
923                        .into_iter()
924                        .map(|relation_info| PbObject {
925                            object_info: Some(relation_info),
926                        })
927                        .collect_vec(),
928                    dependencies: vec![],
929                }),
930            )
931            .await;
932        Ok(version)
933    }
934
935    pub async fn alter_secret(
936        &self,
937        pb_secret: PbSecret,
938        secret_plain_payload: Vec<u8>,
939    ) -> MetaResult<NotificationVersion> {
940        let inner = self.inner.write().await;
941        let owner_id = pb_secret.owner as _;
942        let txn = inner.db.begin().await?;
943        ensure_user_id(owner_id, &txn).await?;
944        ensure_object_id(ObjectType::Database, pb_secret.database_id, &txn).await?;
945        ensure_object_id(ObjectType::Schema, pb_secret.schema_id, &txn).await?;
946
947        ensure_object_id(ObjectType::Secret, pb_secret.id, &txn).await?;
948        let secret: secret::ActiveModel = pb_secret.clone().into();
949        Secret::update(secret).exec(&txn).await?;
950
951        txn.commit().await?;
952
953        // Notify the compute and frontend node plain secret
954        let mut secret_plain = pb_secret;
955        secret_plain.value.clone_from(&secret_plain_payload);
956
957        LocalSecretManager::global().update_secret(secret_plain.id, secret_plain_payload);
958        self.env
959            .notification_manager()
960            .notify_compute_without_version(Operation::Update, Info::Secret(secret_plain.clone()));
961
962        let version = self
963            .notify_frontend(
964                NotificationOperation::Update,
965                NotificationInfo::Secret(secret_plain),
966            )
967            .await;
968
969        Ok(version)
970    }
971
972    // drop table associated source is a special case of drop relation, which just remove the source object and associated state table, keeping the streaming job and fragments.
973    pub async fn drop_table_associated_source(
974        txn: &DatabaseTransaction,
975        drop_table_connector_ctx: &DropTableConnectorContext,
976    ) -> MetaResult<(Vec<PbUserInfo>, Vec<PartialObject>)> {
977        let to_drop_source_objects: Vec<PartialObject> = Object::find()
978            .filter(object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_source_id]))
979            .into_partial_model()
980            .all(txn)
981            .await?;
982        let to_drop_internal_table_objs: Vec<PartialObject> = Object::find()
983            .select_only()
984            .filter(
985                object::Column::Oid.is_in(vec![drop_table_connector_ctx.to_remove_state_table_id]),
986            )
987            .into_partial_model()
988            .all(txn)
989            .await?;
990        let to_drop_objects = to_drop_source_objects
991            .into_iter()
992            .chain(to_drop_internal_table_objs.into_iter())
993            .collect_vec();
994        // Find affect users with privileges on all this objects.
995        let to_update_user_ids: Vec<UserId> = UserPrivilege::find()
996            .select_only()
997            .distinct()
998            .column(user_privilege::Column::UserId)
999            .filter(user_privilege::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1000            .into_tuple()
1001            .all(txn)
1002            .await?;
1003
1004        tracing::debug!(
1005            "drop_table_associated_source: to_drop_objects: {:?}",
1006            to_drop_objects
1007        );
1008
1009        // delete all in to_drop_objects.
1010        let res = Object::delete_many()
1011            .filter(object::Column::Oid.is_in(to_drop_objects.iter().map(|obj| obj.oid)))
1012            .exec(txn)
1013            .await?;
1014        if res.rows_affected == 0 {
1015            return Err(MetaError::catalog_id_not_found(
1016                ObjectType::Source.as_str(),
1017                drop_table_connector_ctx.to_remove_source_id,
1018            ));
1019        }
1020        let user_infos = list_user_info_by_ids(to_update_user_ids, txn).await?;
1021
1022        Ok((user_infos, to_drop_objects))
1023    }
1024
1025    pub async fn alter_database_param(
1026        &self,
1027        database_id: DatabaseId,
1028        param: AlterDatabaseParam,
1029    ) -> MetaResult<(NotificationVersion, risingwave_meta_model::database::Model)> {
1030        let inner = self.inner.write().await;
1031        let txn = inner.db.begin().await?;
1032
1033        let mut database = database::ActiveModel {
1034            database_id: Set(database_id),
1035            ..Default::default()
1036        };
1037        match param {
1038            AlterDatabaseParam::BarrierIntervalMs(interval) => {
1039                if let Some(ref interval) = interval {
1040                    OverrideValidate::barrier_interval_ms(interval)
1041                        .map_err(|e| anyhow::anyhow!(e))?;
1042                }
1043                database.barrier_interval_ms = Set(interval.map(|i| i as i32));
1044            }
1045            AlterDatabaseParam::CheckpointFrequency(frequency) => {
1046                if let Some(ref frequency) = frequency {
1047                    OverrideValidate::checkpoint_frequency(frequency)
1048                        .map_err(|e| anyhow::anyhow!(e))?;
1049                }
1050                database.checkpoint_frequency = Set(frequency.map(|f| f as i64));
1051            }
1052        }
1053        let database = database.update(&txn).await?;
1054
1055        let obj = Object::find_by_id(database_id)
1056            .one(&txn)
1057            .await?
1058            .ok_or_else(|| MetaError::catalog_id_not_found("database", database_id))?;
1059
1060        txn.commit().await?;
1061
1062        let version = self
1063            .notify_frontend(
1064                NotificationOperation::Update,
1065                NotificationInfo::Database(ObjectModel(database.clone(), obj, None).into()),
1066            )
1067            .await;
1068        Ok((version, database))
1069    }
1070
1071    pub async fn alter_subscription_retention(
1072        &self,
1073        subscription_id: SubscriptionId,
1074        retention_seconds: u64,
1075        definition: String,
1076    ) -> MetaResult<(NotificationVersion, PbSubscription)> {
1077        let inner = self.inner.write().await;
1078        let txn = inner.db.begin().await?;
1079
1080        let obj = Object::find_by_id(subscription_id)
1081            .one(&txn)
1082            .await?
1083            .ok_or_else(|| MetaError::catalog_id_not_found("subscription", subscription_id))?;
1084
1085        let active_model = subscription::ActiveModel {
1086            subscription_id: Set(subscription_id),
1087            retention_seconds: Set(retention_seconds as i64),
1088            definition: Set(definition),
1089            ..Default::default()
1090        };
1091        let subscription = active_model.update(&txn).await?;
1092
1093        txn.commit().await?;
1094
1095        let pb_subscription: PbSubscription = ObjectModel(subscription, obj, None).into();
1096        let subscription_info = PbObjectInfo::Subscription(pb_subscription.clone());
1097
1098        let version = self
1099            .notify_frontend(
1100                NotificationOperation::Update,
1101                NotificationInfo::ObjectGroup(PbObjectGroup {
1102                    objects: vec![PbObject {
1103                        object_info: Some(subscription_info),
1104                    }],
1105                    dependencies: vec![],
1106                }),
1107            )
1108            .await;
1109
1110        Ok((version, pb_subscription))
1111    }
1112
1113    pub async fn alter_streaming_job_config(
1114        &self,
1115        job_id: JobId,
1116        entries_to_add: HashMap<String, String>,
1117        keys_to_remove: Vec<String>,
1118    ) -> MetaResult<NotificationVersion> {
1119        let inner = self.inner.write().await;
1120        let txn = inner.db.begin().await?;
1121
1122        let config_override: Option<String> = StreamingJob::find_by_id(job_id)
1123            .select_only()
1124            .column(streaming_job::Column::ConfigOverride)
1125            .into_tuple()
1126            .one(&txn)
1127            .await?
1128            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1129        let config_override = config_override.unwrap_or_default();
1130
1131        let mut table: toml::Table =
1132            toml::from_str(&config_override).context("invalid streaming job config")?;
1133
1134        // The frontend guarantees that there's no duplicated keys in `to_add` and `to_remove`.
1135        for (key, value) in entries_to_add {
1136            let value: toml::Value = value
1137                .parse()
1138                .with_context(|| format!("invalid config value for path {key}"))?;
1139            table
1140                .upsert(&key, value)
1141                .with_context(|| format!("failed to set config path {key}"))?;
1142        }
1143        for key in keys_to_remove {
1144            table
1145                .delete(&key)
1146                .with_context(|| format!("failed to reset config path {key}"))?;
1147        }
1148
1149        let updated_config_override = table.to_string();
1150
1151        // Validate the config override by trying to merge it to the default config.
1152        {
1153            let merged = merge_streaming_config_section(
1154                &StreamingConfig::default(),
1155                &updated_config_override,
1156            )
1157            .context("invalid streaming job config override")?;
1158
1159            // Reject unrecognized entries.
1160            // Note: If these unrecognized entries are pre-existing, we also reject them here.
1161            // Users are able to fix them by issuing a `RESET` first.
1162            if let Some(merged) = merged {
1163                let unrecognized_keys = merged.unrecognized_keys().collect_vec();
1164                if !unrecognized_keys.is_empty() {
1165                    bail_invalid_parameter!("unrecognized configs: {:?}", unrecognized_keys);
1166                }
1167            }
1168        }
1169
1170        StreamingJob::update(streaming_job::ActiveModel {
1171            job_id: Set(job_id),
1172            config_override: Set(Some(updated_config_override)),
1173            ..Default::default()
1174        })
1175        .exec(&txn)
1176        .await?;
1177
1178        txn.commit().await?;
1179
1180        Ok(IGNORED_NOTIFICATION_VERSION)
1181    }
1182
1183    pub async fn ensure_refresh_job(&self, table_id: TableId) -> MetaResult<()> {
1184        let inner = self.inner.read().await;
1185        let active = refresh_job::ActiveModel {
1186            table_id: Set(table_id),
1187            last_trigger_time: Set(None),
1188            trigger_interval_secs: Set(None),
1189            current_status: Set(RefreshState::Idle),
1190            last_success_time: Set(None),
1191        };
1192        match RefreshJob::insert(active)
1193            .on_conflict_do_nothing()
1194            .exec(&inner.db)
1195            .await
1196        {
1197            Ok(_) => Ok(()),
1198            Err(sea_orm::DbErr::RecordNotInserted) => {
1199                // This is expected when the refresh job already exists due to ON CONFLICT DO NOTHING
1200                tracing::debug!("refresh job already exists for table_id={}", table_id);
1201                Ok(())
1202            }
1203            Err(e) => {
1204                if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1205                    tracing::warn!(
1206                        %table_id,
1207                        error = %e.as_report(),
1208                        "skip ensure_refresh_job for stale dropped table"
1209                    );
1210                    Ok(())
1211                } else {
1212                    Err(e.into())
1213                }
1214            }
1215        }
1216    }
1217
1218    pub async fn update_refresh_job_status(
1219        &self,
1220        table_id: TableId,
1221        status: RefreshState,
1222        trigger_time: Option<DateTime>,
1223        is_success: bool,
1224    ) -> MetaResult<()> {
1225        self.ensure_refresh_job(table_id).await?;
1226        let inner = self.inner.read().await;
1227
1228        // expect only update trigger_time when the status changes to Refreshing
1229        assert_eq!(trigger_time.is_some(), status == RefreshState::Refreshing);
1230        let active = refresh_job::ActiveModel {
1231            table_id: Set(table_id),
1232            current_status: Set(status),
1233            last_trigger_time: if trigger_time.is_some() {
1234                Set(trigger_time.map(datetime_to_timestamp_millis))
1235            } else {
1236                NotSet
1237            },
1238            last_success_time: if is_success {
1239                Set(Some(chrono::Utc::now().timestamp_millis()))
1240            } else {
1241                NotSet
1242            },
1243            ..Default::default()
1244        };
1245        match RefreshJob::update(active).exec(&inner.db).await {
1246            Ok(_) => Ok(()),
1247            Err(e) => {
1248                if should_skip_refresh_job_db_err(&inner.db, table_id, &e).await? {
1249                    tracing::warn!(
1250                        %table_id,
1251                        error = %e.as_report(),
1252                        "skip update_refresh_job_status for stale dropped table"
1253                    );
1254                    Ok(())
1255                } else {
1256                    Err(e.into())
1257                }
1258            }
1259        }
1260    }
1261
1262    pub async fn reset_all_refresh_jobs_to_idle(&self) -> MetaResult<()> {
1263        let inner = self.inner.read().await;
1264        RefreshJob::update_many()
1265            .col_expr(
1266                refresh_job::Column::CurrentStatus,
1267                Expr::value(RefreshState::Idle),
1268            )
1269            .exec(&inner.db)
1270            .await?;
1271        Ok(())
1272    }
1273
1274    pub async fn update_refresh_job_interval(
1275        &self,
1276        table_id: TableId,
1277        trigger_interval_secs: Option<i64>,
1278    ) -> MetaResult<()> {
1279        self.ensure_refresh_job(table_id).await?;
1280        let inner = self.inner.read().await;
1281        let active = refresh_job::ActiveModel {
1282            table_id: Set(table_id),
1283            trigger_interval_secs: Set(trigger_interval_secs),
1284            ..Default::default()
1285        };
1286        RefreshJob::update(active).exec(&inner.db).await?;
1287        Ok(())
1288    }
1289}
1290
1291async fn should_skip_refresh_job_db_err<C>(
1292    db: &C,
1293    table_id: TableId,
1294    err: &sea_orm::DbErr,
1295) -> MetaResult<bool>
1296where
1297    C: ConnectionTrait,
1298{
1299    if matches!(err, sea_orm::DbErr::RecordNotUpdated) {
1300        return Ok(true);
1301    }
1302
1303    if !matches!(
1304        err.sql_err(),
1305        Some(SqlErr::ForeignKeyConstraintViolation(_))
1306    ) {
1307        return Ok(false);
1308    }
1309
1310    let table_exists = Table::find_by_id(table_id).one(db).await?.is_some();
1311    Ok(!table_exists)
1312}