risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{self, FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common::util::stream_graph_visitor::{
26    visit_stream_node_body, visit_stream_node_mut,
27};
28use risingwave_common::{bail, current_cluster_version};
29use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
30use risingwave_connector::error::ConnectorError;
31use risingwave_connector::sink::file_sink::fs::FsSink;
32use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
33use risingwave_connector::source::ConnectorProperties;
34use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
35use risingwave_meta_model::actor::ActorStatus;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbTable};
44use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
45use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::{
48    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
49};
50use risingwave_pb::meta::table_fragments::PbActorStatus;
51use risingwave_pb::meta::{PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::PbColumnCatalog;
53use risingwave_pb::secret::PbSecretRef;
54use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
55use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
56use risingwave_pb::stream_plan::stream_node::PbNodeBody;
57use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
58use risingwave_pb::user::PbUserInfo;
59use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
60use risingwave_sqlparser::parser::{Parser, ParserError};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::{Expr, Query, SimpleExpr};
63use sea_orm::{
64    ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
65    JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
66    TransactionTrait,
67};
68use thiserror_ext::AsReport;
69
70use super::rename::IndexItemRewriter;
71use crate::barrier::{Command, Reschedule};
72use crate::controller::ObjectModel;
73use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
74use crate::controller::fragment::FragmentTypeMaskExt;
75use crate::controller::utils::{
76    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
77    check_sink_into_table_cycle, ensure_job_not_canceled, ensure_object_id, ensure_user_id,
78    fetch_target_fragments, get_fragment_actor_ids, get_internal_tables_by_id, get_table_columns,
79    grant_default_privileges_automatically, insert_fragment_relations, list_user_info_by_ids,
80};
81use crate::error::MetaErrorInner;
82use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
83use crate::model::{
84    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
85    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
86};
87use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
88use crate::{MetaError, MetaResult};
89
90impl CatalogController {
91    pub async fn create_streaming_job_obj(
92        txn: &DatabaseTransaction,
93        obj_type: ObjectType,
94        owner_id: UserId,
95        database_id: Option<DatabaseId>,
96        schema_id: Option<SchemaId>,
97        create_type: PbCreateType,
98        timezone: Option<String>,
99        streaming_parallelism: StreamingParallelism,
100        max_parallelism: usize,
101        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
102    ) -> MetaResult<ObjectId> {
103        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
104        let job = streaming_job::ActiveModel {
105            job_id: Set(obj.oid),
106            job_status: Set(JobStatus::Initial),
107            create_type: Set(create_type.into()),
108            timezone: Set(timezone),
109            parallelism: Set(streaming_parallelism),
110            max_parallelism: Set(max_parallelism as _),
111            specific_resource_group: Set(specific_resource_group),
112        };
113        job.insert(txn).await?;
114
115        Ok(obj.oid)
116    }
117
118    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
119    /// materialized view.
120    ///
121    /// Some of the fields in the given streaming job are placeholders, which will
122    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
123    #[await_tree::instrument]
124    pub async fn create_job_catalog(
125        &self,
126        streaming_job: &mut StreamingJob,
127        ctx: &StreamContext,
128        parallelism: &Option<Parallelism>,
129        max_parallelism: usize,
130        mut dependencies: HashSet<ObjectId>,
131        specific_resource_group: Option<String>,
132    ) -> MetaResult<()> {
133        let inner = self.inner.write().await;
134        let txn = inner.db.begin().await?;
135        let create_type = streaming_job.create_type();
136
137        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
138            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
139            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
140            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
141        };
142
143        ensure_user_id(streaming_job.owner() as _, &txn).await?;
144        ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
145        ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
146        check_relation_name_duplicate(
147            &streaming_job.name(),
148            streaming_job.database_id() as _,
149            streaming_job.schema_id() as _,
150            &txn,
151        )
152        .await?;
153
154        // check if any dependency is in altering status.
155        if !dependencies.is_empty() {
156            let altering_cnt = ObjectDependency::find()
157                .join(
158                    JoinType::InnerJoin,
159                    object_dependency::Relation::Object1.def(),
160                )
161                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
162                .filter(
163                    object_dependency::Column::Oid
164                        .is_in(dependencies.clone())
165                        .and(object::Column::ObjType.eq(ObjectType::Table))
166                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
167                        .and(
168                            // It means the referring table is just dummy for altering.
169                            object::Column::Oid.not_in_subquery(
170                                Query::select()
171                                    .column(table::Column::TableId)
172                                    .from(Table)
173                                    .to_owned(),
174                            ),
175                        ),
176                )
177                .count(&txn)
178                .await?;
179            if altering_cnt != 0 {
180                return Err(MetaError::permission_denied(
181                    "some dependent relations are being altered",
182                ));
183            }
184        }
185
186        match streaming_job {
187            StreamingJob::MaterializedView(table) => {
188                let job_id = Self::create_streaming_job_obj(
189                    &txn,
190                    ObjectType::Table,
191                    table.owner as _,
192                    Some(table.database_id as _),
193                    Some(table.schema_id as _),
194                    create_type,
195                    ctx.timezone.clone(),
196                    streaming_parallelism,
197                    max_parallelism,
198                    specific_resource_group,
199                )
200                .await?;
201                table.id = job_id as _;
202                let table_model: table::ActiveModel = table.clone().into();
203                Table::insert(table_model).exec(&txn).await?;
204            }
205            StreamingJob::Sink(sink) => {
206                if let Some(target_table_id) = sink.target_table
207                    && check_sink_into_table_cycle(
208                        target_table_id as ObjectId,
209                        dependencies.iter().cloned().collect(),
210                        &txn,
211                    )
212                    .await?
213                {
214                    bail!("Creating such a sink will result in circular dependency.");
215                }
216
217                let job_id = Self::create_streaming_job_obj(
218                    &txn,
219                    ObjectType::Sink,
220                    sink.owner as _,
221                    Some(sink.database_id as _),
222                    Some(sink.schema_id as _),
223                    create_type,
224                    ctx.timezone.clone(),
225                    streaming_parallelism,
226                    max_parallelism,
227                    specific_resource_group,
228                )
229                .await?;
230                sink.id = job_id as _;
231                let sink_model: sink::ActiveModel = sink.clone().into();
232                Sink::insert(sink_model).exec(&txn).await?;
233            }
234            StreamingJob::Table(src, table, _) => {
235                let job_id = Self::create_streaming_job_obj(
236                    &txn,
237                    ObjectType::Table,
238                    table.owner as _,
239                    Some(table.database_id as _),
240                    Some(table.schema_id as _),
241                    create_type,
242                    ctx.timezone.clone(),
243                    streaming_parallelism,
244                    max_parallelism,
245                    specific_resource_group,
246                )
247                .await?;
248                table.id = job_id as _;
249                if let Some(src) = src {
250                    let src_obj = Self::create_object(
251                        &txn,
252                        ObjectType::Source,
253                        src.owner as _,
254                        Some(src.database_id as _),
255                        Some(src.schema_id as _),
256                    )
257                    .await?;
258                    src.id = src_obj.oid as _;
259                    src.optional_associated_table_id =
260                        Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
261                    table.optional_associated_source_id = Some(
262                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
263                    );
264                    let source: source::ActiveModel = src.clone().into();
265                    Source::insert(source).exec(&txn).await?;
266                }
267                let table_model: table::ActiveModel = table.clone().into();
268                Table::insert(table_model).exec(&txn).await?;
269            }
270            StreamingJob::Index(index, table) => {
271                ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
272                let job_id = Self::create_streaming_job_obj(
273                    &txn,
274                    ObjectType::Index,
275                    index.owner as _,
276                    Some(index.database_id as _),
277                    Some(index.schema_id as _),
278                    create_type,
279                    ctx.timezone.clone(),
280                    streaming_parallelism,
281                    max_parallelism,
282                    specific_resource_group,
283                )
284                .await?;
285                // to be compatible with old implementation.
286                index.id = job_id as _;
287                index.index_table_id = job_id as _;
288                table.id = job_id as _;
289
290                ObjectDependency::insert(object_dependency::ActiveModel {
291                    oid: Set(index.primary_table_id as _),
292                    used_by: Set(table.id as _),
293                    ..Default::default()
294                })
295                .exec(&txn)
296                .await?;
297
298                let table_model: table::ActiveModel = table.clone().into();
299                Table::insert(table_model).exec(&txn).await?;
300                let index_model: index::ActiveModel = index.clone().into();
301                Index::insert(index_model).exec(&txn).await?;
302            }
303            StreamingJob::Source(src) => {
304                let job_id = Self::create_streaming_job_obj(
305                    &txn,
306                    ObjectType::Source,
307                    src.owner as _,
308                    Some(src.database_id as _),
309                    Some(src.schema_id as _),
310                    create_type,
311                    ctx.timezone.clone(),
312                    streaming_parallelism,
313                    max_parallelism,
314                    specific_resource_group,
315                )
316                .await?;
317                src.id = job_id as _;
318                let source_model: source::ActiveModel = src.clone().into();
319                Source::insert(source_model).exec(&txn).await?;
320            }
321        }
322
323        // collect dependent secrets.
324        dependencies.extend(
325            streaming_job
326                .dependent_secret_ids()?
327                .into_iter()
328                .map(|secret_id| secret_id as ObjectId),
329        );
330        // collect dependent connection
331        dependencies.extend(
332            streaming_job
333                .dependent_connection_ids()?
334                .into_iter()
335                .map(|conn_id| conn_id as ObjectId),
336        );
337
338        // record object dependency.
339        if !dependencies.is_empty() {
340            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
341                object_dependency::ActiveModel {
342                    oid: Set(oid),
343                    used_by: Set(streaming_job.id() as _),
344                    ..Default::default()
345                }
346            }))
347            .exec(&txn)
348            .await?;
349        }
350
351        txn.commit().await?;
352
353        Ok(())
354    }
355
356    /// Create catalogs for internal tables, then notify frontend about them if the job is a
357    /// materialized view.
358    ///
359    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
360    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
361    ///
362    /// Returns a mapping from the temporary table id to the actual global table id.
363    pub async fn create_internal_table_catalog(
364        &self,
365        job: &StreamingJob,
366        mut incomplete_internal_tables: Vec<PbTable>,
367    ) -> MetaResult<HashMap<u32, u32>> {
368        let job_id = job.id() as ObjectId;
369        let inner = self.inner.write().await;
370        let txn = inner.db.begin().await?;
371
372        // Ensure the job exists.
373        ensure_job_not_canceled(job_id, &txn).await?;
374
375        let mut table_id_map = HashMap::new();
376        for table in &mut incomplete_internal_tables {
377            let table_id = Self::create_object(
378                &txn,
379                ObjectType::Table,
380                table.owner as _,
381                Some(table.database_id as _),
382                Some(table.schema_id as _),
383            )
384            .await?
385            .oid;
386            table_id_map.insert(table.id, table_id as u32);
387            table.id = table_id as _;
388            table.job_id = Some(job_id as _);
389
390            let table_model = table::ActiveModel {
391                table_id: Set(table_id as _),
392                belongs_to_job_id: Set(Some(job_id)),
393                fragment_id: NotSet,
394                ..table.clone().into()
395            };
396            Table::insert(table_model).exec(&txn).await?;
397        }
398        txn.commit().await?;
399
400        Ok(table_id_map)
401    }
402
403    pub async fn prepare_stream_job_fragments(
404        &self,
405        stream_job_fragments: &StreamJobFragmentsToCreate,
406        streaming_job: &StreamingJob,
407        for_replace: bool,
408    ) -> MetaResult<()> {
409        self.prepare_streaming_job(
410            stream_job_fragments.stream_job_id().table_id as _,
411            || stream_job_fragments.fragments.values(),
412            &stream_job_fragments.actor_status,
413            &stream_job_fragments.downstreams,
414            for_replace,
415            Some(streaming_job),
416        )
417        .await
418    }
419
420    // TODO: In this function, we also update the `Table` model in the meta store.
421    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
422    // making them the source of truth and performing a full replacement for those in the meta store?
423    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
424    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
425    )]
426    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
427        &self,
428        job_id: ObjectId,
429        get_fragments: impl Fn() -> I + 'a,
430        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
431        downstreams: &FragmentDownstreamRelation,
432        for_replace: bool,
433        creating_streaming_job: Option<&'a StreamingJob>,
434    ) -> MetaResult<()> {
435        // NOTE: we don't need to pass actor splits for now
436        // splits field will be updated during the post_collect_job_fragments stage
437        let empty_actor_splits = Default::default();
438
439        let fragment_actors = Self::extract_fragment_and_actors_from_fragments(
440            job_id,
441            get_fragments(),
442            actor_status,
443            &empty_actor_splits,
444        )?;
445        let inner = self.inner.write().await;
446
447        let need_notify = creating_streaming_job
448            .map(|job| job.should_notify_creating())
449            .unwrap_or(false);
450        let definition = creating_streaming_job.map(|job| job.definition());
451
452        let mut objects_to_notify = vec![];
453        let txn = inner.db.begin().await?;
454
455        // Ensure the job exists.
456        ensure_job_not_canceled(job_id, &txn).await?;
457
458        // Add fragments.
459        let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
460        for fragment in fragments {
461            let fragment_id = fragment.fragment_id;
462            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
463
464            let fragment = fragment.into_active_model();
465            Fragment::insert(fragment).exec(&txn).await?;
466
467            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
468            // After table fragments are created, update them for all tables.
469            if !for_replace {
470                let all_tables = StreamJobFragments::collect_tables(get_fragments());
471                for state_table_id in state_table_ids {
472                    // Table's vnode count is not always the fragment's vnode count, so we have to
473                    // look up the table from `TableFragments`.
474                    // See `ActorGraphBuilder::new`.
475                    let table = all_tables
476                        .get(&(state_table_id as u32))
477                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
478                    assert_eq!(table.id, state_table_id as u32);
479                    assert_eq!(table.fragment_id, fragment_id as u32);
480                    let vnode_count = table.vnode_count();
481
482                    table::ActiveModel {
483                        table_id: Set(state_table_id as _),
484                        fragment_id: Set(Some(fragment_id)),
485                        vnode_count: Set(vnode_count as _),
486                        ..Default::default()
487                    }
488                    .update(&txn)
489                    .await?;
490
491                    if need_notify {
492                        let mut table = table.clone();
493                        // In production, definition was replaced but still needed for notification.
494                        if cfg!(not(debug_assertions)) && table.id == job_id as u32 {
495                            table.definition = definition.clone().unwrap();
496                        }
497                        objects_to_notify.push(PbObject {
498                            object_info: Some(PbObjectInfo::Table(table)),
499                        });
500                    }
501                }
502            }
503        }
504
505        // Add streaming job objects to notification
506        if need_notify {
507            match creating_streaming_job.unwrap() {
508                StreamingJob::Sink(sink) => {
509                    objects_to_notify.push(PbObject {
510                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
511                    });
512                }
513                StreamingJob::Index(index, _) => {
514                    objects_to_notify.push(PbObject {
515                        object_info: Some(PbObjectInfo::Index(index.clone())),
516                    });
517                }
518                _ => {}
519            }
520        }
521
522        insert_fragment_relations(&txn, downstreams).await?;
523
524        // Add actors and actor dispatchers.
525        for actors in actors {
526            for actor in actors {
527                let actor = actor.into_active_model();
528                Actor::insert(actor).exec(&txn).await?;
529            }
530        }
531
532        if !for_replace {
533            // Update dml fragment id.
534            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
535                Table::update(table::ActiveModel {
536                    table_id: Set(table.id as _),
537                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
538                    ..Default::default()
539                })
540                .exec(&txn)
541                .await?;
542            }
543        }
544
545        txn.commit().await?;
546
547        // FIXME: there's a gap between the catalog creation and notification, which may lead to
548        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
549        // need to either forward the notification or filter catalogs without any fragments in the metastore.
550        if !objects_to_notify.is_empty() {
551            self.notify_frontend(
552                Operation::Add,
553                Info::ObjectGroup(PbObjectGroup {
554                    objects: objects_to_notify,
555                }),
556            )
557            .await;
558        }
559
560        Ok(())
561    }
562
563    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
564    /// information is required to build barrier mutation.
565    pub async fn build_cancel_command(
566        &self,
567        table_fragments: &StreamJobFragments,
568    ) -> MetaResult<Command> {
569        let inner = self.inner.read().await;
570        let txn = inner.db.begin().await?;
571
572        let dropped_sink_fragment_with_target =
573            if let Some(sink_fragment) = table_fragments.sink_fragment() {
574                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
575                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
576                sink_target_fragment
577                    .get(&sink_fragment_id)
578                    .map(|target_fragments| {
579                        let target_fragment_id = *target_fragments
580                            .first()
581                            .expect("sink should have at least one downstream fragment");
582                        (sink_fragment_id, target_fragment_id)
583                    })
584            } else {
585                None
586            };
587
588        Ok(Command::DropStreamingJobs {
589            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
590            actors: table_fragments.actor_ids(),
591            unregistered_state_table_ids: table_fragments
592                .all_table_ids()
593                .map(catalog::TableId::new)
594                .collect(),
595            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
596            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
597                .into_iter()
598                .map(|(sink, target)| (target as _, vec![sink as _]))
599                .collect(),
600        })
601    }
602
603    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
604    /// It returns (true, _) if the job is not found or aborted.
605    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
606    #[await_tree::instrument]
607    pub async fn try_abort_creating_streaming_job(
608        &self,
609        job_id: ObjectId,
610        is_cancelled: bool,
611    ) -> MetaResult<(bool, Option<DatabaseId>)> {
612        let mut inner = self.inner.write().await;
613        let txn = inner.db.begin().await?;
614
615        let obj = Object::find_by_id(job_id).one(&txn).await?;
616        let Some(obj) = obj else {
617            tracing::warn!(
618                id = job_id,
619                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
620            );
621            return Ok((true, None));
622        };
623        let database_id = obj
624            .database_id
625            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
626        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
627
628        if !is_cancelled && let Some(streaming_job) = &streaming_job {
629            assert_ne!(streaming_job.job_status, JobStatus::Created);
630            if streaming_job.create_type == CreateType::Background
631                && streaming_job.job_status == JobStatus::Creating
632            {
633                // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
634                tracing::warn!(
635                    id = job_id,
636                    "streaming job is created in background and still in creating status"
637                );
638                return Ok((false, Some(database_id)));
639            }
640        }
641
642        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
643
644        // Get the notification info if the job is a materialized view or created in the background.
645        let mut objs = vec![];
646        let table_obj = Table::find_by_id(job_id).one(&txn).await?;
647
648        let mut need_notify =
649            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
650        if !need_notify {
651            // If the job is not created in the background, we only need to notify the frontend if the job is a materialized view.
652            if let Some(table) = &table_obj {
653                need_notify = table.table_type == TableType::MaterializedView;
654            }
655        }
656
657        if is_cancelled {
658            let dropped_tables = Table::find()
659                .find_also_related(Object)
660                .filter(
661                    table::Column::TableId.is_in(
662                        internal_table_ids
663                            .iter()
664                            .cloned()
665                            .chain(table_obj.iter().map(|t| t.table_id as _)),
666                    ),
667                )
668                .all(&txn)
669                .await?
670                .into_iter()
671                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
672            inner
673                .dropped_tables
674                .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
675        }
676
677        if need_notify {
678            let obj: Option<PartialObject> = Object::find_by_id(job_id)
679                .select_only()
680                .columns([
681                    object::Column::Oid,
682                    object::Column::ObjType,
683                    object::Column::SchemaId,
684                    object::Column::DatabaseId,
685                ])
686                .into_partial_model()
687                .one(&txn)
688                .await?;
689            let obj =
690                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
691            objs.push(obj);
692            let internal_table_objs: Vec<PartialObject> = Object::find()
693                .select_only()
694                .columns([
695                    object::Column::Oid,
696                    object::Column::ObjType,
697                    object::Column::SchemaId,
698                    object::Column::DatabaseId,
699                ])
700                .join(JoinType::InnerJoin, object::Relation::Table.def())
701                .filter(table::Column::BelongsToJobId.eq(job_id))
702                .into_partial_model()
703                .all(&txn)
704                .await?;
705            objs.extend(internal_table_objs);
706        }
707
708        // Check if the job is creating sink into table.
709        if table_obj.is_none()
710            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
711                .select_only()
712                .column(sink::Column::TargetTable)
713                .into_tuple::<Option<TableId>>()
714                .one(&txn)
715                .await?
716        {
717            let tmp_id: Option<ObjectId> = ObjectDependency::find()
718                .select_only()
719                .column(object_dependency::Column::UsedBy)
720                .join(
721                    JoinType::InnerJoin,
722                    object_dependency::Relation::Object1.def(),
723                )
724                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
725                .filter(
726                    object_dependency::Column::Oid
727                        .eq(target_table_id)
728                        .and(object::Column::ObjType.eq(ObjectType::Table))
729                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
730                )
731                .into_tuple()
732                .one(&txn)
733                .await?;
734            if let Some(tmp_id) = tmp_id {
735                tracing::warn!(
736                    id = tmp_id,
737                    "aborting temp streaming job for sink into table"
738                );
739                Object::delete_by_id(tmp_id).exec(&txn).await?;
740            }
741        }
742
743        Object::delete_by_id(job_id).exec(&txn).await?;
744        if !internal_table_ids.is_empty() {
745            Object::delete_many()
746                .filter(object::Column::Oid.is_in(internal_table_ids))
747                .exec(&txn)
748                .await?;
749        }
750        if let Some(t) = &table_obj
751            && let Some(source_id) = t.optional_associated_source_id
752        {
753            Object::delete_by_id(source_id).exec(&txn).await?;
754        }
755
756        let err = if is_cancelled {
757            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
758        } else {
759            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
760        };
761        let abort_reason = format!("streaming job aborted {}", err.as_report());
762        for tx in inner
763            .creating_table_finish_notifier
764            .get_mut(&database_id)
765            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
766            .into_iter()
767            .flatten()
768            .flatten()
769        {
770            let _ = tx.send(Err(abort_reason.clone()));
771        }
772        txn.commit().await?;
773
774        if !objs.is_empty() {
775            // We also have notified the frontend about these objects,
776            // so we need to notify the frontend to delete them here.
777            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
778                .await;
779        }
780        Ok((true, Some(database_id)))
781    }
782
783    #[await_tree::instrument]
784    pub async fn post_collect_job_fragments(
785        &self,
786        job_id: ObjectId,
787        actor_ids: Vec<crate::model::ActorId>,
788        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
789        split_assignment: &SplitAssignment,
790        new_sink_downstream: Option<FragmentDownstreamRelation>,
791    ) -> MetaResult<()> {
792        let inner = self.inner.write().await;
793        let txn = inner.db.begin().await?;
794
795        let actor_ids = actor_ids.into_iter().map(|id| id as ActorId).collect_vec();
796
797        Actor::update_many()
798            .col_expr(
799                actor::Column::Status,
800                SimpleExpr::from(ActorStatus::Running.into_value()),
801            )
802            .filter(actor::Column::ActorId.is_in(actor_ids))
803            .exec(&txn)
804            .await?;
805
806        for splits in split_assignment.values() {
807            for (actor_id, splits) in splits {
808                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
809                let connector_splits = &PbConnectorSplits { splits };
810                actor::ActiveModel {
811                    actor_id: Set(*actor_id as _),
812                    splits: Set(Some(connector_splits.into())),
813                    ..Default::default()
814                }
815                .update(&txn)
816                .await?;
817            }
818        }
819
820        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
821
822        if let Some(new_downstream) = new_sink_downstream {
823            insert_fragment_relations(&txn, &new_downstream).await?;
824        }
825
826        // Mark job as CREATING.
827        streaming_job::ActiveModel {
828            job_id: Set(job_id),
829            job_status: Set(JobStatus::Creating),
830            ..Default::default()
831        }
832        .update(&txn)
833        .await?;
834
835        txn.commit().await?;
836
837        Ok(())
838    }
839
840    pub async fn create_job_catalog_for_replace(
841        &self,
842        streaming_job: &StreamingJob,
843        ctx: Option<&StreamContext>,
844        specified_parallelism: Option<&NonZeroUsize>,
845        expected_original_max_parallelism: Option<usize>,
846    ) -> MetaResult<ObjectId> {
847        let id = streaming_job.id();
848        let inner = self.inner.write().await;
849        let txn = inner.db.begin().await?;
850
851        // 1. check version.
852        streaming_job.verify_version_for_replace(&txn).await?;
853        // 2. check concurrent replace.
854        let referring_cnt = ObjectDependency::find()
855            .join(
856                JoinType::InnerJoin,
857                object_dependency::Relation::Object1.def(),
858            )
859            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
860            .filter(
861                object_dependency::Column::Oid
862                    .eq(id as ObjectId)
863                    .and(object::Column::ObjType.eq(ObjectType::Table))
864                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
865            )
866            .count(&txn)
867            .await?;
868        if referring_cnt != 0 {
869            return Err(MetaError::permission_denied(
870                "job is being altered or referenced by some creating jobs",
871            ));
872        }
873
874        // 3. check parallelism.
875        let (original_max_parallelism, original_timezone): (i32, Option<String>) =
876            StreamingJobModel::find_by_id(id as ObjectId)
877                .select_only()
878                .column(streaming_job::Column::MaxParallelism)
879                .column(streaming_job::Column::Timezone)
880                .into_tuple()
881                .one(&txn)
882                .await?
883                .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
884
885        if let Some(max_parallelism) = expected_original_max_parallelism
886            && original_max_parallelism != max_parallelism as i32
887        {
888            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
889            // This should not happen in normal cases.
890            bail!(
891                "cannot use a different max parallelism \
892                 when replacing streaming job, \
893                 original: {}, new: {}",
894                original_max_parallelism,
895                max_parallelism
896            );
897        }
898
899        let parallelism = match specified_parallelism {
900            None => StreamingParallelism::Adaptive,
901            Some(n) => StreamingParallelism::Fixed(n.get() as _),
902        };
903        let timezone = ctx
904            .map(|ctx| ctx.timezone.clone())
905            .unwrap_or(original_timezone);
906
907        // 4. create streaming object for new replace table.
908        let new_obj_id = Self::create_streaming_job_obj(
909            &txn,
910            streaming_job.object_type(),
911            streaming_job.owner() as _,
912            Some(streaming_job.database_id() as _),
913            Some(streaming_job.schema_id() as _),
914            streaming_job.create_type(),
915            timezone,
916            parallelism,
917            original_max_parallelism as _,
918            None,
919        )
920        .await?;
921
922        // 5. record dependency for new replace table.
923        ObjectDependency::insert(object_dependency::ActiveModel {
924            oid: Set(id as _),
925            used_by: Set(new_obj_id as _),
926            ..Default::default()
927        })
928        .exec(&txn)
929        .await?;
930
931        txn.commit().await?;
932
933        Ok(new_obj_id)
934    }
935
936    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
937    pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> {
938        let mut inner = self.inner.write().await;
939        let txn = inner.db.begin().await?;
940
941        let job_type = Object::find_by_id(job_id)
942            .select_only()
943            .column(object::Column::ObjType)
944            .into_tuple()
945            .one(&txn)
946            .await?
947            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
948
949        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
950            .select_only()
951            .column(streaming_job::Column::CreateType)
952            .into_tuple()
953            .one(&txn)
954            .await?
955            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
956
957        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
958        let res = Object::update_many()
959            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
960            .col_expr(
961                object::Column::CreatedAtClusterVersion,
962                current_cluster_version().into(),
963            )
964            .filter(object::Column::Oid.eq(job_id))
965            .exec(&txn)
966            .await?;
967        if res.rows_affected == 0 {
968            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
969        }
970
971        // mark the target stream job as `Created`.
972        let job = streaming_job::ActiveModel {
973            job_id: Set(job_id),
974            job_status: Set(JobStatus::Created),
975            ..Default::default()
976        };
977        job.update(&txn).await?;
978
979        // notify frontend: job, internal tables.
980        let internal_table_objs = Table::find()
981            .find_also_related(Object)
982            .filter(table::Column::BelongsToJobId.eq(job_id))
983            .all(&txn)
984            .await?;
985        let mut objects = internal_table_objs
986            .iter()
987            .map(|(table, obj)| PbObject {
988                object_info: Some(PbObjectInfo::Table(
989                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
990                )),
991            })
992            .collect_vec();
993        let mut notification_op = if create_type == CreateType::Background {
994            NotificationOperation::Update
995        } else {
996            NotificationOperation::Add
997        };
998        let mut updated_user_info = vec![];
999
1000        match job_type {
1001            ObjectType::Table => {
1002                let (table, obj) = Table::find_by_id(job_id)
1003                    .find_also_related(Object)
1004                    .one(&txn)
1005                    .await?
1006                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1007                if table.table_type == TableType::MaterializedView {
1008                    notification_op = NotificationOperation::Update;
1009                }
1010
1011                if let Some(source_id) = table.optional_associated_source_id {
1012                    let (src, obj) = Source::find_by_id(source_id)
1013                        .find_also_related(Object)
1014                        .one(&txn)
1015                        .await?
1016                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1017                    objects.push(PbObject {
1018                        object_info: Some(PbObjectInfo::Source(
1019                            ObjectModel(src, obj.unwrap()).into(),
1020                        )),
1021                    });
1022                }
1023                objects.push(PbObject {
1024                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1025                });
1026            }
1027            ObjectType::Sink => {
1028                let (sink, obj) = Sink::find_by_id(job_id)
1029                    .find_also_related(Object)
1030                    .one(&txn)
1031                    .await?
1032                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1033                objects.push(PbObject {
1034                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1035                });
1036            }
1037            ObjectType::Index => {
1038                let (index, obj) = Index::find_by_id(job_id)
1039                    .find_also_related(Object)
1040                    .one(&txn)
1041                    .await?
1042                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1043                {
1044                    let (table, obj) = Table::find_by_id(index.index_table_id)
1045                        .find_also_related(Object)
1046                        .one(&txn)
1047                        .await?
1048                        .ok_or_else(|| {
1049                            MetaError::catalog_id_not_found("table", index.index_table_id)
1050                        })?;
1051                    objects.push(PbObject {
1052                        object_info: Some(PbObjectInfo::Table(
1053                            ObjectModel(table, obj.unwrap()).into(),
1054                        )),
1055                    });
1056                }
1057
1058                // If the index is created on a table with privileges, we should also
1059                // grant the privileges for the index and its state tables.
1060                let primary_table_privileges = UserPrivilege::find()
1061                    .filter(
1062                        user_privilege::Column::Oid
1063                            .eq(index.primary_table_id)
1064                            .and(user_privilege::Column::Action.eq(Action::Select)),
1065                    )
1066                    .all(&txn)
1067                    .await?;
1068                if !primary_table_privileges.is_empty() {
1069                    let index_state_table_ids: Vec<TableId> = Table::find()
1070                        .select_only()
1071                        .column(table::Column::TableId)
1072                        .filter(
1073                            table::Column::BelongsToJobId
1074                                .eq(job_id)
1075                                .or(table::Column::TableId.eq(index.index_table_id)),
1076                        )
1077                        .into_tuple()
1078                        .all(&txn)
1079                        .await?;
1080                    let mut new_privileges = vec![];
1081                    for privilege in &primary_table_privileges {
1082                        for state_table_id in &index_state_table_ids {
1083                            new_privileges.push(user_privilege::ActiveModel {
1084                                id: Default::default(),
1085                                oid: Set(*state_table_id),
1086                                user_id: Set(privilege.user_id),
1087                                action: Set(Action::Select),
1088                                dependent_id: Set(privilege.dependent_id),
1089                                granted_by: Set(privilege.granted_by),
1090                                with_grant_option: Set(privilege.with_grant_option),
1091                            });
1092                        }
1093                    }
1094                    UserPrivilege::insert_many(new_privileges)
1095                        .exec(&txn)
1096                        .await?;
1097
1098                    updated_user_info = list_user_info_by_ids(
1099                        primary_table_privileges.into_iter().map(|p| p.user_id),
1100                        &txn,
1101                    )
1102                    .await?;
1103                }
1104
1105                objects.push(PbObject {
1106                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1107                });
1108            }
1109            ObjectType::Source => {
1110                let (source, obj) = Source::find_by_id(job_id)
1111                    .find_also_related(Object)
1112                    .one(&txn)
1113                    .await?
1114                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1115                objects.push(PbObject {
1116                    object_info: Some(PbObjectInfo::Source(
1117                        ObjectModel(source, obj.unwrap()).into(),
1118                    )),
1119                });
1120            }
1121            _ => unreachable!("invalid job type: {:?}", job_type),
1122        }
1123
1124        if job_type != ObjectType::Index {
1125            updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1126        }
1127        txn.commit().await?;
1128
1129        let mut version = self
1130            .notify_frontend(
1131                notification_op,
1132                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1133            )
1134            .await;
1135
1136        // notify users about the default privileges
1137        if !updated_user_info.is_empty() {
1138            version = self.notify_users_update(updated_user_info).await;
1139        }
1140
1141        inner
1142            .creating_table_finish_notifier
1143            .values_mut()
1144            .for_each(|creating_tables| {
1145                if let Some(txs) = creating_tables.remove(&job_id) {
1146                    for tx in txs {
1147                        let _ = tx.send(Ok(version));
1148                    }
1149                }
1150            });
1151
1152        Ok(())
1153    }
1154
1155    pub async fn finish_replace_streaming_job(
1156        &self,
1157        tmp_id: ObjectId,
1158        streaming_job: StreamingJob,
1159        replace_upstream: FragmentReplaceUpstream,
1160        sink_into_table_context: SinkIntoTableContext,
1161        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1162        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1163    ) -> MetaResult<NotificationVersion> {
1164        let inner = self.inner.write().await;
1165        let txn = inner.db.begin().await?;
1166
1167        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1168            tmp_id,
1169            replace_upstream,
1170            sink_into_table_context,
1171            &txn,
1172            streaming_job,
1173            drop_table_connector_ctx,
1174            auto_refresh_schema_sinks,
1175        )
1176        .await?;
1177
1178        txn.commit().await?;
1179
1180        let mut version = self
1181            .notify_frontend(
1182                NotificationOperation::Update,
1183                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1184            )
1185            .await;
1186
1187        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1188            self.notify_users_update(user_infos).await;
1189            version = self
1190                .notify_frontend(
1191                    NotificationOperation::Delete,
1192                    build_object_group_for_delete(to_drop_objects),
1193                )
1194                .await;
1195        }
1196
1197        Ok(version)
1198    }
1199
1200    pub async fn finish_replace_streaming_job_inner(
1201        tmp_id: ObjectId,
1202        replace_upstream: FragmentReplaceUpstream,
1203        SinkIntoTableContext {
1204            updated_sink_catalogs,
1205        }: SinkIntoTableContext,
1206        txn: &DatabaseTransaction,
1207        streaming_job: StreamingJob,
1208        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1209        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1210    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1211        let original_job_id = streaming_job.id() as ObjectId;
1212        let job_type = streaming_job.job_type();
1213
1214        let mut index_item_rewriter = None;
1215
1216        // Update catalog
1217        match streaming_job {
1218            StreamingJob::Table(_source, table, _table_job_type) => {
1219                // The source catalog should remain unchanged
1220
1221                let original_column_catalogs = get_table_columns(txn, original_job_id).await?;
1222
1223                index_item_rewriter = Some({
1224                    let original_columns = original_column_catalogs
1225                        .to_protobuf()
1226                        .into_iter()
1227                        .map(|c| c.column_desc.unwrap())
1228                        .collect_vec();
1229                    let new_columns = table
1230                        .columns
1231                        .iter()
1232                        .map(|c| c.column_desc.clone().unwrap())
1233                        .collect_vec();
1234
1235                    IndexItemRewriter {
1236                        original_columns,
1237                        new_columns,
1238                    }
1239                });
1240
1241                // For sinks created in earlier versions, we need to set the original_target_columns.
1242                for sink_id in updated_sink_catalogs {
1243                    sink::ActiveModel {
1244                        sink_id: Set(sink_id as _),
1245                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1246                        ..Default::default()
1247                    }
1248                    .update(txn)
1249                    .await?;
1250                }
1251                // Update the table catalog with the new one. (column catalog is also updated here)
1252                let mut table = table::ActiveModel::from(table);
1253                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1254                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1255                {
1256                    // drop table connector, the rest logic is in `drop_table_associated_source`
1257                    table.optional_associated_source_id = Set(None);
1258                }
1259
1260                table.update(txn).await?;
1261            }
1262            StreamingJob::Source(source) => {
1263                // Update the source catalog with the new one.
1264                let source = source::ActiveModel::from(source);
1265                source.update(txn).await?;
1266            }
1267            StreamingJob::MaterializedView(table) => {
1268                // Update the table catalog with the new one.
1269                let table = table::ActiveModel::from(table);
1270                table.update(txn).await?;
1271            }
1272            _ => unreachable!(
1273                "invalid streaming job type: {:?}",
1274                streaming_job.job_type_str()
1275            ),
1276        }
1277
1278        async fn finish_fragments(
1279            txn: &DatabaseTransaction,
1280            tmp_id: ObjectId,
1281            original_job_id: ObjectId,
1282            replace_upstream: FragmentReplaceUpstream,
1283        ) -> MetaResult<()> {
1284            // 0. update internal tables
1285            // Fields including `fragment_id` were placeholder values before.
1286            // After table fragments are created, update them for all internal tables.
1287            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1288                .select_only()
1289                .columns([
1290                    fragment::Column::FragmentId,
1291                    fragment::Column::StateTableIds,
1292                ])
1293                .filter(fragment::Column::JobId.eq(tmp_id))
1294                .into_tuple()
1295                .all(txn)
1296                .await?;
1297            for (fragment_id, state_table_ids) in fragment_info {
1298                for state_table_id in state_table_ids.into_inner() {
1299                    table::ActiveModel {
1300                        table_id: Set(state_table_id as _),
1301                        fragment_id: Set(Some(fragment_id)),
1302                        // No need to update `vnode_count` because it must remain the same.
1303                        ..Default::default()
1304                    }
1305                    .update(txn)
1306                    .await?;
1307                }
1308            }
1309
1310            // 1. replace old fragments/actors with new ones.
1311            Fragment::delete_many()
1312                .filter(fragment::Column::JobId.eq(original_job_id))
1313                .exec(txn)
1314                .await?;
1315            Fragment::update_many()
1316                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1317                .filter(fragment::Column::JobId.eq(tmp_id))
1318                .exec(txn)
1319                .await?;
1320
1321            // 2. update merges.
1322            // update downstream fragment's Merge node, and upstream_fragment_id
1323            for (fragment_id, fragment_replace_map) in replace_upstream {
1324                let (fragment_id, mut stream_node) =
1325                    Fragment::find_by_id(fragment_id as FragmentId)
1326                        .select_only()
1327                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1328                        .into_tuple::<(FragmentId, StreamNode)>()
1329                        .one(txn)
1330                        .await?
1331                        .map(|(id, node)| (id, node.to_protobuf()))
1332                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1333
1334                visit_stream_node_mut(&mut stream_node, |body| {
1335                    if let PbNodeBody::Merge(m) = body
1336                        && let Some(new_fragment_id) =
1337                            fragment_replace_map.get(&m.upstream_fragment_id)
1338                    {
1339                        m.upstream_fragment_id = *new_fragment_id;
1340                    }
1341                });
1342                fragment::ActiveModel {
1343                    fragment_id: Set(fragment_id),
1344                    stream_node: Set(StreamNode::from(&stream_node)),
1345                    ..Default::default()
1346                }
1347                .update(txn)
1348                .await?;
1349            }
1350
1351            // 3. remove dummy object.
1352            Object::delete_by_id(tmp_id).exec(txn).await?;
1353
1354            Ok(())
1355        }
1356
1357        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1358
1359        // 4. update catalogs and notify.
1360        let mut objects = vec![];
1361        match job_type {
1362            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1363                let (table, table_obj) = Table::find_by_id(original_job_id)
1364                    .find_also_related(Object)
1365                    .one(txn)
1366                    .await?
1367                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1368                objects.push(PbObject {
1369                    object_info: Some(PbObjectInfo::Table(
1370                        ObjectModel(table, table_obj.unwrap()).into(),
1371                    )),
1372                })
1373            }
1374            StreamingJobType::Source => {
1375                let (source, source_obj) = Source::find_by_id(original_job_id)
1376                    .find_also_related(Object)
1377                    .one(txn)
1378                    .await?
1379                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1380                objects.push(PbObject {
1381                    object_info: Some(PbObjectInfo::Source(
1382                        ObjectModel(source, source_obj.unwrap()).into(),
1383                    )),
1384                })
1385            }
1386            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1387        }
1388
1389        if let Some(expr_rewriter) = index_item_rewriter {
1390            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1391                .select_only()
1392                .columns([index::Column::IndexId, index::Column::IndexItems])
1393                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1394                .into_tuple()
1395                .all(txn)
1396                .await?;
1397            for (index_id, nodes) in index_items {
1398                let mut pb_nodes = nodes.to_protobuf();
1399                pb_nodes
1400                    .iter_mut()
1401                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1402                let index = index::ActiveModel {
1403                    index_id: Set(index_id),
1404                    index_items: Set(pb_nodes.into()),
1405                    ..Default::default()
1406                }
1407                .update(txn)
1408                .await?;
1409                let index_obj = index
1410                    .find_related(Object)
1411                    .one(txn)
1412                    .await?
1413                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1414                objects.push(PbObject {
1415                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1416                });
1417            }
1418        }
1419
1420        if let Some(sinks) = auto_refresh_schema_sinks {
1421            for finish_sink_context in sinks {
1422                finish_fragments(
1423                    txn,
1424                    finish_sink_context.tmp_sink_id,
1425                    finish_sink_context.original_sink_id,
1426                    Default::default(),
1427                )
1428                .await?;
1429                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1430                    .find_also_related(Object)
1431                    .one(txn)
1432                    .await?
1433                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1434                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1435                Sink::update(sink::ActiveModel {
1436                    sink_id: Set(finish_sink_context.original_sink_id),
1437                    columns: Set(columns.clone()),
1438                    ..Default::default()
1439                })
1440                .exec(txn)
1441                .await?;
1442                sink.columns = columns;
1443                objects.push(PbObject {
1444                    object_info: Some(PbObjectInfo::Sink(
1445                        ObjectModel(sink, sink_obj.unwrap()).into(),
1446                    )),
1447                });
1448                if let Some((log_store_table_id, new_log_store_table_columns)) =
1449                    finish_sink_context.new_log_store_table
1450                {
1451                    let new_log_store_table_columns: ColumnCatalogArray =
1452                        new_log_store_table_columns.into();
1453                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1454                        .find_also_related(Object)
1455                        .one(txn)
1456                        .await?
1457                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1458                    Table::update(table::ActiveModel {
1459                        table_id: Set(log_store_table_id),
1460                        columns: Set(new_log_store_table_columns.clone()),
1461                        ..Default::default()
1462                    })
1463                    .exec(txn)
1464                    .await?;
1465                    table.columns = new_log_store_table_columns;
1466                    objects.push(PbObject {
1467                        object_info: Some(PbObjectInfo::Table(
1468                            ObjectModel(table, table_obj.unwrap()).into(),
1469                        )),
1470                    });
1471                }
1472            }
1473        }
1474
1475        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1476        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1477            notification_objs =
1478                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1479        }
1480
1481        Ok((objects, notification_objs))
1482    }
1483
1484    /// Abort the replacing streaming job by deleting the temporary job object.
1485    pub async fn try_abort_replacing_streaming_job(
1486        &self,
1487        tmp_job_id: ObjectId,
1488        tmp_sink_ids: Option<Vec<ObjectId>>,
1489    ) -> MetaResult<()> {
1490        let inner = self.inner.write().await;
1491        let txn = inner.db.begin().await?;
1492        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1493        if let Some(tmp_sink_ids) = tmp_sink_ids {
1494            for tmp_sink_id in tmp_sink_ids {
1495                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1496            }
1497        }
1498        txn.commit().await?;
1499        Ok(())
1500    }
1501
1502    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1503    // return the actor_ids to be applied
1504    pub async fn update_source_rate_limit_by_source_id(
1505        &self,
1506        source_id: SourceId,
1507        rate_limit: Option<u32>,
1508    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1509        let inner = self.inner.read().await;
1510        let txn = inner.db.begin().await?;
1511
1512        {
1513            let active_source = source::ActiveModel {
1514                source_id: Set(source_id),
1515                rate_limit: Set(rate_limit.map(|v| v as i32)),
1516                ..Default::default()
1517            };
1518            active_source.update(&txn).await?;
1519        }
1520
1521        let (source, obj) = Source::find_by_id(source_id)
1522            .find_also_related(Object)
1523            .one(&txn)
1524            .await?
1525            .ok_or_else(|| {
1526                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1527            })?;
1528
1529        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1530        let streaming_job_ids: Vec<ObjectId> =
1531            if let Some(table_id) = source.optional_associated_table_id {
1532                vec![table_id]
1533            } else if let Some(source_info) = &source.source_info
1534                && source_info.to_protobuf().is_shared()
1535            {
1536                vec![source_id]
1537            } else {
1538                ObjectDependency::find()
1539                    .select_only()
1540                    .column(object_dependency::Column::UsedBy)
1541                    .filter(object_dependency::Column::Oid.eq(source_id))
1542                    .into_tuple()
1543                    .all(&txn)
1544                    .await?
1545            };
1546
1547        if streaming_job_ids.is_empty() {
1548            return Err(MetaError::invalid_parameter(format!(
1549                "source id {source_id} not used by any streaming job"
1550            )));
1551        }
1552
1553        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1554            .select_only()
1555            .columns([
1556                fragment::Column::FragmentId,
1557                fragment::Column::FragmentTypeMask,
1558                fragment::Column::StreamNode,
1559            ])
1560            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1561            .into_tuple()
1562            .all(&txn)
1563            .await?;
1564        let mut fragments = fragments
1565            .into_iter()
1566            .map(|(id, mask, stream_node)| {
1567                (
1568                    id,
1569                    FragmentTypeMask::from(mask as u32),
1570                    stream_node.to_protobuf(),
1571                )
1572            })
1573            .collect_vec();
1574
1575        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1576            let mut found = false;
1577            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1578                visit_stream_node_mut(stream_node, |node| {
1579                    if let PbNodeBody::Source(node) = node
1580                        && let Some(node_inner) = &mut node.source_inner
1581                        && node_inner.source_id == source_id as u32
1582                    {
1583                        node_inner.rate_limit = rate_limit;
1584                        found = true;
1585                    }
1586                });
1587            }
1588            if is_fs_source {
1589                // in older versions, there's no fragment type flag for `FsFetch` node,
1590                // so we just scan all fragments for StreamFsFetch node if using fs connector
1591                visit_stream_node_mut(stream_node, |node| {
1592                    if let PbNodeBody::StreamFsFetch(node) = node {
1593                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1594                        if let Some(node_inner) = &mut node.node_inner
1595                            && node_inner.source_id == source_id as u32
1596                        {
1597                            node_inner.rate_limit = rate_limit;
1598                            found = true;
1599                        }
1600                    }
1601                });
1602            }
1603            found
1604        });
1605
1606        assert!(
1607            !fragments.is_empty(),
1608            "source id should be used by at least one fragment"
1609        );
1610        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1611        for (id, fragment_type_mask, stream_node) in fragments {
1612            fragment::ActiveModel {
1613                fragment_id: Set(id),
1614                fragment_type_mask: Set(fragment_type_mask.into()),
1615                stream_node: Set(StreamNode::from(&stream_node)),
1616                ..Default::default()
1617            }
1618            .update(&txn)
1619            .await?;
1620        }
1621        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1622
1623        txn.commit().await?;
1624
1625        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1626        let _version = self
1627            .notify_frontend(
1628                NotificationOperation::Update,
1629                NotificationInfo::ObjectGroup(PbObjectGroup {
1630                    objects: vec![PbObject {
1631                        object_info: Some(relation_info),
1632                    }],
1633                }),
1634            )
1635            .await;
1636
1637        Ok(fragment_actors)
1638    }
1639
1640    // edit the content of fragments in given `table_id`
1641    // return the actor_ids to be applied
1642    pub async fn mutate_fragments_by_job_id(
1643        &self,
1644        job_id: ObjectId,
1645        // returns true if the mutation is applied
1646        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1647        // error message when no relevant fragments is found
1648        err_msg: &'static str,
1649    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1650        let inner = self.inner.read().await;
1651        let txn = inner.db.begin().await?;
1652
1653        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1654            .select_only()
1655            .columns([
1656                fragment::Column::FragmentId,
1657                fragment::Column::FragmentTypeMask,
1658                fragment::Column::StreamNode,
1659            ])
1660            .filter(fragment::Column::JobId.eq(job_id))
1661            .into_tuple()
1662            .all(&txn)
1663            .await?;
1664        let mut fragments = fragments
1665            .into_iter()
1666            .map(|(id, mask, stream_node)| {
1667                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1668            })
1669            .collect_vec();
1670
1671        let fragments = fragments
1672            .iter_mut()
1673            .map(|(_, fragment_type_mask, stream_node)| {
1674                fragments_mutation_fn(*fragment_type_mask, stream_node)
1675            })
1676            .collect::<MetaResult<Vec<bool>>>()?
1677            .into_iter()
1678            .zip_eq_debug(std::mem::take(&mut fragments))
1679            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1680            .collect::<Vec<_>>();
1681
1682        if fragments.is_empty() {
1683            return Err(MetaError::invalid_parameter(format!(
1684                "job id {job_id}: {}",
1685                err_msg
1686            )));
1687        }
1688
1689        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1690        for (id, _, stream_node) in fragments {
1691            fragment::ActiveModel {
1692                fragment_id: Set(id),
1693                stream_node: Set(StreamNode::from(&stream_node)),
1694                ..Default::default()
1695            }
1696            .update(&txn)
1697            .await?;
1698        }
1699        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1700
1701        txn.commit().await?;
1702
1703        Ok(fragment_actors)
1704    }
1705
1706    async fn mutate_fragment_by_fragment_id(
1707        &self,
1708        fragment_id: FragmentId,
1709        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1710        err_msg: &'static str,
1711    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1712        let inner = self.inner.read().await;
1713        let txn = inner.db.begin().await?;
1714
1715        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1716            Fragment::find_by_id(fragment_id)
1717                .select_only()
1718                .columns([
1719                    fragment::Column::FragmentTypeMask,
1720                    fragment::Column::StreamNode,
1721                ])
1722                .into_tuple()
1723                .one(&txn)
1724                .await?
1725                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1726        let mut pb_stream_node = stream_node.to_protobuf();
1727        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1728
1729        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1730            return Err(MetaError::invalid_parameter(format!(
1731                "fragment id {fragment_id}: {}",
1732                err_msg
1733            )));
1734        }
1735
1736        fragment::ActiveModel {
1737            fragment_id: Set(fragment_id),
1738            stream_node: Set(stream_node),
1739            ..Default::default()
1740        }
1741        .update(&txn)
1742        .await?;
1743
1744        let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1745
1746        txn.commit().await?;
1747
1748        Ok(fragment_actors)
1749    }
1750
1751    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1752    // return the actor_ids to be applied
1753    pub async fn update_backfill_rate_limit_by_job_id(
1754        &self,
1755        job_id: ObjectId,
1756        rate_limit: Option<u32>,
1757    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1758        let update_backfill_rate_limit =
1759            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1760                let mut found = false;
1761                if fragment_type_mask
1762                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1763                {
1764                    visit_stream_node_mut(stream_node, |node| match node {
1765                        PbNodeBody::StreamCdcScan(node) => {
1766                            node.rate_limit = rate_limit;
1767                            found = true;
1768                        }
1769                        PbNodeBody::StreamScan(node) => {
1770                            node.rate_limit = rate_limit;
1771                            found = true;
1772                        }
1773                        PbNodeBody::SourceBackfill(node) => {
1774                            node.rate_limit = rate_limit;
1775                            found = true;
1776                        }
1777                        PbNodeBody::Sink(node) => {
1778                            node.rate_limit = rate_limit;
1779                            found = true;
1780                        }
1781                        _ => {}
1782                    });
1783                }
1784                Ok(found)
1785            };
1786
1787        self.mutate_fragments_by_job_id(
1788            job_id,
1789            update_backfill_rate_limit,
1790            "stream scan node or source node not found",
1791        )
1792        .await
1793    }
1794
1795    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1796    // return the actor_ids to be applied
1797    pub async fn update_sink_rate_limit_by_job_id(
1798        &self,
1799        job_id: ObjectId,
1800        rate_limit: Option<u32>,
1801    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1802        let update_sink_rate_limit =
1803            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1804                let mut found = Ok(false);
1805                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1806                    visit_stream_node_mut(stream_node, |node| {
1807                        if let PbNodeBody::Sink(node) = node {
1808                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1809                                found = Err(MetaError::invalid_parameter(
1810                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1811                                ));
1812                                return;
1813                            }
1814                            node.rate_limit = rate_limit;
1815                            found = Ok(true);
1816                        }
1817                    });
1818                }
1819                found
1820            };
1821
1822        self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1823            .await
1824    }
1825
1826    pub async fn update_dml_rate_limit_by_job_id(
1827        &self,
1828        job_id: ObjectId,
1829        rate_limit: Option<u32>,
1830    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1831        let update_dml_rate_limit =
1832            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1833                let mut found = false;
1834                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1835                    visit_stream_node_mut(stream_node, |node| {
1836                        if let PbNodeBody::Dml(node) = node {
1837                            node.rate_limit = rate_limit;
1838                            found = true;
1839                        }
1840                    });
1841                }
1842                Ok(found)
1843            };
1844
1845        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1846            .await
1847    }
1848
1849    pub async fn update_source_props_by_source_id(
1850        &self,
1851        source_id: SourceId,
1852        alter_props: BTreeMap<String, String>,
1853        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1854    ) -> MetaResult<WithOptionsSecResolved> {
1855        let inner = self.inner.read().await;
1856        let txn = inner.db.begin().await?;
1857
1858        let (source, _obj) = Source::find_by_id(source_id)
1859            .find_also_related(Object)
1860            .one(&txn)
1861            .await?
1862            .ok_or_else(|| {
1863                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1864            })?;
1865        let connector = source.with_properties.0.get_connector().unwrap();
1866        let is_shared_source = source.is_shared();
1867
1868        let mut dep_source_job_ids: Vec<ObjectId> = Vec::new();
1869        if !is_shared_source {
1870            // mv using non-shared source holds a copy of source in their fragments
1871            dep_source_job_ids = ObjectDependency::find()
1872                .select_only()
1873                .column(object_dependency::Column::UsedBy)
1874                .filter(object_dependency::Column::Oid.eq(source_id))
1875                .into_tuple()
1876                .all(&txn)
1877                .await?;
1878        }
1879
1880        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1881        let prop_keys: Vec<String> = alter_props
1882            .keys()
1883            .chain(alter_secret_refs.keys())
1884            .cloned()
1885            .collect();
1886        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1887            &connector, &prop_keys,
1888        )?;
1889
1890        let mut options_with_secret = WithOptionsSecResolved::new(
1891            source.with_properties.0.clone(),
1892            source
1893                .secret_ref
1894                .map(|secret_ref| secret_ref.to_protobuf())
1895                .unwrap_or_default(),
1896        );
1897        let (to_add_secret_dep, to_remove_secret_dep) =
1898            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1899
1900        tracing::info!(
1901            "applying new properties to source: source_id={}, options_with_secret={:?}",
1902            source_id,
1903            options_with_secret
1904        );
1905        // check if the alter-ed props are valid for each Connector
1906        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1907        // todo: validate via source manager
1908
1909        let mut associate_table_id = None;
1910
1911        // can be source_id or table_id
1912        // if updating an associated source, the preferred_id is the table_id
1913        // otherwise, it is the source_id
1914        let mut preferred_id: i32 = source_id;
1915        let rewrite_sql = {
1916            let definition = source.definition.clone();
1917
1918            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1919                .map_err(|e| {
1920                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1921                        anyhow!(e).context("Failed to parse source definition SQL"),
1922                    )))
1923                })?
1924                .try_into()
1925                .unwrap();
1926
1927            /// Formats SQL options with secret values properly resolved
1928            ///
1929            /// This function processes configuration options that may contain sensitive data:
1930            /// - Plaintext options are directly converted to `SqlOption`
1931            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
1932            ///   without exposing the actual secret value
1933            ///
1934            /// # Arguments
1935            /// * `txn` - Database transaction for retrieving secrets
1936            /// * `options_with_secret` - Container of options with both plaintext and secret values
1937            ///
1938            /// # Returns
1939            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
1940            async fn format_with_option_secret_resolved(
1941                txn: &DatabaseTransaction,
1942                options_with_secret: &WithOptionsSecResolved,
1943            ) -> MetaResult<Vec<SqlOption>> {
1944                let mut options = Vec::new();
1945                for (k, v) in options_with_secret.as_plaintext() {
1946                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1947                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1948                    options.push(sql_option);
1949                }
1950                for (k, v) in options_with_secret.as_secret() {
1951                    if let Some(secret_model) =
1952                        Secret::find_by_id(v.secret_id as i32).one(txn).await?
1953                    {
1954                        let sql_option =
1955                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1956                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1957                        options.push(sql_option);
1958                    } else {
1959                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1960                    }
1961                }
1962                Ok(options)
1963            }
1964
1965            match &mut stmt {
1966                Statement::CreateSource { stmt } => {
1967                    stmt.with_properties.0 =
1968                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1969                }
1970                Statement::CreateTable { with_options, .. } => {
1971                    *with_options =
1972                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1973                    associate_table_id = source.optional_associated_table_id;
1974                    preferred_id = associate_table_id.unwrap();
1975                }
1976                _ => unreachable!(),
1977            }
1978
1979            stmt.to_string()
1980        };
1981
1982        {
1983            // update secret dependencies
1984            if !to_add_secret_dep.is_empty() {
1985                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1986                    object_dependency::ActiveModel {
1987                        oid: Set(secret_id as _),
1988                        used_by: Set(preferred_id as _),
1989                        ..Default::default()
1990                    }
1991                }))
1992                .exec(&txn)
1993                .await?;
1994            }
1995            if !to_remove_secret_dep.is_empty() {
1996                // todo: fix the filter logic
1997                let _ = ObjectDependency::delete_many()
1998                    .filter(
1999                        object_dependency::Column::Oid
2000                            .is_in(to_remove_secret_dep)
2001                            .and(
2002                                object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
2003                            ),
2004                    )
2005                    .exec(&txn)
2006                    .await?;
2007            }
2008        }
2009
2010        let active_source_model = source::ActiveModel {
2011            source_id: Set(source_id),
2012            definition: Set(rewrite_sql.clone()),
2013            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2014            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2015                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2016            ..Default::default()
2017        };
2018        active_source_model.update(&txn).await?;
2019
2020        if let Some(associate_table_id) = associate_table_id {
2021            // update the associated table statement accordly
2022            let active_table_model = table::ActiveModel {
2023                table_id: Set(associate_table_id),
2024                definition: Set(rewrite_sql),
2025                ..Default::default()
2026            };
2027            active_table_model.update(&txn).await?;
2028        }
2029
2030        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2031            // if updating table with connector, the fragment_id is table id
2032            associate_table_id
2033        } else {
2034            source_id
2035        }]
2036        .into_iter()
2037        .chain(dep_source_job_ids.into_iter())
2038        .collect_vec();
2039
2040        // update fragments
2041        update_connector_props_fragments(
2042            &txn,
2043            to_check_job_ids,
2044            FragmentTypeFlag::Source,
2045            |node, found| {
2046                if let PbNodeBody::Source(node) = node
2047                    && let Some(source_inner) = &mut node.source_inner
2048                {
2049                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2050                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2051                    *found = true;
2052                }
2053            },
2054            is_shared_source,
2055        )
2056        .await?;
2057
2058        let mut to_update_objs = Vec::with_capacity(2);
2059        let (source, obj) = Source::find_by_id(source_id)
2060            .find_also_related(Object)
2061            .one(&txn)
2062            .await?
2063            .ok_or_else(|| {
2064                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2065            })?;
2066        to_update_objs.push(PbObject {
2067            object_info: Some(PbObjectInfo::Source(
2068                ObjectModel(source, obj.unwrap()).into(),
2069            )),
2070        });
2071
2072        if let Some(associate_table_id) = associate_table_id {
2073            let (table, obj) = Table::find_by_id(associate_table_id)
2074                .find_also_related(Object)
2075                .one(&txn)
2076                .await?
2077                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2078            to_update_objs.push(PbObject {
2079                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2080            });
2081        }
2082
2083        txn.commit().await?;
2084
2085        self.notify_frontend(
2086            NotificationOperation::Update,
2087            NotificationInfo::ObjectGroup(PbObjectGroup {
2088                objects: to_update_objs,
2089            }),
2090        )
2091        .await;
2092
2093        Ok(options_with_secret)
2094    }
2095
2096    pub async fn update_sink_props_by_sink_id(
2097        &self,
2098        sink_id: SinkId,
2099        props: BTreeMap<String, String>,
2100    ) -> MetaResult<HashMap<String, String>> {
2101        let inner = self.inner.read().await;
2102        let txn = inner.db.begin().await?;
2103
2104        let (sink, _obj) = Sink::find_by_id(sink_id)
2105            .find_also_related(Object)
2106            .one(&txn)
2107            .await?
2108            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2109        validate_sink_props(&sink, &props)?;
2110        let definition = sink.definition.clone();
2111        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2112            .map_err(|e| SinkError::Config(anyhow!(e)))?
2113            .try_into()
2114            .unwrap();
2115        if let Statement::CreateSink { stmt } = &mut stmt {
2116            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2117        } else {
2118            panic!("definition is not a create sink statement")
2119        }
2120        let mut new_config = sink.properties.clone().into_inner();
2121        new_config.extend(props.clone());
2122
2123        let definition = stmt.to_string();
2124        let active_sink = sink::ActiveModel {
2125            sink_id: Set(sink_id),
2126            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2127            definition: Set(definition),
2128            ..Default::default()
2129        };
2130        active_sink.update(&txn).await?;
2131
2132        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2133        let (sink, obj) = Sink::find_by_id(sink_id)
2134            .find_also_related(Object)
2135            .one(&txn)
2136            .await?
2137            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2138        txn.commit().await?;
2139        let relation_infos = vec![PbObject {
2140            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2141        }];
2142
2143        let _version = self
2144            .notify_frontend(
2145                NotificationOperation::Update,
2146                NotificationInfo::ObjectGroup(PbObjectGroup {
2147                    objects: relation_infos,
2148                }),
2149            )
2150            .await;
2151
2152        Ok(props.into_iter().collect())
2153    }
2154
2155    pub async fn update_iceberg_table_props_by_table_id(
2156        &self,
2157        table_id: TableId,
2158        props: BTreeMap<String, String>,
2159        alter_iceberg_table_props: Option<
2160            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2161        >,
2162    ) -> MetaResult<(HashMap<String, String>, u32)> {
2163        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2164            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2165        let inner = self.inner.read().await;
2166        let txn = inner.db.begin().await?;
2167
2168        let (sink, _obj) = Sink::find_by_id(sink_id)
2169            .find_also_related(Object)
2170            .one(&txn)
2171            .await?
2172            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2173        validate_sink_props(&sink, &props)?;
2174
2175        let definition = sink.definition.clone();
2176        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2177            .map_err(|e| SinkError::Config(anyhow!(e)))?
2178            .try_into()
2179            .unwrap();
2180        if let Statement::CreateTable {
2181            with_options,
2182            engine,
2183            ..
2184        } = &mut stmt
2185        {
2186            if !matches!(engine, Engine::Iceberg) {
2187                return Err(SinkError::Config(anyhow!(
2188                    "only iceberg table can be altered as sink"
2189                ))
2190                .into());
2191            }
2192            update_stmt_with_props(with_options, &props)?;
2193        } else {
2194            panic!("definition is not a create iceberg table statement")
2195        }
2196        let mut new_config = sink.properties.clone().into_inner();
2197        new_config.extend(props.clone());
2198
2199        let definition = stmt.to_string();
2200        let active_sink = sink::ActiveModel {
2201            sink_id: Set(sink_id),
2202            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2203            definition: Set(definition.clone()),
2204            ..Default::default()
2205        };
2206        let active_source = source::ActiveModel {
2207            source_id: Set(source_id),
2208            definition: Set(definition.clone()),
2209            ..Default::default()
2210        };
2211        let active_table = table::ActiveModel {
2212            table_id: Set(table_id),
2213            definition: Set(definition),
2214            ..Default::default()
2215        };
2216        active_sink.update(&txn).await?;
2217        active_source.update(&txn).await?;
2218        active_table.update(&txn).await?;
2219
2220        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2221
2222        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2223            .find_also_related(Object)
2224            .one(&txn)
2225            .await?
2226            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2227        let (source, source_obj) = Source::find_by_id(source_id)
2228            .find_also_related(Object)
2229            .one(&txn)
2230            .await?
2231            .ok_or_else(|| {
2232                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2233            })?;
2234        let (table, table_obj) = Table::find_by_id(table_id)
2235            .find_also_related(Object)
2236            .one(&txn)
2237            .await?
2238            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2239        txn.commit().await?;
2240        let relation_infos = vec![
2241            PbObject {
2242                object_info: Some(PbObjectInfo::Sink(
2243                    ObjectModel(sink, sink_obj.unwrap()).into(),
2244                )),
2245            },
2246            PbObject {
2247                object_info: Some(PbObjectInfo::Source(
2248                    ObjectModel(source, source_obj.unwrap()).into(),
2249                )),
2250            },
2251            PbObject {
2252                object_info: Some(PbObjectInfo::Table(
2253                    ObjectModel(table, table_obj.unwrap()).into(),
2254                )),
2255            },
2256        ];
2257        let _version = self
2258            .notify_frontend(
2259                NotificationOperation::Update,
2260                NotificationInfo::ObjectGroup(PbObjectGroup {
2261                    objects: relation_infos,
2262                }),
2263            )
2264            .await;
2265
2266        Ok((props.into_iter().collect(), sink_id as u32))
2267    }
2268
2269    pub async fn update_fragment_rate_limit_by_fragment_id(
2270        &self,
2271        fragment_id: FragmentId,
2272        rate_limit: Option<u32>,
2273    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2274        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2275                                 stream_node: &mut PbStreamNode| {
2276            let mut found = false;
2277            if fragment_type_mask.contains_any(
2278                FragmentTypeFlag::dml_rate_limit_fragments()
2279                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2280                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2281                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2282            ) {
2283                visit_stream_node_mut(stream_node, |node| {
2284                    if let PbNodeBody::Dml(node) = node {
2285                        node.rate_limit = rate_limit;
2286                        found = true;
2287                    }
2288                    if let PbNodeBody::Sink(node) = node {
2289                        node.rate_limit = rate_limit;
2290                        found = true;
2291                    }
2292                    if let PbNodeBody::StreamCdcScan(node) = node {
2293                        node.rate_limit = rate_limit;
2294                        found = true;
2295                    }
2296                    if let PbNodeBody::StreamScan(node) = node {
2297                        node.rate_limit = rate_limit;
2298                        found = true;
2299                    }
2300                    if let PbNodeBody::SourceBackfill(node) = node {
2301                        node.rate_limit = rate_limit;
2302                        found = true;
2303                    }
2304                });
2305            }
2306            found
2307        };
2308        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2309            .await
2310    }
2311
2312    pub async fn post_apply_reschedules(
2313        &self,
2314        reschedules: HashMap<FragmentId, Reschedule>,
2315        post_updates: &JobReschedulePostUpdates,
2316    ) -> MetaResult<()> {
2317        let new_created_actors: HashSet<_> = reschedules
2318            .values()
2319            .flat_map(|reschedule| {
2320                reschedule
2321                    .added_actors
2322                    .values()
2323                    .flatten()
2324                    .map(|actor_id| *actor_id as ActorId)
2325            })
2326            .collect();
2327
2328        let inner = self.inner.write().await;
2329
2330        let txn = inner.db.begin().await?;
2331
2332        for Reschedule {
2333            removed_actors,
2334            vnode_bitmap_updates,
2335            actor_splits,
2336            newly_created_actors,
2337            ..
2338        } in reschedules.into_values()
2339        {
2340            // drop removed actors
2341            Actor::delete_many()
2342                .filter(
2343                    actor::Column::ActorId
2344                        .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2345                )
2346                .exec(&txn)
2347                .await?;
2348
2349            // add new actors
2350            for (
2351                (
2352                    StreamActor {
2353                        actor_id,
2354                        fragment_id,
2355                        vnode_bitmap,
2356                        expr_context,
2357                        ..
2358                    },
2359                    _,
2360                ),
2361                worker_id,
2362            ) in newly_created_actors.into_values()
2363            {
2364                let splits = actor_splits
2365                    .get(&actor_id)
2366                    .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2367
2368                Actor::insert(actor::ActiveModel {
2369                    actor_id: Set(actor_id as _),
2370                    fragment_id: Set(fragment_id as _),
2371                    status: Set(ActorStatus::Running),
2372                    splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2373                    worker_id: Set(worker_id),
2374                    upstream_actor_ids: Set(Default::default()),
2375                    vnode_bitmap: Set(vnode_bitmap
2376                        .as_ref()
2377                        .map(|bitmap| (&bitmap.to_protobuf()).into())),
2378                    expr_context: Set(expr_context.as_ref().unwrap().into()),
2379                })
2380                .exec(&txn)
2381                .await?;
2382            }
2383
2384            // actor update
2385            for (actor_id, bitmap) in vnode_bitmap_updates {
2386                let actor = Actor::find_by_id(actor_id as ActorId)
2387                    .one(&txn)
2388                    .await?
2389                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2390
2391                let mut actor = actor.into_active_model();
2392                actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2393                actor.update(&txn).await?;
2394            }
2395
2396            // Update actor_splits for existing actors
2397            for (actor_id, splits) in actor_splits {
2398                if new_created_actors.contains(&(actor_id as ActorId)) {
2399                    continue;
2400                }
2401
2402                let actor = Actor::find_by_id(actor_id as ActorId)
2403                    .one(&txn)
2404                    .await?
2405                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2406
2407                let mut actor = actor.into_active_model();
2408                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2409                actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2410                actor.update(&txn).await?;
2411            }
2412        }
2413
2414        let JobReschedulePostUpdates {
2415            parallelism_updates,
2416            resource_group_updates,
2417        } = post_updates;
2418
2419        for (table_id, parallelism) in parallelism_updates {
2420            let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2421                .one(&txn)
2422                .await?
2423                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2424                .into_active_model();
2425
2426            streaming_job.parallelism = Set(match parallelism {
2427                TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2428                TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2429                TableParallelism::Custom => StreamingParallelism::Custom,
2430            });
2431
2432            if let Some(resource_group) =
2433                resource_group_updates.get(&(table_id.table_id() as ObjectId))
2434            {
2435                streaming_job.specific_resource_group = Set(resource_group.to_owned());
2436            }
2437
2438            streaming_job.update(&txn).await?;
2439        }
2440
2441        txn.commit().await?;
2442
2443        Ok(())
2444    }
2445
2446    /// Note: `FsFetch` created in old versions are not included.
2447    /// Since this is only used for debugging, it should be fine.
2448    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2449        let inner = self.inner.read().await;
2450        let txn = inner.db.begin().await?;
2451
2452        let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2453            .select_only()
2454            .columns([
2455                fragment::Column::FragmentId,
2456                fragment::Column::JobId,
2457                fragment::Column::FragmentTypeMask,
2458                fragment::Column::StreamNode,
2459            ])
2460            .filter(FragmentTypeMask::intersects_any(
2461                FragmentTypeFlag::rate_limit_fragments(),
2462            ))
2463            .into_tuple()
2464            .all(&txn)
2465            .await?;
2466
2467        let mut rate_limits = Vec::new();
2468        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2469            let stream_node = stream_node.to_protobuf();
2470            visit_stream_node_body(&stream_node, |node| {
2471                let mut rate_limit = None;
2472                let mut node_name = None;
2473
2474                match node {
2475                    // source rate limit
2476                    PbNodeBody::Source(node) => {
2477                        if let Some(node_inner) = &node.source_inner {
2478                            rate_limit = node_inner.rate_limit;
2479                            node_name = Some("SOURCE");
2480                        }
2481                    }
2482                    PbNodeBody::StreamFsFetch(node) => {
2483                        if let Some(node_inner) = &node.node_inner {
2484                            rate_limit = node_inner.rate_limit;
2485                            node_name = Some("FS_FETCH");
2486                        }
2487                    }
2488                    // backfill rate limit
2489                    PbNodeBody::SourceBackfill(node) => {
2490                        rate_limit = node.rate_limit;
2491                        node_name = Some("SOURCE_BACKFILL");
2492                    }
2493                    PbNodeBody::StreamScan(node) => {
2494                        rate_limit = node.rate_limit;
2495                        node_name = Some("STREAM_SCAN");
2496                    }
2497                    PbNodeBody::StreamCdcScan(node) => {
2498                        rate_limit = node.rate_limit;
2499                        node_name = Some("STREAM_CDC_SCAN");
2500                    }
2501                    PbNodeBody::Sink(node) => {
2502                        rate_limit = node.rate_limit;
2503                        node_name = Some("SINK");
2504                    }
2505                    _ => {}
2506                }
2507
2508                if let Some(rate_limit) = rate_limit {
2509                    rate_limits.push(RateLimitInfo {
2510                        fragment_id: fragment_id as u32,
2511                        job_id: job_id as u32,
2512                        fragment_type_mask: fragment_type_mask as u32,
2513                        rate_limit,
2514                        node_name: node_name.unwrap().to_owned(),
2515                    });
2516                }
2517            });
2518        }
2519
2520        Ok(rate_limits)
2521    }
2522}
2523
2524fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2525    // Validate that props can be altered
2526    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2527        Some(connector) => {
2528            let connector_type = connector.to_lowercase();
2529            let field_names: Vec<String> = props.keys().cloned().collect();
2530            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2531                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2532
2533            match_sink_name_str!(
2534                connector_type.as_str(),
2535                SinkType,
2536                {
2537                    let mut new_props = sink.properties.0.clone();
2538                    new_props.extend(props.clone());
2539                    SinkType::validate_alter_config(&new_props)
2540                },
2541                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2542            )?
2543        }
2544        None => {
2545            return Err(
2546                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2547            );
2548        }
2549    };
2550    Ok(())
2551}
2552
2553fn update_stmt_with_props(
2554    with_properties: &mut Vec<SqlOption>,
2555    props: &BTreeMap<String, String>,
2556) -> MetaResult<()> {
2557    let mut new_sql_options = with_properties
2558        .iter()
2559        .map(|sql_option| (&sql_option.name, sql_option))
2560        .collect::<IndexMap<_, _>>();
2561    let add_sql_options = props
2562        .iter()
2563        .map(|(k, v)| SqlOption::try_from((k, v)))
2564        .collect::<Result<Vec<SqlOption>, ParserError>>()
2565        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2566    new_sql_options.extend(
2567        add_sql_options
2568            .iter()
2569            .map(|sql_option| (&sql_option.name, sql_option)),
2570    );
2571    *with_properties = new_sql_options.into_values().cloned().collect();
2572    Ok(())
2573}
2574
2575async fn update_sink_fragment_props(
2576    txn: &DatabaseTransaction,
2577    sink_id: SinkId,
2578    props: BTreeMap<String, String>,
2579) -> MetaResult<()> {
2580    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2581        .select_only()
2582        .columns([
2583            fragment::Column::FragmentId,
2584            fragment::Column::FragmentTypeMask,
2585            fragment::Column::StreamNode,
2586        ])
2587        .filter(fragment::Column::JobId.eq(sink_id))
2588        .into_tuple()
2589        .all(txn)
2590        .await?;
2591    let fragments = fragments
2592        .into_iter()
2593        .filter(|(_, fragment_type_mask, _)| {
2594            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2595        })
2596        .filter_map(|(id, _, stream_node)| {
2597            let mut stream_node = stream_node.to_protobuf();
2598            let mut found = false;
2599            visit_stream_node_mut(&mut stream_node, |node| {
2600                if let PbNodeBody::Sink(node) = node
2601                    && let Some(sink_desc) = &mut node.sink_desc
2602                    && sink_desc.id == sink_id as u32
2603                {
2604                    sink_desc.properties.extend(props.clone());
2605                    found = true;
2606                }
2607            });
2608            if found { Some((id, stream_node)) } else { None }
2609        })
2610        .collect_vec();
2611    assert!(
2612        !fragments.is_empty(),
2613        "sink id should be used by at least one fragment"
2614    );
2615    for (id, stream_node) in fragments {
2616        fragment::ActiveModel {
2617            fragment_id: Set(id),
2618            stream_node: Set(StreamNode::from(&stream_node)),
2619            ..Default::default()
2620        }
2621        .update(txn)
2622        .await?;
2623    }
2624    Ok(())
2625}
2626
2627pub struct SinkIntoTableContext {
2628    /// For alter table (e.g., add column), this is the list of existing sink ids
2629    /// otherwise empty.
2630    pub updated_sink_catalogs: Vec<SinkId>,
2631}
2632
2633pub struct FinishAutoRefreshSchemaSinkContext {
2634    pub tmp_sink_id: ObjectId,
2635    pub original_sink_id: ObjectId,
2636    pub columns: Vec<PbColumnCatalog>,
2637    pub new_log_store_table: Option<(ObjectId, Vec<PbColumnCatalog>)>,
2638}
2639
2640async fn update_connector_props_fragments<F>(
2641    txn: &DatabaseTransaction,
2642    job_ids: Vec<i32>,
2643    expect_flag: FragmentTypeFlag,
2644    mut alter_stream_node_fn: F,
2645    is_shared_source: bool,
2646) -> MetaResult<()>
2647where
2648    F: FnMut(&mut PbNodeBody, &mut bool),
2649{
2650    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2651        .select_only()
2652        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2653        .filter(
2654            fragment::Column::JobId
2655                .is_in(job_ids.clone())
2656                .and(FragmentTypeMask::intersects(expect_flag)),
2657        )
2658        .into_tuple()
2659        .all(txn)
2660        .await?;
2661    let fragments = fragments
2662        .into_iter()
2663        .filter_map(|(id, stream_node)| {
2664            let mut stream_node = stream_node.to_protobuf();
2665            let mut found = false;
2666            visit_stream_node_mut(&mut stream_node, |node| {
2667                alter_stream_node_fn(node, &mut found);
2668            });
2669            if found { Some((id, stream_node)) } else { None }
2670        })
2671        .collect_vec();
2672    if is_shared_source || job_ids.len() > 1 {
2673        // the first element is the source_id or associated table_id
2674        // if the source is non-shared, there is no updated fragments
2675        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
2676        assert!(
2677            !fragments.is_empty(),
2678            "job ids {:?} (type: {:?}) should be used by at least one fragment",
2679            job_ids,
2680            expect_flag
2681        );
2682    }
2683
2684    for (id, stream_node) in fragments {
2685        fragment::ActiveModel {
2686            fragment_id: Set(id),
2687            stream_node: Set(StreamNode::from(&stream_node)),
2688            ..Default::default()
2689        }
2690        .update(txn)
2691        .await?;
2692    }
2693
2694    Ok(())
2695}