risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::refresh_job::RefreshState;
39use risingwave_meta_model::table::TableType;
40use risingwave_meta_model::user_privilege::Action;
41use risingwave_meta_model::*;
42use risingwave_pb::catalog::{PbCreateType, PbTable};
43use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::{PbObject, PbObjectGroup};
50use risingwave_pb::plan_common::PbColumnCatalog;
51use risingwave_pb::plan_common::source_refresh_mode::{
52    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
53};
54use risingwave_pb::secret::PbSecretRef;
55use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
56use risingwave_pb::stream_plan::stream_node::PbNodeBody;
57use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
58use risingwave_pb::user::PbUserInfo;
59use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
60use risingwave_sqlparser::parser::{Parser, ParserError};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::{Expr, Query, SimpleExpr};
63use sea_orm::{
64    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
65    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{Command, SharedFragmentInfo};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::utils::{
75    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
76    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
77    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
78    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
79    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
80};
81use crate::error::MetaErrorInner;
82use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
83use crate::model::{
84    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
85    StreamJobFragmentsToCreate,
86};
87use crate::stream::SplitAssignment;
88use crate::{MetaError, MetaResult};
89
90impl CatalogController {
91    pub async fn create_streaming_job_obj(
92        txn: &DatabaseTransaction,
93        obj_type: ObjectType,
94        owner_id: UserId,
95        database_id: Option<DatabaseId>,
96        schema_id: Option<SchemaId>,
97        create_type: PbCreateType,
98        ctx: StreamContext,
99        streaming_parallelism: StreamingParallelism,
100        max_parallelism: usize,
101        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
102    ) -> MetaResult<JobId> {
103        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
104        let job_id = obj.oid.as_job_id();
105        let job = streaming_job::ActiveModel {
106            job_id: Set(job_id),
107            job_status: Set(JobStatus::Initial),
108            create_type: Set(create_type.into()),
109            timezone: Set(ctx.timezone),
110            config_override: Set(Some(ctx.config_override.to_string())),
111            parallelism: Set(streaming_parallelism),
112            max_parallelism: Set(max_parallelism as _),
113            specific_resource_group: Set(specific_resource_group),
114        };
115        job.insert(txn).await?;
116
117        Ok(job_id)
118    }
119
120    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
121    /// materialized view.
122    ///
123    /// Some of the fields in the given streaming job are placeholders, which will
124    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
125    #[await_tree::instrument]
126    pub async fn create_job_catalog(
127        &self,
128        streaming_job: &mut StreamingJob,
129        ctx: &StreamContext,
130        parallelism: &Option<Parallelism>,
131        max_parallelism: usize,
132        mut dependencies: HashSet<ObjectId>,
133        specific_resource_group: Option<String>,
134    ) -> MetaResult<()> {
135        let inner = self.inner.write().await;
136        let txn = inner.db.begin().await?;
137        let create_type = streaming_job.create_type();
138
139        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
140            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
141            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
142            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
143        };
144
145        ensure_user_id(streaming_job.owner() as _, &txn).await?;
146        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
147        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
148        check_relation_name_duplicate(
149            &streaming_job.name(),
150            streaming_job.database_id(),
151            streaming_job.schema_id(),
152            &txn,
153        )
154        .await?;
155
156        // check if any dependency is in altering status.
157        if !dependencies.is_empty() {
158            let altering_cnt = ObjectDependency::find()
159                .join(
160                    JoinType::InnerJoin,
161                    object_dependency::Relation::Object1.def(),
162                )
163                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
164                .filter(
165                    object_dependency::Column::Oid
166                        .is_in(dependencies.clone())
167                        .and(object::Column::ObjType.eq(ObjectType::Table))
168                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
169                        .and(
170                            // It means the referring table is just dummy for altering.
171                            object::Column::Oid.not_in_subquery(
172                                Query::select()
173                                    .column(table::Column::TableId)
174                                    .from(Table)
175                                    .to_owned(),
176                            ),
177                        ),
178                )
179                .count(&txn)
180                .await?;
181            if altering_cnt != 0 {
182                return Err(MetaError::permission_denied(
183                    "some dependent relations are being altered",
184                ));
185            }
186        }
187
188        match streaming_job {
189            StreamingJob::MaterializedView(table) => {
190                let job_id = Self::create_streaming_job_obj(
191                    &txn,
192                    ObjectType::Table,
193                    table.owner as _,
194                    Some(table.database_id),
195                    Some(table.schema_id),
196                    create_type,
197                    ctx.clone(),
198                    streaming_parallelism,
199                    max_parallelism,
200                    specific_resource_group,
201                )
202                .await?;
203                table.id = job_id.as_mv_table_id();
204                let table_model: table::ActiveModel = table.clone().into();
205                Table::insert(table_model).exec(&txn).await?;
206            }
207            StreamingJob::Sink(sink) => {
208                if let Some(target_table_id) = sink.target_table
209                    && check_sink_into_table_cycle(
210                        target_table_id.into(),
211                        dependencies.iter().cloned().collect(),
212                        &txn,
213                    )
214                    .await?
215                {
216                    bail!("Creating such a sink will result in circular dependency.");
217                }
218
219                let job_id = Self::create_streaming_job_obj(
220                    &txn,
221                    ObjectType::Sink,
222                    sink.owner as _,
223                    Some(sink.database_id),
224                    Some(sink.schema_id),
225                    create_type,
226                    ctx.clone(),
227                    streaming_parallelism,
228                    max_parallelism,
229                    specific_resource_group,
230                )
231                .await?;
232                sink.id = job_id.as_sink_id();
233                let sink_model: sink::ActiveModel = sink.clone().into();
234                Sink::insert(sink_model).exec(&txn).await?;
235            }
236            StreamingJob::Table(src, table, _) => {
237                let job_id = Self::create_streaming_job_obj(
238                    &txn,
239                    ObjectType::Table,
240                    table.owner as _,
241                    Some(table.database_id),
242                    Some(table.schema_id),
243                    create_type,
244                    ctx.clone(),
245                    streaming_parallelism,
246                    max_parallelism,
247                    specific_resource_group,
248                )
249                .await?;
250                table.id = job_id.as_mv_table_id();
251                if let Some(src) = src {
252                    let src_obj = Self::create_object(
253                        &txn,
254                        ObjectType::Source,
255                        src.owner as _,
256                        Some(src.database_id),
257                        Some(src.schema_id),
258                    )
259                    .await?;
260                    src.id = src_obj.oid.as_source_id();
261                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
262                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
263                    let source: source::ActiveModel = src.clone().into();
264                    Source::insert(source).exec(&txn).await?;
265                }
266                let table_model: table::ActiveModel = table.clone().into();
267                Table::insert(table_model).exec(&txn).await?;
268
269                if table.refreshable {
270                    let trigger_interval_secs = src
271                        .as_ref()
272                        .and_then(|source_catalog| source_catalog.refresh_mode)
273                        .and_then(
274                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
275                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
276                                    refresh_interval_sec,
277                                })) => refresh_interval_sec,
278                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
279                                None => None,
280                            },
281                        );
282
283                    RefreshJob::insert(refresh_job::ActiveModel {
284                        table_id: Set(table.id),
285                        last_trigger_time: Set(None),
286                        trigger_interval_secs: Set(trigger_interval_secs),
287                        current_status: Set(RefreshState::Idle),
288                        last_success_time: Set(None),
289                    })
290                    .exec(&txn)
291                    .await?;
292                }
293            }
294            StreamingJob::Index(index, table) => {
295                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
296                let job_id = Self::create_streaming_job_obj(
297                    &txn,
298                    ObjectType::Index,
299                    index.owner as _,
300                    Some(index.database_id),
301                    Some(index.schema_id),
302                    create_type,
303                    ctx.clone(),
304                    streaming_parallelism,
305                    max_parallelism,
306                    specific_resource_group,
307                )
308                .await?;
309                // to be compatible with old implementation.
310                index.id = job_id.as_index_id();
311                index.index_table_id = job_id.as_mv_table_id();
312                table.id = job_id.as_mv_table_id();
313
314                ObjectDependency::insert(object_dependency::ActiveModel {
315                    oid: Set(index.primary_table_id.into()),
316                    used_by: Set(table.id.into()),
317                    ..Default::default()
318                })
319                .exec(&txn)
320                .await?;
321
322                let table_model: table::ActiveModel = table.clone().into();
323                Table::insert(table_model).exec(&txn).await?;
324                let index_model: index::ActiveModel = index.clone().into();
325                Index::insert(index_model).exec(&txn).await?;
326            }
327            StreamingJob::Source(src) => {
328                let job_id = Self::create_streaming_job_obj(
329                    &txn,
330                    ObjectType::Source,
331                    src.owner as _,
332                    Some(src.database_id),
333                    Some(src.schema_id),
334                    create_type,
335                    ctx.clone(),
336                    streaming_parallelism,
337                    max_parallelism,
338                    specific_resource_group,
339                )
340                .await?;
341                src.id = job_id.as_shared_source_id();
342                let source_model: source::ActiveModel = src.clone().into();
343                Source::insert(source_model).exec(&txn).await?;
344            }
345        }
346
347        // collect dependent secrets.
348        dependencies.extend(
349            streaming_job
350                .dependent_secret_ids()?
351                .into_iter()
352                .map(|id| id.as_object_id()),
353        );
354        // collect dependent connection
355        dependencies.extend(
356            streaming_job
357                .dependent_connection_ids()?
358                .into_iter()
359                .map(|id| id.as_object_id()),
360        );
361
362        // record object dependency.
363        if !dependencies.is_empty() {
364            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
365                object_dependency::ActiveModel {
366                    oid: Set(oid),
367                    used_by: Set(streaming_job.id().as_object_id()),
368                    ..Default::default()
369                }
370            }))
371            .exec(&txn)
372            .await?;
373        }
374
375        txn.commit().await?;
376
377        Ok(())
378    }
379
380    /// Create catalogs for internal tables, then notify frontend about them if the job is a
381    /// materialized view.
382    ///
383    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
384    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
385    ///
386    /// Returns a mapping from the temporary table id to the actual global table id.
387    pub async fn create_internal_table_catalog(
388        &self,
389        job: &StreamingJob,
390        mut incomplete_internal_tables: Vec<PbTable>,
391    ) -> MetaResult<HashMap<TableId, TableId>> {
392        let job_id = job.id();
393        let inner = self.inner.write().await;
394        let txn = inner.db.begin().await?;
395
396        // Ensure the job exists.
397        ensure_job_not_canceled(job_id, &txn).await?;
398
399        let mut table_id_map = HashMap::new();
400        for table in &mut incomplete_internal_tables {
401            let table_id = Self::create_object(
402                &txn,
403                ObjectType::Table,
404                table.owner as _,
405                Some(table.database_id),
406                Some(table.schema_id),
407            )
408            .await?
409            .oid
410            .as_table_id();
411            table_id_map.insert(table.id, table_id);
412            table.id = table_id;
413            table.job_id = Some(job_id);
414
415            let table_model = table::ActiveModel {
416                table_id: Set(table_id),
417                belongs_to_job_id: Set(Some(job_id)),
418                fragment_id: NotSet,
419                ..table.clone().into()
420            };
421            Table::insert(table_model).exec(&txn).await?;
422        }
423        txn.commit().await?;
424
425        Ok(table_id_map)
426    }
427
428    pub async fn prepare_stream_job_fragments(
429        &self,
430        stream_job_fragments: &StreamJobFragmentsToCreate,
431        streaming_job: &StreamingJob,
432        for_replace: bool,
433    ) -> MetaResult<()> {
434        self.prepare_streaming_job(
435            stream_job_fragments.stream_job_id(),
436            || stream_job_fragments.fragments.values(),
437            &stream_job_fragments.downstreams,
438            for_replace,
439            Some(streaming_job),
440        )
441        .await
442    }
443
444    // TODO: In this function, we also update the `Table` model in the meta store.
445    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
446    // making them the source of truth and performing a full replacement for those in the meta store?
447    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
448    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
449    )]
450    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
451        &self,
452        job_id: JobId,
453        get_fragments: impl Fn() -> I + 'a,
454        downstreams: &FragmentDownstreamRelation,
455        for_replace: bool,
456        creating_streaming_job: Option<&'a StreamingJob>,
457    ) -> MetaResult<()> {
458        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
459
460        let inner = self.inner.write().await;
461
462        let need_notify = creating_streaming_job
463            .map(|job| job.should_notify_creating())
464            .unwrap_or(false);
465        let definition = creating_streaming_job.map(|job| job.definition());
466
467        let mut objects_to_notify = vec![];
468        let txn = inner.db.begin().await?;
469
470        // Ensure the job exists.
471        ensure_job_not_canceled(job_id, &txn).await?;
472
473        for fragment in fragments {
474            let fragment_id = fragment.fragment_id;
475            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
476
477            let fragment = fragment.into_active_model();
478            Fragment::insert(fragment).exec(&txn).await?;
479
480            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
481            // After table fragments are created, update them for all tables.
482            if !for_replace {
483                let all_tables = StreamJobFragments::collect_tables(get_fragments());
484                for state_table_id in state_table_ids {
485                    // Table's vnode count is not always the fragment's vnode count, so we have to
486                    // look up the table from `TableFragments`.
487                    // See `ActorGraphBuilder::new`.
488                    let table = all_tables
489                        .get(&state_table_id)
490                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
491                    assert_eq!(table.id, state_table_id);
492                    assert_eq!(table.fragment_id, fragment_id);
493                    let vnode_count = table.vnode_count();
494
495                    table::ActiveModel {
496                        table_id: Set(state_table_id as _),
497                        fragment_id: Set(Some(fragment_id)),
498                        vnode_count: Set(vnode_count as _),
499                        ..Default::default()
500                    }
501                    .update(&txn)
502                    .await?;
503
504                    if need_notify {
505                        let mut table = table.clone();
506                        // In production, definition was replaced but still needed for notification.
507                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
508                            table.definition = definition.clone().unwrap();
509                        }
510                        objects_to_notify.push(PbObject {
511                            object_info: Some(PbObjectInfo::Table(table)),
512                        });
513                    }
514                }
515            }
516        }
517
518        // Add streaming job objects to notification
519        if need_notify {
520            match creating_streaming_job.unwrap() {
521                StreamingJob::Table(Some(source), ..) => {
522                    objects_to_notify.push(PbObject {
523                        object_info: Some(PbObjectInfo::Source(source.clone())),
524                    });
525                }
526                StreamingJob::Sink(sink) => {
527                    objects_to_notify.push(PbObject {
528                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
529                    });
530                }
531                StreamingJob::Index(index, _) => {
532                    objects_to_notify.push(PbObject {
533                        object_info: Some(PbObjectInfo::Index(index.clone())),
534                    });
535                }
536                _ => {}
537            }
538        }
539
540        insert_fragment_relations(&txn, downstreams).await?;
541
542        if !for_replace {
543            // Update dml fragment id.
544            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
545                Table::update(table::ActiveModel {
546                    table_id: Set(table.id),
547                    dml_fragment_id: Set(table.dml_fragment_id),
548                    ..Default::default()
549                })
550                .exec(&txn)
551                .await?;
552            }
553        }
554
555        txn.commit().await?;
556
557        // FIXME: there's a gap between the catalog creation and notification, which may lead to
558        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
559        // need to either forward the notification or filter catalogs without any fragments in the metastore.
560        if !objects_to_notify.is_empty() {
561            self.notify_frontend(
562                Operation::Add,
563                Info::ObjectGroup(PbObjectGroup {
564                    objects: objects_to_notify,
565                }),
566            )
567            .await;
568        }
569
570        Ok(())
571    }
572
573    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
574    /// information is required to build barrier mutation.
575    pub async fn build_cancel_command(
576        &self,
577        table_fragments: &StreamJobFragments,
578    ) -> MetaResult<Command> {
579        let inner = self.inner.read().await;
580        let txn = inner.db.begin().await?;
581
582        let dropped_sink_fragment_with_target =
583            if let Some(sink_fragment) = table_fragments.sink_fragment() {
584                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
585                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
586                sink_target_fragment
587                    .get(&sink_fragment_id)
588                    .map(|target_fragments| {
589                        let target_fragment_id = *target_fragments
590                            .first()
591                            .expect("sink should have at least one downstream fragment");
592                        (sink_fragment_id, target_fragment_id)
593                    })
594            } else {
595                None
596            };
597
598        Ok(Command::DropStreamingJobs {
599            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
600            actors: table_fragments.actor_ids().collect(),
601            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
602            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
603            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
604                .into_iter()
605                .map(|(sink, target)| (target as _, vec![sink as _]))
606                .collect(),
607        })
608    }
609
610    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
611    /// It returns (true, _) if the job is not found or aborted.
612    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
613    #[await_tree::instrument]
614    pub async fn try_abort_creating_streaming_job(
615        &self,
616        mut job_id: JobId,
617        is_cancelled: bool,
618    ) -> MetaResult<(bool, Option<DatabaseId>)> {
619        let mut inner = self.inner.write().await;
620        let txn = inner.db.begin().await?;
621
622        let obj = Object::find_by_id(job_id).one(&txn).await?;
623        let Some(obj) = obj else {
624            tracing::warn!(
625                id = %job_id,
626                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
627            );
628            return Ok((true, None));
629        };
630        let database_id = obj
631            .database_id
632            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
633        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
634
635        if !is_cancelled && let Some(streaming_job) = &streaming_job {
636            assert_ne!(streaming_job.job_status, JobStatus::Created);
637            if streaming_job.create_type == CreateType::Background
638                && streaming_job.job_status == JobStatus::Creating
639            {
640                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
641                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
642                {
643                    // If the job belongs to an iceberg table, we still need to clean it.
644                } else {
645                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
646                    tracing::warn!(
647                        id = %job_id,
648                        "streaming job is created in background and still in creating status"
649                    );
650                    return Ok((false, Some(database_id)));
651                }
652            }
653        }
654
655        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
656        let original_job_id = job_id;
657        let original_obj_type = obj.obj_type;
658
659        let iceberg_table_id =
660            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
661        if let Some(iceberg_table_id) = iceberg_table_id {
662            // If the job is iceberg sink, we need to clean the iceberg table as well.
663            // Here we will drop the sink objects directly.
664            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
665            Object::delete_many()
666                .filter(
667                    object::Column::Oid
668                        .eq(job_id)
669                        .or(object::Column::Oid.is_in(internal_tables)),
670                )
671                .exec(&txn)
672                .await?;
673            job_id = iceberg_table_id.as_job_id();
674        };
675
676        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
677
678        // Get the notification info if the job is a materialized view or created in the background.
679        let mut objs = vec![];
680        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
681
682        let mut need_notify =
683            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
684        if !need_notify {
685            if let Some(table) = &table_obj {
686                need_notify = table.table_type == TableType::MaterializedView;
687            } else if original_obj_type == ObjectType::Sink {
688                need_notify = true;
689            }
690        }
691
692        if is_cancelled {
693            let dropped_tables = Table::find()
694                .find_also_related(Object)
695                .filter(
696                    table::Column::TableId.is_in(
697                        internal_table_ids
698                            .iter()
699                            .cloned()
700                            .chain(table_obj.iter().map(|t| t.table_id as _)),
701                    ),
702                )
703                .all(&txn)
704                .await?
705                .into_iter()
706                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
707            inner
708                .dropped_tables
709                .extend(dropped_tables.map(|t| (t.id, t)));
710        }
711
712        if need_notify {
713            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
714            // Ensure we still notify the frontend to delete the original sink object.
715            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
716                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
717                    .select_only()
718                    .columns([
719                        object::Column::Oid,
720                        object::Column::ObjType,
721                        object::Column::SchemaId,
722                        object::Column::DatabaseId,
723                    ])
724                    .into_partial_model()
725                    .one(&txn)
726                    .await?;
727                if let Some(orig_obj) = orig_obj {
728                    objs.push(orig_obj);
729                }
730            }
731
732            let obj: Option<PartialObject> = Object::find_by_id(job_id)
733                .select_only()
734                .columns([
735                    object::Column::Oid,
736                    object::Column::ObjType,
737                    object::Column::SchemaId,
738                    object::Column::DatabaseId,
739                ])
740                .into_partial_model()
741                .one(&txn)
742                .await?;
743            let obj =
744                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
745            objs.push(obj);
746            let internal_table_objs: Vec<PartialObject> = Object::find()
747                .select_only()
748                .columns([
749                    object::Column::Oid,
750                    object::Column::ObjType,
751                    object::Column::SchemaId,
752                    object::Column::DatabaseId,
753                ])
754                .join(JoinType::InnerJoin, object::Relation::Table.def())
755                .filter(table::Column::BelongsToJobId.eq(job_id))
756                .into_partial_model()
757                .all(&txn)
758                .await?;
759            objs.extend(internal_table_objs);
760        }
761
762        // Check if the job is creating sink into table.
763        if table_obj.is_none()
764            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
765                .select_only()
766                .column(sink::Column::TargetTable)
767                .into_tuple::<Option<TableId>>()
768                .one(&txn)
769                .await?
770        {
771            let tmp_id: Option<ObjectId> = ObjectDependency::find()
772                .select_only()
773                .column(object_dependency::Column::UsedBy)
774                .join(
775                    JoinType::InnerJoin,
776                    object_dependency::Relation::Object1.def(),
777                )
778                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
779                .filter(
780                    object_dependency::Column::Oid
781                        .eq(target_table_id)
782                        .and(object::Column::ObjType.eq(ObjectType::Table))
783                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
784                )
785                .into_tuple()
786                .one(&txn)
787                .await?;
788            if let Some(tmp_id) = tmp_id {
789                tracing::warn!(
790                    id = %tmp_id,
791                    "aborting temp streaming job for sink into table"
792                );
793
794                Object::delete_by_id(tmp_id).exec(&txn).await?;
795            }
796        }
797
798        Object::delete_by_id(job_id).exec(&txn).await?;
799        if !internal_table_ids.is_empty() {
800            Object::delete_many()
801                .filter(object::Column::Oid.is_in(internal_table_ids))
802                .exec(&txn)
803                .await?;
804        }
805        if let Some(t) = &table_obj
806            && let Some(source_id) = t.optional_associated_source_id
807        {
808            Object::delete_by_id(source_id).exec(&txn).await?;
809        }
810
811        let err = if is_cancelled {
812            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
813        } else {
814            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
815        };
816        let abort_reason = format!("streaming job aborted {}", err.as_report());
817        for tx in inner
818            .creating_table_finish_notifier
819            .get_mut(&database_id)
820            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
821            .into_iter()
822            .flatten()
823            .flatten()
824        {
825            let _ = tx.send(Err(abort_reason.clone()));
826        }
827        txn.commit().await?;
828
829        if !objs.is_empty() {
830            // We also have notified the frontend about these objects,
831            // so we need to notify the frontend to delete them here.
832            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
833                .await;
834        }
835        Ok((true, Some(database_id)))
836    }
837
838    #[await_tree::instrument]
839    pub async fn post_collect_job_fragments(
840        &self,
841        job_id: JobId,
842        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
843        new_sink_downstream: Option<FragmentDownstreamRelation>,
844        split_assignment: Option<&SplitAssignment>,
845    ) -> MetaResult<()> {
846        let inner = self.inner.write().await;
847        let txn = inner.db.begin().await?;
848
849        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
850
851        if let Some(new_downstream) = new_sink_downstream {
852            insert_fragment_relations(&txn, &new_downstream).await?;
853        }
854
855        // Mark job as CREATING.
856        streaming_job::ActiveModel {
857            job_id: Set(job_id),
858            job_status: Set(JobStatus::Creating),
859            ..Default::default()
860        }
861        .update(&txn)
862        .await?;
863
864        if let Some(split_assignment) = split_assignment {
865            let fragment_splits = split_assignment
866                .iter()
867                .map(|(fragment_id, splits)| {
868                    (
869                        *fragment_id as _,
870                        splits.values().flatten().cloned().collect_vec(),
871                    )
872                })
873                .collect();
874
875            self.update_fragment_splits(&txn, &fragment_splits).await?;
876        }
877
878        txn.commit().await?;
879
880        Ok(())
881    }
882
883    pub async fn create_job_catalog_for_replace(
884        &self,
885        streaming_job: &StreamingJob,
886        ctx: Option<&StreamContext>,
887        specified_parallelism: Option<&NonZeroUsize>,
888        expected_original_max_parallelism: Option<usize>,
889    ) -> MetaResult<JobId> {
890        let id = streaming_job.id();
891        let inner = self.inner.write().await;
892        let txn = inner.db.begin().await?;
893
894        // 1. check version.
895        streaming_job.verify_version_for_replace(&txn).await?;
896        // 2. check concurrent replace.
897        let referring_cnt = ObjectDependency::find()
898            .join(
899                JoinType::InnerJoin,
900                object_dependency::Relation::Object1.def(),
901            )
902            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
903            .filter(
904                object_dependency::Column::Oid
905                    .eq(id)
906                    .and(object::Column::ObjType.eq(ObjectType::Table))
907                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
908            )
909            .count(&txn)
910            .await?;
911        if referring_cnt != 0 {
912            return Err(MetaError::permission_denied(
913                "job is being altered or referenced by some creating jobs",
914            ));
915        }
916
917        // 3. check parallelism.
918        let (original_max_parallelism, original_timezone, original_config_override): (
919            i32,
920            Option<String>,
921            Option<String>,
922        ) = StreamingJobModel::find_by_id(id)
923            .select_only()
924            .column(streaming_job::Column::MaxParallelism)
925            .column(streaming_job::Column::Timezone)
926            .column(streaming_job::Column::ConfigOverride)
927            .into_tuple()
928            .one(&txn)
929            .await?
930            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
931
932        if let Some(max_parallelism) = expected_original_max_parallelism
933            && original_max_parallelism != max_parallelism as i32
934        {
935            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
936            // This should not happen in normal cases.
937            bail!(
938                "cannot use a different max parallelism \
939                 when replacing streaming job, \
940                 original: {}, new: {}",
941                original_max_parallelism,
942                max_parallelism
943            );
944        }
945
946        let parallelism = match specified_parallelism {
947            None => StreamingParallelism::Adaptive,
948            Some(n) => StreamingParallelism::Fixed(n.get() as _),
949        };
950
951        let ctx = StreamContext {
952            timezone: ctx
953                .map(|ctx| ctx.timezone.clone())
954                .unwrap_or(original_timezone),
955            // We don't expect replacing a job with a different config override.
956            // Thus always use the original config override.
957            config_override: original_config_override.unwrap_or_default().into(),
958        };
959
960        // 4. create streaming object for new replace table.
961        let new_obj_id = Self::create_streaming_job_obj(
962            &txn,
963            streaming_job.object_type(),
964            streaming_job.owner() as _,
965            Some(streaming_job.database_id() as _),
966            Some(streaming_job.schema_id() as _),
967            streaming_job.create_type(),
968            ctx,
969            parallelism,
970            original_max_parallelism as _,
971            None,
972        )
973        .await?;
974
975        // 5. record dependency for new replace table.
976        ObjectDependency::insert(object_dependency::ActiveModel {
977            oid: Set(id.as_object_id()),
978            used_by: Set(new_obj_id.as_object_id()),
979            ..Default::default()
980        })
981        .exec(&txn)
982        .await?;
983
984        txn.commit().await?;
985
986        Ok(new_obj_id)
987    }
988
989    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
990    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
991        let mut inner = self.inner.write().await;
992        let txn = inner.db.begin().await?;
993
994        // Check if the job belongs to iceberg table.
995        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
996            tracing::info!(
997                "streaming job {} is for iceberg table, wait for manual finish operation",
998                job_id
999            );
1000            return Ok(());
1001        }
1002
1003        let (notification_op, objects, updated_user_info) =
1004            Self::finish_streaming_job_inner(&txn, job_id).await?;
1005
1006        txn.commit().await?;
1007
1008        let mut version = self
1009            .notify_frontend(
1010                notification_op,
1011                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1012            )
1013            .await;
1014
1015        // notify users about the default privileges
1016        if !updated_user_info.is_empty() {
1017            version = self.notify_users_update(updated_user_info).await;
1018        }
1019
1020        inner
1021            .creating_table_finish_notifier
1022            .values_mut()
1023            .for_each(|creating_tables| {
1024                if let Some(txs) = creating_tables.remove(&job_id) {
1025                    for tx in txs {
1026                        let _ = tx.send(Ok(version));
1027                    }
1028                }
1029            });
1030
1031        Ok(())
1032    }
1033
1034    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1035    pub async fn finish_streaming_job_inner(
1036        txn: &DatabaseTransaction,
1037        job_id: JobId,
1038    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1039        let job_type = Object::find_by_id(job_id)
1040            .select_only()
1041            .column(object::Column::ObjType)
1042            .into_tuple()
1043            .one(txn)
1044            .await?
1045            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1046
1047        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1048            .select_only()
1049            .column(streaming_job::Column::CreateType)
1050            .into_tuple()
1051            .one(txn)
1052            .await?
1053            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1054
1055        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1056        let res = Object::update_many()
1057            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1058            .col_expr(
1059                object::Column::CreatedAtClusterVersion,
1060                current_cluster_version().into(),
1061            )
1062            .filter(object::Column::Oid.eq(job_id))
1063            .exec(txn)
1064            .await?;
1065        if res.rows_affected == 0 {
1066            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1067        }
1068
1069        // mark the target stream job as `Created`.
1070        let job = streaming_job::ActiveModel {
1071            job_id: Set(job_id),
1072            job_status: Set(JobStatus::Created),
1073            ..Default::default()
1074        };
1075        job.update(txn).await?;
1076
1077        // notify frontend: job, internal tables.
1078        let internal_table_objs = Table::find()
1079            .find_also_related(Object)
1080            .filter(table::Column::BelongsToJobId.eq(job_id))
1081            .all(txn)
1082            .await?;
1083        let mut objects = internal_table_objs
1084            .iter()
1085            .map(|(table, obj)| PbObject {
1086                object_info: Some(PbObjectInfo::Table(
1087                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1088                )),
1089            })
1090            .collect_vec();
1091        let mut notification_op = if create_type == CreateType::Background {
1092            NotificationOperation::Update
1093        } else {
1094            NotificationOperation::Add
1095        };
1096        let mut updated_user_info = vec![];
1097        let mut need_grant_default_privileges = true;
1098
1099        match job_type {
1100            ObjectType::Table => {
1101                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1102                    .find_also_related(Object)
1103                    .one(txn)
1104                    .await?
1105                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1106                if table.table_type == TableType::MaterializedView {
1107                    notification_op = NotificationOperation::Update;
1108                }
1109
1110                if let Some(source_id) = table.optional_associated_source_id {
1111                    let (src, obj) = Source::find_by_id(source_id)
1112                        .find_also_related(Object)
1113                        .one(txn)
1114                        .await?
1115                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1116                    objects.push(PbObject {
1117                        object_info: Some(PbObjectInfo::Source(
1118                            ObjectModel(src, obj.unwrap()).into(),
1119                        )),
1120                    });
1121                }
1122                objects.push(PbObject {
1123                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1124                });
1125            }
1126            ObjectType::Sink => {
1127                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1128                    .find_also_related(Object)
1129                    .one(txn)
1130                    .await?
1131                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1132                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1133                    need_grant_default_privileges = false;
1134                }
1135                // If sinks were pre-notified during CREATING, we should use Update at finish
1136                // to avoid duplicate Add notifications (align with MV behavior).
1137                notification_op = NotificationOperation::Update;
1138                objects.push(PbObject {
1139                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1140                });
1141            }
1142            ObjectType::Index => {
1143                need_grant_default_privileges = false;
1144                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1145                    .find_also_related(Object)
1146                    .one(txn)
1147                    .await?
1148                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1149                {
1150                    let (table, obj) = Table::find_by_id(index.index_table_id)
1151                        .find_also_related(Object)
1152                        .one(txn)
1153                        .await?
1154                        .ok_or_else(|| {
1155                            MetaError::catalog_id_not_found("table", index.index_table_id)
1156                        })?;
1157                    objects.push(PbObject {
1158                        object_info: Some(PbObjectInfo::Table(
1159                            ObjectModel(table, obj.unwrap()).into(),
1160                        )),
1161                    });
1162                }
1163
1164                // If the index is created on a table with privileges, we should also
1165                // grant the privileges for the index and its state tables.
1166                let primary_table_privileges = UserPrivilege::find()
1167                    .filter(
1168                        user_privilege::Column::Oid
1169                            .eq(index.primary_table_id)
1170                            .and(user_privilege::Column::Action.eq(Action::Select)),
1171                    )
1172                    .all(txn)
1173                    .await?;
1174                if !primary_table_privileges.is_empty() {
1175                    let index_state_table_ids: Vec<TableId> = Table::find()
1176                        .select_only()
1177                        .column(table::Column::TableId)
1178                        .filter(
1179                            table::Column::BelongsToJobId
1180                                .eq(job_id)
1181                                .or(table::Column::TableId.eq(index.index_table_id)),
1182                        )
1183                        .into_tuple()
1184                        .all(txn)
1185                        .await?;
1186                    let mut new_privileges = vec![];
1187                    for privilege in &primary_table_privileges {
1188                        for state_table_id in &index_state_table_ids {
1189                            new_privileges.push(user_privilege::ActiveModel {
1190                                id: Default::default(),
1191                                oid: Set(state_table_id.as_object_id()),
1192                                user_id: Set(privilege.user_id),
1193                                action: Set(Action::Select),
1194                                dependent_id: Set(privilege.dependent_id),
1195                                granted_by: Set(privilege.granted_by),
1196                                with_grant_option: Set(privilege.with_grant_option),
1197                            });
1198                        }
1199                    }
1200                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1201
1202                    updated_user_info = list_user_info_by_ids(
1203                        primary_table_privileges.into_iter().map(|p| p.user_id),
1204                        txn,
1205                    )
1206                    .await?;
1207                }
1208
1209                objects.push(PbObject {
1210                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1211                });
1212            }
1213            ObjectType::Source => {
1214                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1215                    .find_also_related(Object)
1216                    .one(txn)
1217                    .await?
1218                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1219                objects.push(PbObject {
1220                    object_info: Some(PbObjectInfo::Source(
1221                        ObjectModel(source, obj.unwrap()).into(),
1222                    )),
1223                });
1224            }
1225            _ => unreachable!("invalid job type: {:?}", job_type),
1226        }
1227
1228        if need_grant_default_privileges {
1229            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1230        }
1231
1232        Ok((notification_op, objects, updated_user_info))
1233    }
1234
1235    pub async fn finish_replace_streaming_job(
1236        &self,
1237        tmp_id: JobId,
1238        streaming_job: StreamingJob,
1239        replace_upstream: FragmentReplaceUpstream,
1240        sink_into_table_context: SinkIntoTableContext,
1241        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1242        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1243    ) -> MetaResult<NotificationVersion> {
1244        let inner = self.inner.write().await;
1245        let txn = inner.db.begin().await?;
1246
1247        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1248            tmp_id,
1249            replace_upstream,
1250            sink_into_table_context,
1251            &txn,
1252            streaming_job,
1253            drop_table_connector_ctx,
1254            auto_refresh_schema_sinks,
1255        )
1256        .await?;
1257
1258        txn.commit().await?;
1259
1260        let mut version = self
1261            .notify_frontend(
1262                NotificationOperation::Update,
1263                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1264            )
1265            .await;
1266
1267        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1268            self.notify_users_update(user_infos).await;
1269            version = self
1270                .notify_frontend(
1271                    NotificationOperation::Delete,
1272                    build_object_group_for_delete(to_drop_objects),
1273                )
1274                .await;
1275        }
1276
1277        Ok(version)
1278    }
1279
1280    pub async fn finish_replace_streaming_job_inner(
1281        tmp_id: JobId,
1282        replace_upstream: FragmentReplaceUpstream,
1283        SinkIntoTableContext {
1284            updated_sink_catalogs,
1285        }: SinkIntoTableContext,
1286        txn: &DatabaseTransaction,
1287        streaming_job: StreamingJob,
1288        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1289        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1290    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1291        let original_job_id = streaming_job.id();
1292        let job_type = streaming_job.job_type();
1293
1294        let mut index_item_rewriter = None;
1295
1296        // Update catalog
1297        match streaming_job {
1298            StreamingJob::Table(_source, table, _table_job_type) => {
1299                // The source catalog should remain unchanged
1300
1301                let original_column_catalogs =
1302                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1303
1304                index_item_rewriter = Some({
1305                    let original_columns = original_column_catalogs
1306                        .to_protobuf()
1307                        .into_iter()
1308                        .map(|c| c.column_desc.unwrap())
1309                        .collect_vec();
1310                    let new_columns = table
1311                        .columns
1312                        .iter()
1313                        .map(|c| c.column_desc.clone().unwrap())
1314                        .collect_vec();
1315
1316                    IndexItemRewriter {
1317                        original_columns,
1318                        new_columns,
1319                    }
1320                });
1321
1322                // For sinks created in earlier versions, we need to set the original_target_columns.
1323                for sink_id in updated_sink_catalogs {
1324                    sink::ActiveModel {
1325                        sink_id: Set(sink_id as _),
1326                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1327                        ..Default::default()
1328                    }
1329                    .update(txn)
1330                    .await?;
1331                }
1332                // Update the table catalog with the new one. (column catalog is also updated here)
1333                let mut table = table::ActiveModel::from(table);
1334                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1335                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1336                {
1337                    // drop table connector, the rest logic is in `drop_table_associated_source`
1338                    table.optional_associated_source_id = Set(None);
1339                }
1340
1341                table.update(txn).await?;
1342            }
1343            StreamingJob::Source(source) => {
1344                // Update the source catalog with the new one.
1345                let source = source::ActiveModel::from(source);
1346                source.update(txn).await?;
1347            }
1348            StreamingJob::MaterializedView(table) => {
1349                // Update the table catalog with the new one.
1350                let table = table::ActiveModel::from(table);
1351                table.update(txn).await?;
1352            }
1353            _ => unreachable!(
1354                "invalid streaming job type: {:?}",
1355                streaming_job.job_type_str()
1356            ),
1357        }
1358
1359        async fn finish_fragments(
1360            txn: &DatabaseTransaction,
1361            tmp_id: JobId,
1362            original_job_id: JobId,
1363            replace_upstream: FragmentReplaceUpstream,
1364        ) -> MetaResult<()> {
1365            // 0. update internal tables
1366            // Fields including `fragment_id` were placeholder values before.
1367            // After table fragments are created, update them for all internal tables.
1368            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1369                .select_only()
1370                .columns([
1371                    fragment::Column::FragmentId,
1372                    fragment::Column::StateTableIds,
1373                ])
1374                .filter(fragment::Column::JobId.eq(tmp_id))
1375                .into_tuple()
1376                .all(txn)
1377                .await?;
1378            for (fragment_id, state_table_ids) in fragment_info {
1379                for state_table_id in state_table_ids.into_inner() {
1380                    let state_table_id = TableId::new(state_table_id as _);
1381                    table::ActiveModel {
1382                        table_id: Set(state_table_id),
1383                        fragment_id: Set(Some(fragment_id)),
1384                        // No need to update `vnode_count` because it must remain the same.
1385                        ..Default::default()
1386                    }
1387                    .update(txn)
1388                    .await?;
1389                }
1390            }
1391
1392            // 1. replace old fragments/actors with new ones.
1393            Fragment::delete_many()
1394                .filter(fragment::Column::JobId.eq(original_job_id))
1395                .exec(txn)
1396                .await?;
1397            Fragment::update_many()
1398                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1399                .filter(fragment::Column::JobId.eq(tmp_id))
1400                .exec(txn)
1401                .await?;
1402
1403            // 2. update merges.
1404            // update downstream fragment's Merge node, and upstream_fragment_id
1405            for (fragment_id, fragment_replace_map) in replace_upstream {
1406                let (fragment_id, mut stream_node) =
1407                    Fragment::find_by_id(fragment_id as FragmentId)
1408                        .select_only()
1409                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1410                        .into_tuple::<(FragmentId, StreamNode)>()
1411                        .one(txn)
1412                        .await?
1413                        .map(|(id, node)| (id, node.to_protobuf()))
1414                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1415
1416                visit_stream_node_mut(&mut stream_node, |body| {
1417                    if let PbNodeBody::Merge(m) = body
1418                        && let Some(new_fragment_id) =
1419                            fragment_replace_map.get(&m.upstream_fragment_id)
1420                    {
1421                        m.upstream_fragment_id = *new_fragment_id;
1422                    }
1423                });
1424                fragment::ActiveModel {
1425                    fragment_id: Set(fragment_id),
1426                    stream_node: Set(StreamNode::from(&stream_node)),
1427                    ..Default::default()
1428                }
1429                .update(txn)
1430                .await?;
1431            }
1432
1433            // 3. remove dummy object.
1434            Object::delete_by_id(tmp_id).exec(txn).await?;
1435
1436            Ok(())
1437        }
1438
1439        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1440
1441        // 4. update catalogs and notify.
1442        let mut objects = vec![];
1443        match job_type {
1444            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1445                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1446                    .find_also_related(Object)
1447                    .one(txn)
1448                    .await?
1449                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1450                objects.push(PbObject {
1451                    object_info: Some(PbObjectInfo::Table(
1452                        ObjectModel(table, table_obj.unwrap()).into(),
1453                    )),
1454                })
1455            }
1456            StreamingJobType::Source => {
1457                let (source, source_obj) =
1458                    Source::find_by_id(original_job_id.as_shared_source_id())
1459                        .find_also_related(Object)
1460                        .one(txn)
1461                        .await?
1462                        .ok_or_else(|| {
1463                            MetaError::catalog_id_not_found("object", original_job_id)
1464                        })?;
1465                objects.push(PbObject {
1466                    object_info: Some(PbObjectInfo::Source(
1467                        ObjectModel(source, source_obj.unwrap()).into(),
1468                    )),
1469                })
1470            }
1471            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1472        }
1473
1474        if let Some(expr_rewriter) = index_item_rewriter {
1475            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1476                .select_only()
1477                .columns([index::Column::IndexId, index::Column::IndexItems])
1478                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1479                .into_tuple()
1480                .all(txn)
1481                .await?;
1482            for (index_id, nodes) in index_items {
1483                let mut pb_nodes = nodes.to_protobuf();
1484                pb_nodes
1485                    .iter_mut()
1486                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1487                let index = index::ActiveModel {
1488                    index_id: Set(index_id),
1489                    index_items: Set(pb_nodes.into()),
1490                    ..Default::default()
1491                }
1492                .update(txn)
1493                .await?;
1494                let index_obj = index
1495                    .find_related(Object)
1496                    .one(txn)
1497                    .await?
1498                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1499                objects.push(PbObject {
1500                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1501                });
1502            }
1503        }
1504
1505        if let Some(sinks) = auto_refresh_schema_sinks {
1506            for finish_sink_context in sinks {
1507                finish_fragments(
1508                    txn,
1509                    finish_sink_context.tmp_sink_id.as_job_id(),
1510                    finish_sink_context.original_sink_id.as_job_id(),
1511                    Default::default(),
1512                )
1513                .await?;
1514                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1515                    .find_also_related(Object)
1516                    .one(txn)
1517                    .await?
1518                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1519                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1520                Sink::update(sink::ActiveModel {
1521                    sink_id: Set(finish_sink_context.original_sink_id),
1522                    columns: Set(columns.clone()),
1523                    ..Default::default()
1524                })
1525                .exec(txn)
1526                .await?;
1527                sink.columns = columns;
1528                objects.push(PbObject {
1529                    object_info: Some(PbObjectInfo::Sink(
1530                        ObjectModel(sink, sink_obj.unwrap()).into(),
1531                    )),
1532                });
1533                if let Some((log_store_table_id, new_log_store_table_columns)) =
1534                    finish_sink_context.new_log_store_table
1535                {
1536                    let new_log_store_table_columns: ColumnCatalogArray =
1537                        new_log_store_table_columns.into();
1538                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1539                        .find_also_related(Object)
1540                        .one(txn)
1541                        .await?
1542                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1543                    Table::update(table::ActiveModel {
1544                        table_id: Set(log_store_table_id),
1545                        columns: Set(new_log_store_table_columns.clone()),
1546                        ..Default::default()
1547                    })
1548                    .exec(txn)
1549                    .await?;
1550                    table.columns = new_log_store_table_columns;
1551                    objects.push(PbObject {
1552                        object_info: Some(PbObjectInfo::Table(
1553                            ObjectModel(table, table_obj.unwrap()).into(),
1554                        )),
1555                    });
1556                }
1557            }
1558        }
1559
1560        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1561        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1562            notification_objs =
1563                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1564        }
1565
1566        Ok((objects, notification_objs))
1567    }
1568
1569    /// Abort the replacing streaming job by deleting the temporary job object.
1570    pub async fn try_abort_replacing_streaming_job(
1571        &self,
1572        tmp_job_id: JobId,
1573        tmp_sink_ids: Option<Vec<ObjectId>>,
1574    ) -> MetaResult<()> {
1575        let inner = self.inner.write().await;
1576        let txn = inner.db.begin().await?;
1577        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1578        if let Some(tmp_sink_ids) = tmp_sink_ids {
1579            for tmp_sink_id in tmp_sink_ids {
1580                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1581            }
1582        }
1583        txn.commit().await?;
1584        Ok(())
1585    }
1586
1587    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1588    // return the actor_ids to be applied
1589    pub async fn update_source_rate_limit_by_source_id(
1590        &self,
1591        source_id: SourceId,
1592        rate_limit: Option<u32>,
1593    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1594        let inner = self.inner.read().await;
1595        let txn = inner.db.begin().await?;
1596
1597        {
1598            let active_source = source::ActiveModel {
1599                source_id: Set(source_id),
1600                rate_limit: Set(rate_limit.map(|v| v as i32)),
1601                ..Default::default()
1602            };
1603            active_source.update(&txn).await?;
1604        }
1605
1606        let (source, obj) = Source::find_by_id(source_id)
1607            .find_also_related(Object)
1608            .one(&txn)
1609            .await?
1610            .ok_or_else(|| {
1611                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1612            })?;
1613
1614        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1615        let streaming_job_ids: Vec<JobId> =
1616            if let Some(table_id) = source.optional_associated_table_id {
1617                vec![table_id.as_job_id()]
1618            } else if let Some(source_info) = &source.source_info
1619                && source_info.to_protobuf().is_shared()
1620            {
1621                vec![source_id.as_share_source_job_id()]
1622            } else {
1623                ObjectDependency::find()
1624                    .select_only()
1625                    .column(object_dependency::Column::UsedBy)
1626                    .filter(object_dependency::Column::Oid.eq(source_id))
1627                    .into_tuple()
1628                    .all(&txn)
1629                    .await?
1630            };
1631
1632        if streaming_job_ids.is_empty() {
1633            return Err(MetaError::invalid_parameter(format!(
1634                "source id {source_id} not used by any streaming job"
1635            )));
1636        }
1637
1638        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1639            .select_only()
1640            .columns([
1641                fragment::Column::FragmentId,
1642                fragment::Column::FragmentTypeMask,
1643                fragment::Column::StreamNode,
1644            ])
1645            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1646            .into_tuple()
1647            .all(&txn)
1648            .await?;
1649        let mut fragments = fragments
1650            .into_iter()
1651            .map(|(id, mask, stream_node)| {
1652                (
1653                    id,
1654                    FragmentTypeMask::from(mask as u32),
1655                    stream_node.to_protobuf(),
1656                )
1657            })
1658            .collect_vec();
1659
1660        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1661            let mut found = false;
1662            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1663                visit_stream_node_mut(stream_node, |node| {
1664                    if let PbNodeBody::Source(node) = node
1665                        && let Some(node_inner) = &mut node.source_inner
1666                        && node_inner.source_id == source_id
1667                    {
1668                        node_inner.rate_limit = rate_limit;
1669                        found = true;
1670                    }
1671                });
1672            }
1673            if is_fs_source {
1674                // in older versions, there's no fragment type flag for `FsFetch` node,
1675                // so we just scan all fragments for StreamFsFetch node if using fs connector
1676                visit_stream_node_mut(stream_node, |node| {
1677                    if let PbNodeBody::StreamFsFetch(node) = node {
1678                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1679                        if let Some(node_inner) = &mut node.node_inner
1680                            && node_inner.source_id == source_id
1681                        {
1682                            node_inner.rate_limit = rate_limit;
1683                            found = true;
1684                        }
1685                    }
1686                });
1687            }
1688            found
1689        });
1690
1691        assert!(
1692            !fragments.is_empty(),
1693            "source id should be used by at least one fragment"
1694        );
1695        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1696
1697        for (id, fragment_type_mask, stream_node) in fragments {
1698            fragment::ActiveModel {
1699                fragment_id: Set(id),
1700                fragment_type_mask: Set(fragment_type_mask.into()),
1701                stream_node: Set(StreamNode::from(&stream_node)),
1702                ..Default::default()
1703            }
1704            .update(&txn)
1705            .await?;
1706        }
1707
1708        txn.commit().await?;
1709
1710        let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1711
1712        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1713        let _version = self
1714            .notify_frontend(
1715                NotificationOperation::Update,
1716                NotificationInfo::ObjectGroup(PbObjectGroup {
1717                    objects: vec![PbObject {
1718                        object_info: Some(relation_info),
1719                    }],
1720                }),
1721            )
1722            .await;
1723
1724        Ok(fragment_actors)
1725    }
1726
1727    fn get_fragment_actors_from_running_info(
1728        &self,
1729        fragment_ids: impl Iterator<Item = FragmentId>,
1730    ) -> HashMap<FragmentId, Vec<ActorId>> {
1731        let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1732
1733        let info = self.env.shared_actor_infos().read_guard();
1734
1735        for fragment_id in fragment_ids {
1736            let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1737            fragment_actors
1738                .entry(fragment_id as _)
1739                .or_default()
1740                .extend(actors.keys().copied());
1741        }
1742
1743        fragment_actors
1744    }
1745
1746    // edit the content of fragments in given `table_id`
1747    // return the actor_ids to be applied
1748    pub async fn mutate_fragments_by_job_id(
1749        &self,
1750        job_id: JobId,
1751        // returns true if the mutation is applied
1752        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1753        // error message when no relevant fragments is found
1754        err_msg: &'static str,
1755    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1756        let inner = self.inner.read().await;
1757        let txn = inner.db.begin().await?;
1758
1759        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1760            .select_only()
1761            .columns([
1762                fragment::Column::FragmentId,
1763                fragment::Column::FragmentTypeMask,
1764                fragment::Column::StreamNode,
1765            ])
1766            .filter(fragment::Column::JobId.eq(job_id))
1767            .into_tuple()
1768            .all(&txn)
1769            .await?;
1770        let mut fragments = fragments
1771            .into_iter()
1772            .map(|(id, mask, stream_node)| {
1773                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1774            })
1775            .collect_vec();
1776
1777        let fragments = fragments
1778            .iter_mut()
1779            .map(|(_, fragment_type_mask, stream_node)| {
1780                fragments_mutation_fn(*fragment_type_mask, stream_node)
1781            })
1782            .collect::<MetaResult<Vec<bool>>>()?
1783            .into_iter()
1784            .zip_eq_debug(std::mem::take(&mut fragments))
1785            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1786            .collect::<Vec<_>>();
1787
1788        if fragments.is_empty() {
1789            return Err(MetaError::invalid_parameter(format!(
1790                "job id {job_id}: {}",
1791                err_msg
1792            )));
1793        }
1794
1795        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1796        for (id, _, stream_node) in fragments {
1797            fragment::ActiveModel {
1798                fragment_id: Set(id),
1799                stream_node: Set(StreamNode::from(&stream_node)),
1800                ..Default::default()
1801            }
1802            .update(&txn)
1803            .await?;
1804        }
1805
1806        txn.commit().await?;
1807
1808        let fragment_actors =
1809            self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1810
1811        Ok(fragment_actors)
1812    }
1813
1814    async fn mutate_fragment_by_fragment_id(
1815        &self,
1816        fragment_id: FragmentId,
1817        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1818        err_msg: &'static str,
1819    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1820        let inner = self.inner.read().await;
1821        let txn = inner.db.begin().await?;
1822
1823        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1824            Fragment::find_by_id(fragment_id)
1825                .select_only()
1826                .columns([
1827                    fragment::Column::FragmentTypeMask,
1828                    fragment::Column::StreamNode,
1829                ])
1830                .into_tuple()
1831                .one(&txn)
1832                .await?
1833                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1834        let mut pb_stream_node = stream_node.to_protobuf();
1835        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1836
1837        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1838            return Err(MetaError::invalid_parameter(format!(
1839                "fragment id {fragment_id}: {}",
1840                err_msg
1841            )));
1842        }
1843
1844        fragment::ActiveModel {
1845            fragment_id: Set(fragment_id),
1846            stream_node: Set(stream_node),
1847            ..Default::default()
1848        }
1849        .update(&txn)
1850        .await?;
1851
1852        let fragment_actors =
1853            self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1854
1855        txn.commit().await?;
1856
1857        Ok(fragment_actors)
1858    }
1859
1860    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1861    // return the actor_ids to be applied
1862    pub async fn update_backfill_rate_limit_by_job_id(
1863        &self,
1864        job_id: JobId,
1865        rate_limit: Option<u32>,
1866    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1867        let update_backfill_rate_limit =
1868            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1869                let mut found = false;
1870                if fragment_type_mask
1871                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1872                {
1873                    visit_stream_node_mut(stream_node, |node| match node {
1874                        PbNodeBody::StreamCdcScan(node) => {
1875                            node.rate_limit = rate_limit;
1876                            found = true;
1877                        }
1878                        PbNodeBody::StreamScan(node) => {
1879                            node.rate_limit = rate_limit;
1880                            found = true;
1881                        }
1882                        PbNodeBody::SourceBackfill(node) => {
1883                            node.rate_limit = rate_limit;
1884                            found = true;
1885                        }
1886                        PbNodeBody::Sink(node) => {
1887                            node.rate_limit = rate_limit;
1888                            found = true;
1889                        }
1890                        _ => {}
1891                    });
1892                }
1893                Ok(found)
1894            };
1895
1896        self.mutate_fragments_by_job_id(
1897            job_id,
1898            update_backfill_rate_limit,
1899            "stream scan node or source node not found",
1900        )
1901        .await
1902    }
1903
1904    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1905    // return the actor_ids to be applied
1906    pub async fn update_sink_rate_limit_by_job_id(
1907        &self,
1908        sink_id: SinkId,
1909        rate_limit: Option<u32>,
1910    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1911        let update_sink_rate_limit =
1912            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1913                let mut found = Ok(false);
1914                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1915                    visit_stream_node_mut(stream_node, |node| {
1916                        if let PbNodeBody::Sink(node) = node {
1917                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1918                                found = Err(MetaError::invalid_parameter(
1919                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1920                                ));
1921                                return;
1922                            }
1923                            node.rate_limit = rate_limit;
1924                            found = Ok(true);
1925                        }
1926                    });
1927                }
1928                found
1929            };
1930
1931        self.mutate_fragments_by_job_id(
1932            sink_id.as_job_id(),
1933            update_sink_rate_limit,
1934            "sink node not found",
1935        )
1936        .await
1937    }
1938
1939    pub async fn update_dml_rate_limit_by_job_id(
1940        &self,
1941        job_id: JobId,
1942        rate_limit: Option<u32>,
1943    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1944        let update_dml_rate_limit =
1945            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1946                let mut found = false;
1947                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1948                    visit_stream_node_mut(stream_node, |node| {
1949                        if let PbNodeBody::Dml(node) = node {
1950                            node.rate_limit = rate_limit;
1951                            found = true;
1952                        }
1953                    });
1954                }
1955                Ok(found)
1956            };
1957
1958        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1959            .await
1960    }
1961
1962    pub async fn update_source_props_by_source_id(
1963        &self,
1964        source_id: SourceId,
1965        alter_props: BTreeMap<String, String>,
1966        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1967    ) -> MetaResult<WithOptionsSecResolved> {
1968        let inner = self.inner.read().await;
1969        let txn = inner.db.begin().await?;
1970
1971        let (source, _obj) = Source::find_by_id(source_id)
1972            .find_also_related(Object)
1973            .one(&txn)
1974            .await?
1975            .ok_or_else(|| {
1976                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1977            })?;
1978        let connector = source.with_properties.0.get_connector().unwrap();
1979        let is_shared_source = source.is_shared();
1980
1981        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1982        if !is_shared_source {
1983            // mv using non-shared source holds a copy of source in their fragments
1984            dep_source_job_ids = ObjectDependency::find()
1985                .select_only()
1986                .column(object_dependency::Column::UsedBy)
1987                .filter(object_dependency::Column::Oid.eq(source_id))
1988                .into_tuple()
1989                .all(&txn)
1990                .await?;
1991        }
1992
1993        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1994        let prop_keys: Vec<String> = alter_props
1995            .keys()
1996            .chain(alter_secret_refs.keys())
1997            .cloned()
1998            .collect();
1999        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2000            &connector, &prop_keys,
2001        )?;
2002
2003        let mut options_with_secret = WithOptionsSecResolved::new(
2004            source.with_properties.0.clone(),
2005            source
2006                .secret_ref
2007                .map(|secret_ref| secret_ref.to_protobuf())
2008                .unwrap_or_default(),
2009        );
2010        let (to_add_secret_dep, to_remove_secret_dep) =
2011            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2012
2013        tracing::info!(
2014            "applying new properties to source: source_id={}, options_with_secret={:?}",
2015            source_id,
2016            options_with_secret
2017        );
2018        // check if the alter-ed props are valid for each Connector
2019        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2020        // todo: validate via source manager
2021
2022        let mut associate_table_id = None;
2023
2024        // can be source_id or table_id
2025        // if updating an associated source, the preferred_id is the table_id
2026        // otherwise, it is the source_id
2027        let mut preferred_id = source_id.as_object_id();
2028        let rewrite_sql = {
2029            let definition = source.definition.clone();
2030
2031            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2032                .map_err(|e| {
2033                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2034                        anyhow!(e).context("Failed to parse source definition SQL"),
2035                    )))
2036                })?
2037                .try_into()
2038                .unwrap();
2039
2040            /// Formats SQL options with secret values properly resolved
2041            ///
2042            /// This function processes configuration options that may contain sensitive data:
2043            /// - Plaintext options are directly converted to `SqlOption`
2044            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2045            ///   without exposing the actual secret value
2046            ///
2047            /// # Arguments
2048            /// * `txn` - Database transaction for retrieving secrets
2049            /// * `options_with_secret` - Container of options with both plaintext and secret values
2050            ///
2051            /// # Returns
2052            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2053            async fn format_with_option_secret_resolved(
2054                txn: &DatabaseTransaction,
2055                options_with_secret: &WithOptionsSecResolved,
2056            ) -> MetaResult<Vec<SqlOption>> {
2057                let mut options = Vec::new();
2058                for (k, v) in options_with_secret.as_plaintext() {
2059                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2060                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2061                    options.push(sql_option);
2062                }
2063                for (k, v) in options_with_secret.as_secret() {
2064                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2065                        let sql_option =
2066                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2067                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2068                        options.push(sql_option);
2069                    } else {
2070                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2071                    }
2072                }
2073                Ok(options)
2074            }
2075
2076            match &mut stmt {
2077                Statement::CreateSource { stmt } => {
2078                    stmt.with_properties.0 =
2079                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2080                }
2081                Statement::CreateTable { with_options, .. } => {
2082                    *with_options =
2083                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2084                    associate_table_id = source.optional_associated_table_id;
2085                    preferred_id = associate_table_id.unwrap().as_object_id();
2086                }
2087                _ => unreachable!(),
2088            }
2089
2090            stmt.to_string()
2091        };
2092
2093        {
2094            // update secret dependencies
2095            if !to_add_secret_dep.is_empty() {
2096                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2097                    object_dependency::ActiveModel {
2098                        oid: Set(secret_id.into()),
2099                        used_by: Set(preferred_id),
2100                        ..Default::default()
2101                    }
2102                }))
2103                .exec(&txn)
2104                .await?;
2105            }
2106            if !to_remove_secret_dep.is_empty() {
2107                // todo: fix the filter logic
2108                let _ = ObjectDependency::delete_many()
2109                    .filter(
2110                        object_dependency::Column::Oid
2111                            .is_in(to_remove_secret_dep)
2112                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2113                    )
2114                    .exec(&txn)
2115                    .await?;
2116            }
2117        }
2118
2119        let active_source_model = source::ActiveModel {
2120            source_id: Set(source_id),
2121            definition: Set(rewrite_sql.clone()),
2122            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2123            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2124                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2125            ..Default::default()
2126        };
2127        active_source_model.update(&txn).await?;
2128
2129        if let Some(associate_table_id) = associate_table_id {
2130            // update the associated table statement accordly
2131            let active_table_model = table::ActiveModel {
2132                table_id: Set(associate_table_id),
2133                definition: Set(rewrite_sql),
2134                ..Default::default()
2135            };
2136            active_table_model.update(&txn).await?;
2137        }
2138
2139        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2140            // if updating table with connector, the fragment_id is table id
2141            associate_table_id.as_job_id()
2142        } else {
2143            source_id.as_share_source_job_id()
2144        }]
2145        .into_iter()
2146        .chain(dep_source_job_ids.into_iter())
2147        .collect_vec();
2148
2149        // update fragments
2150        update_connector_props_fragments(
2151            &txn,
2152            to_check_job_ids,
2153            FragmentTypeFlag::Source,
2154            |node, found| {
2155                if let PbNodeBody::Source(node) = node
2156                    && let Some(source_inner) = &mut node.source_inner
2157                {
2158                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2159                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2160                    *found = true;
2161                }
2162            },
2163            is_shared_source,
2164        )
2165        .await?;
2166
2167        let mut to_update_objs = Vec::with_capacity(2);
2168        let (source, obj) = Source::find_by_id(source_id)
2169            .find_also_related(Object)
2170            .one(&txn)
2171            .await?
2172            .ok_or_else(|| {
2173                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2174            })?;
2175        to_update_objs.push(PbObject {
2176            object_info: Some(PbObjectInfo::Source(
2177                ObjectModel(source, obj.unwrap()).into(),
2178            )),
2179        });
2180
2181        if let Some(associate_table_id) = associate_table_id {
2182            let (table, obj) = Table::find_by_id(associate_table_id)
2183                .find_also_related(Object)
2184                .one(&txn)
2185                .await?
2186                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2187            to_update_objs.push(PbObject {
2188                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2189            });
2190        }
2191
2192        txn.commit().await?;
2193
2194        self.notify_frontend(
2195            NotificationOperation::Update,
2196            NotificationInfo::ObjectGroup(PbObjectGroup {
2197                objects: to_update_objs,
2198            }),
2199        )
2200        .await;
2201
2202        Ok(options_with_secret)
2203    }
2204
2205    pub async fn update_sink_props_by_sink_id(
2206        &self,
2207        sink_id: SinkId,
2208        props: BTreeMap<String, String>,
2209    ) -> MetaResult<HashMap<String, String>> {
2210        let inner = self.inner.read().await;
2211        let txn = inner.db.begin().await?;
2212
2213        let (sink, _obj) = Sink::find_by_id(sink_id)
2214            .find_also_related(Object)
2215            .one(&txn)
2216            .await?
2217            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2218        validate_sink_props(&sink, &props)?;
2219        let definition = sink.definition.clone();
2220        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2221            .map_err(|e| SinkError::Config(anyhow!(e)))?
2222            .try_into()
2223            .unwrap();
2224        if let Statement::CreateSink { stmt } = &mut stmt {
2225            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2226        } else {
2227            panic!("definition is not a create sink statement")
2228        }
2229        let mut new_config = sink.properties.clone().into_inner();
2230        new_config.extend(props.clone());
2231
2232        let definition = stmt.to_string();
2233        let active_sink = sink::ActiveModel {
2234            sink_id: Set(sink_id),
2235            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2236            definition: Set(definition),
2237            ..Default::default()
2238        };
2239        active_sink.update(&txn).await?;
2240
2241        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2242        let (sink, obj) = Sink::find_by_id(sink_id)
2243            .find_also_related(Object)
2244            .one(&txn)
2245            .await?
2246            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2247        txn.commit().await?;
2248        let relation_infos = vec![PbObject {
2249            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2250        }];
2251
2252        let _version = self
2253            .notify_frontend(
2254                NotificationOperation::Update,
2255                NotificationInfo::ObjectGroup(PbObjectGroup {
2256                    objects: relation_infos,
2257                }),
2258            )
2259            .await;
2260
2261        Ok(props.into_iter().collect())
2262    }
2263
2264    pub async fn update_iceberg_table_props_by_table_id(
2265        &self,
2266        table_id: TableId,
2267        props: BTreeMap<String, String>,
2268        alter_iceberg_table_props: Option<
2269            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2270        >,
2271    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2272        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2273            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2274        let inner = self.inner.read().await;
2275        let txn = inner.db.begin().await?;
2276
2277        let (sink, _obj) = Sink::find_by_id(sink_id)
2278            .find_also_related(Object)
2279            .one(&txn)
2280            .await?
2281            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2282        validate_sink_props(&sink, &props)?;
2283
2284        let definition = sink.definition.clone();
2285        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2286            .map_err(|e| SinkError::Config(anyhow!(e)))?
2287            .try_into()
2288            .unwrap();
2289        if let Statement::CreateTable {
2290            with_options,
2291            engine,
2292            ..
2293        } = &mut stmt
2294        {
2295            if !matches!(engine, Engine::Iceberg) {
2296                return Err(SinkError::Config(anyhow!(
2297                    "only iceberg table can be altered as sink"
2298                ))
2299                .into());
2300            }
2301            update_stmt_with_props(with_options, &props)?;
2302        } else {
2303            panic!("definition is not a create iceberg table statement")
2304        }
2305        let mut new_config = sink.properties.clone().into_inner();
2306        new_config.extend(props.clone());
2307
2308        let definition = stmt.to_string();
2309        let active_sink = sink::ActiveModel {
2310            sink_id: Set(sink_id),
2311            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2312            definition: Set(definition.clone()),
2313            ..Default::default()
2314        };
2315        let active_source = source::ActiveModel {
2316            source_id: Set(source_id),
2317            definition: Set(definition.clone()),
2318            ..Default::default()
2319        };
2320        let active_table = table::ActiveModel {
2321            table_id: Set(table_id),
2322            definition: Set(definition),
2323            ..Default::default()
2324        };
2325        active_sink.update(&txn).await?;
2326        active_source.update(&txn).await?;
2327        active_table.update(&txn).await?;
2328
2329        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2330
2331        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2332            .find_also_related(Object)
2333            .one(&txn)
2334            .await?
2335            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2336        let (source, source_obj) = Source::find_by_id(source_id)
2337            .find_also_related(Object)
2338            .one(&txn)
2339            .await?
2340            .ok_or_else(|| {
2341                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2342            })?;
2343        let (table, table_obj) = Table::find_by_id(table_id)
2344            .find_also_related(Object)
2345            .one(&txn)
2346            .await?
2347            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2348        txn.commit().await?;
2349        let relation_infos = vec![
2350            PbObject {
2351                object_info: Some(PbObjectInfo::Sink(
2352                    ObjectModel(sink, sink_obj.unwrap()).into(),
2353                )),
2354            },
2355            PbObject {
2356                object_info: Some(PbObjectInfo::Source(
2357                    ObjectModel(source, source_obj.unwrap()).into(),
2358                )),
2359            },
2360            PbObject {
2361                object_info: Some(PbObjectInfo::Table(
2362                    ObjectModel(table, table_obj.unwrap()).into(),
2363                )),
2364            },
2365        ];
2366        let _version = self
2367            .notify_frontend(
2368                NotificationOperation::Update,
2369                NotificationInfo::ObjectGroup(PbObjectGroup {
2370                    objects: relation_infos,
2371                }),
2372            )
2373            .await;
2374
2375        Ok((props.into_iter().collect(), sink_id))
2376    }
2377
2378    pub async fn update_fragment_rate_limit_by_fragment_id(
2379        &self,
2380        fragment_id: FragmentId,
2381        rate_limit: Option<u32>,
2382    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2383        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2384                                 stream_node: &mut PbStreamNode| {
2385            let mut found = false;
2386            if fragment_type_mask.contains_any(
2387                FragmentTypeFlag::dml_rate_limit_fragments()
2388                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2389                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2390                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2391            ) {
2392                visit_stream_node_mut(stream_node, |node| {
2393                    if let PbNodeBody::Dml(node) = node {
2394                        node.rate_limit = rate_limit;
2395                        found = true;
2396                    }
2397                    if let PbNodeBody::Sink(node) = node {
2398                        node.rate_limit = rate_limit;
2399                        found = true;
2400                    }
2401                    if let PbNodeBody::StreamCdcScan(node) = node {
2402                        node.rate_limit = rate_limit;
2403                        found = true;
2404                    }
2405                    if let PbNodeBody::StreamScan(node) = node {
2406                        node.rate_limit = rate_limit;
2407                        found = true;
2408                    }
2409                    if let PbNodeBody::SourceBackfill(node) = node {
2410                        node.rate_limit = rate_limit;
2411                        found = true;
2412                    }
2413                });
2414            }
2415            found
2416        };
2417        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2418            .await
2419    }
2420
2421    /// Note: `FsFetch` created in old versions are not included.
2422    /// Since this is only used for debugging, it should be fine.
2423    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2424        let inner = self.inner.read().await;
2425        let txn = inner.db.begin().await?;
2426
2427        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2428            .select_only()
2429            .columns([
2430                fragment::Column::FragmentId,
2431                fragment::Column::JobId,
2432                fragment::Column::FragmentTypeMask,
2433                fragment::Column::StreamNode,
2434            ])
2435            .filter(FragmentTypeMask::intersects_any(
2436                FragmentTypeFlag::rate_limit_fragments(),
2437            ))
2438            .into_tuple()
2439            .all(&txn)
2440            .await?;
2441
2442        let mut rate_limits = Vec::new();
2443        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2444            let stream_node = stream_node.to_protobuf();
2445            visit_stream_node_body(&stream_node, |node| {
2446                let mut rate_limit = None;
2447                let mut node_name = None;
2448
2449                match node {
2450                    // source rate limit
2451                    PbNodeBody::Source(node) => {
2452                        if let Some(node_inner) = &node.source_inner {
2453                            rate_limit = node_inner.rate_limit;
2454                            node_name = Some("SOURCE");
2455                        }
2456                    }
2457                    PbNodeBody::StreamFsFetch(node) => {
2458                        if let Some(node_inner) = &node.node_inner {
2459                            rate_limit = node_inner.rate_limit;
2460                            node_name = Some("FS_FETCH");
2461                        }
2462                    }
2463                    // backfill rate limit
2464                    PbNodeBody::SourceBackfill(node) => {
2465                        rate_limit = node.rate_limit;
2466                        node_name = Some("SOURCE_BACKFILL");
2467                    }
2468                    PbNodeBody::StreamScan(node) => {
2469                        rate_limit = node.rate_limit;
2470                        node_name = Some("STREAM_SCAN");
2471                    }
2472                    PbNodeBody::StreamCdcScan(node) => {
2473                        rate_limit = node.rate_limit;
2474                        node_name = Some("STREAM_CDC_SCAN");
2475                    }
2476                    PbNodeBody::Sink(node) => {
2477                        rate_limit = node.rate_limit;
2478                        node_name = Some("SINK");
2479                    }
2480                    _ => {}
2481                }
2482
2483                if let Some(rate_limit) = rate_limit {
2484                    rate_limits.push(RateLimitInfo {
2485                        fragment_id,
2486                        job_id,
2487                        fragment_type_mask: fragment_type_mask as u32,
2488                        rate_limit,
2489                        node_name: node_name.unwrap().to_owned(),
2490                    });
2491                }
2492            });
2493        }
2494
2495        Ok(rate_limits)
2496    }
2497}
2498
2499fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2500    // Validate that props can be altered
2501    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2502        Some(connector) => {
2503            let connector_type = connector.to_lowercase();
2504            let field_names: Vec<String> = props.keys().cloned().collect();
2505            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2506                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2507
2508            match_sink_name_str!(
2509                connector_type.as_str(),
2510                SinkType,
2511                {
2512                    let mut new_props = sink.properties.0.clone();
2513                    new_props.extend(props.clone());
2514                    SinkType::validate_alter_config(&new_props)
2515                },
2516                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2517            )?
2518        }
2519        None => {
2520            return Err(
2521                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2522            );
2523        }
2524    };
2525    Ok(())
2526}
2527
2528fn update_stmt_with_props(
2529    with_properties: &mut Vec<SqlOption>,
2530    props: &BTreeMap<String, String>,
2531) -> MetaResult<()> {
2532    let mut new_sql_options = with_properties
2533        .iter()
2534        .map(|sql_option| (&sql_option.name, sql_option))
2535        .collect::<IndexMap<_, _>>();
2536    let add_sql_options = props
2537        .iter()
2538        .map(|(k, v)| SqlOption::try_from((k, v)))
2539        .collect::<Result<Vec<SqlOption>, ParserError>>()
2540        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2541    new_sql_options.extend(
2542        add_sql_options
2543            .iter()
2544            .map(|sql_option| (&sql_option.name, sql_option)),
2545    );
2546    *with_properties = new_sql_options.into_values().cloned().collect();
2547    Ok(())
2548}
2549
2550async fn update_sink_fragment_props(
2551    txn: &DatabaseTransaction,
2552    sink_id: SinkId,
2553    props: BTreeMap<String, String>,
2554) -> MetaResult<()> {
2555    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2556        .select_only()
2557        .columns([
2558            fragment::Column::FragmentId,
2559            fragment::Column::FragmentTypeMask,
2560            fragment::Column::StreamNode,
2561        ])
2562        .filter(fragment::Column::JobId.eq(sink_id))
2563        .into_tuple()
2564        .all(txn)
2565        .await?;
2566    let fragments = fragments
2567        .into_iter()
2568        .filter(|(_, fragment_type_mask, _)| {
2569            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2570        })
2571        .filter_map(|(id, _, stream_node)| {
2572            let mut stream_node = stream_node.to_protobuf();
2573            let mut found = false;
2574            visit_stream_node_mut(&mut stream_node, |node| {
2575                if let PbNodeBody::Sink(node) = node
2576                    && let Some(sink_desc) = &mut node.sink_desc
2577                    && sink_desc.id == sink_id
2578                {
2579                    sink_desc.properties.extend(props.clone());
2580                    found = true;
2581                }
2582            });
2583            if found { Some((id, stream_node)) } else { None }
2584        })
2585        .collect_vec();
2586    assert!(
2587        !fragments.is_empty(),
2588        "sink id should be used by at least one fragment"
2589    );
2590    for (id, stream_node) in fragments {
2591        fragment::ActiveModel {
2592            fragment_id: Set(id),
2593            stream_node: Set(StreamNode::from(&stream_node)),
2594            ..Default::default()
2595        }
2596        .update(txn)
2597        .await?;
2598    }
2599    Ok(())
2600}
2601
2602pub struct SinkIntoTableContext {
2603    /// For alter table (e.g., add column), this is the list of existing sink ids
2604    /// otherwise empty.
2605    pub updated_sink_catalogs: Vec<SinkId>,
2606}
2607
2608pub struct FinishAutoRefreshSchemaSinkContext {
2609    pub tmp_sink_id: SinkId,
2610    pub original_sink_id: SinkId,
2611    pub columns: Vec<PbColumnCatalog>,
2612    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2613}
2614
2615async fn update_connector_props_fragments<F>(
2616    txn: &DatabaseTransaction,
2617    job_ids: Vec<JobId>,
2618    expect_flag: FragmentTypeFlag,
2619    mut alter_stream_node_fn: F,
2620    is_shared_source: bool,
2621) -> MetaResult<()>
2622where
2623    F: FnMut(&mut PbNodeBody, &mut bool),
2624{
2625    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2626        .select_only()
2627        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2628        .filter(
2629            fragment::Column::JobId
2630                .is_in(job_ids.clone())
2631                .and(FragmentTypeMask::intersects(expect_flag)),
2632        )
2633        .into_tuple()
2634        .all(txn)
2635        .await?;
2636    let fragments = fragments
2637        .into_iter()
2638        .filter_map(|(id, stream_node)| {
2639            let mut stream_node = stream_node.to_protobuf();
2640            let mut found = false;
2641            visit_stream_node_mut(&mut stream_node, |node| {
2642                alter_stream_node_fn(node, &mut found);
2643            });
2644            if found { Some((id, stream_node)) } else { None }
2645        })
2646        .collect_vec();
2647    if is_shared_source || job_ids.len() > 1 {
2648        // the first element is the source_id or associated table_id
2649        // if the source is non-shared, there is no updated fragments
2650        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
2651        assert!(
2652            !fragments.is_empty(),
2653            "job ids {:?} (type: {:?}) should be used by at least one fragment",
2654            job_ids,
2655            expect_flag
2656        );
2657    }
2658
2659    for (id, stream_node) in fragments {
2660        fragment::ActiveModel {
2661            fragment_id: Set(id),
2662            stream_node: Set(StreamNode::from(&stream_node)),
2663            ..Default::default()
2664        }
2665        .update(txn)
2666        .await?;
2667    }
2668
2669    Ok(())
2670}