risingwave_meta/controller/
streaming_job.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::{
28    visit_stream_node_body, visit_stream_node_mut,
29};
30use risingwave_common::{bail, current_cluster_version};
31use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
32use risingwave_connector::connector_common::validate_connection;
33use risingwave_connector::error::ConnectorError;
34use risingwave_connector::sink::file_sink::fs::FsSink;
35use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
36use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
37use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
40use risingwave_meta_model::refresh_job::RefreshState;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::user_privilege::Action;
43use risingwave_meta_model::*;
44use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
45use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
46use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
47use risingwave_pb::meta::object::PbObjectInfo;
48use risingwave_pb::meta::subscribe_response::{
49    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
50};
51use risingwave_pb::meta::{PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::PbColumnCatalog;
53use risingwave_pb::plan_common::source_refresh_mode::{
54    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
55};
56use risingwave_pb::secret::PbSecretRef;
57use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
58use risingwave_pb::stream_plan::stream_node::PbNodeBody;
59use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
60use risingwave_pb::user::PbUserInfo;
61use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
62use risingwave_sqlparser::parser::{Parser, ParserError};
63use sea_orm::ActiveValue::Set;
64use sea_orm::sea_query::{Expr, Query, SimpleExpr};
65use sea_orm::{
66    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
67    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
68};
69use thiserror_ext::AsReport;
70
71use super::rename::IndexItemRewriter;
72use crate::barrier::Command;
73use crate::controller::ObjectModel;
74use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
75use crate::controller::fragment::FragmentTypeMaskExt;
76use crate::controller::utils::{
77    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
78    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
79    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
80    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
81    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
82};
83use crate::error::MetaErrorInner;
84use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
85use crate::model::{
86    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
87    StreamJobFragmentsToCreate,
88};
89use crate::stream::SplitAssignment;
90use crate::{MetaError, MetaResult};
91
92/// A planned fragment update for dependent sources when a connection's properties change.
93///
94/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
95/// `type_complexity` warnings.
96#[derive(Debug)]
97struct DependentSourceFragmentUpdate {
98    job_ids: Vec<JobId>,
99    with_properties: BTreeMap<String, String>,
100    secret_refs: BTreeMap<String, PbSecretRef>,
101    is_shared_source: bool,
102}
103
104impl CatalogController {
105    #[allow(clippy::too_many_arguments)]
106    pub async fn create_streaming_job_obj(
107        txn: &DatabaseTransaction,
108        obj_type: ObjectType,
109        owner_id: UserId,
110        database_id: Option<DatabaseId>,
111        schema_id: Option<SchemaId>,
112        create_type: PbCreateType,
113        ctx: StreamContext,
114        streaming_parallelism: StreamingParallelism,
115        max_parallelism: usize,
116        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
117        backfill_parallelism: Option<StreamingParallelism>,
118    ) -> MetaResult<JobId> {
119        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
120        let job_id = obj.oid.as_job_id();
121        let job = streaming_job::ActiveModel {
122            job_id: Set(job_id),
123            job_status: Set(JobStatus::Initial),
124            create_type: Set(create_type.into()),
125            timezone: Set(ctx.timezone),
126            config_override: Set(Some(ctx.config_override.to_string())),
127            parallelism: Set(streaming_parallelism),
128            backfill_parallelism: Set(backfill_parallelism),
129            max_parallelism: Set(max_parallelism as _),
130            specific_resource_group: Set(specific_resource_group),
131        };
132        job.insert(txn).await?;
133
134        Ok(job_id)
135    }
136
137    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
138    /// materialized view.
139    ///
140    /// Some of the fields in the given streaming job are placeholders, which will
141    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
142    #[await_tree::instrument]
143    pub async fn create_job_catalog(
144        &self,
145        streaming_job: &mut StreamingJob,
146        ctx: &StreamContext,
147        parallelism: &Option<Parallelism>,
148        max_parallelism: usize,
149        mut dependencies: HashSet<ObjectId>,
150        specific_resource_group: Option<String>,
151        backfill_parallelism: &Option<Parallelism>,
152    ) -> MetaResult<()> {
153        let inner = self.inner.write().await;
154        let txn = inner.db.begin().await?;
155        let create_type = streaming_job.create_type();
156
157        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
158            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
159            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
160            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
161        };
162        let backfill_parallelism = backfill_parallelism
163            .as_ref()
164            .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
165
166        ensure_user_id(streaming_job.owner() as _, &txn).await?;
167        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
168        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
169        check_relation_name_duplicate(
170            &streaming_job.name(),
171            streaming_job.database_id(),
172            streaming_job.schema_id(),
173            &txn,
174        )
175        .await?;
176
177        // check if any dependency is in altering status.
178        if !dependencies.is_empty() {
179            let altering_cnt = ObjectDependency::find()
180                .join(
181                    JoinType::InnerJoin,
182                    object_dependency::Relation::Object1.def(),
183                )
184                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
185                .filter(
186                    object_dependency::Column::Oid
187                        .is_in(dependencies.clone())
188                        .and(object::Column::ObjType.eq(ObjectType::Table))
189                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
190                        .and(
191                            // It means the referring table is just dummy for altering.
192                            object::Column::Oid.not_in_subquery(
193                                Query::select()
194                                    .column(table::Column::TableId)
195                                    .from(Table)
196                                    .to_owned(),
197                            ),
198                        ),
199                )
200                .count(&txn)
201                .await?;
202            if altering_cnt != 0 {
203                return Err(MetaError::permission_denied(
204                    "some dependent relations are being altered",
205                ));
206            }
207        }
208
209        match streaming_job {
210            StreamingJob::MaterializedView(table) => {
211                let job_id = Self::create_streaming_job_obj(
212                    &txn,
213                    ObjectType::Table,
214                    table.owner as _,
215                    Some(table.database_id),
216                    Some(table.schema_id),
217                    create_type,
218                    ctx.clone(),
219                    streaming_parallelism,
220                    max_parallelism,
221                    specific_resource_group,
222                    backfill_parallelism.clone(),
223                )
224                .await?;
225                table.id = job_id.as_mv_table_id();
226                let table_model: table::ActiveModel = table.clone().into();
227                Table::insert(table_model).exec(&txn).await?;
228            }
229            StreamingJob::Sink(sink) => {
230                if let Some(target_table_id) = sink.target_table
231                    && check_sink_into_table_cycle(
232                        target_table_id.into(),
233                        dependencies.iter().cloned().collect(),
234                        &txn,
235                    )
236                    .await?
237                {
238                    bail!("Creating such a sink will result in circular dependency.");
239                }
240
241                let job_id = Self::create_streaming_job_obj(
242                    &txn,
243                    ObjectType::Sink,
244                    sink.owner as _,
245                    Some(sink.database_id),
246                    Some(sink.schema_id),
247                    create_type,
248                    ctx.clone(),
249                    streaming_parallelism,
250                    max_parallelism,
251                    specific_resource_group,
252                    backfill_parallelism.clone(),
253                )
254                .await?;
255                sink.id = job_id.as_sink_id();
256                let sink_model: sink::ActiveModel = sink.clone().into();
257                Sink::insert(sink_model).exec(&txn).await?;
258            }
259            StreamingJob::Table(src, table, _) => {
260                let job_id = Self::create_streaming_job_obj(
261                    &txn,
262                    ObjectType::Table,
263                    table.owner as _,
264                    Some(table.database_id),
265                    Some(table.schema_id),
266                    create_type,
267                    ctx.clone(),
268                    streaming_parallelism,
269                    max_parallelism,
270                    specific_resource_group,
271                    backfill_parallelism.clone(),
272                )
273                .await?;
274                table.id = job_id.as_mv_table_id();
275                if let Some(src) = src {
276                    let src_obj = Self::create_object(
277                        &txn,
278                        ObjectType::Source,
279                        src.owner as _,
280                        Some(src.database_id),
281                        Some(src.schema_id),
282                    )
283                    .await?;
284                    src.id = src_obj.oid.as_source_id();
285                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
286                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
287                    let source: source::ActiveModel = src.clone().into();
288                    Source::insert(source).exec(&txn).await?;
289                }
290                let table_model: table::ActiveModel = table.clone().into();
291                Table::insert(table_model).exec(&txn).await?;
292
293                if table.refreshable {
294                    let trigger_interval_secs = src
295                        .as_ref()
296                        .and_then(|source_catalog| source_catalog.refresh_mode)
297                        .and_then(
298                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
299                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
300                                    refresh_interval_sec,
301                                })) => refresh_interval_sec,
302                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
303                                None => None,
304                            },
305                        );
306
307                    RefreshJob::insert(refresh_job::ActiveModel {
308                        table_id: Set(table.id),
309                        last_trigger_time: Set(None),
310                        trigger_interval_secs: Set(trigger_interval_secs),
311                        current_status: Set(RefreshState::Idle),
312                        last_success_time: Set(None),
313                    })
314                    .exec(&txn)
315                    .await?;
316                }
317            }
318            StreamingJob::Index(index, table) => {
319                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
320                let job_id = Self::create_streaming_job_obj(
321                    &txn,
322                    ObjectType::Index,
323                    index.owner as _,
324                    Some(index.database_id),
325                    Some(index.schema_id),
326                    create_type,
327                    ctx.clone(),
328                    streaming_parallelism,
329                    max_parallelism,
330                    specific_resource_group,
331                    backfill_parallelism.clone(),
332                )
333                .await?;
334                // to be compatible with old implementation.
335                index.id = job_id.as_index_id();
336                index.index_table_id = job_id.as_mv_table_id();
337                table.id = job_id.as_mv_table_id();
338
339                ObjectDependency::insert(object_dependency::ActiveModel {
340                    oid: Set(index.primary_table_id.into()),
341                    used_by: Set(table.id.into()),
342                    ..Default::default()
343                })
344                .exec(&txn)
345                .await?;
346
347                let table_model: table::ActiveModel = table.clone().into();
348                Table::insert(table_model).exec(&txn).await?;
349                let index_model: index::ActiveModel = index.clone().into();
350                Index::insert(index_model).exec(&txn).await?;
351            }
352            StreamingJob::Source(src) => {
353                let job_id = Self::create_streaming_job_obj(
354                    &txn,
355                    ObjectType::Source,
356                    src.owner as _,
357                    Some(src.database_id),
358                    Some(src.schema_id),
359                    create_type,
360                    ctx.clone(),
361                    streaming_parallelism,
362                    max_parallelism,
363                    specific_resource_group,
364                    backfill_parallelism.clone(),
365                )
366                .await?;
367                src.id = job_id.as_shared_source_id();
368                let source_model: source::ActiveModel = src.clone().into();
369                Source::insert(source_model).exec(&txn).await?;
370            }
371        }
372
373        // collect dependent secrets.
374        dependencies.extend(
375            streaming_job
376                .dependent_secret_ids()?
377                .into_iter()
378                .map(|id| id.as_object_id()),
379        );
380        // collect dependent connection
381        dependencies.extend(
382            streaming_job
383                .dependent_connection_ids()?
384                .into_iter()
385                .map(|id| id.as_object_id()),
386        );
387
388        // record object dependency.
389        if !dependencies.is_empty() {
390            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
391                object_dependency::ActiveModel {
392                    oid: Set(oid),
393                    used_by: Set(streaming_job.id().as_object_id()),
394                    ..Default::default()
395                }
396            }))
397            .exec(&txn)
398            .await?;
399        }
400
401        txn.commit().await?;
402
403        Ok(())
404    }
405
406    /// Create catalogs for internal tables, then notify frontend about them if the job is a
407    /// materialized view.
408    ///
409    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
410    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
411    ///
412    /// Returns a mapping from the temporary table id to the actual global table id.
413    pub async fn create_internal_table_catalog(
414        &self,
415        job: &StreamingJob,
416        mut incomplete_internal_tables: Vec<PbTable>,
417    ) -> MetaResult<HashMap<TableId, TableId>> {
418        let job_id = job.id();
419        let inner = self.inner.write().await;
420        let txn = inner.db.begin().await?;
421
422        // Ensure the job exists.
423        ensure_job_not_canceled(job_id, &txn).await?;
424
425        let mut table_id_map = HashMap::new();
426        for table in &mut incomplete_internal_tables {
427            let table_id = Self::create_object(
428                &txn,
429                ObjectType::Table,
430                table.owner as _,
431                Some(table.database_id),
432                Some(table.schema_id),
433            )
434            .await?
435            .oid
436            .as_table_id();
437            table_id_map.insert(table.id, table_id);
438            table.id = table_id;
439            table.job_id = Some(job_id);
440
441            let table_model = table::ActiveModel {
442                table_id: Set(table_id),
443                belongs_to_job_id: Set(Some(job_id)),
444                fragment_id: NotSet,
445                ..table.clone().into()
446            };
447            Table::insert(table_model).exec(&txn).await?;
448        }
449        txn.commit().await?;
450
451        Ok(table_id_map)
452    }
453
454    pub async fn prepare_stream_job_fragments(
455        &self,
456        stream_job_fragments: &StreamJobFragmentsToCreate,
457        streaming_job: &StreamingJob,
458        for_replace: bool,
459    ) -> MetaResult<()> {
460        self.prepare_streaming_job(
461            stream_job_fragments.stream_job_id(),
462            || stream_job_fragments.fragments.values(),
463            &stream_job_fragments.downstreams,
464            for_replace,
465            Some(streaming_job),
466        )
467        .await
468    }
469
470    // TODO: In this function, we also update the `Table` model in the meta store.
471    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
472    // making them the source of truth and performing a full replacement for those in the meta store?
473    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
474    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
475    )]
476    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
477        &self,
478        job_id: JobId,
479        get_fragments: impl Fn() -> I + 'a,
480        downstreams: &FragmentDownstreamRelation,
481        for_replace: bool,
482        creating_streaming_job: Option<&'a StreamingJob>,
483    ) -> MetaResult<()> {
484        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
485
486        let inner = self.inner.write().await;
487
488        let need_notify = creating_streaming_job
489            .map(|job| job.should_notify_creating())
490            .unwrap_or(false);
491        let definition = creating_streaming_job.map(|job| job.definition());
492
493        let mut objects_to_notify = vec![];
494        let txn = inner.db.begin().await?;
495
496        // Ensure the job exists.
497        ensure_job_not_canceled(job_id, &txn).await?;
498
499        for fragment in fragments {
500            let fragment_id = fragment.fragment_id;
501            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
502
503            let fragment = fragment.into_active_model();
504            Fragment::insert(fragment).exec(&txn).await?;
505
506            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
507            // After table fragments are created, update them for all tables.
508            if !for_replace {
509                let all_tables = StreamJobFragments::collect_tables(get_fragments());
510                for state_table_id in state_table_ids {
511                    // Table's vnode count is not always the fragment's vnode count, so we have to
512                    // look up the table from `TableFragments`.
513                    // See `ActorGraphBuilder::new`.
514                    let table = all_tables
515                        .get(&state_table_id)
516                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
517                    assert_eq!(table.id, state_table_id);
518                    assert_eq!(table.fragment_id, fragment_id);
519                    let vnode_count = table.vnode_count();
520
521                    table::ActiveModel {
522                        table_id: Set(state_table_id as _),
523                        fragment_id: Set(Some(fragment_id)),
524                        vnode_count: Set(vnode_count as _),
525                        ..Default::default()
526                    }
527                    .update(&txn)
528                    .await?;
529
530                    if need_notify {
531                        let mut table = table.clone();
532                        // In production, definition was replaced but still needed for notification.
533                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
534                            table.definition = definition.clone().unwrap();
535                        }
536                        objects_to_notify.push(PbObject {
537                            object_info: Some(PbObjectInfo::Table(table)),
538                        });
539                    }
540                }
541            }
542        }
543
544        // Add streaming job objects to notification
545        if need_notify {
546            match creating_streaming_job.unwrap() {
547                StreamingJob::Table(Some(source), ..) => {
548                    objects_to_notify.push(PbObject {
549                        object_info: Some(PbObjectInfo::Source(source.clone())),
550                    });
551                }
552                StreamingJob::Sink(sink) => {
553                    objects_to_notify.push(PbObject {
554                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
555                    });
556                }
557                StreamingJob::Index(index, _) => {
558                    objects_to_notify.push(PbObject {
559                        object_info: Some(PbObjectInfo::Index(index.clone())),
560                    });
561                }
562                _ => {}
563            }
564        }
565
566        insert_fragment_relations(&txn, downstreams).await?;
567
568        if !for_replace {
569            // Update dml fragment id.
570            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
571                Table::update(table::ActiveModel {
572                    table_id: Set(table.id),
573                    dml_fragment_id: Set(table.dml_fragment_id),
574                    ..Default::default()
575                })
576                .exec(&txn)
577                .await?;
578            }
579        }
580
581        txn.commit().await?;
582
583        // FIXME: there's a gap between the catalog creation and notification, which may lead to
584        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
585        // need to either forward the notification or filter catalogs without any fragments in the metastore.
586        if !objects_to_notify.is_empty() {
587            self.notify_frontend(
588                Operation::Add,
589                Info::ObjectGroup(PbObjectGroup {
590                    objects: objects_to_notify,
591                }),
592            )
593            .await;
594        }
595
596        Ok(())
597    }
598
599    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
600    /// information is required to build barrier mutation.
601    pub async fn build_cancel_command(
602        &self,
603        table_fragments: &StreamJobFragments,
604    ) -> MetaResult<Command> {
605        let inner = self.inner.read().await;
606        let txn = inner.db.begin().await?;
607
608        let dropped_sink_fragment_with_target =
609            if let Some(sink_fragment) = table_fragments.sink_fragment() {
610                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
611                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
612                sink_target_fragment
613                    .get(&sink_fragment_id)
614                    .map(|target_fragments| {
615                        let target_fragment_id = *target_fragments
616                            .first()
617                            .expect("sink should have at least one downstream fragment");
618                        (sink_fragment_id, target_fragment_id)
619                    })
620            } else {
621                None
622            };
623
624        Ok(Command::DropStreamingJobs {
625            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
626            actors: table_fragments.actor_ids().collect(),
627            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
628            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
629            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
630                .into_iter()
631                .map(|(sink, target)| (target as _, vec![sink as _]))
632                .collect(),
633        })
634    }
635
636    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
637    /// It returns (true, _) if the job is not found or aborted.
638    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
639    #[await_tree::instrument]
640    pub async fn try_abort_creating_streaming_job(
641        &self,
642        mut job_id: JobId,
643        is_cancelled: bool,
644    ) -> MetaResult<(bool, Option<DatabaseId>)> {
645        let mut inner = self.inner.write().await;
646        let txn = inner.db.begin().await?;
647
648        let obj = Object::find_by_id(job_id).one(&txn).await?;
649        let Some(obj) = obj else {
650            tracing::warn!(
651                id = %job_id,
652                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
653            );
654            return Ok((true, None));
655        };
656        let database_id = obj
657            .database_id
658            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
659        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
660
661        if !is_cancelled && let Some(streaming_job) = &streaming_job {
662            assert_ne!(streaming_job.job_status, JobStatus::Created);
663            if streaming_job.create_type == CreateType::Background
664                && streaming_job.job_status == JobStatus::Creating
665            {
666                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
667                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
668                {
669                    // If the job belongs to an iceberg table, we still need to clean it.
670                } else {
671                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
672                    tracing::warn!(
673                        id = %job_id,
674                        "streaming job is created in background and still in creating status"
675                    );
676                    return Ok((false, Some(database_id)));
677                }
678            }
679        }
680
681        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
682        let original_job_id = job_id;
683        let original_obj_type = obj.obj_type;
684
685        let iceberg_table_id =
686            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
687        if let Some(iceberg_table_id) = iceberg_table_id {
688            // If the job is iceberg sink, we need to clean the iceberg table as well.
689            // Here we will drop the sink objects directly.
690            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
691            Object::delete_many()
692                .filter(
693                    object::Column::Oid
694                        .eq(job_id)
695                        .or(object::Column::Oid.is_in(internal_tables)),
696                )
697                .exec(&txn)
698                .await?;
699            job_id = iceberg_table_id.as_job_id();
700        };
701
702        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
703
704        // Get the notification info if the job is a materialized view or created in the background.
705        let mut objs = vec![];
706        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
707
708        let mut need_notify =
709            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
710        if !need_notify {
711            if let Some(table) = &table_obj {
712                need_notify = table.table_type == TableType::MaterializedView;
713            } else if original_obj_type == ObjectType::Sink {
714                need_notify = true;
715            }
716        }
717
718        if is_cancelled {
719            let dropped_tables = Table::find()
720                .find_also_related(Object)
721                .filter(
722                    table::Column::TableId.is_in(
723                        internal_table_ids
724                            .iter()
725                            .cloned()
726                            .chain(table_obj.iter().map(|t| t.table_id as _)),
727                    ),
728                )
729                .all(&txn)
730                .await?
731                .into_iter()
732                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
733            inner
734                .dropped_tables
735                .extend(dropped_tables.map(|t| (t.id, t)));
736        }
737
738        if need_notify {
739            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
740            // Ensure we still notify the frontend to delete the original sink object.
741            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
742                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
743                    .select_only()
744                    .columns([
745                        object::Column::Oid,
746                        object::Column::ObjType,
747                        object::Column::SchemaId,
748                        object::Column::DatabaseId,
749                    ])
750                    .into_partial_model()
751                    .one(&txn)
752                    .await?;
753                if let Some(orig_obj) = orig_obj {
754                    objs.push(orig_obj);
755                }
756            }
757
758            let obj: Option<PartialObject> = Object::find_by_id(job_id)
759                .select_only()
760                .columns([
761                    object::Column::Oid,
762                    object::Column::ObjType,
763                    object::Column::SchemaId,
764                    object::Column::DatabaseId,
765                ])
766                .into_partial_model()
767                .one(&txn)
768                .await?;
769            let obj =
770                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
771            objs.push(obj);
772            let internal_table_objs: Vec<PartialObject> = Object::find()
773                .select_only()
774                .columns([
775                    object::Column::Oid,
776                    object::Column::ObjType,
777                    object::Column::SchemaId,
778                    object::Column::DatabaseId,
779                ])
780                .join(JoinType::InnerJoin, object::Relation::Table.def())
781                .filter(table::Column::BelongsToJobId.eq(job_id))
782                .into_partial_model()
783                .all(&txn)
784                .await?;
785            objs.extend(internal_table_objs);
786        }
787
788        // Check if the job is creating sink into table.
789        if table_obj.is_none()
790            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
791                .select_only()
792                .column(sink::Column::TargetTable)
793                .into_tuple::<Option<TableId>>()
794                .one(&txn)
795                .await?
796        {
797            let tmp_id: Option<ObjectId> = ObjectDependency::find()
798                .select_only()
799                .column(object_dependency::Column::UsedBy)
800                .join(
801                    JoinType::InnerJoin,
802                    object_dependency::Relation::Object1.def(),
803                )
804                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
805                .filter(
806                    object_dependency::Column::Oid
807                        .eq(target_table_id)
808                        .and(object::Column::ObjType.eq(ObjectType::Table))
809                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
810                )
811                .into_tuple()
812                .one(&txn)
813                .await?;
814            if let Some(tmp_id) = tmp_id {
815                tracing::warn!(
816                    id = %tmp_id,
817                    "aborting temp streaming job for sink into table"
818                );
819
820                Object::delete_by_id(tmp_id).exec(&txn).await?;
821            }
822        }
823
824        Object::delete_by_id(job_id).exec(&txn).await?;
825        if !internal_table_ids.is_empty() {
826            Object::delete_many()
827                .filter(object::Column::Oid.is_in(internal_table_ids))
828                .exec(&txn)
829                .await?;
830        }
831        if let Some(t) = &table_obj
832            && let Some(source_id) = t.optional_associated_source_id
833        {
834            Object::delete_by_id(source_id).exec(&txn).await?;
835        }
836
837        let err = if is_cancelled {
838            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
839        } else {
840            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
841        };
842        let abort_reason = format!("streaming job aborted {}", err.as_report());
843        for tx in inner
844            .creating_table_finish_notifier
845            .get_mut(&database_id)
846            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
847            .into_iter()
848            .flatten()
849            .flatten()
850        {
851            let _ = tx.send(Err(abort_reason.clone()));
852        }
853        txn.commit().await?;
854
855        if !objs.is_empty() {
856            // We also have notified the frontend about these objects,
857            // so we need to notify the frontend to delete them here.
858            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
859                .await;
860        }
861        Ok((true, Some(database_id)))
862    }
863
864    #[await_tree::instrument]
865    pub async fn post_collect_job_fragments(
866        &self,
867        job_id: JobId,
868        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
869        new_sink_downstream: Option<FragmentDownstreamRelation>,
870        split_assignment: Option<&SplitAssignment>,
871    ) -> MetaResult<()> {
872        let inner = self.inner.write().await;
873        let txn = inner.db.begin().await?;
874
875        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
876
877        if let Some(new_downstream) = new_sink_downstream {
878            insert_fragment_relations(&txn, &new_downstream).await?;
879        }
880
881        // Mark job as CREATING.
882        streaming_job::ActiveModel {
883            job_id: Set(job_id),
884            job_status: Set(JobStatus::Creating),
885            ..Default::default()
886        }
887        .update(&txn)
888        .await?;
889
890        if let Some(split_assignment) = split_assignment {
891            let fragment_splits = split_assignment
892                .iter()
893                .map(|(fragment_id, splits)| {
894                    (
895                        *fragment_id as _,
896                        splits.values().flatten().cloned().collect_vec(),
897                    )
898                })
899                .collect();
900
901            self.update_fragment_splits(&txn, &fragment_splits).await?;
902        }
903
904        txn.commit().await?;
905
906        Ok(())
907    }
908
909    pub async fn create_job_catalog_for_replace(
910        &self,
911        streaming_job: &StreamingJob,
912        ctx: Option<&StreamContext>,
913        specified_parallelism: Option<&NonZeroUsize>,
914        expected_original_max_parallelism: Option<usize>,
915    ) -> MetaResult<JobId> {
916        let id = streaming_job.id();
917        let inner = self.inner.write().await;
918        let txn = inner.db.begin().await?;
919
920        // 1. check version.
921        streaming_job.verify_version_for_replace(&txn).await?;
922        // 2. check concurrent replace.
923        let referring_cnt = ObjectDependency::find()
924            .join(
925                JoinType::InnerJoin,
926                object_dependency::Relation::Object1.def(),
927            )
928            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
929            .filter(
930                object_dependency::Column::Oid
931                    .eq(id)
932                    .and(object::Column::ObjType.eq(ObjectType::Table))
933                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
934            )
935            .count(&txn)
936            .await?;
937        if referring_cnt != 0 {
938            return Err(MetaError::permission_denied(
939                "job is being altered or referenced by some creating jobs",
940            ));
941        }
942
943        // 3. check parallelism.
944        let (original_max_parallelism, original_timezone, original_config_override): (
945            i32,
946            Option<String>,
947            Option<String>,
948        ) = StreamingJobModel::find_by_id(id)
949            .select_only()
950            .column(streaming_job::Column::MaxParallelism)
951            .column(streaming_job::Column::Timezone)
952            .column(streaming_job::Column::ConfigOverride)
953            .into_tuple()
954            .one(&txn)
955            .await?
956            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
957
958        if let Some(max_parallelism) = expected_original_max_parallelism
959            && original_max_parallelism != max_parallelism as i32
960        {
961            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
962            // This should not happen in normal cases.
963            bail!(
964                "cannot use a different max parallelism \
965                 when replacing streaming job, \
966                 original: {}, new: {}",
967                original_max_parallelism,
968                max_parallelism
969            );
970        }
971
972        let parallelism = match specified_parallelism {
973            None => StreamingParallelism::Adaptive,
974            Some(n) => StreamingParallelism::Fixed(n.get() as _),
975        };
976
977        let ctx = StreamContext {
978            timezone: ctx
979                .map(|ctx| ctx.timezone.clone())
980                .unwrap_or(original_timezone),
981            // We don't expect replacing a job with a different config override.
982            // Thus always use the original config override.
983            config_override: original_config_override.unwrap_or_default().into(),
984        };
985
986        // 4. create streaming object for new replace table.
987        let new_obj_id = Self::create_streaming_job_obj(
988            &txn,
989            streaming_job.object_type(),
990            streaming_job.owner() as _,
991            Some(streaming_job.database_id() as _),
992            Some(streaming_job.schema_id() as _),
993            streaming_job.create_type(),
994            ctx,
995            parallelism,
996            original_max_parallelism as _,
997            None,
998            None,
999        )
1000        .await?;
1001
1002        // 5. record dependency for new replace table.
1003        ObjectDependency::insert(object_dependency::ActiveModel {
1004            oid: Set(id.as_object_id()),
1005            used_by: Set(new_obj_id.as_object_id()),
1006            ..Default::default()
1007        })
1008        .exec(&txn)
1009        .await?;
1010
1011        txn.commit().await?;
1012
1013        Ok(new_obj_id)
1014    }
1015
1016    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1017    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1018        let mut inner = self.inner.write().await;
1019        let txn = inner.db.begin().await?;
1020
1021        // Check if the job belongs to iceberg table.
1022        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1023            tracing::info!(
1024                "streaming job {} is for iceberg table, wait for manual finish operation",
1025                job_id
1026            );
1027            return Ok(());
1028        }
1029
1030        let (notification_op, objects, updated_user_info) =
1031            Self::finish_streaming_job_inner(&txn, job_id).await?;
1032
1033        txn.commit().await?;
1034
1035        let mut version = self
1036            .notify_frontend(
1037                notification_op,
1038                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1039            )
1040            .await;
1041
1042        // notify users about the default privileges
1043        if !updated_user_info.is_empty() {
1044            version = self.notify_users_update(updated_user_info).await;
1045        }
1046
1047        inner
1048            .creating_table_finish_notifier
1049            .values_mut()
1050            .for_each(|creating_tables| {
1051                if let Some(txs) = creating_tables.remove(&job_id) {
1052                    for tx in txs {
1053                        let _ = tx.send(Ok(version));
1054                    }
1055                }
1056            });
1057
1058        Ok(())
1059    }
1060
1061    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1062    pub async fn finish_streaming_job_inner(
1063        txn: &DatabaseTransaction,
1064        job_id: JobId,
1065    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1066        let job_type = Object::find_by_id(job_id)
1067            .select_only()
1068            .column(object::Column::ObjType)
1069            .into_tuple()
1070            .one(txn)
1071            .await?
1072            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1073
1074        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1075            .select_only()
1076            .column(streaming_job::Column::CreateType)
1077            .into_tuple()
1078            .one(txn)
1079            .await?
1080            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1081
1082        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1083        let res = Object::update_many()
1084            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1085            .col_expr(
1086                object::Column::CreatedAtClusterVersion,
1087                current_cluster_version().into(),
1088            )
1089            .filter(object::Column::Oid.eq(job_id))
1090            .exec(txn)
1091            .await?;
1092        if res.rows_affected == 0 {
1093            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1094        }
1095
1096        // mark the target stream job as `Created`.
1097        let job = streaming_job::ActiveModel {
1098            job_id: Set(job_id),
1099            job_status: Set(JobStatus::Created),
1100            ..Default::default()
1101        };
1102        job.update(txn).await?;
1103
1104        // notify frontend: job, internal tables.
1105        let internal_table_objs = Table::find()
1106            .find_also_related(Object)
1107            .filter(table::Column::BelongsToJobId.eq(job_id))
1108            .all(txn)
1109            .await?;
1110        let mut objects = internal_table_objs
1111            .iter()
1112            .map(|(table, obj)| PbObject {
1113                object_info: Some(PbObjectInfo::Table(
1114                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1115                )),
1116            })
1117            .collect_vec();
1118        let mut notification_op = if create_type == CreateType::Background {
1119            NotificationOperation::Update
1120        } else {
1121            NotificationOperation::Add
1122        };
1123        let mut updated_user_info = vec![];
1124        let mut need_grant_default_privileges = true;
1125
1126        match job_type {
1127            ObjectType::Table => {
1128                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1129                    .find_also_related(Object)
1130                    .one(txn)
1131                    .await?
1132                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1133                if table.table_type == TableType::MaterializedView {
1134                    notification_op = NotificationOperation::Update;
1135                }
1136
1137                if let Some(source_id) = table.optional_associated_source_id {
1138                    let (src, obj) = Source::find_by_id(source_id)
1139                        .find_also_related(Object)
1140                        .one(txn)
1141                        .await?
1142                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1143                    objects.push(PbObject {
1144                        object_info: Some(PbObjectInfo::Source(
1145                            ObjectModel(src, obj.unwrap()).into(),
1146                        )),
1147                    });
1148                }
1149                objects.push(PbObject {
1150                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1151                });
1152            }
1153            ObjectType::Sink => {
1154                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1155                    .find_also_related(Object)
1156                    .one(txn)
1157                    .await?
1158                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1159                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1160                    need_grant_default_privileges = false;
1161                }
1162                // If sinks were pre-notified during CREATING, we should use Update at finish
1163                // to avoid duplicate Add notifications (align with MV behavior).
1164                notification_op = NotificationOperation::Update;
1165                objects.push(PbObject {
1166                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1167                });
1168            }
1169            ObjectType::Index => {
1170                need_grant_default_privileges = false;
1171                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1172                    .find_also_related(Object)
1173                    .one(txn)
1174                    .await?
1175                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1176                {
1177                    let (table, obj) = Table::find_by_id(index.index_table_id)
1178                        .find_also_related(Object)
1179                        .one(txn)
1180                        .await?
1181                        .ok_or_else(|| {
1182                            MetaError::catalog_id_not_found("table", index.index_table_id)
1183                        })?;
1184                    objects.push(PbObject {
1185                        object_info: Some(PbObjectInfo::Table(
1186                            ObjectModel(table, obj.unwrap()).into(),
1187                        )),
1188                    });
1189                }
1190
1191                // If the index is created on a table with privileges, we should also
1192                // grant the privileges for the index and its state tables.
1193                let primary_table_privileges = UserPrivilege::find()
1194                    .filter(
1195                        user_privilege::Column::Oid
1196                            .eq(index.primary_table_id)
1197                            .and(user_privilege::Column::Action.eq(Action::Select)),
1198                    )
1199                    .all(txn)
1200                    .await?;
1201                if !primary_table_privileges.is_empty() {
1202                    let index_state_table_ids: Vec<TableId> = Table::find()
1203                        .select_only()
1204                        .column(table::Column::TableId)
1205                        .filter(
1206                            table::Column::BelongsToJobId
1207                                .eq(job_id)
1208                                .or(table::Column::TableId.eq(index.index_table_id)),
1209                        )
1210                        .into_tuple()
1211                        .all(txn)
1212                        .await?;
1213                    let mut new_privileges = vec![];
1214                    for privilege in &primary_table_privileges {
1215                        for state_table_id in &index_state_table_ids {
1216                            new_privileges.push(user_privilege::ActiveModel {
1217                                id: Default::default(),
1218                                oid: Set(state_table_id.as_object_id()),
1219                                user_id: Set(privilege.user_id),
1220                                action: Set(Action::Select),
1221                                dependent_id: Set(privilege.dependent_id),
1222                                granted_by: Set(privilege.granted_by),
1223                                with_grant_option: Set(privilege.with_grant_option),
1224                            });
1225                        }
1226                    }
1227                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1228
1229                    updated_user_info = list_user_info_by_ids(
1230                        primary_table_privileges.into_iter().map(|p| p.user_id),
1231                        txn,
1232                    )
1233                    .await?;
1234                }
1235
1236                objects.push(PbObject {
1237                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1238                });
1239            }
1240            ObjectType::Source => {
1241                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1242                    .find_also_related(Object)
1243                    .one(txn)
1244                    .await?
1245                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1246                objects.push(PbObject {
1247                    object_info: Some(PbObjectInfo::Source(
1248                        ObjectModel(source, obj.unwrap()).into(),
1249                    )),
1250                });
1251            }
1252            _ => unreachable!("invalid job type: {:?}", job_type),
1253        }
1254
1255        if need_grant_default_privileges {
1256            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1257        }
1258
1259        Ok((notification_op, objects, updated_user_info))
1260    }
1261
1262    pub async fn finish_replace_streaming_job(
1263        &self,
1264        tmp_id: JobId,
1265        streaming_job: StreamingJob,
1266        replace_upstream: FragmentReplaceUpstream,
1267        sink_into_table_context: SinkIntoTableContext,
1268        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1269        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1270    ) -> MetaResult<NotificationVersion> {
1271        let inner = self.inner.write().await;
1272        let txn = inner.db.begin().await?;
1273
1274        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1275            tmp_id,
1276            replace_upstream,
1277            sink_into_table_context,
1278            &txn,
1279            streaming_job,
1280            drop_table_connector_ctx,
1281            auto_refresh_schema_sinks,
1282        )
1283        .await?;
1284
1285        txn.commit().await?;
1286
1287        let mut version = self
1288            .notify_frontend(
1289                NotificationOperation::Update,
1290                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1291            )
1292            .await;
1293
1294        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1295            self.notify_users_update(user_infos).await;
1296            version = self
1297                .notify_frontend(
1298                    NotificationOperation::Delete,
1299                    build_object_group_for_delete(to_drop_objects),
1300                )
1301                .await;
1302        }
1303
1304        Ok(version)
1305    }
1306
1307    pub async fn finish_replace_streaming_job_inner(
1308        tmp_id: JobId,
1309        replace_upstream: FragmentReplaceUpstream,
1310        SinkIntoTableContext {
1311            updated_sink_catalogs,
1312        }: SinkIntoTableContext,
1313        txn: &DatabaseTransaction,
1314        streaming_job: StreamingJob,
1315        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1316        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1317    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1318        let original_job_id = streaming_job.id();
1319        let job_type = streaming_job.job_type();
1320
1321        let mut index_item_rewriter = None;
1322
1323        // Update catalog
1324        match streaming_job {
1325            StreamingJob::Table(_source, table, _table_job_type) => {
1326                // The source catalog should remain unchanged
1327
1328                let original_column_catalogs =
1329                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1330
1331                index_item_rewriter = Some({
1332                    let original_columns = original_column_catalogs
1333                        .to_protobuf()
1334                        .into_iter()
1335                        .map(|c| c.column_desc.unwrap())
1336                        .collect_vec();
1337                    let new_columns = table
1338                        .columns
1339                        .iter()
1340                        .map(|c| c.column_desc.clone().unwrap())
1341                        .collect_vec();
1342
1343                    IndexItemRewriter {
1344                        original_columns,
1345                        new_columns,
1346                    }
1347                });
1348
1349                // For sinks created in earlier versions, we need to set the original_target_columns.
1350                for sink_id in updated_sink_catalogs {
1351                    sink::ActiveModel {
1352                        sink_id: Set(sink_id as _),
1353                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1354                        ..Default::default()
1355                    }
1356                    .update(txn)
1357                    .await?;
1358                }
1359                // Update the table catalog with the new one. (column catalog is also updated here)
1360                let mut table = table::ActiveModel::from(table);
1361                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1362                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1363                {
1364                    // drop table connector, the rest logic is in `drop_table_associated_source`
1365                    table.optional_associated_source_id = Set(None);
1366                }
1367
1368                table.update(txn).await?;
1369            }
1370            StreamingJob::Source(source) => {
1371                // Update the source catalog with the new one.
1372                let source = source::ActiveModel::from(source);
1373                source.update(txn).await?;
1374            }
1375            StreamingJob::MaterializedView(table) => {
1376                // Update the table catalog with the new one.
1377                let table = table::ActiveModel::from(table);
1378                table.update(txn).await?;
1379            }
1380            _ => unreachable!(
1381                "invalid streaming job type: {:?}",
1382                streaming_job.job_type_str()
1383            ),
1384        }
1385
1386        async fn finish_fragments(
1387            txn: &DatabaseTransaction,
1388            tmp_id: JobId,
1389            original_job_id: JobId,
1390            replace_upstream: FragmentReplaceUpstream,
1391        ) -> MetaResult<()> {
1392            // 0. update internal tables
1393            // Fields including `fragment_id` were placeholder values before.
1394            // After table fragments are created, update them for all internal tables.
1395            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1396                .select_only()
1397                .columns([
1398                    fragment::Column::FragmentId,
1399                    fragment::Column::StateTableIds,
1400                ])
1401                .filter(fragment::Column::JobId.eq(tmp_id))
1402                .into_tuple()
1403                .all(txn)
1404                .await?;
1405            for (fragment_id, state_table_ids) in fragment_info {
1406                for state_table_id in state_table_ids.into_inner() {
1407                    let state_table_id = TableId::new(state_table_id as _);
1408                    table::ActiveModel {
1409                        table_id: Set(state_table_id),
1410                        fragment_id: Set(Some(fragment_id)),
1411                        // No need to update `vnode_count` because it must remain the same.
1412                        ..Default::default()
1413                    }
1414                    .update(txn)
1415                    .await?;
1416                }
1417            }
1418
1419            // 1. replace old fragments/actors with new ones.
1420            Fragment::delete_many()
1421                .filter(fragment::Column::JobId.eq(original_job_id))
1422                .exec(txn)
1423                .await?;
1424            Fragment::update_many()
1425                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1426                .filter(fragment::Column::JobId.eq(tmp_id))
1427                .exec(txn)
1428                .await?;
1429
1430            // 2. update merges.
1431            // update downstream fragment's Merge node, and upstream_fragment_id
1432            for (fragment_id, fragment_replace_map) in replace_upstream {
1433                let (fragment_id, mut stream_node) =
1434                    Fragment::find_by_id(fragment_id as FragmentId)
1435                        .select_only()
1436                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1437                        .into_tuple::<(FragmentId, StreamNode)>()
1438                        .one(txn)
1439                        .await?
1440                        .map(|(id, node)| (id, node.to_protobuf()))
1441                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1442
1443                visit_stream_node_mut(&mut stream_node, |body| {
1444                    if let PbNodeBody::Merge(m) = body
1445                        && let Some(new_fragment_id) =
1446                            fragment_replace_map.get(&m.upstream_fragment_id)
1447                    {
1448                        m.upstream_fragment_id = *new_fragment_id;
1449                    }
1450                });
1451                fragment::ActiveModel {
1452                    fragment_id: Set(fragment_id),
1453                    stream_node: Set(StreamNode::from(&stream_node)),
1454                    ..Default::default()
1455                }
1456                .update(txn)
1457                .await?;
1458            }
1459
1460            // 3. remove dummy object.
1461            Object::delete_by_id(tmp_id).exec(txn).await?;
1462
1463            Ok(())
1464        }
1465
1466        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1467
1468        // 4. update catalogs and notify.
1469        let mut objects = vec![];
1470        match job_type {
1471            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1472                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1473                    .find_also_related(Object)
1474                    .one(txn)
1475                    .await?
1476                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1477                objects.push(PbObject {
1478                    object_info: Some(PbObjectInfo::Table(
1479                        ObjectModel(table, table_obj.unwrap()).into(),
1480                    )),
1481                })
1482            }
1483            StreamingJobType::Source => {
1484                let (source, source_obj) =
1485                    Source::find_by_id(original_job_id.as_shared_source_id())
1486                        .find_also_related(Object)
1487                        .one(txn)
1488                        .await?
1489                        .ok_or_else(|| {
1490                            MetaError::catalog_id_not_found("object", original_job_id)
1491                        })?;
1492                objects.push(PbObject {
1493                    object_info: Some(PbObjectInfo::Source(
1494                        ObjectModel(source, source_obj.unwrap()).into(),
1495                    )),
1496                })
1497            }
1498            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1499        }
1500
1501        if let Some(expr_rewriter) = index_item_rewriter {
1502            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1503                .select_only()
1504                .columns([index::Column::IndexId, index::Column::IndexItems])
1505                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1506                .into_tuple()
1507                .all(txn)
1508                .await?;
1509            for (index_id, nodes) in index_items {
1510                let mut pb_nodes = nodes.to_protobuf();
1511                pb_nodes
1512                    .iter_mut()
1513                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1514                let index = index::ActiveModel {
1515                    index_id: Set(index_id),
1516                    index_items: Set(pb_nodes.into()),
1517                    ..Default::default()
1518                }
1519                .update(txn)
1520                .await?;
1521                let index_obj = index
1522                    .find_related(Object)
1523                    .one(txn)
1524                    .await?
1525                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1526                objects.push(PbObject {
1527                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1528                });
1529            }
1530        }
1531
1532        if let Some(sinks) = auto_refresh_schema_sinks {
1533            for finish_sink_context in sinks {
1534                finish_fragments(
1535                    txn,
1536                    finish_sink_context.tmp_sink_id.as_job_id(),
1537                    finish_sink_context.original_sink_id.as_job_id(),
1538                    Default::default(),
1539                )
1540                .await?;
1541                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1542                    .find_also_related(Object)
1543                    .one(txn)
1544                    .await?
1545                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1546                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1547                Sink::update(sink::ActiveModel {
1548                    sink_id: Set(finish_sink_context.original_sink_id),
1549                    columns: Set(columns.clone()),
1550                    ..Default::default()
1551                })
1552                .exec(txn)
1553                .await?;
1554                sink.columns = columns;
1555                objects.push(PbObject {
1556                    object_info: Some(PbObjectInfo::Sink(
1557                        ObjectModel(sink, sink_obj.unwrap()).into(),
1558                    )),
1559                });
1560                if let Some((log_store_table_id, new_log_store_table_columns)) =
1561                    finish_sink_context.new_log_store_table
1562                {
1563                    let new_log_store_table_columns: ColumnCatalogArray =
1564                        new_log_store_table_columns.into();
1565                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1566                        .find_also_related(Object)
1567                        .one(txn)
1568                        .await?
1569                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1570                    Table::update(table::ActiveModel {
1571                        table_id: Set(log_store_table_id),
1572                        columns: Set(new_log_store_table_columns.clone()),
1573                        ..Default::default()
1574                    })
1575                    .exec(txn)
1576                    .await?;
1577                    table.columns = new_log_store_table_columns;
1578                    objects.push(PbObject {
1579                        object_info: Some(PbObjectInfo::Table(
1580                            ObjectModel(table, table_obj.unwrap()).into(),
1581                        )),
1582                    });
1583                }
1584            }
1585        }
1586
1587        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1588        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1589            notification_objs =
1590                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1591        }
1592
1593        Ok((objects, notification_objs))
1594    }
1595
1596    /// Abort the replacing streaming job by deleting the temporary job object.
1597    pub async fn try_abort_replacing_streaming_job(
1598        &self,
1599        tmp_job_id: JobId,
1600        tmp_sink_ids: Option<Vec<ObjectId>>,
1601    ) -> MetaResult<()> {
1602        let inner = self.inner.write().await;
1603        let txn = inner.db.begin().await?;
1604        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1605        if let Some(tmp_sink_ids) = tmp_sink_ids {
1606            for tmp_sink_id in tmp_sink_ids {
1607                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1608            }
1609        }
1610        txn.commit().await?;
1611        Ok(())
1612    }
1613
1614    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1615    // return the actor_ids to be applied
1616    pub async fn update_source_rate_limit_by_source_id(
1617        &self,
1618        source_id: SourceId,
1619        rate_limit: Option<u32>,
1620    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1621        let inner = self.inner.read().await;
1622        let txn = inner.db.begin().await?;
1623
1624        {
1625            let active_source = source::ActiveModel {
1626                source_id: Set(source_id),
1627                rate_limit: Set(rate_limit.map(|v| v as i32)),
1628                ..Default::default()
1629            };
1630            active_source.update(&txn).await?;
1631        }
1632
1633        let (source, obj) = Source::find_by_id(source_id)
1634            .find_also_related(Object)
1635            .one(&txn)
1636            .await?
1637            .ok_or_else(|| {
1638                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1639            })?;
1640
1641        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1642        let streaming_job_ids: Vec<JobId> =
1643            if let Some(table_id) = source.optional_associated_table_id {
1644                vec![table_id.as_job_id()]
1645            } else if let Some(source_info) = &source.source_info
1646                && source_info.to_protobuf().is_shared()
1647            {
1648                vec![source_id.as_share_source_job_id()]
1649            } else {
1650                ObjectDependency::find()
1651                    .select_only()
1652                    .column(object_dependency::Column::UsedBy)
1653                    .filter(object_dependency::Column::Oid.eq(source_id))
1654                    .into_tuple()
1655                    .all(&txn)
1656                    .await?
1657            };
1658
1659        if streaming_job_ids.is_empty() {
1660            return Err(MetaError::invalid_parameter(format!(
1661                "source id {source_id} not used by any streaming job"
1662            )));
1663        }
1664
1665        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1666            .select_only()
1667            .columns([
1668                fragment::Column::FragmentId,
1669                fragment::Column::JobId,
1670                fragment::Column::FragmentTypeMask,
1671                fragment::Column::StreamNode,
1672            ])
1673            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1674            .into_tuple()
1675            .all(&txn)
1676            .await?;
1677        let mut fragments = fragments
1678            .into_iter()
1679            .map(|(id, job_id, mask, stream_node)| {
1680                (
1681                    id,
1682                    job_id,
1683                    FragmentTypeMask::from(mask as u32),
1684                    stream_node.to_protobuf(),
1685                )
1686            })
1687            .collect_vec();
1688
1689        fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1690            let mut found = false;
1691            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1692                visit_stream_node_mut(stream_node, |node| {
1693                    if let PbNodeBody::Source(node) = node
1694                        && let Some(node_inner) = &mut node.source_inner
1695                        && node_inner.source_id == source_id
1696                    {
1697                        node_inner.rate_limit = rate_limit;
1698                        found = true;
1699                    }
1700                });
1701            }
1702            if is_fs_source {
1703                // in older versions, there's no fragment type flag for `FsFetch` node,
1704                // so we just scan all fragments for StreamFsFetch node if using fs connector
1705                visit_stream_node_mut(stream_node, |node| {
1706                    if let PbNodeBody::StreamFsFetch(node) = node {
1707                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1708                        if let Some(node_inner) = &mut node.node_inner
1709                            && node_inner.source_id == source_id
1710                        {
1711                            node_inner.rate_limit = rate_limit;
1712                            found = true;
1713                        }
1714                    }
1715                });
1716            }
1717            found
1718        });
1719
1720        assert!(
1721            !fragments.is_empty(),
1722            "source id should be used by at least one fragment"
1723        );
1724
1725        let (fragment_ids, job_ids) = fragments
1726            .iter()
1727            .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1728            .unzip();
1729
1730        for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1731            fragment::ActiveModel {
1732                fragment_id: Set(fragment_id),
1733                fragment_type_mask: Set(fragment_type_mask.into()),
1734                stream_node: Set(StreamNode::from(&stream_node)),
1735                ..Default::default()
1736            }
1737            .update(&txn)
1738            .await?;
1739        }
1740
1741        txn.commit().await?;
1742
1743        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1744        let _version = self
1745            .notify_frontend(
1746                NotificationOperation::Update,
1747                NotificationInfo::ObjectGroup(PbObjectGroup {
1748                    objects: vec![PbObject {
1749                        object_info: Some(relation_info),
1750                    }],
1751                }),
1752            )
1753            .await;
1754
1755        Ok((job_ids, fragment_ids))
1756    }
1757
1758    // edit the content of fragments in given `table_id`
1759    // return the actor_ids to be applied
1760    pub async fn mutate_fragments_by_job_id(
1761        &self,
1762        job_id: JobId,
1763        // returns true if the mutation is applied
1764        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1765        // error message when no relevant fragments is found
1766        err_msg: &'static str,
1767    ) -> MetaResult<HashSet<FragmentId>> {
1768        let inner = self.inner.read().await;
1769        let txn = inner.db.begin().await?;
1770
1771        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1772            .select_only()
1773            .columns([
1774                fragment::Column::FragmentId,
1775                fragment::Column::FragmentTypeMask,
1776                fragment::Column::StreamNode,
1777            ])
1778            .filter(fragment::Column::JobId.eq(job_id))
1779            .into_tuple()
1780            .all(&txn)
1781            .await?;
1782        let mut fragments = fragments
1783            .into_iter()
1784            .map(|(id, mask, stream_node)| {
1785                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1786            })
1787            .collect_vec();
1788
1789        let fragments = fragments
1790            .iter_mut()
1791            .map(|(_, fragment_type_mask, stream_node)| {
1792                fragments_mutation_fn(*fragment_type_mask, stream_node)
1793            })
1794            .collect::<MetaResult<Vec<bool>>>()?
1795            .into_iter()
1796            .zip_eq_debug(std::mem::take(&mut fragments))
1797            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1798            .collect::<Vec<_>>();
1799
1800        if fragments.is_empty() {
1801            return Err(MetaError::invalid_parameter(format!(
1802                "job id {job_id}: {}",
1803                err_msg
1804            )));
1805        }
1806
1807        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1808        for (id, _, stream_node) in fragments {
1809            fragment::ActiveModel {
1810                fragment_id: Set(id),
1811                stream_node: Set(StreamNode::from(&stream_node)),
1812                ..Default::default()
1813            }
1814            .update(&txn)
1815            .await?;
1816        }
1817
1818        txn.commit().await?;
1819
1820        Ok(fragment_ids)
1821    }
1822
1823    async fn mutate_fragment_by_fragment_id(
1824        &self,
1825        fragment_id: FragmentId,
1826        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1827        err_msg: &'static str,
1828    ) -> MetaResult<()> {
1829        let inner = self.inner.read().await;
1830        let txn = inner.db.begin().await?;
1831
1832        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1833            Fragment::find_by_id(fragment_id)
1834                .select_only()
1835                .columns([
1836                    fragment::Column::FragmentTypeMask,
1837                    fragment::Column::StreamNode,
1838                ])
1839                .into_tuple()
1840                .one(&txn)
1841                .await?
1842                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1843        let mut pb_stream_node = stream_node.to_protobuf();
1844        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1845
1846        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1847            return Err(MetaError::invalid_parameter(format!(
1848                "fragment id {fragment_id}: {}",
1849                err_msg
1850            )));
1851        }
1852
1853        fragment::ActiveModel {
1854            fragment_id: Set(fragment_id),
1855            stream_node: Set(stream_node),
1856            ..Default::default()
1857        }
1858        .update(&txn)
1859        .await?;
1860
1861        txn.commit().await?;
1862
1863        Ok(())
1864    }
1865
1866    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1867    // return the actor_ids to be applied
1868    pub async fn update_backfill_rate_limit_by_job_id(
1869        &self,
1870        job_id: JobId,
1871        rate_limit: Option<u32>,
1872    ) -> MetaResult<HashSet<FragmentId>> {
1873        let update_backfill_rate_limit =
1874            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1875                let mut found = false;
1876                if fragment_type_mask
1877                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1878                {
1879                    visit_stream_node_mut(stream_node, |node| match node {
1880                        PbNodeBody::StreamCdcScan(node) => {
1881                            node.rate_limit = rate_limit;
1882                            found = true;
1883                        }
1884                        PbNodeBody::StreamScan(node) => {
1885                            node.rate_limit = rate_limit;
1886                            found = true;
1887                        }
1888                        PbNodeBody::SourceBackfill(node) => {
1889                            node.rate_limit = rate_limit;
1890                            found = true;
1891                        }
1892                        PbNodeBody::Sink(node) => {
1893                            node.rate_limit = rate_limit;
1894                            found = true;
1895                        }
1896                        _ => {}
1897                    });
1898                }
1899                Ok(found)
1900            };
1901
1902        self.mutate_fragments_by_job_id(
1903            job_id,
1904            update_backfill_rate_limit,
1905            "stream scan node or source node not found",
1906        )
1907        .await
1908    }
1909
1910    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1911    // return the actor_ids to be applied
1912    pub async fn update_sink_rate_limit_by_job_id(
1913        &self,
1914        sink_id: SinkId,
1915        rate_limit: Option<u32>,
1916    ) -> MetaResult<HashSet<FragmentId>> {
1917        let update_sink_rate_limit =
1918            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1919                let mut found = Ok(false);
1920                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1921                    visit_stream_node_mut(stream_node, |node| {
1922                        if let PbNodeBody::Sink(node) = node {
1923                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1924                                found = Err(MetaError::invalid_parameter(
1925                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1926                                ));
1927                                return;
1928                            }
1929                            node.rate_limit = rate_limit;
1930                            found = Ok(true);
1931                        }
1932                    });
1933                }
1934                found
1935            };
1936
1937        self.mutate_fragments_by_job_id(
1938            sink_id.as_job_id(),
1939            update_sink_rate_limit,
1940            "sink node not found",
1941        )
1942        .await
1943    }
1944
1945    pub async fn update_dml_rate_limit_by_job_id(
1946        &self,
1947        job_id: JobId,
1948        rate_limit: Option<u32>,
1949    ) -> MetaResult<HashSet<FragmentId>> {
1950        let update_dml_rate_limit =
1951            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1952                let mut found = false;
1953                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1954                    visit_stream_node_mut(stream_node, |node| {
1955                        if let PbNodeBody::Dml(node) = node {
1956                            node.rate_limit = rate_limit;
1957                            found = true;
1958                        }
1959                    });
1960                }
1961                Ok(found)
1962            };
1963
1964        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1965            .await
1966    }
1967
1968    pub async fn update_source_props_by_source_id(
1969        &self,
1970        source_id: SourceId,
1971        alter_props: BTreeMap<String, String>,
1972        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1973    ) -> MetaResult<WithOptionsSecResolved> {
1974        let inner = self.inner.read().await;
1975        let txn = inner.db.begin().await?;
1976
1977        let (source, _obj) = Source::find_by_id(source_id)
1978            .find_also_related(Object)
1979            .one(&txn)
1980            .await?
1981            .ok_or_else(|| {
1982                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1983            })?;
1984        let connector = source.with_properties.0.get_connector().unwrap();
1985        let is_shared_source = source.is_shared();
1986
1987        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1988        if !is_shared_source {
1989            // mv using non-shared source holds a copy of source in their fragments
1990            dep_source_job_ids = ObjectDependency::find()
1991                .select_only()
1992                .column(object_dependency::Column::UsedBy)
1993                .filter(object_dependency::Column::Oid.eq(source_id))
1994                .into_tuple()
1995                .all(&txn)
1996                .await?;
1997        }
1998
1999        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
2000        let prop_keys: Vec<String> = alter_props
2001            .keys()
2002            .chain(alter_secret_refs.keys())
2003            .cloned()
2004            .collect();
2005        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2006            &connector, &prop_keys,
2007        )?;
2008
2009        let mut options_with_secret = WithOptionsSecResolved::new(
2010            source.with_properties.0.clone(),
2011            source
2012                .secret_ref
2013                .map(|secret_ref| secret_ref.to_protobuf())
2014                .unwrap_or_default(),
2015        );
2016        let (to_add_secret_dep, to_remove_secret_dep) =
2017            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2018
2019        tracing::info!(
2020            "applying new properties to source: source_id={}, options_with_secret={:?}",
2021            source_id,
2022            options_with_secret
2023        );
2024        // check if the alter-ed props are valid for each Connector
2025        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2026        // todo: validate via source manager
2027
2028        let mut associate_table_id = None;
2029
2030        // can be source_id or table_id
2031        // if updating an associated source, the preferred_id is the table_id
2032        // otherwise, it is the source_id
2033        let mut preferred_id = source_id.as_object_id();
2034        let rewrite_sql = {
2035            let definition = source.definition.clone();
2036
2037            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2038                .map_err(|e| {
2039                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2040                        anyhow!(e).context("Failed to parse source definition SQL"),
2041                    )))
2042                })?
2043                .try_into()
2044                .unwrap();
2045
2046            /// Formats SQL options with secret values properly resolved
2047            ///
2048            /// This function processes configuration options that may contain sensitive data:
2049            /// - Plaintext options are directly converted to `SqlOption`
2050            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2051            ///   without exposing the actual secret value
2052            ///
2053            /// # Arguments
2054            /// * `txn` - Database transaction for retrieving secrets
2055            /// * `options_with_secret` - Container of options with both plaintext and secret values
2056            ///
2057            /// # Returns
2058            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2059            async fn format_with_option_secret_resolved(
2060                txn: &DatabaseTransaction,
2061                options_with_secret: &WithOptionsSecResolved,
2062            ) -> MetaResult<Vec<SqlOption>> {
2063                let mut options = Vec::new();
2064                for (k, v) in options_with_secret.as_plaintext() {
2065                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2066                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2067                    options.push(sql_option);
2068                }
2069                for (k, v) in options_with_secret.as_secret() {
2070                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2071                        let sql_option =
2072                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2073                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2074                        options.push(sql_option);
2075                    } else {
2076                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2077                    }
2078                }
2079                Ok(options)
2080            }
2081
2082            match &mut stmt {
2083                Statement::CreateSource { stmt } => {
2084                    stmt.with_properties.0 =
2085                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2086                }
2087                Statement::CreateTable { with_options, .. } => {
2088                    *with_options =
2089                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2090                    associate_table_id = source.optional_associated_table_id;
2091                    preferred_id = associate_table_id.unwrap().as_object_id();
2092                }
2093                _ => unreachable!(),
2094            }
2095
2096            stmt.to_string()
2097        };
2098
2099        {
2100            // update secret dependencies
2101            if !to_add_secret_dep.is_empty() {
2102                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2103                    object_dependency::ActiveModel {
2104                        oid: Set(secret_id.into()),
2105                        used_by: Set(preferred_id),
2106                        ..Default::default()
2107                    }
2108                }))
2109                .exec(&txn)
2110                .await?;
2111            }
2112            if !to_remove_secret_dep.is_empty() {
2113                // todo: fix the filter logic
2114                let _ = ObjectDependency::delete_many()
2115                    .filter(
2116                        object_dependency::Column::Oid
2117                            .is_in(to_remove_secret_dep)
2118                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2119                    )
2120                    .exec(&txn)
2121                    .await?;
2122            }
2123        }
2124
2125        let active_source_model = source::ActiveModel {
2126            source_id: Set(source_id),
2127            definition: Set(rewrite_sql.clone()),
2128            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2129            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2130                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2131            ..Default::default()
2132        };
2133        active_source_model.update(&txn).await?;
2134
2135        if let Some(associate_table_id) = associate_table_id {
2136            // update the associated table statement accordly
2137            let active_table_model = table::ActiveModel {
2138                table_id: Set(associate_table_id),
2139                definition: Set(rewrite_sql),
2140                ..Default::default()
2141            };
2142            active_table_model.update(&txn).await?;
2143        }
2144
2145        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2146            // if updating table with connector, the fragment_id is table id
2147            associate_table_id.as_job_id()
2148        } else {
2149            source_id.as_share_source_job_id()
2150        }]
2151        .into_iter()
2152        .chain(dep_source_job_ids.into_iter())
2153        .collect_vec();
2154
2155        // update fragments
2156        update_connector_props_fragments(
2157            &txn,
2158            to_check_job_ids,
2159            FragmentTypeFlag::Source,
2160            |node, found| {
2161                if let PbNodeBody::Source(node) = node
2162                    && let Some(source_inner) = &mut node.source_inner
2163                {
2164                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2165                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2166                    *found = true;
2167                }
2168            },
2169            is_shared_source,
2170        )
2171        .await?;
2172
2173        let mut to_update_objs = Vec::with_capacity(2);
2174        let (source, obj) = Source::find_by_id(source_id)
2175            .find_also_related(Object)
2176            .one(&txn)
2177            .await?
2178            .ok_or_else(|| {
2179                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2180            })?;
2181        to_update_objs.push(PbObject {
2182            object_info: Some(PbObjectInfo::Source(
2183                ObjectModel(source, obj.unwrap()).into(),
2184            )),
2185        });
2186
2187        if let Some(associate_table_id) = associate_table_id {
2188            let (table, obj) = Table::find_by_id(associate_table_id)
2189                .find_also_related(Object)
2190                .one(&txn)
2191                .await?
2192                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2193            to_update_objs.push(PbObject {
2194                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2195            });
2196        }
2197
2198        txn.commit().await?;
2199
2200        self.notify_frontend(
2201            NotificationOperation::Update,
2202            NotificationInfo::ObjectGroup(PbObjectGroup {
2203                objects: to_update_objs,
2204            }),
2205        )
2206        .await;
2207
2208        Ok(options_with_secret)
2209    }
2210
2211    pub async fn update_sink_props_by_sink_id(
2212        &self,
2213        sink_id: SinkId,
2214        props: BTreeMap<String, String>,
2215    ) -> MetaResult<HashMap<String, String>> {
2216        let inner = self.inner.read().await;
2217        let txn = inner.db.begin().await?;
2218
2219        let (sink, _obj) = Sink::find_by_id(sink_id)
2220            .find_also_related(Object)
2221            .one(&txn)
2222            .await?
2223            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2224        validate_sink_props(&sink, &props)?;
2225        let definition = sink.definition.clone();
2226        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2227            .map_err(|e| SinkError::Config(anyhow!(e)))?
2228            .try_into()
2229            .unwrap();
2230        if let Statement::CreateSink { stmt } = &mut stmt {
2231            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2232        } else {
2233            panic!("definition is not a create sink statement")
2234        }
2235        let mut new_config = sink.properties.clone().into_inner();
2236        new_config.extend(props.clone());
2237
2238        let definition = stmt.to_string();
2239        let active_sink = sink::ActiveModel {
2240            sink_id: Set(sink_id),
2241            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2242            definition: Set(definition),
2243            ..Default::default()
2244        };
2245        active_sink.update(&txn).await?;
2246
2247        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2248        let (sink, obj) = Sink::find_by_id(sink_id)
2249            .find_also_related(Object)
2250            .one(&txn)
2251            .await?
2252            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2253        txn.commit().await?;
2254        let relation_infos = vec![PbObject {
2255            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2256        }];
2257
2258        let _version = self
2259            .notify_frontend(
2260                NotificationOperation::Update,
2261                NotificationInfo::ObjectGroup(PbObjectGroup {
2262                    objects: relation_infos,
2263                }),
2264            )
2265            .await;
2266
2267        Ok(props.into_iter().collect())
2268    }
2269
2270    pub async fn update_iceberg_table_props_by_table_id(
2271        &self,
2272        table_id: TableId,
2273        props: BTreeMap<String, String>,
2274        alter_iceberg_table_props: Option<
2275            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2276        >,
2277    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2278        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2279            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2280        let inner = self.inner.read().await;
2281        let txn = inner.db.begin().await?;
2282
2283        let (sink, _obj) = Sink::find_by_id(sink_id)
2284            .find_also_related(Object)
2285            .one(&txn)
2286            .await?
2287            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2288        validate_sink_props(&sink, &props)?;
2289
2290        let definition = sink.definition.clone();
2291        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2292            .map_err(|e| SinkError::Config(anyhow!(e)))?
2293            .try_into()
2294            .unwrap();
2295        if let Statement::CreateTable {
2296            with_options,
2297            engine,
2298            ..
2299        } = &mut stmt
2300        {
2301            if !matches!(engine, Engine::Iceberg) {
2302                return Err(SinkError::Config(anyhow!(
2303                    "only iceberg table can be altered as sink"
2304                ))
2305                .into());
2306            }
2307            update_stmt_with_props(with_options, &props)?;
2308        } else {
2309            panic!("definition is not a create iceberg table statement")
2310        }
2311        let mut new_config = sink.properties.clone().into_inner();
2312        new_config.extend(props.clone());
2313
2314        let definition = stmt.to_string();
2315        let active_sink = sink::ActiveModel {
2316            sink_id: Set(sink_id),
2317            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2318            definition: Set(definition.clone()),
2319            ..Default::default()
2320        };
2321        let active_source = source::ActiveModel {
2322            source_id: Set(source_id),
2323            definition: Set(definition.clone()),
2324            ..Default::default()
2325        };
2326        let active_table = table::ActiveModel {
2327            table_id: Set(table_id),
2328            definition: Set(definition),
2329            ..Default::default()
2330        };
2331        active_sink.update(&txn).await?;
2332        active_source.update(&txn).await?;
2333        active_table.update(&txn).await?;
2334
2335        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2336
2337        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2338            .find_also_related(Object)
2339            .one(&txn)
2340            .await?
2341            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2342        let (source, source_obj) = Source::find_by_id(source_id)
2343            .find_also_related(Object)
2344            .one(&txn)
2345            .await?
2346            .ok_or_else(|| {
2347                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2348            })?;
2349        let (table, table_obj) = Table::find_by_id(table_id)
2350            .find_also_related(Object)
2351            .one(&txn)
2352            .await?
2353            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2354        txn.commit().await?;
2355        let relation_infos = vec![
2356            PbObject {
2357                object_info: Some(PbObjectInfo::Sink(
2358                    ObjectModel(sink, sink_obj.unwrap()).into(),
2359                )),
2360            },
2361            PbObject {
2362                object_info: Some(PbObjectInfo::Source(
2363                    ObjectModel(source, source_obj.unwrap()).into(),
2364                )),
2365            },
2366            PbObject {
2367                object_info: Some(PbObjectInfo::Table(
2368                    ObjectModel(table, table_obj.unwrap()).into(),
2369                )),
2370            },
2371        ];
2372        let _version = self
2373            .notify_frontend(
2374                NotificationOperation::Update,
2375                NotificationInfo::ObjectGroup(PbObjectGroup {
2376                    objects: relation_infos,
2377                }),
2378            )
2379            .await;
2380
2381        Ok((props.into_iter().collect(), sink_id))
2382    }
2383
2384    /// Update connection properties and all dependent sources/sinks in a single transaction
2385    pub async fn update_connection_and_dependent_objects_props(
2386        &self,
2387        connection_id: ConnectionId,
2388        alter_props: BTreeMap<String, String>,
2389        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2390    ) -> MetaResult<(
2391        WithOptionsSecResolved,                   // Connection's new properties
2392        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2393        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2394    )> {
2395        let inner = self.inner.read().await;
2396        let txn = inner.db.begin().await?;
2397
2398        // Find all dependent sources and sinks first
2399        let dependent_sources: Vec<SourceId> = Source::find()
2400            .select_only()
2401            .column(source::Column::SourceId)
2402            .filter(source::Column::ConnectionId.eq(connection_id))
2403            .into_tuple()
2404            .all(&txn)
2405            .await?;
2406
2407        let dependent_sinks: Vec<SinkId> = Sink::find()
2408            .select_only()
2409            .column(sink::Column::SinkId)
2410            .filter(sink::Column::ConnectionId.eq(connection_id))
2411            .into_tuple()
2412            .all(&txn)
2413            .await?;
2414
2415        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2416            .find_also_related(Object)
2417            .one(&txn)
2418            .await?
2419            .ok_or_else(|| {
2420                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2421            })?;
2422
2423        // Validate that props can be altered
2424        let prop_keys: Vec<String> = alter_props
2425            .keys()
2426            .chain(alter_secret_refs.keys())
2427            .cloned()
2428            .collect();
2429
2430        // Map the connection type enum to the string name expected by the validation function
2431        let connection_type_str = pb_connection_type_to_connection_type(
2432            &connection_catalog.params.to_protobuf().connection_type(),
2433        )
2434        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2435
2436        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2437            connection_type_str, &prop_keys,
2438        )?;
2439
2440        let connection_pb = connection_catalog.params.to_protobuf();
2441        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2442            connection_pb.properties.into_iter().collect(),
2443            connection_pb.secret_refs.into_iter().collect(),
2444        );
2445
2446        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2447            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2448
2449        tracing::debug!(
2450            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2451            connection_id,
2452            dependent_sources,
2453            dependent_sinks
2454        );
2455
2456        // Validate connection
2457        {
2458            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2459                connection_type: connection_pb.connection_type,
2460                properties: connection_options_with_secret
2461                    .as_plaintext()
2462                    .clone()
2463                    .into_iter()
2464                    .collect(),
2465                secret_refs: connection_options_with_secret
2466                    .as_secret()
2467                    .clone()
2468                    .into_iter()
2469                    .collect(),
2470            };
2471            let connection = PbConnection {
2472                id: connection_id as _,
2473                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2474                    conn_params_pb,
2475                )),
2476                ..Default::default()
2477            };
2478            validate_connection(&connection).await?;
2479        }
2480
2481        // Update connection secret dependencies
2482        if !to_add_secret_dep.is_empty() {
2483            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2484                object_dependency::ActiveModel {
2485                    oid: Set(secret_id.into()),
2486                    used_by: Set(connection_id.as_object_id()),
2487                    ..Default::default()
2488                }
2489            }))
2490            .exec(&txn)
2491            .await?;
2492        }
2493        if !to_remove_secret_dep.is_empty() {
2494            let _ = ObjectDependency::delete_many()
2495                .filter(
2496                    object_dependency::Column::Oid
2497                        .is_in(to_remove_secret_dep)
2498                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2499                )
2500                .exec(&txn)
2501                .await?;
2502        }
2503
2504        // Update the connection with new properties
2505        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2506            connection_type: connection_pb.connection_type,
2507            properties: connection_options_with_secret
2508                .as_plaintext()
2509                .clone()
2510                .into_iter()
2511                .collect(),
2512            secret_refs: connection_options_with_secret
2513                .as_secret()
2514                .clone()
2515                .into_iter()
2516                .collect(),
2517        };
2518        let active_connection_model = connection::ActiveModel {
2519            connection_id: Set(connection_id),
2520            params: Set(ConnectionParams::from(&updated_connection_params)),
2521            ..Default::default()
2522        };
2523        active_connection_model.update(&txn).await?;
2524
2525        // Batch update dependent sources and collect their complete properties
2526        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2527
2528        if !dependent_sources.is_empty() {
2529            // Batch fetch all dependent sources
2530            let sources_with_objs = Source::find()
2531                .find_also_related(Object)
2532                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2533                .all(&txn)
2534                .await?;
2535
2536            // Prepare batch updates
2537            let mut source_updates = Vec::new();
2538            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2539
2540            for (source, _obj) in sources_with_objs {
2541                let source_id = source.source_id;
2542
2543                let mut source_options_with_secret = WithOptionsSecResolved::new(
2544                    source.with_properties.0.clone(),
2545                    source
2546                        .secret_ref
2547                        .clone()
2548                        .map(|secret_ref| secret_ref.to_protobuf())
2549                        .unwrap_or_default(),
2550                );
2551                let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2552                    source_options_with_secret
2553                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2554
2555                // Validate the updated source properties
2556                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2557
2558                // Prepare source update
2559                let active_source = source::ActiveModel {
2560                    source_id: Set(source_id),
2561                    with_properties: Set(Property(
2562                        source_options_with_secret.as_plaintext().clone(),
2563                    )),
2564                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2565                        || {
2566                            risingwave_meta_model::SecretRef::from(
2567                                source_options_with_secret.as_secret().clone(),
2568                            )
2569                        },
2570                    )),
2571                    ..Default::default()
2572                };
2573                source_updates.push(active_source);
2574
2575                // Prepare fragment update:
2576                // - If the source is a table-associated source, update fragments for the table job.
2577                // - Otherwise update the shared source job.
2578                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2579                let is_shared_source = source.is_shared();
2580                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2581                if !is_shared_source {
2582                    dep_source_job_ids = ObjectDependency::find()
2583                        .select_only()
2584                        .column(object_dependency::Column::UsedBy)
2585                        .filter(object_dependency::Column::Oid.eq(source_id))
2586                        .into_tuple()
2587                        .all(&txn)
2588                        .await?;
2589                }
2590
2591                let base_job_id =
2592                    if let Some(associate_table_id) = source.optional_associated_table_id {
2593                        associate_table_id.as_job_id()
2594                    } else {
2595                        source_id.as_share_source_job_id()
2596                    };
2597                let job_ids = vec![base_job_id]
2598                    .into_iter()
2599                    .chain(dep_source_job_ids.into_iter())
2600                    .collect_vec();
2601
2602                fragment_updates.push(DependentSourceFragmentUpdate {
2603                    job_ids,
2604                    with_properties: source_options_with_secret.as_plaintext().clone(),
2605                    secret_refs: source_options_with_secret.as_secret().clone(),
2606                    is_shared_source,
2607                });
2608
2609                // Collect the complete properties for runtime broadcast
2610                let complete_source_props = LocalSecretManager::global()
2611                    .fill_secrets(
2612                        source_options_with_secret.as_plaintext().clone(),
2613                        source_options_with_secret.as_secret().clone(),
2614                    )
2615                    .map_err(MetaError::from)?
2616                    .into_iter()
2617                    .collect::<HashMap<String, String>>();
2618                updated_sources_with_props.push((source_id, complete_source_props));
2619            }
2620
2621            for source_update in source_updates {
2622                source_update.update(&txn).await?;
2623            }
2624
2625            // Batch execute fragment updates
2626            for DependentSourceFragmentUpdate {
2627                job_ids,
2628                with_properties,
2629                secret_refs,
2630                is_shared_source,
2631            } in fragment_updates
2632            {
2633                update_connector_props_fragments(
2634                    &txn,
2635                    job_ids,
2636                    FragmentTypeFlag::Source,
2637                    |node, found| {
2638                        if let PbNodeBody::Source(node) = node
2639                            && let Some(source_inner) = &mut node.source_inner
2640                        {
2641                            source_inner.with_properties = with_properties.clone();
2642                            source_inner.secret_refs = secret_refs.clone();
2643                            *found = true;
2644                        }
2645                    },
2646                    is_shared_source,
2647                )
2648                .await?;
2649            }
2650        }
2651
2652        // Batch update dependent sinks and collect their complete properties
2653        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2654
2655        if !dependent_sinks.is_empty() {
2656            // Batch fetch all dependent sinks
2657            let sinks_with_objs = Sink::find()
2658                .find_also_related(Object)
2659                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2660                .all(&txn)
2661                .await?;
2662
2663            // Prepare batch updates
2664            let mut sink_updates = Vec::new();
2665            let mut sink_fragment_updates = Vec::new();
2666
2667            for (sink, _obj) in sinks_with_objs {
2668                let sink_id = sink.sink_id;
2669
2670                // Validate that sink props can be altered
2671                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2672                    Some(connector) => {
2673                        let connector_type = connector.to_lowercase();
2674                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2675                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
2676
2677                        match_sink_name_str!(
2678                            connector_type.as_str(),
2679                            SinkType,
2680                            {
2681                                let mut new_sink_props = sink.properties.0.clone();
2682                                new_sink_props.extend(alter_props.clone());
2683                                SinkType::validate_alter_config(&new_sink_props)
2684                            },
2685                            |sink: &str| Err(SinkError::Config(anyhow!(
2686                                "unsupported sink type {}",
2687                                sink
2688                            )))
2689                        )?
2690                    }
2691                    None => {
2692                        return Err(SinkError::Config(anyhow!(
2693                            "connector not specified when alter sink"
2694                        ))
2695                        .into());
2696                    }
2697                };
2698
2699                let mut new_sink_props = sink.properties.0.clone();
2700                new_sink_props.extend(alter_props.clone());
2701
2702                // Prepare sink update
2703                let active_sink = sink::ActiveModel {
2704                    sink_id: Set(sink_id),
2705                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2706                    ..Default::default()
2707                };
2708                sink_updates.push(active_sink);
2709
2710                // Prepare fragment updates for this sink
2711                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2712
2713                // Collect the complete properties for runtime broadcast
2714                let complete_sink_props: HashMap<String, String> =
2715                    new_sink_props.into_iter().collect();
2716                updated_sinks_with_props.push((sink_id, complete_sink_props));
2717            }
2718
2719            // Batch execute sink updates
2720            for sink_update in sink_updates {
2721                sink_update.update(&txn).await?;
2722            }
2723
2724            // Batch execute sink fragment updates using the reusable function
2725            for (sink_id, new_sink_props) in sink_fragment_updates {
2726                update_connector_props_fragments(
2727                    &txn,
2728                    vec![sink_id.as_job_id()],
2729                    FragmentTypeFlag::Sink,
2730                    |node, found| {
2731                        if let PbNodeBody::Sink(node) = node
2732                            && let Some(sink_desc) = &mut node.sink_desc
2733                            && sink_desc.id == sink_id.as_raw_id()
2734                        {
2735                            sink_desc.properties = new_sink_props.clone();
2736                            *found = true;
2737                        }
2738                    },
2739                    true,
2740                )
2741                .await?;
2742            }
2743        }
2744
2745        // Collect all updated objects for frontend notification
2746        let mut updated_objects = Vec::new();
2747
2748        // Add connection
2749        let (connection, obj) = Connection::find_by_id(connection_id)
2750            .find_also_related(Object)
2751            .one(&txn)
2752            .await?
2753            .ok_or_else(|| {
2754                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2755            })?;
2756        updated_objects.push(PbObject {
2757            object_info: Some(PbObjectInfo::Connection(
2758                ObjectModel(connection, obj.unwrap()).into(),
2759            )),
2760        });
2761
2762        // Add sources
2763        for source_id in &dependent_sources {
2764            let (source, obj) = Source::find_by_id(*source_id)
2765                .find_also_related(Object)
2766                .one(&txn)
2767                .await?
2768                .ok_or_else(|| {
2769                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2770                })?;
2771            updated_objects.push(PbObject {
2772                object_info: Some(PbObjectInfo::Source(
2773                    ObjectModel(source, obj.unwrap()).into(),
2774                )),
2775            });
2776        }
2777
2778        // Add sinks
2779        for sink_id in &dependent_sinks {
2780            let (sink, obj) = Sink::find_by_id(*sink_id)
2781                .find_also_related(Object)
2782                .one(&txn)
2783                .await?
2784                .ok_or_else(|| {
2785                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2786                })?;
2787            updated_objects.push(PbObject {
2788                object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2789            });
2790        }
2791
2792        // Commit the transaction
2793        txn.commit().await?;
2794
2795        // Notify frontend about all updated objects
2796        if !updated_objects.is_empty() {
2797            self.notify_frontend(
2798                NotificationOperation::Update,
2799                NotificationInfo::ObjectGroup(PbObjectGroup {
2800                    objects: updated_objects,
2801                }),
2802            )
2803            .await;
2804        }
2805
2806        Ok((
2807            connection_options_with_secret,
2808            updated_sources_with_props,
2809            updated_sinks_with_props,
2810        ))
2811    }
2812
2813    pub async fn update_fragment_rate_limit_by_fragment_id(
2814        &self,
2815        fragment_id: FragmentId,
2816        rate_limit: Option<u32>,
2817    ) -> MetaResult<()> {
2818        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2819                                 stream_node: &mut PbStreamNode| {
2820            let mut found = false;
2821            if fragment_type_mask.contains_any(
2822                FragmentTypeFlag::dml_rate_limit_fragments()
2823                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2824                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2825                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2826            ) {
2827                visit_stream_node_mut(stream_node, |node| {
2828                    if let PbNodeBody::Dml(node) = node {
2829                        node.rate_limit = rate_limit;
2830                        found = true;
2831                    }
2832                    if let PbNodeBody::Sink(node) = node {
2833                        node.rate_limit = rate_limit;
2834                        found = true;
2835                    }
2836                    if let PbNodeBody::StreamCdcScan(node) = node {
2837                        node.rate_limit = rate_limit;
2838                        found = true;
2839                    }
2840                    if let PbNodeBody::StreamScan(node) = node {
2841                        node.rate_limit = rate_limit;
2842                        found = true;
2843                    }
2844                    if let PbNodeBody::SourceBackfill(node) = node {
2845                        node.rate_limit = rate_limit;
2846                        found = true;
2847                    }
2848                });
2849            }
2850            found
2851        };
2852        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2853            .await
2854    }
2855
2856    /// Note: `FsFetch` created in old versions are not included.
2857    /// Since this is only used for debugging, it should be fine.
2858    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2859        let inner = self.inner.read().await;
2860        let txn = inner.db.begin().await?;
2861
2862        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2863            .select_only()
2864            .columns([
2865                fragment::Column::FragmentId,
2866                fragment::Column::JobId,
2867                fragment::Column::FragmentTypeMask,
2868                fragment::Column::StreamNode,
2869            ])
2870            .filter(FragmentTypeMask::intersects_any(
2871                FragmentTypeFlag::rate_limit_fragments(),
2872            ))
2873            .into_tuple()
2874            .all(&txn)
2875            .await?;
2876
2877        let mut rate_limits = Vec::new();
2878        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2879            let stream_node = stream_node.to_protobuf();
2880            visit_stream_node_body(&stream_node, |node| {
2881                let mut rate_limit = None;
2882                let mut node_name = None;
2883
2884                match node {
2885                    // source rate limit
2886                    PbNodeBody::Source(node) => {
2887                        if let Some(node_inner) = &node.source_inner {
2888                            rate_limit = node_inner.rate_limit;
2889                            node_name = Some("SOURCE");
2890                        }
2891                    }
2892                    PbNodeBody::StreamFsFetch(node) => {
2893                        if let Some(node_inner) = &node.node_inner {
2894                            rate_limit = node_inner.rate_limit;
2895                            node_name = Some("FS_FETCH");
2896                        }
2897                    }
2898                    // backfill rate limit
2899                    PbNodeBody::SourceBackfill(node) => {
2900                        rate_limit = node.rate_limit;
2901                        node_name = Some("SOURCE_BACKFILL");
2902                    }
2903                    PbNodeBody::StreamScan(node) => {
2904                        rate_limit = node.rate_limit;
2905                        node_name = Some("STREAM_SCAN");
2906                    }
2907                    PbNodeBody::StreamCdcScan(node) => {
2908                        rate_limit = node.rate_limit;
2909                        node_name = Some("STREAM_CDC_SCAN");
2910                    }
2911                    PbNodeBody::Sink(node) => {
2912                        rate_limit = node.rate_limit;
2913                        node_name = Some("SINK");
2914                    }
2915                    _ => {}
2916                }
2917
2918                if let Some(rate_limit) = rate_limit {
2919                    rate_limits.push(RateLimitInfo {
2920                        fragment_id,
2921                        job_id,
2922                        fragment_type_mask: fragment_type_mask as u32,
2923                        rate_limit,
2924                        node_name: node_name.unwrap().to_owned(),
2925                    });
2926                }
2927            });
2928        }
2929
2930        Ok(rate_limits)
2931    }
2932}
2933
2934fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2935    // Validate that props can be altered
2936    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2937        Some(connector) => {
2938            let connector_type = connector.to_lowercase();
2939            let field_names: Vec<String> = props.keys().cloned().collect();
2940            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2941                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2942
2943            match_sink_name_str!(
2944                connector_type.as_str(),
2945                SinkType,
2946                {
2947                    let mut new_props = sink.properties.0.clone();
2948                    new_props.extend(props.clone());
2949                    SinkType::validate_alter_config(&new_props)
2950                },
2951                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2952            )?
2953        }
2954        None => {
2955            return Err(
2956                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2957            );
2958        }
2959    };
2960    Ok(())
2961}
2962
2963fn update_stmt_with_props(
2964    with_properties: &mut Vec<SqlOption>,
2965    props: &BTreeMap<String, String>,
2966) -> MetaResult<()> {
2967    let mut new_sql_options = with_properties
2968        .iter()
2969        .map(|sql_option| (&sql_option.name, sql_option))
2970        .collect::<IndexMap<_, _>>();
2971    let add_sql_options = props
2972        .iter()
2973        .map(|(k, v)| SqlOption::try_from((k, v)))
2974        .collect::<Result<Vec<SqlOption>, ParserError>>()
2975        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2976    new_sql_options.extend(
2977        add_sql_options
2978            .iter()
2979            .map(|sql_option| (&sql_option.name, sql_option)),
2980    );
2981    *with_properties = new_sql_options.into_values().cloned().collect();
2982    Ok(())
2983}
2984
2985async fn update_sink_fragment_props(
2986    txn: &DatabaseTransaction,
2987    sink_id: SinkId,
2988    props: BTreeMap<String, String>,
2989) -> MetaResult<()> {
2990    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2991        .select_only()
2992        .columns([
2993            fragment::Column::FragmentId,
2994            fragment::Column::FragmentTypeMask,
2995            fragment::Column::StreamNode,
2996        ])
2997        .filter(fragment::Column::JobId.eq(sink_id))
2998        .into_tuple()
2999        .all(txn)
3000        .await?;
3001    let fragments = fragments
3002        .into_iter()
3003        .filter(|(_, fragment_type_mask, _)| {
3004            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3005        })
3006        .filter_map(|(id, _, stream_node)| {
3007            let mut stream_node = stream_node.to_protobuf();
3008            let mut found = false;
3009            visit_stream_node_mut(&mut stream_node, |node| {
3010                if let PbNodeBody::Sink(node) = node
3011                    && let Some(sink_desc) = &mut node.sink_desc
3012                    && sink_desc.id == sink_id
3013                {
3014                    sink_desc.properties.extend(props.clone());
3015                    found = true;
3016                }
3017            });
3018            if found { Some((id, stream_node)) } else { None }
3019        })
3020        .collect_vec();
3021    assert!(
3022        !fragments.is_empty(),
3023        "sink id should be used by at least one fragment"
3024    );
3025    for (id, stream_node) in fragments {
3026        fragment::ActiveModel {
3027            fragment_id: Set(id),
3028            stream_node: Set(StreamNode::from(&stream_node)),
3029            ..Default::default()
3030        }
3031        .update(txn)
3032        .await?;
3033    }
3034    Ok(())
3035}
3036
3037pub struct SinkIntoTableContext {
3038    /// For alter table (e.g., add column), this is the list of existing sink ids
3039    /// otherwise empty.
3040    pub updated_sink_catalogs: Vec<SinkId>,
3041}
3042
3043pub struct FinishAutoRefreshSchemaSinkContext {
3044    pub tmp_sink_id: SinkId,
3045    pub original_sink_id: SinkId,
3046    pub columns: Vec<PbColumnCatalog>,
3047    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3048}
3049
3050async fn update_connector_props_fragments<F>(
3051    txn: &DatabaseTransaction,
3052    job_ids: Vec<JobId>,
3053    expect_flag: FragmentTypeFlag,
3054    mut alter_stream_node_fn: F,
3055    is_shared_source: bool,
3056) -> MetaResult<()>
3057where
3058    F: FnMut(&mut PbNodeBody, &mut bool),
3059{
3060    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3061        .select_only()
3062        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3063        .filter(
3064            fragment::Column::JobId
3065                .is_in(job_ids.clone())
3066                .and(FragmentTypeMask::intersects(expect_flag)),
3067        )
3068        .into_tuple()
3069        .all(txn)
3070        .await?;
3071    let fragments = fragments
3072        .into_iter()
3073        .filter_map(|(id, stream_node)| {
3074            let mut stream_node = stream_node.to_protobuf();
3075            let mut found = false;
3076            visit_stream_node_mut(&mut stream_node, |node| {
3077                alter_stream_node_fn(node, &mut found);
3078            });
3079            if found { Some((id, stream_node)) } else { None }
3080        })
3081        .collect_vec();
3082    if is_shared_source || job_ids.len() > 1 {
3083        // the first element is the source_id or associated table_id
3084        // if the source is non-shared, there is no updated fragments
3085        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3086        assert!(
3087            !fragments.is_empty(),
3088            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3089            job_ids,
3090            expect_flag
3091        );
3092    }
3093
3094    for (id, stream_node) in fragments {
3095        fragment::ActiveModel {
3096            fragment_id: Set(id),
3097            stream_node: Set(StreamNode::from(&stream_node)),
3098            ..Default::default()
3099        }
3100        .update(txn)
3101        .await?;
3102    }
3103
3104    Ok(())
3105}