risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::{
28    visit_stream_node_body, visit_stream_node_mut,
29};
30use risingwave_common::{bail, current_cluster_version};
31use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
32use risingwave_connector::connector_common::validate_connection;
33use risingwave_connector::error::ConnectorError;
34use risingwave_connector::sink::file_sink::fs::FsSink;
35use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
36use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
37use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
40use risingwave_meta_model::refresh_job::RefreshState;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::user_privilege::Action;
43use risingwave_meta_model::*;
44use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
45use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
46use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
47use risingwave_pb::meta::object::PbObjectInfo;
48use risingwave_pb::meta::subscribe_response::{
49    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
50};
51use risingwave_pb::meta::{PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::PbColumnCatalog;
53use risingwave_pb::plan_common::source_refresh_mode::{
54    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
55};
56use risingwave_pb::secret::PbSecretRef;
57use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
58use risingwave_pb::stream_plan::stream_node::PbNodeBody;
59use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
60use risingwave_pb::user::PbUserInfo;
61use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
62use risingwave_sqlparser::parser::{Parser, ParserError};
63use sea_orm::ActiveValue::Set;
64use sea_orm::sea_query::{Expr, Query, SimpleExpr};
65use sea_orm::{
66    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
67    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
68};
69use thiserror_ext::AsReport;
70
71use super::rename::IndexItemRewriter;
72use crate::barrier::{Command, SharedFragmentInfo};
73use crate::controller::ObjectModel;
74use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
75use crate::controller::fragment::FragmentTypeMaskExt;
76use crate::controller::utils::{
77    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
78    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
79    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
80    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
81    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
82};
83use crate::error::MetaErrorInner;
84use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
85use crate::model::{
86    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
87    StreamJobFragmentsToCreate,
88};
89use crate::stream::SplitAssignment;
90use crate::{MetaError, MetaResult};
91
92/// A planned fragment update for dependent sources when a connection's properties change.
93///
94/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
95/// `type_complexity` warnings.
96#[derive(Debug)]
97struct DependentSourceFragmentUpdate {
98    job_ids: Vec<JobId>,
99    with_properties: BTreeMap<String, String>,
100    secret_refs: BTreeMap<String, PbSecretRef>,
101    is_shared_source: bool,
102}
103
104impl CatalogController {
105    #[allow(clippy::too_many_arguments)]
106    pub async fn create_streaming_job_obj(
107        txn: &DatabaseTransaction,
108        obj_type: ObjectType,
109        owner_id: UserId,
110        database_id: Option<DatabaseId>,
111        schema_id: Option<SchemaId>,
112        create_type: PbCreateType,
113        ctx: StreamContext,
114        streaming_parallelism: StreamingParallelism,
115        max_parallelism: usize,
116        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
117        backfill_parallelism: Option<StreamingParallelism>,
118    ) -> MetaResult<JobId> {
119        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
120        let job_id = obj.oid.as_job_id();
121        let job = streaming_job::ActiveModel {
122            job_id: Set(job_id),
123            job_status: Set(JobStatus::Initial),
124            create_type: Set(create_type.into()),
125            timezone: Set(ctx.timezone),
126            config_override: Set(Some(ctx.config_override.to_string())),
127            parallelism: Set(streaming_parallelism),
128            backfill_parallelism: Set(backfill_parallelism),
129            max_parallelism: Set(max_parallelism as _),
130            specific_resource_group: Set(specific_resource_group),
131        };
132        job.insert(txn).await?;
133
134        Ok(job_id)
135    }
136
137    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
138    /// materialized view.
139    ///
140    /// Some of the fields in the given streaming job are placeholders, which will
141    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
142    #[await_tree::instrument]
143    pub async fn create_job_catalog(
144        &self,
145        streaming_job: &mut StreamingJob,
146        ctx: &StreamContext,
147        parallelism: &Option<Parallelism>,
148        max_parallelism: usize,
149        mut dependencies: HashSet<ObjectId>,
150        specific_resource_group: Option<String>,
151        backfill_parallelism: &Option<Parallelism>,
152    ) -> MetaResult<()> {
153        let inner = self.inner.write().await;
154        let txn = inner.db.begin().await?;
155        let create_type = streaming_job.create_type();
156
157        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
158            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
159            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
160            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
161        };
162        let backfill_parallelism = backfill_parallelism
163            .as_ref()
164            .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
165
166        ensure_user_id(streaming_job.owner() as _, &txn).await?;
167        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
168        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
169        check_relation_name_duplicate(
170            &streaming_job.name(),
171            streaming_job.database_id(),
172            streaming_job.schema_id(),
173            &txn,
174        )
175        .await?;
176
177        // check if any dependency is in altering status.
178        if !dependencies.is_empty() {
179            let altering_cnt = ObjectDependency::find()
180                .join(
181                    JoinType::InnerJoin,
182                    object_dependency::Relation::Object1.def(),
183                )
184                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
185                .filter(
186                    object_dependency::Column::Oid
187                        .is_in(dependencies.clone())
188                        .and(object::Column::ObjType.eq(ObjectType::Table))
189                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
190                        .and(
191                            // It means the referring table is just dummy for altering.
192                            object::Column::Oid.not_in_subquery(
193                                Query::select()
194                                    .column(table::Column::TableId)
195                                    .from(Table)
196                                    .to_owned(),
197                            ),
198                        ),
199                )
200                .count(&txn)
201                .await?;
202            if altering_cnt != 0 {
203                return Err(MetaError::permission_denied(
204                    "some dependent relations are being altered",
205                ));
206            }
207        }
208
209        match streaming_job {
210            StreamingJob::MaterializedView(table) => {
211                let job_id = Self::create_streaming_job_obj(
212                    &txn,
213                    ObjectType::Table,
214                    table.owner as _,
215                    Some(table.database_id),
216                    Some(table.schema_id),
217                    create_type,
218                    ctx.clone(),
219                    streaming_parallelism,
220                    max_parallelism,
221                    specific_resource_group,
222                    backfill_parallelism.clone(),
223                )
224                .await?;
225                table.id = job_id.as_mv_table_id();
226                let table_model: table::ActiveModel = table.clone().into();
227                Table::insert(table_model).exec(&txn).await?;
228            }
229            StreamingJob::Sink(sink) => {
230                if let Some(target_table_id) = sink.target_table
231                    && check_sink_into_table_cycle(
232                        target_table_id.into(),
233                        dependencies.iter().cloned().collect(),
234                        &txn,
235                    )
236                    .await?
237                {
238                    bail!("Creating such a sink will result in circular dependency.");
239                }
240
241                let job_id = Self::create_streaming_job_obj(
242                    &txn,
243                    ObjectType::Sink,
244                    sink.owner as _,
245                    Some(sink.database_id),
246                    Some(sink.schema_id),
247                    create_type,
248                    ctx.clone(),
249                    streaming_parallelism,
250                    max_parallelism,
251                    specific_resource_group,
252                    backfill_parallelism.clone(),
253                )
254                .await?;
255                sink.id = job_id.as_sink_id();
256                let sink_model: sink::ActiveModel = sink.clone().into();
257                Sink::insert(sink_model).exec(&txn).await?;
258            }
259            StreamingJob::Table(src, table, _) => {
260                let job_id = Self::create_streaming_job_obj(
261                    &txn,
262                    ObjectType::Table,
263                    table.owner as _,
264                    Some(table.database_id),
265                    Some(table.schema_id),
266                    create_type,
267                    ctx.clone(),
268                    streaming_parallelism,
269                    max_parallelism,
270                    specific_resource_group,
271                    backfill_parallelism.clone(),
272                )
273                .await?;
274                table.id = job_id.as_mv_table_id();
275                if let Some(src) = src {
276                    let src_obj = Self::create_object(
277                        &txn,
278                        ObjectType::Source,
279                        src.owner as _,
280                        Some(src.database_id),
281                        Some(src.schema_id),
282                    )
283                    .await?;
284                    src.id = src_obj.oid.as_source_id();
285                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
286                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
287                    let source: source::ActiveModel = src.clone().into();
288                    Source::insert(source).exec(&txn).await?;
289                }
290                let table_model: table::ActiveModel = table.clone().into();
291                Table::insert(table_model).exec(&txn).await?;
292
293                if table.refreshable {
294                    let trigger_interval_secs = src
295                        .as_ref()
296                        .and_then(|source_catalog| source_catalog.refresh_mode)
297                        .and_then(
298                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
299                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
300                                    refresh_interval_sec,
301                                })) => refresh_interval_sec,
302                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
303                                None => None,
304                            },
305                        );
306
307                    RefreshJob::insert(refresh_job::ActiveModel {
308                        table_id: Set(table.id),
309                        last_trigger_time: Set(None),
310                        trigger_interval_secs: Set(trigger_interval_secs),
311                        current_status: Set(RefreshState::Idle),
312                        last_success_time: Set(None),
313                    })
314                    .exec(&txn)
315                    .await?;
316                }
317            }
318            StreamingJob::Index(index, table) => {
319                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
320                let job_id = Self::create_streaming_job_obj(
321                    &txn,
322                    ObjectType::Index,
323                    index.owner as _,
324                    Some(index.database_id),
325                    Some(index.schema_id),
326                    create_type,
327                    ctx.clone(),
328                    streaming_parallelism,
329                    max_parallelism,
330                    specific_resource_group,
331                    backfill_parallelism.clone(),
332                )
333                .await?;
334                // to be compatible with old implementation.
335                index.id = job_id.as_index_id();
336                index.index_table_id = job_id.as_mv_table_id();
337                table.id = job_id.as_mv_table_id();
338
339                ObjectDependency::insert(object_dependency::ActiveModel {
340                    oid: Set(index.primary_table_id.into()),
341                    used_by: Set(table.id.into()),
342                    ..Default::default()
343                })
344                .exec(&txn)
345                .await?;
346
347                let table_model: table::ActiveModel = table.clone().into();
348                Table::insert(table_model).exec(&txn).await?;
349                let index_model: index::ActiveModel = index.clone().into();
350                Index::insert(index_model).exec(&txn).await?;
351            }
352            StreamingJob::Source(src) => {
353                let job_id = Self::create_streaming_job_obj(
354                    &txn,
355                    ObjectType::Source,
356                    src.owner as _,
357                    Some(src.database_id),
358                    Some(src.schema_id),
359                    create_type,
360                    ctx.clone(),
361                    streaming_parallelism,
362                    max_parallelism,
363                    specific_resource_group,
364                    backfill_parallelism.clone(),
365                )
366                .await?;
367                src.id = job_id.as_shared_source_id();
368                let source_model: source::ActiveModel = src.clone().into();
369                Source::insert(source_model).exec(&txn).await?;
370            }
371        }
372
373        // collect dependent secrets.
374        dependencies.extend(
375            streaming_job
376                .dependent_secret_ids()?
377                .into_iter()
378                .map(|id| id.as_object_id()),
379        );
380        // collect dependent connection
381        dependencies.extend(
382            streaming_job
383                .dependent_connection_ids()?
384                .into_iter()
385                .map(|id| id.as_object_id()),
386        );
387
388        // record object dependency.
389        if !dependencies.is_empty() {
390            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
391                object_dependency::ActiveModel {
392                    oid: Set(oid),
393                    used_by: Set(streaming_job.id().as_object_id()),
394                    ..Default::default()
395                }
396            }))
397            .exec(&txn)
398            .await?;
399        }
400
401        txn.commit().await?;
402
403        Ok(())
404    }
405
406    /// Create catalogs for internal tables, then notify frontend about them if the job is a
407    /// materialized view.
408    ///
409    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
410    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
411    ///
412    /// Returns a mapping from the temporary table id to the actual global table id.
413    pub async fn create_internal_table_catalog(
414        &self,
415        job: &StreamingJob,
416        mut incomplete_internal_tables: Vec<PbTable>,
417    ) -> MetaResult<HashMap<TableId, TableId>> {
418        let job_id = job.id();
419        let inner = self.inner.write().await;
420        let txn = inner.db.begin().await?;
421
422        // Ensure the job exists.
423        ensure_job_not_canceled(job_id, &txn).await?;
424
425        let mut table_id_map = HashMap::new();
426        for table in &mut incomplete_internal_tables {
427            let table_id = Self::create_object(
428                &txn,
429                ObjectType::Table,
430                table.owner as _,
431                Some(table.database_id),
432                Some(table.schema_id),
433            )
434            .await?
435            .oid
436            .as_table_id();
437            table_id_map.insert(table.id, table_id);
438            table.id = table_id;
439            table.job_id = Some(job_id);
440
441            let table_model = table::ActiveModel {
442                table_id: Set(table_id),
443                belongs_to_job_id: Set(Some(job_id)),
444                fragment_id: NotSet,
445                ..table.clone().into()
446            };
447            Table::insert(table_model).exec(&txn).await?;
448        }
449        txn.commit().await?;
450
451        Ok(table_id_map)
452    }
453
454    pub async fn prepare_stream_job_fragments(
455        &self,
456        stream_job_fragments: &StreamJobFragmentsToCreate,
457        streaming_job: &StreamingJob,
458        for_replace: bool,
459    ) -> MetaResult<()> {
460        self.prepare_streaming_job(
461            stream_job_fragments.stream_job_id(),
462            || stream_job_fragments.fragments.values(),
463            &stream_job_fragments.downstreams,
464            for_replace,
465            Some(streaming_job),
466        )
467        .await
468    }
469
470    // TODO: In this function, we also update the `Table` model in the meta store.
471    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
472    // making them the source of truth and performing a full replacement for those in the meta store?
473    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
474    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
475    )]
476    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
477        &self,
478        job_id: JobId,
479        get_fragments: impl Fn() -> I + 'a,
480        downstreams: &FragmentDownstreamRelation,
481        for_replace: bool,
482        creating_streaming_job: Option<&'a StreamingJob>,
483    ) -> MetaResult<()> {
484        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
485
486        let inner = self.inner.write().await;
487
488        let need_notify = creating_streaming_job
489            .map(|job| job.should_notify_creating())
490            .unwrap_or(false);
491        let definition = creating_streaming_job.map(|job| job.definition());
492
493        let mut objects_to_notify = vec![];
494        let txn = inner.db.begin().await?;
495
496        // Ensure the job exists.
497        ensure_job_not_canceled(job_id, &txn).await?;
498
499        for fragment in fragments {
500            let fragment_id = fragment.fragment_id;
501            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
502
503            let fragment = fragment.into_active_model();
504            Fragment::insert(fragment).exec(&txn).await?;
505
506            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
507            // After table fragments are created, update them for all tables.
508            if !for_replace {
509                let all_tables = StreamJobFragments::collect_tables(get_fragments());
510                for state_table_id in state_table_ids {
511                    // Table's vnode count is not always the fragment's vnode count, so we have to
512                    // look up the table from `TableFragments`.
513                    // See `ActorGraphBuilder::new`.
514                    let table = all_tables
515                        .get(&state_table_id)
516                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
517                    assert_eq!(table.id, state_table_id);
518                    assert_eq!(table.fragment_id, fragment_id);
519                    let vnode_count = table.vnode_count();
520
521                    table::ActiveModel {
522                        table_id: Set(state_table_id as _),
523                        fragment_id: Set(Some(fragment_id)),
524                        vnode_count: Set(vnode_count as _),
525                        ..Default::default()
526                    }
527                    .update(&txn)
528                    .await?;
529
530                    if need_notify {
531                        let mut table = table.clone();
532                        // In production, definition was replaced but still needed for notification.
533                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
534                            table.definition = definition.clone().unwrap();
535                        }
536                        objects_to_notify.push(PbObject {
537                            object_info: Some(PbObjectInfo::Table(table)),
538                        });
539                    }
540                }
541            }
542        }
543
544        // Add streaming job objects to notification
545        if need_notify {
546            match creating_streaming_job.unwrap() {
547                StreamingJob::Table(Some(source), ..) => {
548                    objects_to_notify.push(PbObject {
549                        object_info: Some(PbObjectInfo::Source(source.clone())),
550                    });
551                }
552                StreamingJob::Sink(sink) => {
553                    objects_to_notify.push(PbObject {
554                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
555                    });
556                }
557                StreamingJob::Index(index, _) => {
558                    objects_to_notify.push(PbObject {
559                        object_info: Some(PbObjectInfo::Index(index.clone())),
560                    });
561                }
562                _ => {}
563            }
564        }
565
566        insert_fragment_relations(&txn, downstreams).await?;
567
568        if !for_replace {
569            // Update dml fragment id.
570            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
571                Table::update(table::ActiveModel {
572                    table_id: Set(table.id),
573                    dml_fragment_id: Set(table.dml_fragment_id),
574                    ..Default::default()
575                })
576                .exec(&txn)
577                .await?;
578            }
579        }
580
581        txn.commit().await?;
582
583        // FIXME: there's a gap between the catalog creation and notification, which may lead to
584        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
585        // need to either forward the notification or filter catalogs without any fragments in the metastore.
586        if !objects_to_notify.is_empty() {
587            self.notify_frontend(
588                Operation::Add,
589                Info::ObjectGroup(PbObjectGroup {
590                    objects: objects_to_notify,
591                }),
592            )
593            .await;
594        }
595
596        Ok(())
597    }
598
599    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
600    /// information is required to build barrier mutation.
601    pub async fn build_cancel_command(
602        &self,
603        table_fragments: &StreamJobFragments,
604    ) -> MetaResult<Command> {
605        let inner = self.inner.read().await;
606        let txn = inner.db.begin().await?;
607
608        let dropped_sink_fragment_with_target =
609            if let Some(sink_fragment) = table_fragments.sink_fragment() {
610                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
611                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
612                sink_target_fragment
613                    .get(&sink_fragment_id)
614                    .map(|target_fragments| {
615                        let target_fragment_id = *target_fragments
616                            .first()
617                            .expect("sink should have at least one downstream fragment");
618                        (sink_fragment_id, target_fragment_id)
619                    })
620            } else {
621                None
622            };
623
624        Ok(Command::DropStreamingJobs {
625            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
626            actors: table_fragments.actor_ids().collect(),
627            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
628            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
629            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
630                .into_iter()
631                .map(|(sink, target)| (target as _, vec![sink as _]))
632                .collect(),
633        })
634    }
635
636    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
637    /// It returns (true, _) if the job is not found or aborted.
638    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
639    #[await_tree::instrument]
640    pub async fn try_abort_creating_streaming_job(
641        &self,
642        mut job_id: JobId,
643        is_cancelled: bool,
644    ) -> MetaResult<(bool, Option<DatabaseId>)> {
645        let mut inner = self.inner.write().await;
646        let txn = inner.db.begin().await?;
647
648        let obj = Object::find_by_id(job_id).one(&txn).await?;
649        let Some(obj) = obj else {
650            tracing::warn!(
651                id = %job_id,
652                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
653            );
654            return Ok((true, None));
655        };
656        let database_id = obj
657            .database_id
658            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
659        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
660
661        if !is_cancelled && let Some(streaming_job) = &streaming_job {
662            assert_ne!(streaming_job.job_status, JobStatus::Created);
663            if streaming_job.create_type == CreateType::Background
664                && streaming_job.job_status == JobStatus::Creating
665            {
666                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
667                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
668                {
669                    // If the job belongs to an iceberg table, we still need to clean it.
670                } else {
671                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
672                    tracing::warn!(
673                        id = %job_id,
674                        "streaming job is created in background and still in creating status"
675                    );
676                    return Ok((false, Some(database_id)));
677                }
678            }
679        }
680
681        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
682        let original_job_id = job_id;
683        let original_obj_type = obj.obj_type;
684
685        let iceberg_table_id =
686            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
687        if let Some(iceberg_table_id) = iceberg_table_id {
688            // If the job is iceberg sink, we need to clean the iceberg table as well.
689            // Here we will drop the sink objects directly.
690            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
691            Object::delete_many()
692                .filter(
693                    object::Column::Oid
694                        .eq(job_id)
695                        .or(object::Column::Oid.is_in(internal_tables)),
696                )
697                .exec(&txn)
698                .await?;
699            job_id = iceberg_table_id.as_job_id();
700        };
701
702        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
703
704        // Get the notification info if the job is a materialized view or created in the background.
705        let mut objs = vec![];
706        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
707
708        let mut need_notify =
709            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
710        if !need_notify {
711            if let Some(table) = &table_obj {
712                need_notify = table.table_type == TableType::MaterializedView;
713            } else if original_obj_type == ObjectType::Sink {
714                need_notify = true;
715            }
716        }
717
718        if is_cancelled {
719            let dropped_tables = Table::find()
720                .find_also_related(Object)
721                .filter(
722                    table::Column::TableId.is_in(
723                        internal_table_ids
724                            .iter()
725                            .cloned()
726                            .chain(table_obj.iter().map(|t| t.table_id as _)),
727                    ),
728                )
729                .all(&txn)
730                .await?
731                .into_iter()
732                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
733            inner
734                .dropped_tables
735                .extend(dropped_tables.map(|t| (t.id, t)));
736        }
737
738        if need_notify {
739            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
740            // Ensure we still notify the frontend to delete the original sink object.
741            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
742                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
743                    .select_only()
744                    .columns([
745                        object::Column::Oid,
746                        object::Column::ObjType,
747                        object::Column::SchemaId,
748                        object::Column::DatabaseId,
749                    ])
750                    .into_partial_model()
751                    .one(&txn)
752                    .await?;
753                if let Some(orig_obj) = orig_obj {
754                    objs.push(orig_obj);
755                }
756            }
757
758            let obj: Option<PartialObject> = Object::find_by_id(job_id)
759                .select_only()
760                .columns([
761                    object::Column::Oid,
762                    object::Column::ObjType,
763                    object::Column::SchemaId,
764                    object::Column::DatabaseId,
765                ])
766                .into_partial_model()
767                .one(&txn)
768                .await?;
769            let obj =
770                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
771            objs.push(obj);
772            let internal_table_objs: Vec<PartialObject> = Object::find()
773                .select_only()
774                .columns([
775                    object::Column::Oid,
776                    object::Column::ObjType,
777                    object::Column::SchemaId,
778                    object::Column::DatabaseId,
779                ])
780                .join(JoinType::InnerJoin, object::Relation::Table.def())
781                .filter(table::Column::BelongsToJobId.eq(job_id))
782                .into_partial_model()
783                .all(&txn)
784                .await?;
785            objs.extend(internal_table_objs);
786        }
787
788        // Check if the job is creating sink into table.
789        if table_obj.is_none()
790            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
791                .select_only()
792                .column(sink::Column::TargetTable)
793                .into_tuple::<Option<TableId>>()
794                .one(&txn)
795                .await?
796        {
797            let tmp_id: Option<ObjectId> = ObjectDependency::find()
798                .select_only()
799                .column(object_dependency::Column::UsedBy)
800                .join(
801                    JoinType::InnerJoin,
802                    object_dependency::Relation::Object1.def(),
803                )
804                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
805                .filter(
806                    object_dependency::Column::Oid
807                        .eq(target_table_id)
808                        .and(object::Column::ObjType.eq(ObjectType::Table))
809                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
810                )
811                .into_tuple()
812                .one(&txn)
813                .await?;
814            if let Some(tmp_id) = tmp_id {
815                tracing::warn!(
816                    id = %tmp_id,
817                    "aborting temp streaming job for sink into table"
818                );
819
820                Object::delete_by_id(tmp_id).exec(&txn).await?;
821            }
822        }
823
824        Object::delete_by_id(job_id).exec(&txn).await?;
825        if !internal_table_ids.is_empty() {
826            Object::delete_many()
827                .filter(object::Column::Oid.is_in(internal_table_ids))
828                .exec(&txn)
829                .await?;
830        }
831        if let Some(t) = &table_obj
832            && let Some(source_id) = t.optional_associated_source_id
833        {
834            Object::delete_by_id(source_id).exec(&txn).await?;
835        }
836
837        let err = if is_cancelled {
838            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
839        } else {
840            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
841        };
842        let abort_reason = format!("streaming job aborted {}", err.as_report());
843        for tx in inner
844            .creating_table_finish_notifier
845            .get_mut(&database_id)
846            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
847            .into_iter()
848            .flatten()
849            .flatten()
850        {
851            let _ = tx.send(Err(abort_reason.clone()));
852        }
853        txn.commit().await?;
854
855        if !objs.is_empty() {
856            // We also have notified the frontend about these objects,
857            // so we need to notify the frontend to delete them here.
858            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
859                .await;
860        }
861        Ok((true, Some(database_id)))
862    }
863
864    #[await_tree::instrument]
865    pub async fn post_collect_job_fragments(
866        &self,
867        job_id: JobId,
868        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
869        new_sink_downstream: Option<FragmentDownstreamRelation>,
870        split_assignment: Option<&SplitAssignment>,
871    ) -> MetaResult<()> {
872        let inner = self.inner.write().await;
873        let txn = inner.db.begin().await?;
874
875        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
876
877        if let Some(new_downstream) = new_sink_downstream {
878            insert_fragment_relations(&txn, &new_downstream).await?;
879        }
880
881        // Mark job as CREATING.
882        streaming_job::ActiveModel {
883            job_id: Set(job_id),
884            job_status: Set(JobStatus::Creating),
885            ..Default::default()
886        }
887        .update(&txn)
888        .await?;
889
890        if let Some(split_assignment) = split_assignment {
891            let fragment_splits = split_assignment
892                .iter()
893                .map(|(fragment_id, splits)| {
894                    (
895                        *fragment_id as _,
896                        splits.values().flatten().cloned().collect_vec(),
897                    )
898                })
899                .collect();
900
901            self.update_fragment_splits(&txn, &fragment_splits).await?;
902        }
903
904        txn.commit().await?;
905
906        Ok(())
907    }
908
909    pub async fn create_job_catalog_for_replace(
910        &self,
911        streaming_job: &StreamingJob,
912        ctx: Option<&StreamContext>,
913        specified_parallelism: Option<&NonZeroUsize>,
914        expected_original_max_parallelism: Option<usize>,
915    ) -> MetaResult<JobId> {
916        let id = streaming_job.id();
917        let inner = self.inner.write().await;
918        let txn = inner.db.begin().await?;
919
920        // 1. check version.
921        streaming_job.verify_version_for_replace(&txn).await?;
922        // 2. check concurrent replace.
923        let referring_cnt = ObjectDependency::find()
924            .join(
925                JoinType::InnerJoin,
926                object_dependency::Relation::Object1.def(),
927            )
928            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
929            .filter(
930                object_dependency::Column::Oid
931                    .eq(id)
932                    .and(object::Column::ObjType.eq(ObjectType::Table))
933                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
934            )
935            .count(&txn)
936            .await?;
937        if referring_cnt != 0 {
938            return Err(MetaError::permission_denied(
939                "job is being altered or referenced by some creating jobs",
940            ));
941        }
942
943        // 3. check parallelism.
944        let (original_max_parallelism, original_timezone, original_config_override): (
945            i32,
946            Option<String>,
947            Option<String>,
948        ) = StreamingJobModel::find_by_id(id)
949            .select_only()
950            .column(streaming_job::Column::MaxParallelism)
951            .column(streaming_job::Column::Timezone)
952            .column(streaming_job::Column::ConfigOverride)
953            .into_tuple()
954            .one(&txn)
955            .await?
956            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
957
958        if let Some(max_parallelism) = expected_original_max_parallelism
959            && original_max_parallelism != max_parallelism as i32
960        {
961            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
962            // This should not happen in normal cases.
963            bail!(
964                "cannot use a different max parallelism \
965                 when replacing streaming job, \
966                 original: {}, new: {}",
967                original_max_parallelism,
968                max_parallelism
969            );
970        }
971
972        let parallelism = match specified_parallelism {
973            None => StreamingParallelism::Adaptive,
974            Some(n) => StreamingParallelism::Fixed(n.get() as _),
975        };
976
977        let ctx = StreamContext {
978            timezone: ctx
979                .map(|ctx| ctx.timezone.clone())
980                .unwrap_or(original_timezone),
981            // We don't expect replacing a job with a different config override.
982            // Thus always use the original config override.
983            config_override: original_config_override.unwrap_or_default().into(),
984        };
985
986        // 4. create streaming object for new replace table.
987        let new_obj_id = Self::create_streaming_job_obj(
988            &txn,
989            streaming_job.object_type(),
990            streaming_job.owner() as _,
991            Some(streaming_job.database_id() as _),
992            Some(streaming_job.schema_id() as _),
993            streaming_job.create_type(),
994            ctx,
995            parallelism,
996            original_max_parallelism as _,
997            None,
998            None,
999        )
1000        .await?;
1001
1002        // 5. record dependency for new replace table.
1003        ObjectDependency::insert(object_dependency::ActiveModel {
1004            oid: Set(id.as_object_id()),
1005            used_by: Set(new_obj_id.as_object_id()),
1006            ..Default::default()
1007        })
1008        .exec(&txn)
1009        .await?;
1010
1011        txn.commit().await?;
1012
1013        Ok(new_obj_id)
1014    }
1015
1016    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1017    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1018        let mut inner = self.inner.write().await;
1019        let txn = inner.db.begin().await?;
1020
1021        // Check if the job belongs to iceberg table.
1022        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1023            tracing::info!(
1024                "streaming job {} is for iceberg table, wait for manual finish operation",
1025                job_id
1026            );
1027            return Ok(());
1028        }
1029
1030        let (notification_op, objects, updated_user_info) =
1031            Self::finish_streaming_job_inner(&txn, job_id).await?;
1032
1033        txn.commit().await?;
1034
1035        let mut version = self
1036            .notify_frontend(
1037                notification_op,
1038                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1039            )
1040            .await;
1041
1042        // notify users about the default privileges
1043        if !updated_user_info.is_empty() {
1044            version = self.notify_users_update(updated_user_info).await;
1045        }
1046
1047        inner
1048            .creating_table_finish_notifier
1049            .values_mut()
1050            .for_each(|creating_tables| {
1051                if let Some(txs) = creating_tables.remove(&job_id) {
1052                    for tx in txs {
1053                        let _ = tx.send(Ok(version));
1054                    }
1055                }
1056            });
1057
1058        Ok(())
1059    }
1060
1061    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1062    pub async fn finish_streaming_job_inner(
1063        txn: &DatabaseTransaction,
1064        job_id: JobId,
1065    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1066        let job_type = Object::find_by_id(job_id)
1067            .select_only()
1068            .column(object::Column::ObjType)
1069            .into_tuple()
1070            .one(txn)
1071            .await?
1072            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1073
1074        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1075            .select_only()
1076            .column(streaming_job::Column::CreateType)
1077            .into_tuple()
1078            .one(txn)
1079            .await?
1080            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1081
1082        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1083        let res = Object::update_many()
1084            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1085            .col_expr(
1086                object::Column::CreatedAtClusterVersion,
1087                current_cluster_version().into(),
1088            )
1089            .filter(object::Column::Oid.eq(job_id))
1090            .exec(txn)
1091            .await?;
1092        if res.rows_affected == 0 {
1093            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1094        }
1095
1096        // mark the target stream job as `Created`.
1097        let job = streaming_job::ActiveModel {
1098            job_id: Set(job_id),
1099            job_status: Set(JobStatus::Created),
1100            ..Default::default()
1101        };
1102        job.update(txn).await?;
1103
1104        // notify frontend: job, internal tables.
1105        let internal_table_objs = Table::find()
1106            .find_also_related(Object)
1107            .filter(table::Column::BelongsToJobId.eq(job_id))
1108            .all(txn)
1109            .await?;
1110        let mut objects = internal_table_objs
1111            .iter()
1112            .map(|(table, obj)| PbObject {
1113                object_info: Some(PbObjectInfo::Table(
1114                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1115                )),
1116            })
1117            .collect_vec();
1118        let mut notification_op = if create_type == CreateType::Background {
1119            NotificationOperation::Update
1120        } else {
1121            NotificationOperation::Add
1122        };
1123        let mut updated_user_info = vec![];
1124        let mut need_grant_default_privileges = true;
1125
1126        match job_type {
1127            ObjectType::Table => {
1128                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1129                    .find_also_related(Object)
1130                    .one(txn)
1131                    .await?
1132                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1133                if table.table_type == TableType::MaterializedView {
1134                    notification_op = NotificationOperation::Update;
1135                }
1136
1137                if let Some(source_id) = table.optional_associated_source_id {
1138                    let (src, obj) = Source::find_by_id(source_id)
1139                        .find_also_related(Object)
1140                        .one(txn)
1141                        .await?
1142                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1143                    objects.push(PbObject {
1144                        object_info: Some(PbObjectInfo::Source(
1145                            ObjectModel(src, obj.unwrap()).into(),
1146                        )),
1147                    });
1148                }
1149                objects.push(PbObject {
1150                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1151                });
1152            }
1153            ObjectType::Sink => {
1154                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1155                    .find_also_related(Object)
1156                    .one(txn)
1157                    .await?
1158                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1159                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1160                    need_grant_default_privileges = false;
1161                }
1162                // If sinks were pre-notified during CREATING, we should use Update at finish
1163                // to avoid duplicate Add notifications (align with MV behavior).
1164                notification_op = NotificationOperation::Update;
1165                objects.push(PbObject {
1166                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1167                });
1168            }
1169            ObjectType::Index => {
1170                need_grant_default_privileges = false;
1171                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1172                    .find_also_related(Object)
1173                    .one(txn)
1174                    .await?
1175                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1176                {
1177                    let (table, obj) = Table::find_by_id(index.index_table_id)
1178                        .find_also_related(Object)
1179                        .one(txn)
1180                        .await?
1181                        .ok_or_else(|| {
1182                            MetaError::catalog_id_not_found("table", index.index_table_id)
1183                        })?;
1184                    objects.push(PbObject {
1185                        object_info: Some(PbObjectInfo::Table(
1186                            ObjectModel(table, obj.unwrap()).into(),
1187                        )),
1188                    });
1189                }
1190
1191                // If the index is created on a table with privileges, we should also
1192                // grant the privileges for the index and its state tables.
1193                let primary_table_privileges = UserPrivilege::find()
1194                    .filter(
1195                        user_privilege::Column::Oid
1196                            .eq(index.primary_table_id)
1197                            .and(user_privilege::Column::Action.eq(Action::Select)),
1198                    )
1199                    .all(txn)
1200                    .await?;
1201                if !primary_table_privileges.is_empty() {
1202                    let index_state_table_ids: Vec<TableId> = Table::find()
1203                        .select_only()
1204                        .column(table::Column::TableId)
1205                        .filter(
1206                            table::Column::BelongsToJobId
1207                                .eq(job_id)
1208                                .or(table::Column::TableId.eq(index.index_table_id)),
1209                        )
1210                        .into_tuple()
1211                        .all(txn)
1212                        .await?;
1213                    let mut new_privileges = vec![];
1214                    for privilege in &primary_table_privileges {
1215                        for state_table_id in &index_state_table_ids {
1216                            new_privileges.push(user_privilege::ActiveModel {
1217                                id: Default::default(),
1218                                oid: Set(state_table_id.as_object_id()),
1219                                user_id: Set(privilege.user_id),
1220                                action: Set(Action::Select),
1221                                dependent_id: Set(privilege.dependent_id),
1222                                granted_by: Set(privilege.granted_by),
1223                                with_grant_option: Set(privilege.with_grant_option),
1224                            });
1225                        }
1226                    }
1227                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1228
1229                    updated_user_info = list_user_info_by_ids(
1230                        primary_table_privileges.into_iter().map(|p| p.user_id),
1231                        txn,
1232                    )
1233                    .await?;
1234                }
1235
1236                objects.push(PbObject {
1237                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1238                });
1239            }
1240            ObjectType::Source => {
1241                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1242                    .find_also_related(Object)
1243                    .one(txn)
1244                    .await?
1245                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1246                objects.push(PbObject {
1247                    object_info: Some(PbObjectInfo::Source(
1248                        ObjectModel(source, obj.unwrap()).into(),
1249                    )),
1250                });
1251            }
1252            _ => unreachable!("invalid job type: {:?}", job_type),
1253        }
1254
1255        if need_grant_default_privileges {
1256            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1257        }
1258
1259        Ok((notification_op, objects, updated_user_info))
1260    }
1261
1262    pub async fn finish_replace_streaming_job(
1263        &self,
1264        tmp_id: JobId,
1265        streaming_job: StreamingJob,
1266        replace_upstream: FragmentReplaceUpstream,
1267        sink_into_table_context: SinkIntoTableContext,
1268        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1269        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1270    ) -> MetaResult<NotificationVersion> {
1271        let inner = self.inner.write().await;
1272        let txn = inner.db.begin().await?;
1273
1274        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1275            tmp_id,
1276            replace_upstream,
1277            sink_into_table_context,
1278            &txn,
1279            streaming_job,
1280            drop_table_connector_ctx,
1281            auto_refresh_schema_sinks,
1282        )
1283        .await?;
1284
1285        txn.commit().await?;
1286
1287        let mut version = self
1288            .notify_frontend(
1289                NotificationOperation::Update,
1290                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1291            )
1292            .await;
1293
1294        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1295            self.notify_users_update(user_infos).await;
1296            version = self
1297                .notify_frontend(
1298                    NotificationOperation::Delete,
1299                    build_object_group_for_delete(to_drop_objects),
1300                )
1301                .await;
1302        }
1303
1304        Ok(version)
1305    }
1306
1307    pub async fn finish_replace_streaming_job_inner(
1308        tmp_id: JobId,
1309        replace_upstream: FragmentReplaceUpstream,
1310        SinkIntoTableContext {
1311            updated_sink_catalogs,
1312        }: SinkIntoTableContext,
1313        txn: &DatabaseTransaction,
1314        streaming_job: StreamingJob,
1315        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1316        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1317    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1318        let original_job_id = streaming_job.id();
1319        let job_type = streaming_job.job_type();
1320
1321        let mut index_item_rewriter = None;
1322
1323        // Update catalog
1324        match streaming_job {
1325            StreamingJob::Table(_source, table, _table_job_type) => {
1326                // The source catalog should remain unchanged
1327
1328                let original_column_catalogs =
1329                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1330
1331                index_item_rewriter = Some({
1332                    let original_columns = original_column_catalogs
1333                        .to_protobuf()
1334                        .into_iter()
1335                        .map(|c| c.column_desc.unwrap())
1336                        .collect_vec();
1337                    let new_columns = table
1338                        .columns
1339                        .iter()
1340                        .map(|c| c.column_desc.clone().unwrap())
1341                        .collect_vec();
1342
1343                    IndexItemRewriter {
1344                        original_columns,
1345                        new_columns,
1346                    }
1347                });
1348
1349                // For sinks created in earlier versions, we need to set the original_target_columns.
1350                for sink_id in updated_sink_catalogs {
1351                    sink::ActiveModel {
1352                        sink_id: Set(sink_id as _),
1353                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1354                        ..Default::default()
1355                    }
1356                    .update(txn)
1357                    .await?;
1358                }
1359                // Update the table catalog with the new one. (column catalog is also updated here)
1360                let mut table = table::ActiveModel::from(table);
1361                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1362                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1363                {
1364                    // drop table connector, the rest logic is in `drop_table_associated_source`
1365                    table.optional_associated_source_id = Set(None);
1366                }
1367
1368                table.update(txn).await?;
1369            }
1370            StreamingJob::Source(source) => {
1371                // Update the source catalog with the new one.
1372                let source = source::ActiveModel::from(source);
1373                source.update(txn).await?;
1374            }
1375            StreamingJob::MaterializedView(table) => {
1376                // Update the table catalog with the new one.
1377                let table = table::ActiveModel::from(table);
1378                table.update(txn).await?;
1379            }
1380            _ => unreachable!(
1381                "invalid streaming job type: {:?}",
1382                streaming_job.job_type_str()
1383            ),
1384        }
1385
1386        async fn finish_fragments(
1387            txn: &DatabaseTransaction,
1388            tmp_id: JobId,
1389            original_job_id: JobId,
1390            replace_upstream: FragmentReplaceUpstream,
1391        ) -> MetaResult<()> {
1392            // 0. update internal tables
1393            // Fields including `fragment_id` were placeholder values before.
1394            // After table fragments are created, update them for all internal tables.
1395            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1396                .select_only()
1397                .columns([
1398                    fragment::Column::FragmentId,
1399                    fragment::Column::StateTableIds,
1400                ])
1401                .filter(fragment::Column::JobId.eq(tmp_id))
1402                .into_tuple()
1403                .all(txn)
1404                .await?;
1405            for (fragment_id, state_table_ids) in fragment_info {
1406                for state_table_id in state_table_ids.into_inner() {
1407                    let state_table_id = TableId::new(state_table_id as _);
1408                    table::ActiveModel {
1409                        table_id: Set(state_table_id),
1410                        fragment_id: Set(Some(fragment_id)),
1411                        // No need to update `vnode_count` because it must remain the same.
1412                        ..Default::default()
1413                    }
1414                    .update(txn)
1415                    .await?;
1416                }
1417            }
1418
1419            // 1. replace old fragments/actors with new ones.
1420            Fragment::delete_many()
1421                .filter(fragment::Column::JobId.eq(original_job_id))
1422                .exec(txn)
1423                .await?;
1424            Fragment::update_many()
1425                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1426                .filter(fragment::Column::JobId.eq(tmp_id))
1427                .exec(txn)
1428                .await?;
1429
1430            // 2. update merges.
1431            // update downstream fragment's Merge node, and upstream_fragment_id
1432            for (fragment_id, fragment_replace_map) in replace_upstream {
1433                let (fragment_id, mut stream_node) =
1434                    Fragment::find_by_id(fragment_id as FragmentId)
1435                        .select_only()
1436                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1437                        .into_tuple::<(FragmentId, StreamNode)>()
1438                        .one(txn)
1439                        .await?
1440                        .map(|(id, node)| (id, node.to_protobuf()))
1441                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1442
1443                visit_stream_node_mut(&mut stream_node, |body| {
1444                    if let PbNodeBody::Merge(m) = body
1445                        && let Some(new_fragment_id) =
1446                            fragment_replace_map.get(&m.upstream_fragment_id)
1447                    {
1448                        m.upstream_fragment_id = *new_fragment_id;
1449                    }
1450                });
1451                fragment::ActiveModel {
1452                    fragment_id: Set(fragment_id),
1453                    stream_node: Set(StreamNode::from(&stream_node)),
1454                    ..Default::default()
1455                }
1456                .update(txn)
1457                .await?;
1458            }
1459
1460            // 3. remove dummy object.
1461            Object::delete_by_id(tmp_id).exec(txn).await?;
1462
1463            Ok(())
1464        }
1465
1466        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1467
1468        // 4. update catalogs and notify.
1469        let mut objects = vec![];
1470        match job_type {
1471            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1472                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1473                    .find_also_related(Object)
1474                    .one(txn)
1475                    .await?
1476                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1477                objects.push(PbObject {
1478                    object_info: Some(PbObjectInfo::Table(
1479                        ObjectModel(table, table_obj.unwrap()).into(),
1480                    )),
1481                })
1482            }
1483            StreamingJobType::Source => {
1484                let (source, source_obj) =
1485                    Source::find_by_id(original_job_id.as_shared_source_id())
1486                        .find_also_related(Object)
1487                        .one(txn)
1488                        .await?
1489                        .ok_or_else(|| {
1490                            MetaError::catalog_id_not_found("object", original_job_id)
1491                        })?;
1492                objects.push(PbObject {
1493                    object_info: Some(PbObjectInfo::Source(
1494                        ObjectModel(source, source_obj.unwrap()).into(),
1495                    )),
1496                })
1497            }
1498            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1499        }
1500
1501        if let Some(expr_rewriter) = index_item_rewriter {
1502            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1503                .select_only()
1504                .columns([index::Column::IndexId, index::Column::IndexItems])
1505                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1506                .into_tuple()
1507                .all(txn)
1508                .await?;
1509            for (index_id, nodes) in index_items {
1510                let mut pb_nodes = nodes.to_protobuf();
1511                pb_nodes
1512                    .iter_mut()
1513                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1514                let index = index::ActiveModel {
1515                    index_id: Set(index_id),
1516                    index_items: Set(pb_nodes.into()),
1517                    ..Default::default()
1518                }
1519                .update(txn)
1520                .await?;
1521                let index_obj = index
1522                    .find_related(Object)
1523                    .one(txn)
1524                    .await?
1525                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1526                objects.push(PbObject {
1527                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1528                });
1529            }
1530        }
1531
1532        if let Some(sinks) = auto_refresh_schema_sinks {
1533            for finish_sink_context in sinks {
1534                finish_fragments(
1535                    txn,
1536                    finish_sink_context.tmp_sink_id.as_job_id(),
1537                    finish_sink_context.original_sink_id.as_job_id(),
1538                    Default::default(),
1539                )
1540                .await?;
1541                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1542                    .find_also_related(Object)
1543                    .one(txn)
1544                    .await?
1545                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1546                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1547                Sink::update(sink::ActiveModel {
1548                    sink_id: Set(finish_sink_context.original_sink_id),
1549                    columns: Set(columns.clone()),
1550                    ..Default::default()
1551                })
1552                .exec(txn)
1553                .await?;
1554                sink.columns = columns;
1555                objects.push(PbObject {
1556                    object_info: Some(PbObjectInfo::Sink(
1557                        ObjectModel(sink, sink_obj.unwrap()).into(),
1558                    )),
1559                });
1560                if let Some((log_store_table_id, new_log_store_table_columns)) =
1561                    finish_sink_context.new_log_store_table
1562                {
1563                    let new_log_store_table_columns: ColumnCatalogArray =
1564                        new_log_store_table_columns.into();
1565                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1566                        .find_also_related(Object)
1567                        .one(txn)
1568                        .await?
1569                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1570                    Table::update(table::ActiveModel {
1571                        table_id: Set(log_store_table_id),
1572                        columns: Set(new_log_store_table_columns.clone()),
1573                        ..Default::default()
1574                    })
1575                    .exec(txn)
1576                    .await?;
1577                    table.columns = new_log_store_table_columns;
1578                    objects.push(PbObject {
1579                        object_info: Some(PbObjectInfo::Table(
1580                            ObjectModel(table, table_obj.unwrap()).into(),
1581                        )),
1582                    });
1583                }
1584            }
1585        }
1586
1587        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1588        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1589            notification_objs =
1590                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1591        }
1592
1593        Ok((objects, notification_objs))
1594    }
1595
1596    /// Abort the replacing streaming job by deleting the temporary job object.
1597    pub async fn try_abort_replacing_streaming_job(
1598        &self,
1599        tmp_job_id: JobId,
1600        tmp_sink_ids: Option<Vec<ObjectId>>,
1601    ) -> MetaResult<()> {
1602        let inner = self.inner.write().await;
1603        let txn = inner.db.begin().await?;
1604        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1605        if let Some(tmp_sink_ids) = tmp_sink_ids {
1606            for tmp_sink_id in tmp_sink_ids {
1607                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1608            }
1609        }
1610        txn.commit().await?;
1611        Ok(())
1612    }
1613
1614    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1615    // return the actor_ids to be applied
1616    pub async fn update_source_rate_limit_by_source_id(
1617        &self,
1618        source_id: SourceId,
1619        rate_limit: Option<u32>,
1620    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1621        let inner = self.inner.read().await;
1622        let txn = inner.db.begin().await?;
1623
1624        {
1625            let active_source = source::ActiveModel {
1626                source_id: Set(source_id),
1627                rate_limit: Set(rate_limit.map(|v| v as i32)),
1628                ..Default::default()
1629            };
1630            active_source.update(&txn).await?;
1631        }
1632
1633        let (source, obj) = Source::find_by_id(source_id)
1634            .find_also_related(Object)
1635            .one(&txn)
1636            .await?
1637            .ok_or_else(|| {
1638                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1639            })?;
1640
1641        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1642        let streaming_job_ids: Vec<JobId> =
1643            if let Some(table_id) = source.optional_associated_table_id {
1644                vec![table_id.as_job_id()]
1645            } else if let Some(source_info) = &source.source_info
1646                && source_info.to_protobuf().is_shared()
1647            {
1648                vec![source_id.as_share_source_job_id()]
1649            } else {
1650                ObjectDependency::find()
1651                    .select_only()
1652                    .column(object_dependency::Column::UsedBy)
1653                    .filter(object_dependency::Column::Oid.eq(source_id))
1654                    .into_tuple()
1655                    .all(&txn)
1656                    .await?
1657            };
1658
1659        if streaming_job_ids.is_empty() {
1660            return Err(MetaError::invalid_parameter(format!(
1661                "source id {source_id} not used by any streaming job"
1662            )));
1663        }
1664
1665        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1666            .select_only()
1667            .columns([
1668                fragment::Column::FragmentId,
1669                fragment::Column::FragmentTypeMask,
1670                fragment::Column::StreamNode,
1671            ])
1672            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1673            .into_tuple()
1674            .all(&txn)
1675            .await?;
1676        let mut fragments = fragments
1677            .into_iter()
1678            .map(|(id, mask, stream_node)| {
1679                (
1680                    id,
1681                    FragmentTypeMask::from(mask as u32),
1682                    stream_node.to_protobuf(),
1683                )
1684            })
1685            .collect_vec();
1686
1687        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1688            let mut found = false;
1689            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1690                visit_stream_node_mut(stream_node, |node| {
1691                    if let PbNodeBody::Source(node) = node
1692                        && let Some(node_inner) = &mut node.source_inner
1693                        && node_inner.source_id == source_id
1694                    {
1695                        node_inner.rate_limit = rate_limit;
1696                        found = true;
1697                    }
1698                });
1699            }
1700            if is_fs_source {
1701                // in older versions, there's no fragment type flag for `FsFetch` node,
1702                // so we just scan all fragments for StreamFsFetch node if using fs connector
1703                visit_stream_node_mut(stream_node, |node| {
1704                    if let PbNodeBody::StreamFsFetch(node) = node {
1705                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1706                        if let Some(node_inner) = &mut node.node_inner
1707                            && node_inner.source_id == source_id
1708                        {
1709                            node_inner.rate_limit = rate_limit;
1710                            found = true;
1711                        }
1712                    }
1713                });
1714            }
1715            found
1716        });
1717
1718        assert!(
1719            !fragments.is_empty(),
1720            "source id should be used by at least one fragment"
1721        );
1722        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1723
1724        for (id, fragment_type_mask, stream_node) in fragments {
1725            fragment::ActiveModel {
1726                fragment_id: Set(id),
1727                fragment_type_mask: Set(fragment_type_mask.into()),
1728                stream_node: Set(StreamNode::from(&stream_node)),
1729                ..Default::default()
1730            }
1731            .update(&txn)
1732            .await?;
1733        }
1734
1735        txn.commit().await?;
1736
1737        let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1738
1739        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1740        let _version = self
1741            .notify_frontend(
1742                NotificationOperation::Update,
1743                NotificationInfo::ObjectGroup(PbObjectGroup {
1744                    objects: vec![PbObject {
1745                        object_info: Some(relation_info),
1746                    }],
1747                }),
1748            )
1749            .await;
1750
1751        Ok(fragment_actors)
1752    }
1753
1754    fn get_fragment_actors_from_running_info(
1755        &self,
1756        fragment_ids: impl Iterator<Item = FragmentId>,
1757    ) -> HashMap<FragmentId, Vec<ActorId>> {
1758        let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1759
1760        let info = self.env.shared_actor_infos().read_guard();
1761
1762        for fragment_id in fragment_ids {
1763            let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1764            fragment_actors
1765                .entry(fragment_id as _)
1766                .or_default()
1767                .extend(actors.keys().copied());
1768        }
1769
1770        fragment_actors
1771    }
1772
1773    // edit the content of fragments in given `table_id`
1774    // return the actor_ids to be applied
1775    pub async fn mutate_fragments_by_job_id(
1776        &self,
1777        job_id: JobId,
1778        // returns true if the mutation is applied
1779        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1780        // error message when no relevant fragments is found
1781        err_msg: &'static str,
1782    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1783        let inner = self.inner.read().await;
1784        let txn = inner.db.begin().await?;
1785
1786        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1787            .select_only()
1788            .columns([
1789                fragment::Column::FragmentId,
1790                fragment::Column::FragmentTypeMask,
1791                fragment::Column::StreamNode,
1792            ])
1793            .filter(fragment::Column::JobId.eq(job_id))
1794            .into_tuple()
1795            .all(&txn)
1796            .await?;
1797        let mut fragments = fragments
1798            .into_iter()
1799            .map(|(id, mask, stream_node)| {
1800                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1801            })
1802            .collect_vec();
1803
1804        let fragments = fragments
1805            .iter_mut()
1806            .map(|(_, fragment_type_mask, stream_node)| {
1807                fragments_mutation_fn(*fragment_type_mask, stream_node)
1808            })
1809            .collect::<MetaResult<Vec<bool>>>()?
1810            .into_iter()
1811            .zip_eq_debug(std::mem::take(&mut fragments))
1812            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1813            .collect::<Vec<_>>();
1814
1815        if fragments.is_empty() {
1816            return Err(MetaError::invalid_parameter(format!(
1817                "job id {job_id}: {}",
1818                err_msg
1819            )));
1820        }
1821
1822        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1823        for (id, _, stream_node) in fragments {
1824            fragment::ActiveModel {
1825                fragment_id: Set(id),
1826                stream_node: Set(StreamNode::from(&stream_node)),
1827                ..Default::default()
1828            }
1829            .update(&txn)
1830            .await?;
1831        }
1832
1833        txn.commit().await?;
1834
1835        let fragment_actors =
1836            self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1837
1838        Ok(fragment_actors)
1839    }
1840
1841    async fn mutate_fragment_by_fragment_id(
1842        &self,
1843        fragment_id: FragmentId,
1844        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1845        err_msg: &'static str,
1846    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1847        let inner = self.inner.read().await;
1848        let txn = inner.db.begin().await?;
1849
1850        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1851            Fragment::find_by_id(fragment_id)
1852                .select_only()
1853                .columns([
1854                    fragment::Column::FragmentTypeMask,
1855                    fragment::Column::StreamNode,
1856                ])
1857                .into_tuple()
1858                .one(&txn)
1859                .await?
1860                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1861        let mut pb_stream_node = stream_node.to_protobuf();
1862        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1863
1864        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1865            return Err(MetaError::invalid_parameter(format!(
1866                "fragment id {fragment_id}: {}",
1867                err_msg
1868            )));
1869        }
1870
1871        fragment::ActiveModel {
1872            fragment_id: Set(fragment_id),
1873            stream_node: Set(stream_node),
1874            ..Default::default()
1875        }
1876        .update(&txn)
1877        .await?;
1878
1879        let fragment_actors =
1880            self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1881
1882        txn.commit().await?;
1883
1884        Ok(fragment_actors)
1885    }
1886
1887    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1888    // return the actor_ids to be applied
1889    pub async fn update_backfill_rate_limit_by_job_id(
1890        &self,
1891        job_id: JobId,
1892        rate_limit: Option<u32>,
1893    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1894        let update_backfill_rate_limit =
1895            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1896                let mut found = false;
1897                if fragment_type_mask
1898                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1899                {
1900                    visit_stream_node_mut(stream_node, |node| match node {
1901                        PbNodeBody::StreamCdcScan(node) => {
1902                            node.rate_limit = rate_limit;
1903                            found = true;
1904                        }
1905                        PbNodeBody::StreamScan(node) => {
1906                            node.rate_limit = rate_limit;
1907                            found = true;
1908                        }
1909                        PbNodeBody::SourceBackfill(node) => {
1910                            node.rate_limit = rate_limit;
1911                            found = true;
1912                        }
1913                        PbNodeBody::Sink(node) => {
1914                            node.rate_limit = rate_limit;
1915                            found = true;
1916                        }
1917                        _ => {}
1918                    });
1919                }
1920                Ok(found)
1921            };
1922
1923        self.mutate_fragments_by_job_id(
1924            job_id,
1925            update_backfill_rate_limit,
1926            "stream scan node or source node not found",
1927        )
1928        .await
1929    }
1930
1931    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1932    // return the actor_ids to be applied
1933    pub async fn update_sink_rate_limit_by_job_id(
1934        &self,
1935        sink_id: SinkId,
1936        rate_limit: Option<u32>,
1937    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1938        let update_sink_rate_limit =
1939            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1940                let mut found = Ok(false);
1941                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1942                    visit_stream_node_mut(stream_node, |node| {
1943                        if let PbNodeBody::Sink(node) = node {
1944                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1945                                found = Err(MetaError::invalid_parameter(
1946                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1947                                ));
1948                                return;
1949                            }
1950                            node.rate_limit = rate_limit;
1951                            found = Ok(true);
1952                        }
1953                    });
1954                }
1955                found
1956            };
1957
1958        self.mutate_fragments_by_job_id(
1959            sink_id.as_job_id(),
1960            update_sink_rate_limit,
1961            "sink node not found",
1962        )
1963        .await
1964    }
1965
1966    pub async fn update_dml_rate_limit_by_job_id(
1967        &self,
1968        job_id: JobId,
1969        rate_limit: Option<u32>,
1970    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1971        let update_dml_rate_limit =
1972            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1973                let mut found = false;
1974                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1975                    visit_stream_node_mut(stream_node, |node| {
1976                        if let PbNodeBody::Dml(node) = node {
1977                            node.rate_limit = rate_limit;
1978                            found = true;
1979                        }
1980                    });
1981                }
1982                Ok(found)
1983            };
1984
1985        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1986            .await
1987    }
1988
1989    pub async fn update_source_props_by_source_id(
1990        &self,
1991        source_id: SourceId,
1992        alter_props: BTreeMap<String, String>,
1993        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1994    ) -> MetaResult<WithOptionsSecResolved> {
1995        let inner = self.inner.read().await;
1996        let txn = inner.db.begin().await?;
1997
1998        let (source, _obj) = Source::find_by_id(source_id)
1999            .find_also_related(Object)
2000            .one(&txn)
2001            .await?
2002            .ok_or_else(|| {
2003                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2004            })?;
2005        let connector = source.with_properties.0.get_connector().unwrap();
2006        let is_shared_source = source.is_shared();
2007
2008        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2009        if !is_shared_source {
2010            // mv using non-shared source holds a copy of source in their fragments
2011            dep_source_job_ids = ObjectDependency::find()
2012                .select_only()
2013                .column(object_dependency::Column::UsedBy)
2014                .filter(object_dependency::Column::Oid.eq(source_id))
2015                .into_tuple()
2016                .all(&txn)
2017                .await?;
2018        }
2019
2020        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
2021        let prop_keys: Vec<String> = alter_props
2022            .keys()
2023            .chain(alter_secret_refs.keys())
2024            .cloned()
2025            .collect();
2026        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2027            &connector, &prop_keys,
2028        )?;
2029
2030        let mut options_with_secret = WithOptionsSecResolved::new(
2031            source.with_properties.0.clone(),
2032            source
2033                .secret_ref
2034                .map(|secret_ref| secret_ref.to_protobuf())
2035                .unwrap_or_default(),
2036        );
2037        let (to_add_secret_dep, to_remove_secret_dep) =
2038            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2039
2040        tracing::info!(
2041            "applying new properties to source: source_id={}, options_with_secret={:?}",
2042            source_id,
2043            options_with_secret
2044        );
2045        // check if the alter-ed props are valid for each Connector
2046        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2047        // todo: validate via source manager
2048
2049        let mut associate_table_id = None;
2050
2051        // can be source_id or table_id
2052        // if updating an associated source, the preferred_id is the table_id
2053        // otherwise, it is the source_id
2054        let mut preferred_id = source_id.as_object_id();
2055        let rewrite_sql = {
2056            let definition = source.definition.clone();
2057
2058            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2059                .map_err(|e| {
2060                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2061                        anyhow!(e).context("Failed to parse source definition SQL"),
2062                    )))
2063                })?
2064                .try_into()
2065                .unwrap();
2066
2067            /// Formats SQL options with secret values properly resolved
2068            ///
2069            /// This function processes configuration options that may contain sensitive data:
2070            /// - Plaintext options are directly converted to `SqlOption`
2071            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2072            ///   without exposing the actual secret value
2073            ///
2074            /// # Arguments
2075            /// * `txn` - Database transaction for retrieving secrets
2076            /// * `options_with_secret` - Container of options with both plaintext and secret values
2077            ///
2078            /// # Returns
2079            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2080            async fn format_with_option_secret_resolved(
2081                txn: &DatabaseTransaction,
2082                options_with_secret: &WithOptionsSecResolved,
2083            ) -> MetaResult<Vec<SqlOption>> {
2084                let mut options = Vec::new();
2085                for (k, v) in options_with_secret.as_plaintext() {
2086                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2087                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2088                    options.push(sql_option);
2089                }
2090                for (k, v) in options_with_secret.as_secret() {
2091                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2092                        let sql_option =
2093                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2094                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2095                        options.push(sql_option);
2096                    } else {
2097                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2098                    }
2099                }
2100                Ok(options)
2101            }
2102
2103            match &mut stmt {
2104                Statement::CreateSource { stmt } => {
2105                    stmt.with_properties.0 =
2106                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2107                }
2108                Statement::CreateTable { with_options, .. } => {
2109                    *with_options =
2110                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2111                    associate_table_id = source.optional_associated_table_id;
2112                    preferred_id = associate_table_id.unwrap().as_object_id();
2113                }
2114                _ => unreachable!(),
2115            }
2116
2117            stmt.to_string()
2118        };
2119
2120        {
2121            // update secret dependencies
2122            if !to_add_secret_dep.is_empty() {
2123                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2124                    object_dependency::ActiveModel {
2125                        oid: Set(secret_id.into()),
2126                        used_by: Set(preferred_id),
2127                        ..Default::default()
2128                    }
2129                }))
2130                .exec(&txn)
2131                .await?;
2132            }
2133            if !to_remove_secret_dep.is_empty() {
2134                // todo: fix the filter logic
2135                let _ = ObjectDependency::delete_many()
2136                    .filter(
2137                        object_dependency::Column::Oid
2138                            .is_in(to_remove_secret_dep)
2139                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2140                    )
2141                    .exec(&txn)
2142                    .await?;
2143            }
2144        }
2145
2146        let active_source_model = source::ActiveModel {
2147            source_id: Set(source_id),
2148            definition: Set(rewrite_sql.clone()),
2149            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2150            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2151                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2152            ..Default::default()
2153        };
2154        active_source_model.update(&txn).await?;
2155
2156        if let Some(associate_table_id) = associate_table_id {
2157            // update the associated table statement accordly
2158            let active_table_model = table::ActiveModel {
2159                table_id: Set(associate_table_id),
2160                definition: Set(rewrite_sql),
2161                ..Default::default()
2162            };
2163            active_table_model.update(&txn).await?;
2164        }
2165
2166        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2167            // if updating table with connector, the fragment_id is table id
2168            associate_table_id.as_job_id()
2169        } else {
2170            source_id.as_share_source_job_id()
2171        }]
2172        .into_iter()
2173        .chain(dep_source_job_ids.into_iter())
2174        .collect_vec();
2175
2176        // update fragments
2177        update_connector_props_fragments(
2178            &txn,
2179            to_check_job_ids,
2180            FragmentTypeFlag::Source,
2181            |node, found| {
2182                if let PbNodeBody::Source(node) = node
2183                    && let Some(source_inner) = &mut node.source_inner
2184                {
2185                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2186                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2187                    *found = true;
2188                }
2189            },
2190            is_shared_source,
2191        )
2192        .await?;
2193
2194        let mut to_update_objs = Vec::with_capacity(2);
2195        let (source, obj) = Source::find_by_id(source_id)
2196            .find_also_related(Object)
2197            .one(&txn)
2198            .await?
2199            .ok_or_else(|| {
2200                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2201            })?;
2202        to_update_objs.push(PbObject {
2203            object_info: Some(PbObjectInfo::Source(
2204                ObjectModel(source, obj.unwrap()).into(),
2205            )),
2206        });
2207
2208        if let Some(associate_table_id) = associate_table_id {
2209            let (table, obj) = Table::find_by_id(associate_table_id)
2210                .find_also_related(Object)
2211                .one(&txn)
2212                .await?
2213                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2214            to_update_objs.push(PbObject {
2215                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2216            });
2217        }
2218
2219        txn.commit().await?;
2220
2221        self.notify_frontend(
2222            NotificationOperation::Update,
2223            NotificationInfo::ObjectGroup(PbObjectGroup {
2224                objects: to_update_objs,
2225            }),
2226        )
2227        .await;
2228
2229        Ok(options_with_secret)
2230    }
2231
2232    pub async fn update_sink_props_by_sink_id(
2233        &self,
2234        sink_id: SinkId,
2235        props: BTreeMap<String, String>,
2236    ) -> MetaResult<HashMap<String, String>> {
2237        let inner = self.inner.read().await;
2238        let txn = inner.db.begin().await?;
2239
2240        let (sink, _obj) = Sink::find_by_id(sink_id)
2241            .find_also_related(Object)
2242            .one(&txn)
2243            .await?
2244            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2245        validate_sink_props(&sink, &props)?;
2246        let definition = sink.definition.clone();
2247        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2248            .map_err(|e| SinkError::Config(anyhow!(e)))?
2249            .try_into()
2250            .unwrap();
2251        if let Statement::CreateSink { stmt } = &mut stmt {
2252            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2253        } else {
2254            panic!("definition is not a create sink statement")
2255        }
2256        let mut new_config = sink.properties.clone().into_inner();
2257        new_config.extend(props.clone());
2258
2259        let definition = stmt.to_string();
2260        let active_sink = sink::ActiveModel {
2261            sink_id: Set(sink_id),
2262            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2263            definition: Set(definition),
2264            ..Default::default()
2265        };
2266        active_sink.update(&txn).await?;
2267
2268        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2269        let (sink, obj) = Sink::find_by_id(sink_id)
2270            .find_also_related(Object)
2271            .one(&txn)
2272            .await?
2273            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2274        txn.commit().await?;
2275        let relation_infos = vec![PbObject {
2276            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2277        }];
2278
2279        let _version = self
2280            .notify_frontend(
2281                NotificationOperation::Update,
2282                NotificationInfo::ObjectGroup(PbObjectGroup {
2283                    objects: relation_infos,
2284                }),
2285            )
2286            .await;
2287
2288        Ok(props.into_iter().collect())
2289    }
2290
2291    pub async fn update_iceberg_table_props_by_table_id(
2292        &self,
2293        table_id: TableId,
2294        props: BTreeMap<String, String>,
2295        alter_iceberg_table_props: Option<
2296            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2297        >,
2298    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2299        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2300            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2301        let inner = self.inner.read().await;
2302        let txn = inner.db.begin().await?;
2303
2304        let (sink, _obj) = Sink::find_by_id(sink_id)
2305            .find_also_related(Object)
2306            .one(&txn)
2307            .await?
2308            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2309        validate_sink_props(&sink, &props)?;
2310
2311        let definition = sink.definition.clone();
2312        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2313            .map_err(|e| SinkError::Config(anyhow!(e)))?
2314            .try_into()
2315            .unwrap();
2316        if let Statement::CreateTable {
2317            with_options,
2318            engine,
2319            ..
2320        } = &mut stmt
2321        {
2322            if !matches!(engine, Engine::Iceberg) {
2323                return Err(SinkError::Config(anyhow!(
2324                    "only iceberg table can be altered as sink"
2325                ))
2326                .into());
2327            }
2328            update_stmt_with_props(with_options, &props)?;
2329        } else {
2330            panic!("definition is not a create iceberg table statement")
2331        }
2332        let mut new_config = sink.properties.clone().into_inner();
2333        new_config.extend(props.clone());
2334
2335        let definition = stmt.to_string();
2336        let active_sink = sink::ActiveModel {
2337            sink_id: Set(sink_id),
2338            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2339            definition: Set(definition.clone()),
2340            ..Default::default()
2341        };
2342        let active_source = source::ActiveModel {
2343            source_id: Set(source_id),
2344            definition: Set(definition.clone()),
2345            ..Default::default()
2346        };
2347        let active_table = table::ActiveModel {
2348            table_id: Set(table_id),
2349            definition: Set(definition),
2350            ..Default::default()
2351        };
2352        active_sink.update(&txn).await?;
2353        active_source.update(&txn).await?;
2354        active_table.update(&txn).await?;
2355
2356        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2357
2358        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2359            .find_also_related(Object)
2360            .one(&txn)
2361            .await?
2362            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2363        let (source, source_obj) = Source::find_by_id(source_id)
2364            .find_also_related(Object)
2365            .one(&txn)
2366            .await?
2367            .ok_or_else(|| {
2368                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2369            })?;
2370        let (table, table_obj) = Table::find_by_id(table_id)
2371            .find_also_related(Object)
2372            .one(&txn)
2373            .await?
2374            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2375        txn.commit().await?;
2376        let relation_infos = vec![
2377            PbObject {
2378                object_info: Some(PbObjectInfo::Sink(
2379                    ObjectModel(sink, sink_obj.unwrap()).into(),
2380                )),
2381            },
2382            PbObject {
2383                object_info: Some(PbObjectInfo::Source(
2384                    ObjectModel(source, source_obj.unwrap()).into(),
2385                )),
2386            },
2387            PbObject {
2388                object_info: Some(PbObjectInfo::Table(
2389                    ObjectModel(table, table_obj.unwrap()).into(),
2390                )),
2391            },
2392        ];
2393        let _version = self
2394            .notify_frontend(
2395                NotificationOperation::Update,
2396                NotificationInfo::ObjectGroup(PbObjectGroup {
2397                    objects: relation_infos,
2398                }),
2399            )
2400            .await;
2401
2402        Ok((props.into_iter().collect(), sink_id))
2403    }
2404
2405    /// Update connection properties and all dependent sources/sinks in a single transaction
2406    pub async fn update_connection_and_dependent_objects_props(
2407        &self,
2408        connection_id: ConnectionId,
2409        alter_props: BTreeMap<String, String>,
2410        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2411    ) -> MetaResult<(
2412        WithOptionsSecResolved,                   // Connection's new properties
2413        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2414        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2415    )> {
2416        let inner = self.inner.read().await;
2417        let txn = inner.db.begin().await?;
2418
2419        // Find all dependent sources and sinks first
2420        let dependent_sources: Vec<SourceId> = Source::find()
2421            .select_only()
2422            .column(source::Column::SourceId)
2423            .filter(source::Column::ConnectionId.eq(connection_id))
2424            .into_tuple()
2425            .all(&txn)
2426            .await?;
2427
2428        let dependent_sinks: Vec<SinkId> = Sink::find()
2429            .select_only()
2430            .column(sink::Column::SinkId)
2431            .filter(sink::Column::ConnectionId.eq(connection_id))
2432            .into_tuple()
2433            .all(&txn)
2434            .await?;
2435
2436        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2437            .find_also_related(Object)
2438            .one(&txn)
2439            .await?
2440            .ok_or_else(|| {
2441                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2442            })?;
2443
2444        // Validate that props can be altered
2445        let prop_keys: Vec<String> = alter_props
2446            .keys()
2447            .chain(alter_secret_refs.keys())
2448            .cloned()
2449            .collect();
2450
2451        // Map the connection type enum to the string name expected by the validation function
2452        let connection_type_str = pb_connection_type_to_connection_type(
2453            &connection_catalog.params.to_protobuf().connection_type(),
2454        )
2455        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2456
2457        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2458            connection_type_str, &prop_keys,
2459        )?;
2460
2461        let connection_pb = connection_catalog.params.to_protobuf();
2462        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2463            connection_pb.properties.into_iter().collect(),
2464            connection_pb.secret_refs.into_iter().collect(),
2465        );
2466
2467        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2468            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2469
2470        tracing::debug!(
2471            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2472            connection_id,
2473            dependent_sources,
2474            dependent_sinks
2475        );
2476
2477        // Validate connection
2478        {
2479            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2480                connection_type: connection_pb.connection_type,
2481                properties: connection_options_with_secret
2482                    .as_plaintext()
2483                    .clone()
2484                    .into_iter()
2485                    .collect(),
2486                secret_refs: connection_options_with_secret
2487                    .as_secret()
2488                    .clone()
2489                    .into_iter()
2490                    .collect(),
2491            };
2492            let connection = PbConnection {
2493                id: connection_id as _,
2494                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2495                    conn_params_pb,
2496                )),
2497                ..Default::default()
2498            };
2499            validate_connection(&connection).await?;
2500        }
2501
2502        // Update connection secret dependencies
2503        if !to_add_secret_dep.is_empty() {
2504            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2505                object_dependency::ActiveModel {
2506                    oid: Set(secret_id.into()),
2507                    used_by: Set(connection_id.as_object_id()),
2508                    ..Default::default()
2509                }
2510            }))
2511            .exec(&txn)
2512            .await?;
2513        }
2514        if !to_remove_secret_dep.is_empty() {
2515            let _ = ObjectDependency::delete_many()
2516                .filter(
2517                    object_dependency::Column::Oid
2518                        .is_in(to_remove_secret_dep)
2519                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2520                )
2521                .exec(&txn)
2522                .await?;
2523        }
2524
2525        // Update the connection with new properties
2526        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2527            connection_type: connection_pb.connection_type,
2528            properties: connection_options_with_secret
2529                .as_plaintext()
2530                .clone()
2531                .into_iter()
2532                .collect(),
2533            secret_refs: connection_options_with_secret
2534                .as_secret()
2535                .clone()
2536                .into_iter()
2537                .collect(),
2538        };
2539        let active_connection_model = connection::ActiveModel {
2540            connection_id: Set(connection_id),
2541            params: Set(ConnectionParams::from(&updated_connection_params)),
2542            ..Default::default()
2543        };
2544        active_connection_model.update(&txn).await?;
2545
2546        // Batch update dependent sources and collect their complete properties
2547        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2548
2549        if !dependent_sources.is_empty() {
2550            // Batch fetch all dependent sources
2551            let sources_with_objs = Source::find()
2552                .find_also_related(Object)
2553                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2554                .all(&txn)
2555                .await?;
2556
2557            // Prepare batch updates
2558            let mut source_updates = Vec::new();
2559            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2560
2561            for (source, _obj) in sources_with_objs {
2562                let source_id = source.source_id;
2563
2564                let mut source_options_with_secret = WithOptionsSecResolved::new(
2565                    source.with_properties.0.clone(),
2566                    source
2567                        .secret_ref
2568                        .clone()
2569                        .map(|secret_ref| secret_ref.to_protobuf())
2570                        .unwrap_or_default(),
2571                );
2572                let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2573                    source_options_with_secret
2574                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2575
2576                // Validate the updated source properties
2577                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2578
2579                // Prepare source update
2580                let active_source = source::ActiveModel {
2581                    source_id: Set(source_id),
2582                    with_properties: Set(Property(
2583                        source_options_with_secret.as_plaintext().clone(),
2584                    )),
2585                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2586                        || {
2587                            risingwave_meta_model::SecretRef::from(
2588                                source_options_with_secret.as_secret().clone(),
2589                            )
2590                        },
2591                    )),
2592                    ..Default::default()
2593                };
2594                source_updates.push(active_source);
2595
2596                // Prepare fragment update:
2597                // - If the source is a table-associated source, update fragments for the table job.
2598                // - Otherwise update the shared source job.
2599                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2600                let is_shared_source = source.is_shared();
2601                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2602                if !is_shared_source {
2603                    dep_source_job_ids = ObjectDependency::find()
2604                        .select_only()
2605                        .column(object_dependency::Column::UsedBy)
2606                        .filter(object_dependency::Column::Oid.eq(source_id))
2607                        .into_tuple()
2608                        .all(&txn)
2609                        .await?;
2610                }
2611
2612                let base_job_id =
2613                    if let Some(associate_table_id) = source.optional_associated_table_id {
2614                        associate_table_id.as_job_id()
2615                    } else {
2616                        source_id.as_share_source_job_id()
2617                    };
2618                let job_ids = vec![base_job_id]
2619                    .into_iter()
2620                    .chain(dep_source_job_ids.into_iter())
2621                    .collect_vec();
2622
2623                fragment_updates.push(DependentSourceFragmentUpdate {
2624                    job_ids,
2625                    with_properties: source_options_with_secret.as_plaintext().clone(),
2626                    secret_refs: source_options_with_secret.as_secret().clone(),
2627                    is_shared_source,
2628                });
2629
2630                // Collect the complete properties for runtime broadcast
2631                let complete_source_props = LocalSecretManager::global()
2632                    .fill_secrets(
2633                        source_options_with_secret.as_plaintext().clone(),
2634                        source_options_with_secret.as_secret().clone(),
2635                    )
2636                    .map_err(MetaError::from)?
2637                    .into_iter()
2638                    .collect::<HashMap<String, String>>();
2639                updated_sources_with_props.push((source_id, complete_source_props));
2640            }
2641
2642            for source_update in source_updates {
2643                source_update.update(&txn).await?;
2644            }
2645
2646            // Batch execute fragment updates
2647            for DependentSourceFragmentUpdate {
2648                job_ids,
2649                with_properties,
2650                secret_refs,
2651                is_shared_source,
2652            } in fragment_updates
2653            {
2654                update_connector_props_fragments(
2655                    &txn,
2656                    job_ids,
2657                    FragmentTypeFlag::Source,
2658                    |node, found| {
2659                        if let PbNodeBody::Source(node) = node
2660                            && let Some(source_inner) = &mut node.source_inner
2661                        {
2662                            source_inner.with_properties = with_properties.clone();
2663                            source_inner.secret_refs = secret_refs.clone();
2664                            *found = true;
2665                        }
2666                    },
2667                    is_shared_source,
2668                )
2669                .await?;
2670            }
2671        }
2672
2673        // Batch update dependent sinks and collect their complete properties
2674        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2675
2676        if !dependent_sinks.is_empty() {
2677            // Batch fetch all dependent sinks
2678            let sinks_with_objs = Sink::find()
2679                .find_also_related(Object)
2680                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2681                .all(&txn)
2682                .await?;
2683
2684            // Prepare batch updates
2685            let mut sink_updates = Vec::new();
2686            let mut sink_fragment_updates = Vec::new();
2687
2688            for (sink, _obj) in sinks_with_objs {
2689                let sink_id = sink.sink_id;
2690
2691                // Validate that sink props can be altered
2692                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2693                    Some(connector) => {
2694                        let connector_type = connector.to_lowercase();
2695                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2696                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
2697
2698                        match_sink_name_str!(
2699                            connector_type.as_str(),
2700                            SinkType,
2701                            {
2702                                let mut new_sink_props = sink.properties.0.clone();
2703                                new_sink_props.extend(alter_props.clone());
2704                                SinkType::validate_alter_config(&new_sink_props)
2705                            },
2706                            |sink: &str| Err(SinkError::Config(anyhow!(
2707                                "unsupported sink type {}",
2708                                sink
2709                            )))
2710                        )?
2711                    }
2712                    None => {
2713                        return Err(SinkError::Config(anyhow!(
2714                            "connector not specified when alter sink"
2715                        ))
2716                        .into());
2717                    }
2718                };
2719
2720                let mut new_sink_props = sink.properties.0.clone();
2721                new_sink_props.extend(alter_props.clone());
2722
2723                // Prepare sink update
2724                let active_sink = sink::ActiveModel {
2725                    sink_id: Set(sink_id),
2726                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2727                    ..Default::default()
2728                };
2729                sink_updates.push(active_sink);
2730
2731                // Prepare fragment updates for this sink
2732                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2733
2734                // Collect the complete properties for runtime broadcast
2735                let complete_sink_props: HashMap<String, String> =
2736                    new_sink_props.into_iter().collect();
2737                updated_sinks_with_props.push((sink_id, complete_sink_props));
2738            }
2739
2740            // Batch execute sink updates
2741            for sink_update in sink_updates {
2742                sink_update.update(&txn).await?;
2743            }
2744
2745            // Batch execute sink fragment updates using the reusable function
2746            for (sink_id, new_sink_props) in sink_fragment_updates {
2747                update_connector_props_fragments(
2748                    &txn,
2749                    vec![sink_id.as_job_id()],
2750                    FragmentTypeFlag::Sink,
2751                    |node, found| {
2752                        if let PbNodeBody::Sink(node) = node
2753                            && let Some(sink_desc) = &mut node.sink_desc
2754                            && sink_desc.id == sink_id.as_raw_id()
2755                        {
2756                            sink_desc.properties = new_sink_props.clone();
2757                            *found = true;
2758                        }
2759                    },
2760                    true,
2761                )
2762                .await?;
2763            }
2764        }
2765
2766        // Collect all updated objects for frontend notification
2767        let mut updated_objects = Vec::new();
2768
2769        // Add connection
2770        let (connection, obj) = Connection::find_by_id(connection_id)
2771            .find_also_related(Object)
2772            .one(&txn)
2773            .await?
2774            .ok_or_else(|| {
2775                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2776            })?;
2777        updated_objects.push(PbObject {
2778            object_info: Some(PbObjectInfo::Connection(
2779                ObjectModel(connection, obj.unwrap()).into(),
2780            )),
2781        });
2782
2783        // Add sources
2784        for source_id in &dependent_sources {
2785            let (source, obj) = Source::find_by_id(*source_id)
2786                .find_also_related(Object)
2787                .one(&txn)
2788                .await?
2789                .ok_or_else(|| {
2790                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2791                })?;
2792            updated_objects.push(PbObject {
2793                object_info: Some(PbObjectInfo::Source(
2794                    ObjectModel(source, obj.unwrap()).into(),
2795                )),
2796            });
2797        }
2798
2799        // Add sinks
2800        for sink_id in &dependent_sinks {
2801            let (sink, obj) = Sink::find_by_id(*sink_id)
2802                .find_also_related(Object)
2803                .one(&txn)
2804                .await?
2805                .ok_or_else(|| {
2806                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2807                })?;
2808            updated_objects.push(PbObject {
2809                object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2810            });
2811        }
2812
2813        // Commit the transaction
2814        txn.commit().await?;
2815
2816        // Notify frontend about all updated objects
2817        if !updated_objects.is_empty() {
2818            self.notify_frontend(
2819                NotificationOperation::Update,
2820                NotificationInfo::ObjectGroup(PbObjectGroup {
2821                    objects: updated_objects,
2822                }),
2823            )
2824            .await;
2825        }
2826
2827        Ok((
2828            connection_options_with_secret,
2829            updated_sources_with_props,
2830            updated_sinks_with_props,
2831        ))
2832    }
2833
2834    pub async fn update_fragment_rate_limit_by_fragment_id(
2835        &self,
2836        fragment_id: FragmentId,
2837        rate_limit: Option<u32>,
2838    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2839        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2840                                 stream_node: &mut PbStreamNode| {
2841            let mut found = false;
2842            if fragment_type_mask.contains_any(
2843                FragmentTypeFlag::dml_rate_limit_fragments()
2844                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2845                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2846                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2847            ) {
2848                visit_stream_node_mut(stream_node, |node| {
2849                    if let PbNodeBody::Dml(node) = node {
2850                        node.rate_limit = rate_limit;
2851                        found = true;
2852                    }
2853                    if let PbNodeBody::Sink(node) = node {
2854                        node.rate_limit = rate_limit;
2855                        found = true;
2856                    }
2857                    if let PbNodeBody::StreamCdcScan(node) = node {
2858                        node.rate_limit = rate_limit;
2859                        found = true;
2860                    }
2861                    if let PbNodeBody::StreamScan(node) = node {
2862                        node.rate_limit = rate_limit;
2863                        found = true;
2864                    }
2865                    if let PbNodeBody::SourceBackfill(node) = node {
2866                        node.rate_limit = rate_limit;
2867                        found = true;
2868                    }
2869                });
2870            }
2871            found
2872        };
2873        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2874            .await
2875    }
2876
2877    /// Note: `FsFetch` created in old versions are not included.
2878    /// Since this is only used for debugging, it should be fine.
2879    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2880        let inner = self.inner.read().await;
2881        let txn = inner.db.begin().await?;
2882
2883        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2884            .select_only()
2885            .columns([
2886                fragment::Column::FragmentId,
2887                fragment::Column::JobId,
2888                fragment::Column::FragmentTypeMask,
2889                fragment::Column::StreamNode,
2890            ])
2891            .filter(FragmentTypeMask::intersects_any(
2892                FragmentTypeFlag::rate_limit_fragments(),
2893            ))
2894            .into_tuple()
2895            .all(&txn)
2896            .await?;
2897
2898        let mut rate_limits = Vec::new();
2899        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2900            let stream_node = stream_node.to_protobuf();
2901            visit_stream_node_body(&stream_node, |node| {
2902                let mut rate_limit = None;
2903                let mut node_name = None;
2904
2905                match node {
2906                    // source rate limit
2907                    PbNodeBody::Source(node) => {
2908                        if let Some(node_inner) = &node.source_inner {
2909                            rate_limit = node_inner.rate_limit;
2910                            node_name = Some("SOURCE");
2911                        }
2912                    }
2913                    PbNodeBody::StreamFsFetch(node) => {
2914                        if let Some(node_inner) = &node.node_inner {
2915                            rate_limit = node_inner.rate_limit;
2916                            node_name = Some("FS_FETCH");
2917                        }
2918                    }
2919                    // backfill rate limit
2920                    PbNodeBody::SourceBackfill(node) => {
2921                        rate_limit = node.rate_limit;
2922                        node_name = Some("SOURCE_BACKFILL");
2923                    }
2924                    PbNodeBody::StreamScan(node) => {
2925                        rate_limit = node.rate_limit;
2926                        node_name = Some("STREAM_SCAN");
2927                    }
2928                    PbNodeBody::StreamCdcScan(node) => {
2929                        rate_limit = node.rate_limit;
2930                        node_name = Some("STREAM_CDC_SCAN");
2931                    }
2932                    PbNodeBody::Sink(node) => {
2933                        rate_limit = node.rate_limit;
2934                        node_name = Some("SINK");
2935                    }
2936                    _ => {}
2937                }
2938
2939                if let Some(rate_limit) = rate_limit {
2940                    rate_limits.push(RateLimitInfo {
2941                        fragment_id,
2942                        job_id,
2943                        fragment_type_mask: fragment_type_mask as u32,
2944                        rate_limit,
2945                        node_name: node_name.unwrap().to_owned(),
2946                    });
2947                }
2948            });
2949        }
2950
2951        Ok(rate_limits)
2952    }
2953}
2954
2955fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2956    // Validate that props can be altered
2957    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2958        Some(connector) => {
2959            let connector_type = connector.to_lowercase();
2960            let field_names: Vec<String> = props.keys().cloned().collect();
2961            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2962                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2963
2964            match_sink_name_str!(
2965                connector_type.as_str(),
2966                SinkType,
2967                {
2968                    let mut new_props = sink.properties.0.clone();
2969                    new_props.extend(props.clone());
2970                    SinkType::validate_alter_config(&new_props)
2971                },
2972                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2973            )?
2974        }
2975        None => {
2976            return Err(
2977                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2978            );
2979        }
2980    };
2981    Ok(())
2982}
2983
2984fn update_stmt_with_props(
2985    with_properties: &mut Vec<SqlOption>,
2986    props: &BTreeMap<String, String>,
2987) -> MetaResult<()> {
2988    let mut new_sql_options = with_properties
2989        .iter()
2990        .map(|sql_option| (&sql_option.name, sql_option))
2991        .collect::<IndexMap<_, _>>();
2992    let add_sql_options = props
2993        .iter()
2994        .map(|(k, v)| SqlOption::try_from((k, v)))
2995        .collect::<Result<Vec<SqlOption>, ParserError>>()
2996        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2997    new_sql_options.extend(
2998        add_sql_options
2999            .iter()
3000            .map(|sql_option| (&sql_option.name, sql_option)),
3001    );
3002    *with_properties = new_sql_options.into_values().cloned().collect();
3003    Ok(())
3004}
3005
3006async fn update_sink_fragment_props(
3007    txn: &DatabaseTransaction,
3008    sink_id: SinkId,
3009    props: BTreeMap<String, String>,
3010) -> MetaResult<()> {
3011    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3012        .select_only()
3013        .columns([
3014            fragment::Column::FragmentId,
3015            fragment::Column::FragmentTypeMask,
3016            fragment::Column::StreamNode,
3017        ])
3018        .filter(fragment::Column::JobId.eq(sink_id))
3019        .into_tuple()
3020        .all(txn)
3021        .await?;
3022    let fragments = fragments
3023        .into_iter()
3024        .filter(|(_, fragment_type_mask, _)| {
3025            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3026        })
3027        .filter_map(|(id, _, stream_node)| {
3028            let mut stream_node = stream_node.to_protobuf();
3029            let mut found = false;
3030            visit_stream_node_mut(&mut stream_node, |node| {
3031                if let PbNodeBody::Sink(node) = node
3032                    && let Some(sink_desc) = &mut node.sink_desc
3033                    && sink_desc.id == sink_id
3034                {
3035                    sink_desc.properties.extend(props.clone());
3036                    found = true;
3037                }
3038            });
3039            if found { Some((id, stream_node)) } else { None }
3040        })
3041        .collect_vec();
3042    assert!(
3043        !fragments.is_empty(),
3044        "sink id should be used by at least one fragment"
3045    );
3046    for (id, stream_node) in fragments {
3047        fragment::ActiveModel {
3048            fragment_id: Set(id),
3049            stream_node: Set(StreamNode::from(&stream_node)),
3050            ..Default::default()
3051        }
3052        .update(txn)
3053        .await?;
3054    }
3055    Ok(())
3056}
3057
3058pub struct SinkIntoTableContext {
3059    /// For alter table (e.g., add column), this is the list of existing sink ids
3060    /// otherwise empty.
3061    pub updated_sink_catalogs: Vec<SinkId>,
3062}
3063
3064pub struct FinishAutoRefreshSchemaSinkContext {
3065    pub tmp_sink_id: SinkId,
3066    pub original_sink_id: SinkId,
3067    pub columns: Vec<PbColumnCatalog>,
3068    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3069}
3070
3071async fn update_connector_props_fragments<F>(
3072    txn: &DatabaseTransaction,
3073    job_ids: Vec<JobId>,
3074    expect_flag: FragmentTypeFlag,
3075    mut alter_stream_node_fn: F,
3076    is_shared_source: bool,
3077) -> MetaResult<()>
3078where
3079    F: FnMut(&mut PbNodeBody, &mut bool),
3080{
3081    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3082        .select_only()
3083        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3084        .filter(
3085            fragment::Column::JobId
3086                .is_in(job_ids.clone())
3087                .and(FragmentTypeMask::intersects(expect_flag)),
3088        )
3089        .into_tuple()
3090        .all(txn)
3091        .await?;
3092    let fragments = fragments
3093        .into_iter()
3094        .filter_map(|(id, stream_node)| {
3095            let mut stream_node = stream_node.to_protobuf();
3096            let mut found = false;
3097            visit_stream_node_mut(&mut stream_node, |node| {
3098                alter_stream_node_fn(node, &mut found);
3099            });
3100            if found { Some((id, stream_node)) } else { None }
3101        })
3102        .collect_vec();
3103    if is_shared_source || job_ids.len() > 1 {
3104        // the first element is the source_id or associated table_id
3105        // if the source is non-shared, there is no updated fragments
3106        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3107        assert!(
3108            !fragments.is_empty(),
3109            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3110            job_ids,
3111            expect_flag
3112        );
3113    }
3114
3115    for (id, stream_node) in fragments {
3116        fragment::ActiveModel {
3117            fragment_id: Set(id),
3118            stream_node: Set(StreamNode::from(&stream_node)),
3119            ..Default::default()
3120        }
3121        .update(txn)
3122        .await?;
3123    }
3124
3125    Ok(())
3126}