risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::refresh_job::RefreshState;
39use risingwave_meta_model::table::TableType;
40use risingwave_meta_model::user_privilege::Action;
41use risingwave_meta_model::*;
42use risingwave_pb::catalog::{PbCreateType, PbTable};
43use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::{PbObject, PbObjectGroup};
50use risingwave_pb::plan_common::PbColumnCatalog;
51use risingwave_pb::plan_common::source_refresh_mode::{
52    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
53};
54use risingwave_pb::secret::PbSecretRef;
55use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
56use risingwave_pb::stream_plan::stream_node::PbNodeBody;
57use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
58use risingwave_pb::user::PbUserInfo;
59use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
60use risingwave_sqlparser::parser::{Parser, ParserError};
61use sea_orm::ActiveValue::Set;
62use sea_orm::sea_query::{Expr, Query, SimpleExpr};
63use sea_orm::{
64    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
65    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{Command, SharedFragmentInfo};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::utils::{
75    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
76    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
77    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
78    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
79    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
80};
81use crate::error::MetaErrorInner;
82use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
83use crate::model::{
84    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
85    StreamJobFragmentsToCreate,
86};
87use crate::stream::SplitAssignment;
88use crate::{MetaError, MetaResult};
89
90impl CatalogController {
91    pub async fn create_streaming_job_obj(
92        txn: &DatabaseTransaction,
93        obj_type: ObjectType,
94        owner_id: UserId,
95        database_id: Option<DatabaseId>,
96        schema_id: Option<SchemaId>,
97        create_type: PbCreateType,
98        ctx: StreamContext,
99        streaming_parallelism: StreamingParallelism,
100        max_parallelism: usize,
101        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
102    ) -> MetaResult<JobId> {
103        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
104        let job_id = obj.oid.as_job_id();
105        let job = streaming_job::ActiveModel {
106            job_id: Set(job_id),
107            job_status: Set(JobStatus::Initial),
108            create_type: Set(create_type.into()),
109            timezone: Set(ctx.timezone),
110            config_override: Set(Some(ctx.config_override.to_string())),
111            parallelism: Set(streaming_parallelism),
112            max_parallelism: Set(max_parallelism as _),
113            specific_resource_group: Set(specific_resource_group),
114        };
115        job.insert(txn).await?;
116
117        Ok(job_id)
118    }
119
120    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
121    /// materialized view.
122    ///
123    /// Some of the fields in the given streaming job are placeholders, which will
124    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
125    #[await_tree::instrument]
126    pub async fn create_job_catalog(
127        &self,
128        streaming_job: &mut StreamingJob,
129        ctx: &StreamContext,
130        parallelism: &Option<Parallelism>,
131        max_parallelism: usize,
132        mut dependencies: HashSet<ObjectId>,
133        specific_resource_group: Option<String>,
134    ) -> MetaResult<()> {
135        let inner = self.inner.write().await;
136        let txn = inner.db.begin().await?;
137        let create_type = streaming_job.create_type();
138
139        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
140            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
141            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
142            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
143        };
144
145        ensure_user_id(streaming_job.owner() as _, &txn).await?;
146        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
147        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
148        check_relation_name_duplicate(
149            &streaming_job.name(),
150            streaming_job.database_id(),
151            streaming_job.schema_id(),
152            &txn,
153        )
154        .await?;
155
156        // check if any dependency is in altering status.
157        if !dependencies.is_empty() {
158            let altering_cnt = ObjectDependency::find()
159                .join(
160                    JoinType::InnerJoin,
161                    object_dependency::Relation::Object1.def(),
162                )
163                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
164                .filter(
165                    object_dependency::Column::Oid
166                        .is_in(dependencies.clone())
167                        .and(object::Column::ObjType.eq(ObjectType::Table))
168                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
169                        .and(
170                            // It means the referring table is just dummy for altering.
171                            object::Column::Oid.not_in_subquery(
172                                Query::select()
173                                    .column(table::Column::TableId)
174                                    .from(Table)
175                                    .to_owned(),
176                            ),
177                        ),
178                )
179                .count(&txn)
180                .await?;
181            if altering_cnt != 0 {
182                return Err(MetaError::permission_denied(
183                    "some dependent relations are being altered",
184                ));
185            }
186        }
187
188        match streaming_job {
189            StreamingJob::MaterializedView(table) => {
190                let job_id = Self::create_streaming_job_obj(
191                    &txn,
192                    ObjectType::Table,
193                    table.owner as _,
194                    Some(table.database_id),
195                    Some(table.schema_id),
196                    create_type,
197                    ctx.clone(),
198                    streaming_parallelism,
199                    max_parallelism,
200                    specific_resource_group,
201                )
202                .await?;
203                table.id = job_id.as_mv_table_id();
204                let table_model: table::ActiveModel = table.clone().into();
205                Table::insert(table_model).exec(&txn).await?;
206            }
207            StreamingJob::Sink(sink) => {
208                if let Some(target_table_id) = sink.target_table
209                    && check_sink_into_table_cycle(
210                        target_table_id.into(),
211                        dependencies.iter().cloned().collect(),
212                        &txn,
213                    )
214                    .await?
215                {
216                    bail!("Creating such a sink will result in circular dependency.");
217                }
218
219                let job_id = Self::create_streaming_job_obj(
220                    &txn,
221                    ObjectType::Sink,
222                    sink.owner as _,
223                    Some(sink.database_id),
224                    Some(sink.schema_id),
225                    create_type,
226                    ctx.clone(),
227                    streaming_parallelism,
228                    max_parallelism,
229                    specific_resource_group,
230                )
231                .await?;
232                sink.id = job_id.as_sink_id();
233                let sink_model: sink::ActiveModel = sink.clone().into();
234                Sink::insert(sink_model).exec(&txn).await?;
235            }
236            StreamingJob::Table(src, table, _) => {
237                let job_id = Self::create_streaming_job_obj(
238                    &txn,
239                    ObjectType::Table,
240                    table.owner as _,
241                    Some(table.database_id),
242                    Some(table.schema_id),
243                    create_type,
244                    ctx.clone(),
245                    streaming_parallelism,
246                    max_parallelism,
247                    specific_resource_group,
248                )
249                .await?;
250                table.id = job_id.as_mv_table_id();
251                if let Some(src) = src {
252                    let src_obj = Self::create_object(
253                        &txn,
254                        ObjectType::Source,
255                        src.owner as _,
256                        Some(src.database_id),
257                        Some(src.schema_id),
258                    )
259                    .await?;
260                    src.id = src_obj.oid.as_source_id();
261                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
262                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
263                    let source: source::ActiveModel = src.clone().into();
264                    Source::insert(source).exec(&txn).await?;
265                }
266                let table_model: table::ActiveModel = table.clone().into();
267                Table::insert(table_model).exec(&txn).await?;
268
269                if table.refreshable {
270                    let trigger_interval_secs = src
271                        .as_ref()
272                        .and_then(|source_catalog| source_catalog.refresh_mode)
273                        .and_then(
274                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
275                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
276                                    refresh_interval_sec,
277                                })) => refresh_interval_sec,
278                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
279                                None => None,
280                            },
281                        );
282
283                    RefreshJob::insert(refresh_job::ActiveModel {
284                        table_id: Set(table.id),
285                        last_trigger_time: Set(None),
286                        trigger_interval_secs: Set(trigger_interval_secs),
287                        current_status: Set(RefreshState::Idle),
288                        last_success_time: Set(None),
289                    })
290                    .exec(&txn)
291                    .await?;
292                }
293            }
294            StreamingJob::Index(index, table) => {
295                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
296                let job_id = Self::create_streaming_job_obj(
297                    &txn,
298                    ObjectType::Index,
299                    index.owner as _,
300                    Some(index.database_id),
301                    Some(index.schema_id),
302                    create_type,
303                    ctx.clone(),
304                    streaming_parallelism,
305                    max_parallelism,
306                    specific_resource_group,
307                )
308                .await?;
309                // to be compatible with old implementation.
310                index.id = job_id.as_index_id();
311                index.index_table_id = job_id.as_mv_table_id();
312                table.id = job_id.as_mv_table_id();
313
314                ObjectDependency::insert(object_dependency::ActiveModel {
315                    oid: Set(index.primary_table_id.into()),
316                    used_by: Set(table.id.into()),
317                    ..Default::default()
318                })
319                .exec(&txn)
320                .await?;
321
322                let table_model: table::ActiveModel = table.clone().into();
323                Table::insert(table_model).exec(&txn).await?;
324                let index_model: index::ActiveModel = index.clone().into();
325                Index::insert(index_model).exec(&txn).await?;
326            }
327            StreamingJob::Source(src) => {
328                let job_id = Self::create_streaming_job_obj(
329                    &txn,
330                    ObjectType::Source,
331                    src.owner as _,
332                    Some(src.database_id),
333                    Some(src.schema_id),
334                    create_type,
335                    ctx.clone(),
336                    streaming_parallelism,
337                    max_parallelism,
338                    specific_resource_group,
339                )
340                .await?;
341                src.id = job_id.as_shared_source_id();
342                let source_model: source::ActiveModel = src.clone().into();
343                Source::insert(source_model).exec(&txn).await?;
344            }
345        }
346
347        // collect dependent secrets.
348        dependencies.extend(
349            streaming_job
350                .dependent_secret_ids()?
351                .into_iter()
352                .map(|id| id.as_object_id()),
353        );
354        // collect dependent connection
355        dependencies.extend(
356            streaming_job
357                .dependent_connection_ids()?
358                .into_iter()
359                .map(|id| id.as_object_id()),
360        );
361
362        // record object dependency.
363        if !dependencies.is_empty() {
364            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
365                object_dependency::ActiveModel {
366                    oid: Set(oid),
367                    used_by: Set(streaming_job.id().as_object_id()),
368                    ..Default::default()
369                }
370            }))
371            .exec(&txn)
372            .await?;
373        }
374
375        txn.commit().await?;
376
377        Ok(())
378    }
379
380    /// Create catalogs for internal tables, then notify frontend about them if the job is a
381    /// materialized view.
382    ///
383    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
384    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
385    ///
386    /// Returns a mapping from the temporary table id to the actual global table id.
387    pub async fn create_internal_table_catalog(
388        &self,
389        job: &StreamingJob,
390        mut incomplete_internal_tables: Vec<PbTable>,
391    ) -> MetaResult<HashMap<TableId, TableId>> {
392        let job_id = job.id();
393        let inner = self.inner.write().await;
394        let txn = inner.db.begin().await?;
395
396        // Ensure the job exists.
397        ensure_job_not_canceled(job_id, &txn).await?;
398
399        let mut table_id_map = HashMap::new();
400        for table in &mut incomplete_internal_tables {
401            let table_id = Self::create_object(
402                &txn,
403                ObjectType::Table,
404                table.owner as _,
405                Some(table.database_id),
406                Some(table.schema_id),
407            )
408            .await?
409            .oid
410            .as_table_id();
411            table_id_map.insert(table.id, table_id);
412            table.id = table_id;
413            table.job_id = Some(job_id);
414
415            let table_model = table::ActiveModel {
416                table_id: Set(table_id),
417                belongs_to_job_id: Set(Some(job_id)),
418                fragment_id: NotSet,
419                ..table.clone().into()
420            };
421            Table::insert(table_model).exec(&txn).await?;
422        }
423        txn.commit().await?;
424
425        Ok(table_id_map)
426    }
427
428    pub async fn prepare_stream_job_fragments(
429        &self,
430        stream_job_fragments: &StreamJobFragmentsToCreate,
431        streaming_job: &StreamingJob,
432        for_replace: bool,
433    ) -> MetaResult<()> {
434        self.prepare_streaming_job(
435            stream_job_fragments.stream_job_id(),
436            || stream_job_fragments.fragments.values(),
437            &stream_job_fragments.downstreams,
438            for_replace,
439            Some(streaming_job),
440        )
441        .await
442    }
443
444    // TODO: In this function, we also update the `Table` model in the meta store.
445    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
446    // making them the source of truth and performing a full replacement for those in the meta store?
447    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
448    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
449    )]
450    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
451        &self,
452        job_id: JobId,
453        get_fragments: impl Fn() -> I + 'a,
454        downstreams: &FragmentDownstreamRelation,
455        for_replace: bool,
456        creating_streaming_job: Option<&'a StreamingJob>,
457    ) -> MetaResult<()> {
458        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
459
460        let inner = self.inner.write().await;
461
462        let need_notify = creating_streaming_job
463            .map(|job| job.should_notify_creating())
464            .unwrap_or(false);
465        let definition = creating_streaming_job.map(|job| job.definition());
466
467        let mut objects_to_notify = vec![];
468        let txn = inner.db.begin().await?;
469
470        // Ensure the job exists.
471        ensure_job_not_canceled(job_id, &txn).await?;
472
473        for fragment in fragments {
474            let fragment_id = fragment.fragment_id;
475            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
476
477            let fragment = fragment.into_active_model();
478            Fragment::insert(fragment).exec(&txn).await?;
479
480            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
481            // After table fragments are created, update them for all tables.
482            if !for_replace {
483                let all_tables = StreamJobFragments::collect_tables(get_fragments());
484                for state_table_id in state_table_ids {
485                    // Table's vnode count is not always the fragment's vnode count, so we have to
486                    // look up the table from `TableFragments`.
487                    // See `ActorGraphBuilder::new`.
488                    let table = all_tables
489                        .get(&state_table_id)
490                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
491                    assert_eq!(table.id, state_table_id);
492                    assert_eq!(table.fragment_id, fragment_id);
493                    let vnode_count = table.vnode_count();
494
495                    table::ActiveModel {
496                        table_id: Set(state_table_id as _),
497                        fragment_id: Set(Some(fragment_id)),
498                        vnode_count: Set(vnode_count as _),
499                        ..Default::default()
500                    }
501                    .update(&txn)
502                    .await?;
503
504                    if need_notify {
505                        let mut table = table.clone();
506                        // In production, definition was replaced but still needed for notification.
507                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
508                            table.definition = definition.clone().unwrap();
509                        }
510                        objects_to_notify.push(PbObject {
511                            object_info: Some(PbObjectInfo::Table(table)),
512                        });
513                    }
514                }
515            }
516        }
517
518        // Add streaming job objects to notification
519        if need_notify {
520            match creating_streaming_job.unwrap() {
521                StreamingJob::Table(Some(source), ..) => {
522                    objects_to_notify.push(PbObject {
523                        object_info: Some(PbObjectInfo::Source(source.clone())),
524                    });
525                }
526                StreamingJob::Sink(sink) => {
527                    objects_to_notify.push(PbObject {
528                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
529                    });
530                }
531                StreamingJob::Index(index, _) => {
532                    objects_to_notify.push(PbObject {
533                        object_info: Some(PbObjectInfo::Index(index.clone())),
534                    });
535                }
536                _ => {}
537            }
538        }
539
540        insert_fragment_relations(&txn, downstreams).await?;
541
542        if !for_replace {
543            // Update dml fragment id.
544            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
545                Table::update(table::ActiveModel {
546                    table_id: Set(table.id),
547                    dml_fragment_id: Set(table.dml_fragment_id),
548                    ..Default::default()
549                })
550                .exec(&txn)
551                .await?;
552            }
553        }
554
555        txn.commit().await?;
556
557        // FIXME: there's a gap between the catalog creation and notification, which may lead to
558        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
559        // need to either forward the notification or filter catalogs without any fragments in the metastore.
560        if !objects_to_notify.is_empty() {
561            self.notify_frontend(
562                Operation::Add,
563                Info::ObjectGroup(PbObjectGroup {
564                    objects: objects_to_notify,
565                }),
566            )
567            .await;
568        }
569
570        Ok(())
571    }
572
573    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
574    /// information is required to build barrier mutation.
575    pub async fn build_cancel_command(
576        &self,
577        table_fragments: &StreamJobFragments,
578    ) -> MetaResult<Command> {
579        let inner = self.inner.read().await;
580        let txn = inner.db.begin().await?;
581
582        let dropped_sink_fragment_with_target =
583            if let Some(sink_fragment) = table_fragments.sink_fragment() {
584                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
585                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
586                sink_target_fragment
587                    .get(&sink_fragment_id)
588                    .map(|target_fragments| {
589                        let target_fragment_id = *target_fragments
590                            .first()
591                            .expect("sink should have at least one downstream fragment");
592                        (sink_fragment_id, target_fragment_id)
593                    })
594            } else {
595                None
596            };
597
598        Ok(Command::DropStreamingJobs {
599            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
600            actors: table_fragments.actor_ids().collect(),
601            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
602            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
603            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
604                .into_iter()
605                .map(|(sink, target)| (target as _, vec![sink as _]))
606                .collect(),
607        })
608    }
609
610    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
611    /// It returns (true, _) if the job is not found or aborted.
612    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
613    #[await_tree::instrument]
614    pub async fn try_abort_creating_streaming_job(
615        &self,
616        mut job_id: JobId,
617        is_cancelled: bool,
618    ) -> MetaResult<(bool, Option<DatabaseId>)> {
619        let mut inner = self.inner.write().await;
620        let txn = inner.db.begin().await?;
621
622        let obj = Object::find_by_id(job_id).one(&txn).await?;
623        let Some(obj) = obj else {
624            tracing::warn!(
625                id = %job_id,
626                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
627            );
628            return Ok((true, None));
629        };
630        let database_id = obj
631            .database_id
632            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
633        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
634
635        if !is_cancelled && let Some(streaming_job) = &streaming_job {
636            assert_ne!(streaming_job.job_status, JobStatus::Created);
637            if streaming_job.create_type == CreateType::Background
638                && streaming_job.job_status == JobStatus::Creating
639            {
640                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
641                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
642                {
643                    // If the job belongs to an iceberg table, we still need to clean it.
644                } else {
645                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
646                    tracing::warn!(
647                        id = %job_id,
648                        "streaming job is created in background and still in creating status"
649                    );
650                    return Ok((false, Some(database_id)));
651                }
652            }
653        }
654
655        let iceberg_table_id =
656            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
657        if let Some(iceberg_table_id) = iceberg_table_id {
658            // If the job is iceberg sink, we need to clean the iceberg table as well.
659            // Here we will drop the sink objects directly.
660            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
661            Object::delete_many()
662                .filter(
663                    object::Column::Oid
664                        .eq(job_id)
665                        .or(object::Column::Oid.is_in(internal_tables)),
666                )
667                .exec(&txn)
668                .await?;
669            job_id = iceberg_table_id.as_job_id();
670        };
671
672        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
673
674        // Get the notification info if the job is a materialized view or created in the background.
675        let mut objs = vec![];
676        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
677
678        let mut need_notify =
679            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
680        if !need_notify {
681            // If the job is not created in the background, we only need to notify the frontend if the job is a materialized view.
682            if let Some(table) = &table_obj {
683                need_notify = table.table_type == TableType::MaterializedView;
684            }
685        }
686
687        if is_cancelled {
688            let dropped_tables = Table::find()
689                .find_also_related(Object)
690                .filter(
691                    table::Column::TableId.is_in(
692                        internal_table_ids
693                            .iter()
694                            .cloned()
695                            .chain(table_obj.iter().map(|t| t.table_id as _)),
696                    ),
697                )
698                .all(&txn)
699                .await?
700                .into_iter()
701                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
702            inner
703                .dropped_tables
704                .extend(dropped_tables.map(|t| (t.id, t)));
705        }
706
707        if need_notify {
708            let obj: Option<PartialObject> = Object::find_by_id(job_id)
709                .select_only()
710                .columns([
711                    object::Column::Oid,
712                    object::Column::ObjType,
713                    object::Column::SchemaId,
714                    object::Column::DatabaseId,
715                ])
716                .into_partial_model()
717                .one(&txn)
718                .await?;
719            let obj =
720                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
721            objs.push(obj);
722            let internal_table_objs: Vec<PartialObject> = Object::find()
723                .select_only()
724                .columns([
725                    object::Column::Oid,
726                    object::Column::ObjType,
727                    object::Column::SchemaId,
728                    object::Column::DatabaseId,
729                ])
730                .join(JoinType::InnerJoin, object::Relation::Table.def())
731                .filter(table::Column::BelongsToJobId.eq(job_id))
732                .into_partial_model()
733                .all(&txn)
734                .await?;
735            objs.extend(internal_table_objs);
736        }
737
738        // Check if the job is creating sink into table.
739        if table_obj.is_none()
740            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
741                .select_only()
742                .column(sink::Column::TargetTable)
743                .into_tuple::<Option<TableId>>()
744                .one(&txn)
745                .await?
746        {
747            let tmp_id: Option<ObjectId> = ObjectDependency::find()
748                .select_only()
749                .column(object_dependency::Column::UsedBy)
750                .join(
751                    JoinType::InnerJoin,
752                    object_dependency::Relation::Object1.def(),
753                )
754                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
755                .filter(
756                    object_dependency::Column::Oid
757                        .eq(target_table_id)
758                        .and(object::Column::ObjType.eq(ObjectType::Table))
759                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
760                )
761                .into_tuple()
762                .one(&txn)
763                .await?;
764            if let Some(tmp_id) = tmp_id {
765                tracing::warn!(
766                    id = %tmp_id,
767                    "aborting temp streaming job for sink into table"
768                );
769
770                Object::delete_by_id(tmp_id).exec(&txn).await?;
771            }
772        }
773
774        Object::delete_by_id(job_id).exec(&txn).await?;
775        if !internal_table_ids.is_empty() {
776            Object::delete_many()
777                .filter(object::Column::Oid.is_in(internal_table_ids))
778                .exec(&txn)
779                .await?;
780        }
781        if let Some(t) = &table_obj
782            && let Some(source_id) = t.optional_associated_source_id
783        {
784            Object::delete_by_id(source_id).exec(&txn).await?;
785        }
786
787        let err = if is_cancelled {
788            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
789        } else {
790            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
791        };
792        let abort_reason = format!("streaming job aborted {}", err.as_report());
793        for tx in inner
794            .creating_table_finish_notifier
795            .get_mut(&database_id)
796            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
797            .into_iter()
798            .flatten()
799            .flatten()
800        {
801            let _ = tx.send(Err(abort_reason.clone()));
802        }
803        txn.commit().await?;
804
805        if !objs.is_empty() {
806            // We also have notified the frontend about these objects,
807            // so we need to notify the frontend to delete them here.
808            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
809                .await;
810        }
811        Ok((true, Some(database_id)))
812    }
813
814    #[await_tree::instrument]
815    pub async fn post_collect_job_fragments(
816        &self,
817        job_id: JobId,
818        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
819        new_sink_downstream: Option<FragmentDownstreamRelation>,
820        split_assignment: Option<&SplitAssignment>,
821    ) -> MetaResult<()> {
822        let inner = self.inner.write().await;
823        let txn = inner.db.begin().await?;
824
825        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
826
827        if let Some(new_downstream) = new_sink_downstream {
828            insert_fragment_relations(&txn, &new_downstream).await?;
829        }
830
831        // Mark job as CREATING.
832        streaming_job::ActiveModel {
833            job_id: Set(job_id),
834            job_status: Set(JobStatus::Creating),
835            ..Default::default()
836        }
837        .update(&txn)
838        .await?;
839
840        if let Some(split_assignment) = split_assignment {
841            let fragment_splits = split_assignment
842                .iter()
843                .map(|(fragment_id, splits)| {
844                    (
845                        *fragment_id as _,
846                        splits.values().flatten().cloned().collect_vec(),
847                    )
848                })
849                .collect();
850
851            self.update_fragment_splits(&txn, &fragment_splits).await?;
852        }
853
854        txn.commit().await?;
855
856        Ok(())
857    }
858
859    pub async fn create_job_catalog_for_replace(
860        &self,
861        streaming_job: &StreamingJob,
862        ctx: Option<&StreamContext>,
863        specified_parallelism: Option<&NonZeroUsize>,
864        expected_original_max_parallelism: Option<usize>,
865    ) -> MetaResult<JobId> {
866        let id = streaming_job.id();
867        let inner = self.inner.write().await;
868        let txn = inner.db.begin().await?;
869
870        // 1. check version.
871        streaming_job.verify_version_for_replace(&txn).await?;
872        // 2. check concurrent replace.
873        let referring_cnt = ObjectDependency::find()
874            .join(
875                JoinType::InnerJoin,
876                object_dependency::Relation::Object1.def(),
877            )
878            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
879            .filter(
880                object_dependency::Column::Oid
881                    .eq(id)
882                    .and(object::Column::ObjType.eq(ObjectType::Table))
883                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
884            )
885            .count(&txn)
886            .await?;
887        if referring_cnt != 0 {
888            return Err(MetaError::permission_denied(
889                "job is being altered or referenced by some creating jobs",
890            ));
891        }
892
893        // 3. check parallelism.
894        let (original_max_parallelism, original_timezone, original_config_override): (
895            i32,
896            Option<String>,
897            Option<String>,
898        ) = StreamingJobModel::find_by_id(id)
899            .select_only()
900            .column(streaming_job::Column::MaxParallelism)
901            .column(streaming_job::Column::Timezone)
902            .column(streaming_job::Column::ConfigOverride)
903            .into_tuple()
904            .one(&txn)
905            .await?
906            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
907
908        if let Some(max_parallelism) = expected_original_max_parallelism
909            && original_max_parallelism != max_parallelism as i32
910        {
911            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
912            // This should not happen in normal cases.
913            bail!(
914                "cannot use a different max parallelism \
915                 when replacing streaming job, \
916                 original: {}, new: {}",
917                original_max_parallelism,
918                max_parallelism
919            );
920        }
921
922        let parallelism = match specified_parallelism {
923            None => StreamingParallelism::Adaptive,
924            Some(n) => StreamingParallelism::Fixed(n.get() as _),
925        };
926
927        let ctx = StreamContext {
928            timezone: ctx
929                .map(|ctx| ctx.timezone.clone())
930                .unwrap_or(original_timezone),
931            // We don't expect replacing a job with a different config override.
932            // Thus always use the original config override.
933            config_override: original_config_override.unwrap_or_default().into(),
934        };
935
936        // 4. create streaming object for new replace table.
937        let new_obj_id = Self::create_streaming_job_obj(
938            &txn,
939            streaming_job.object_type(),
940            streaming_job.owner() as _,
941            Some(streaming_job.database_id() as _),
942            Some(streaming_job.schema_id() as _),
943            streaming_job.create_type(),
944            ctx,
945            parallelism,
946            original_max_parallelism as _,
947            None,
948        )
949        .await?;
950
951        // 5. record dependency for new replace table.
952        ObjectDependency::insert(object_dependency::ActiveModel {
953            oid: Set(id.as_object_id()),
954            used_by: Set(new_obj_id.as_object_id()),
955            ..Default::default()
956        })
957        .exec(&txn)
958        .await?;
959
960        txn.commit().await?;
961
962        Ok(new_obj_id)
963    }
964
965    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
966    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
967        let mut inner = self.inner.write().await;
968        let txn = inner.db.begin().await?;
969
970        // Check if the job belongs to iceberg table.
971        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
972            tracing::info!(
973                "streaming job {} is for iceberg table, wait for manual finish operation",
974                job_id
975            );
976            return Ok(());
977        }
978
979        let (notification_op, objects, updated_user_info) =
980            Self::finish_streaming_job_inner(&txn, job_id).await?;
981
982        txn.commit().await?;
983
984        let mut version = self
985            .notify_frontend(
986                notification_op,
987                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
988            )
989            .await;
990
991        // notify users about the default privileges
992        if !updated_user_info.is_empty() {
993            version = self.notify_users_update(updated_user_info).await;
994        }
995
996        inner
997            .creating_table_finish_notifier
998            .values_mut()
999            .for_each(|creating_tables| {
1000                if let Some(txs) = creating_tables.remove(&job_id) {
1001                    for tx in txs {
1002                        let _ = tx.send(Ok(version));
1003                    }
1004                }
1005            });
1006
1007        Ok(())
1008    }
1009
1010    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1011    pub async fn finish_streaming_job_inner(
1012        txn: &DatabaseTransaction,
1013        job_id: JobId,
1014    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1015        let job_type = Object::find_by_id(job_id)
1016            .select_only()
1017            .column(object::Column::ObjType)
1018            .into_tuple()
1019            .one(txn)
1020            .await?
1021            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1022
1023        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1024            .select_only()
1025            .column(streaming_job::Column::CreateType)
1026            .into_tuple()
1027            .one(txn)
1028            .await?
1029            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1030
1031        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1032        let res = Object::update_many()
1033            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1034            .col_expr(
1035                object::Column::CreatedAtClusterVersion,
1036                current_cluster_version().into(),
1037            )
1038            .filter(object::Column::Oid.eq(job_id))
1039            .exec(txn)
1040            .await?;
1041        if res.rows_affected == 0 {
1042            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1043        }
1044
1045        // mark the target stream job as `Created`.
1046        let job = streaming_job::ActiveModel {
1047            job_id: Set(job_id),
1048            job_status: Set(JobStatus::Created),
1049            ..Default::default()
1050        };
1051        job.update(txn).await?;
1052
1053        // notify frontend: job, internal tables.
1054        let internal_table_objs = Table::find()
1055            .find_also_related(Object)
1056            .filter(table::Column::BelongsToJobId.eq(job_id))
1057            .all(txn)
1058            .await?;
1059        let mut objects = internal_table_objs
1060            .iter()
1061            .map(|(table, obj)| PbObject {
1062                object_info: Some(PbObjectInfo::Table(
1063                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1064                )),
1065            })
1066            .collect_vec();
1067        let mut notification_op = if create_type == CreateType::Background {
1068            NotificationOperation::Update
1069        } else {
1070            NotificationOperation::Add
1071        };
1072        let mut updated_user_info = vec![];
1073
1074        match job_type {
1075            ObjectType::Table => {
1076                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1077                    .find_also_related(Object)
1078                    .one(txn)
1079                    .await?
1080                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1081                if table.table_type == TableType::MaterializedView {
1082                    notification_op = NotificationOperation::Update;
1083                }
1084
1085                if let Some(source_id) = table.optional_associated_source_id {
1086                    let (src, obj) = Source::find_by_id(source_id)
1087                        .find_also_related(Object)
1088                        .one(txn)
1089                        .await?
1090                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1091                    objects.push(PbObject {
1092                        object_info: Some(PbObjectInfo::Source(
1093                            ObjectModel(src, obj.unwrap()).into(),
1094                        )),
1095                    });
1096                }
1097                objects.push(PbObject {
1098                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1099                });
1100            }
1101            ObjectType::Sink => {
1102                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1103                    .find_also_related(Object)
1104                    .one(txn)
1105                    .await?
1106                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1107                objects.push(PbObject {
1108                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1109                });
1110            }
1111            ObjectType::Index => {
1112                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1113                    .find_also_related(Object)
1114                    .one(txn)
1115                    .await?
1116                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1117                {
1118                    let (table, obj) = Table::find_by_id(index.index_table_id)
1119                        .find_also_related(Object)
1120                        .one(txn)
1121                        .await?
1122                        .ok_or_else(|| {
1123                            MetaError::catalog_id_not_found("table", index.index_table_id)
1124                        })?;
1125                    objects.push(PbObject {
1126                        object_info: Some(PbObjectInfo::Table(
1127                            ObjectModel(table, obj.unwrap()).into(),
1128                        )),
1129                    });
1130                }
1131
1132                // If the index is created on a table with privileges, we should also
1133                // grant the privileges for the index and its state tables.
1134                let primary_table_privileges = UserPrivilege::find()
1135                    .filter(
1136                        user_privilege::Column::Oid
1137                            .eq(index.primary_table_id)
1138                            .and(user_privilege::Column::Action.eq(Action::Select)),
1139                    )
1140                    .all(txn)
1141                    .await?;
1142                if !primary_table_privileges.is_empty() {
1143                    let index_state_table_ids: Vec<TableId> = Table::find()
1144                        .select_only()
1145                        .column(table::Column::TableId)
1146                        .filter(
1147                            table::Column::BelongsToJobId
1148                                .eq(job_id)
1149                                .or(table::Column::TableId.eq(index.index_table_id)),
1150                        )
1151                        .into_tuple()
1152                        .all(txn)
1153                        .await?;
1154                    let mut new_privileges = vec![];
1155                    for privilege in &primary_table_privileges {
1156                        for state_table_id in &index_state_table_ids {
1157                            new_privileges.push(user_privilege::ActiveModel {
1158                                id: Default::default(),
1159                                oid: Set(state_table_id.as_object_id()),
1160                                user_id: Set(privilege.user_id),
1161                                action: Set(Action::Select),
1162                                dependent_id: Set(privilege.dependent_id),
1163                                granted_by: Set(privilege.granted_by),
1164                                with_grant_option: Set(privilege.with_grant_option),
1165                            });
1166                        }
1167                    }
1168                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1169
1170                    updated_user_info = list_user_info_by_ids(
1171                        primary_table_privileges.into_iter().map(|p| p.user_id),
1172                        txn,
1173                    )
1174                    .await?;
1175                }
1176
1177                objects.push(PbObject {
1178                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1179                });
1180            }
1181            ObjectType::Source => {
1182                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1183                    .find_also_related(Object)
1184                    .one(txn)
1185                    .await?
1186                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1187                objects.push(PbObject {
1188                    object_info: Some(PbObjectInfo::Source(
1189                        ObjectModel(source, obj.unwrap()).into(),
1190                    )),
1191                });
1192            }
1193            _ => unreachable!("invalid job type: {:?}", job_type),
1194        }
1195
1196        if job_type != ObjectType::Index {
1197            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1198        }
1199
1200        Ok((notification_op, objects, updated_user_info))
1201    }
1202
1203    pub async fn finish_replace_streaming_job(
1204        &self,
1205        tmp_id: JobId,
1206        streaming_job: StreamingJob,
1207        replace_upstream: FragmentReplaceUpstream,
1208        sink_into_table_context: SinkIntoTableContext,
1209        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1210        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1211    ) -> MetaResult<NotificationVersion> {
1212        let inner = self.inner.write().await;
1213        let txn = inner.db.begin().await?;
1214
1215        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1216            tmp_id,
1217            replace_upstream,
1218            sink_into_table_context,
1219            &txn,
1220            streaming_job,
1221            drop_table_connector_ctx,
1222            auto_refresh_schema_sinks,
1223        )
1224        .await?;
1225
1226        txn.commit().await?;
1227
1228        let mut version = self
1229            .notify_frontend(
1230                NotificationOperation::Update,
1231                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1232            )
1233            .await;
1234
1235        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1236            self.notify_users_update(user_infos).await;
1237            version = self
1238                .notify_frontend(
1239                    NotificationOperation::Delete,
1240                    build_object_group_for_delete(to_drop_objects),
1241                )
1242                .await;
1243        }
1244
1245        Ok(version)
1246    }
1247
1248    pub async fn finish_replace_streaming_job_inner(
1249        tmp_id: JobId,
1250        replace_upstream: FragmentReplaceUpstream,
1251        SinkIntoTableContext {
1252            updated_sink_catalogs,
1253        }: SinkIntoTableContext,
1254        txn: &DatabaseTransaction,
1255        streaming_job: StreamingJob,
1256        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1257        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1258    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1259        let original_job_id = streaming_job.id();
1260        let job_type = streaming_job.job_type();
1261
1262        let mut index_item_rewriter = None;
1263
1264        // Update catalog
1265        match streaming_job {
1266            StreamingJob::Table(_source, table, _table_job_type) => {
1267                // The source catalog should remain unchanged
1268
1269                let original_column_catalogs =
1270                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1271
1272                index_item_rewriter = Some({
1273                    let original_columns = original_column_catalogs
1274                        .to_protobuf()
1275                        .into_iter()
1276                        .map(|c| c.column_desc.unwrap())
1277                        .collect_vec();
1278                    let new_columns = table
1279                        .columns
1280                        .iter()
1281                        .map(|c| c.column_desc.clone().unwrap())
1282                        .collect_vec();
1283
1284                    IndexItemRewriter {
1285                        original_columns,
1286                        new_columns,
1287                    }
1288                });
1289
1290                // For sinks created in earlier versions, we need to set the original_target_columns.
1291                for sink_id in updated_sink_catalogs {
1292                    sink::ActiveModel {
1293                        sink_id: Set(sink_id as _),
1294                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1295                        ..Default::default()
1296                    }
1297                    .update(txn)
1298                    .await?;
1299                }
1300                // Update the table catalog with the new one. (column catalog is also updated here)
1301                let mut table = table::ActiveModel::from(table);
1302                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1303                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1304                {
1305                    // drop table connector, the rest logic is in `drop_table_associated_source`
1306                    table.optional_associated_source_id = Set(None);
1307                }
1308
1309                table.update(txn).await?;
1310            }
1311            StreamingJob::Source(source) => {
1312                // Update the source catalog with the new one.
1313                let source = source::ActiveModel::from(source);
1314                source.update(txn).await?;
1315            }
1316            StreamingJob::MaterializedView(table) => {
1317                // Update the table catalog with the new one.
1318                let table = table::ActiveModel::from(table);
1319                table.update(txn).await?;
1320            }
1321            _ => unreachable!(
1322                "invalid streaming job type: {:?}",
1323                streaming_job.job_type_str()
1324            ),
1325        }
1326
1327        async fn finish_fragments(
1328            txn: &DatabaseTransaction,
1329            tmp_id: JobId,
1330            original_job_id: JobId,
1331            replace_upstream: FragmentReplaceUpstream,
1332        ) -> MetaResult<()> {
1333            // 0. update internal tables
1334            // Fields including `fragment_id` were placeholder values before.
1335            // After table fragments are created, update them for all internal tables.
1336            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1337                .select_only()
1338                .columns([
1339                    fragment::Column::FragmentId,
1340                    fragment::Column::StateTableIds,
1341                ])
1342                .filter(fragment::Column::JobId.eq(tmp_id))
1343                .into_tuple()
1344                .all(txn)
1345                .await?;
1346            for (fragment_id, state_table_ids) in fragment_info {
1347                for state_table_id in state_table_ids.into_inner() {
1348                    let state_table_id = TableId::new(state_table_id as _);
1349                    table::ActiveModel {
1350                        table_id: Set(state_table_id),
1351                        fragment_id: Set(Some(fragment_id)),
1352                        // No need to update `vnode_count` because it must remain the same.
1353                        ..Default::default()
1354                    }
1355                    .update(txn)
1356                    .await?;
1357                }
1358            }
1359
1360            // 1. replace old fragments/actors with new ones.
1361            Fragment::delete_many()
1362                .filter(fragment::Column::JobId.eq(original_job_id))
1363                .exec(txn)
1364                .await?;
1365            Fragment::update_many()
1366                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1367                .filter(fragment::Column::JobId.eq(tmp_id))
1368                .exec(txn)
1369                .await?;
1370
1371            // 2. update merges.
1372            // update downstream fragment's Merge node, and upstream_fragment_id
1373            for (fragment_id, fragment_replace_map) in replace_upstream {
1374                let (fragment_id, mut stream_node) =
1375                    Fragment::find_by_id(fragment_id as FragmentId)
1376                        .select_only()
1377                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1378                        .into_tuple::<(FragmentId, StreamNode)>()
1379                        .one(txn)
1380                        .await?
1381                        .map(|(id, node)| (id, node.to_protobuf()))
1382                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1383
1384                visit_stream_node_mut(&mut stream_node, |body| {
1385                    if let PbNodeBody::Merge(m) = body
1386                        && let Some(new_fragment_id) =
1387                            fragment_replace_map.get(&m.upstream_fragment_id)
1388                    {
1389                        m.upstream_fragment_id = *new_fragment_id;
1390                    }
1391                });
1392                fragment::ActiveModel {
1393                    fragment_id: Set(fragment_id),
1394                    stream_node: Set(StreamNode::from(&stream_node)),
1395                    ..Default::default()
1396                }
1397                .update(txn)
1398                .await?;
1399            }
1400
1401            // 3. remove dummy object.
1402            Object::delete_by_id(tmp_id).exec(txn).await?;
1403
1404            Ok(())
1405        }
1406
1407        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1408
1409        // 4. update catalogs and notify.
1410        let mut objects = vec![];
1411        match job_type {
1412            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1413                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1414                    .find_also_related(Object)
1415                    .one(txn)
1416                    .await?
1417                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1418                objects.push(PbObject {
1419                    object_info: Some(PbObjectInfo::Table(
1420                        ObjectModel(table, table_obj.unwrap()).into(),
1421                    )),
1422                })
1423            }
1424            StreamingJobType::Source => {
1425                let (source, source_obj) =
1426                    Source::find_by_id(original_job_id.as_shared_source_id())
1427                        .find_also_related(Object)
1428                        .one(txn)
1429                        .await?
1430                        .ok_or_else(|| {
1431                            MetaError::catalog_id_not_found("object", original_job_id)
1432                        })?;
1433                objects.push(PbObject {
1434                    object_info: Some(PbObjectInfo::Source(
1435                        ObjectModel(source, source_obj.unwrap()).into(),
1436                    )),
1437                })
1438            }
1439            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1440        }
1441
1442        if let Some(expr_rewriter) = index_item_rewriter {
1443            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1444                .select_only()
1445                .columns([index::Column::IndexId, index::Column::IndexItems])
1446                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1447                .into_tuple()
1448                .all(txn)
1449                .await?;
1450            for (index_id, nodes) in index_items {
1451                let mut pb_nodes = nodes.to_protobuf();
1452                pb_nodes
1453                    .iter_mut()
1454                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1455                let index = index::ActiveModel {
1456                    index_id: Set(index_id),
1457                    index_items: Set(pb_nodes.into()),
1458                    ..Default::default()
1459                }
1460                .update(txn)
1461                .await?;
1462                let index_obj = index
1463                    .find_related(Object)
1464                    .one(txn)
1465                    .await?
1466                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1467                objects.push(PbObject {
1468                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1469                });
1470            }
1471        }
1472
1473        if let Some(sinks) = auto_refresh_schema_sinks {
1474            for finish_sink_context in sinks {
1475                finish_fragments(
1476                    txn,
1477                    finish_sink_context.tmp_sink_id.as_job_id(),
1478                    finish_sink_context.original_sink_id.as_job_id(),
1479                    Default::default(),
1480                )
1481                .await?;
1482                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1483                    .find_also_related(Object)
1484                    .one(txn)
1485                    .await?
1486                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1487                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1488                Sink::update(sink::ActiveModel {
1489                    sink_id: Set(finish_sink_context.original_sink_id),
1490                    columns: Set(columns.clone()),
1491                    ..Default::default()
1492                })
1493                .exec(txn)
1494                .await?;
1495                sink.columns = columns;
1496                objects.push(PbObject {
1497                    object_info: Some(PbObjectInfo::Sink(
1498                        ObjectModel(sink, sink_obj.unwrap()).into(),
1499                    )),
1500                });
1501                if let Some((log_store_table_id, new_log_store_table_columns)) =
1502                    finish_sink_context.new_log_store_table
1503                {
1504                    let new_log_store_table_columns: ColumnCatalogArray =
1505                        new_log_store_table_columns.into();
1506                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1507                        .find_also_related(Object)
1508                        .one(txn)
1509                        .await?
1510                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1511                    Table::update(table::ActiveModel {
1512                        table_id: Set(log_store_table_id),
1513                        columns: Set(new_log_store_table_columns.clone()),
1514                        ..Default::default()
1515                    })
1516                    .exec(txn)
1517                    .await?;
1518                    table.columns = new_log_store_table_columns;
1519                    objects.push(PbObject {
1520                        object_info: Some(PbObjectInfo::Table(
1521                            ObjectModel(table, table_obj.unwrap()).into(),
1522                        )),
1523                    });
1524                }
1525            }
1526        }
1527
1528        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1529        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1530            notification_objs =
1531                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1532        }
1533
1534        Ok((objects, notification_objs))
1535    }
1536
1537    /// Abort the replacing streaming job by deleting the temporary job object.
1538    pub async fn try_abort_replacing_streaming_job(
1539        &self,
1540        tmp_job_id: JobId,
1541        tmp_sink_ids: Option<Vec<ObjectId>>,
1542    ) -> MetaResult<()> {
1543        let inner = self.inner.write().await;
1544        let txn = inner.db.begin().await?;
1545        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1546        if let Some(tmp_sink_ids) = tmp_sink_ids {
1547            for tmp_sink_id in tmp_sink_ids {
1548                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1549            }
1550        }
1551        txn.commit().await?;
1552        Ok(())
1553    }
1554
1555    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1556    // return the actor_ids to be applied
1557    pub async fn update_source_rate_limit_by_source_id(
1558        &self,
1559        source_id: SourceId,
1560        rate_limit: Option<u32>,
1561    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1562        let inner = self.inner.read().await;
1563        let txn = inner.db.begin().await?;
1564
1565        {
1566            let active_source = source::ActiveModel {
1567                source_id: Set(source_id),
1568                rate_limit: Set(rate_limit.map(|v| v as i32)),
1569                ..Default::default()
1570            };
1571            active_source.update(&txn).await?;
1572        }
1573
1574        let (source, obj) = Source::find_by_id(source_id)
1575            .find_also_related(Object)
1576            .one(&txn)
1577            .await?
1578            .ok_or_else(|| {
1579                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1580            })?;
1581
1582        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1583        let streaming_job_ids: Vec<JobId> =
1584            if let Some(table_id) = source.optional_associated_table_id {
1585                vec![table_id.as_job_id()]
1586            } else if let Some(source_info) = &source.source_info
1587                && source_info.to_protobuf().is_shared()
1588            {
1589                vec![source_id.as_share_source_job_id()]
1590            } else {
1591                ObjectDependency::find()
1592                    .select_only()
1593                    .column(object_dependency::Column::UsedBy)
1594                    .filter(object_dependency::Column::Oid.eq(source_id))
1595                    .into_tuple()
1596                    .all(&txn)
1597                    .await?
1598            };
1599
1600        if streaming_job_ids.is_empty() {
1601            return Err(MetaError::invalid_parameter(format!(
1602                "source id {source_id} not used by any streaming job"
1603            )));
1604        }
1605
1606        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1607            .select_only()
1608            .columns([
1609                fragment::Column::FragmentId,
1610                fragment::Column::FragmentTypeMask,
1611                fragment::Column::StreamNode,
1612            ])
1613            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1614            .into_tuple()
1615            .all(&txn)
1616            .await?;
1617        let mut fragments = fragments
1618            .into_iter()
1619            .map(|(id, mask, stream_node)| {
1620                (
1621                    id,
1622                    FragmentTypeMask::from(mask as u32),
1623                    stream_node.to_protobuf(),
1624                )
1625            })
1626            .collect_vec();
1627
1628        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1629            let mut found = false;
1630            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1631                visit_stream_node_mut(stream_node, |node| {
1632                    if let PbNodeBody::Source(node) = node
1633                        && let Some(node_inner) = &mut node.source_inner
1634                        && node_inner.source_id == source_id
1635                    {
1636                        node_inner.rate_limit = rate_limit;
1637                        found = true;
1638                    }
1639                });
1640            }
1641            if is_fs_source {
1642                // in older versions, there's no fragment type flag for `FsFetch` node,
1643                // so we just scan all fragments for StreamFsFetch node if using fs connector
1644                visit_stream_node_mut(stream_node, |node| {
1645                    if let PbNodeBody::StreamFsFetch(node) = node {
1646                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1647                        if let Some(node_inner) = &mut node.node_inner
1648                            && node_inner.source_id == source_id
1649                        {
1650                            node_inner.rate_limit = rate_limit;
1651                            found = true;
1652                        }
1653                    }
1654                });
1655            }
1656            found
1657        });
1658
1659        assert!(
1660            !fragments.is_empty(),
1661            "source id should be used by at least one fragment"
1662        );
1663        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1664
1665        for (id, fragment_type_mask, stream_node) in fragments {
1666            fragment::ActiveModel {
1667                fragment_id: Set(id),
1668                fragment_type_mask: Set(fragment_type_mask.into()),
1669                stream_node: Set(StreamNode::from(&stream_node)),
1670                ..Default::default()
1671            }
1672            .update(&txn)
1673            .await?;
1674        }
1675
1676        txn.commit().await?;
1677
1678        let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1679
1680        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1681        let _version = self
1682            .notify_frontend(
1683                NotificationOperation::Update,
1684                NotificationInfo::ObjectGroup(PbObjectGroup {
1685                    objects: vec![PbObject {
1686                        object_info: Some(relation_info),
1687                    }],
1688                }),
1689            )
1690            .await;
1691
1692        Ok(fragment_actors)
1693    }
1694
1695    fn get_fragment_actors_from_running_info(
1696        &self,
1697        fragment_ids: impl Iterator<Item = FragmentId>,
1698    ) -> HashMap<FragmentId, Vec<ActorId>> {
1699        let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1700
1701        let info = self.env.shared_actor_infos().read_guard();
1702
1703        for fragment_id in fragment_ids {
1704            let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1705            fragment_actors
1706                .entry(fragment_id as _)
1707                .or_default()
1708                .extend(actors.keys().copied());
1709        }
1710
1711        fragment_actors
1712    }
1713
1714    // edit the content of fragments in given `table_id`
1715    // return the actor_ids to be applied
1716    pub async fn mutate_fragments_by_job_id(
1717        &self,
1718        job_id: JobId,
1719        // returns true if the mutation is applied
1720        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1721        // error message when no relevant fragments is found
1722        err_msg: &'static str,
1723    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1724        let inner = self.inner.read().await;
1725        let txn = inner.db.begin().await?;
1726
1727        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1728            .select_only()
1729            .columns([
1730                fragment::Column::FragmentId,
1731                fragment::Column::FragmentTypeMask,
1732                fragment::Column::StreamNode,
1733            ])
1734            .filter(fragment::Column::JobId.eq(job_id))
1735            .into_tuple()
1736            .all(&txn)
1737            .await?;
1738        let mut fragments = fragments
1739            .into_iter()
1740            .map(|(id, mask, stream_node)| {
1741                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1742            })
1743            .collect_vec();
1744
1745        let fragments = fragments
1746            .iter_mut()
1747            .map(|(_, fragment_type_mask, stream_node)| {
1748                fragments_mutation_fn(*fragment_type_mask, stream_node)
1749            })
1750            .collect::<MetaResult<Vec<bool>>>()?
1751            .into_iter()
1752            .zip_eq_debug(std::mem::take(&mut fragments))
1753            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1754            .collect::<Vec<_>>();
1755
1756        if fragments.is_empty() {
1757            return Err(MetaError::invalid_parameter(format!(
1758                "job id {job_id}: {}",
1759                err_msg
1760            )));
1761        }
1762
1763        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1764        for (id, _, stream_node) in fragments {
1765            fragment::ActiveModel {
1766                fragment_id: Set(id),
1767                stream_node: Set(StreamNode::from(&stream_node)),
1768                ..Default::default()
1769            }
1770            .update(&txn)
1771            .await?;
1772        }
1773
1774        txn.commit().await?;
1775
1776        let fragment_actors =
1777            self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1778
1779        Ok(fragment_actors)
1780    }
1781
1782    async fn mutate_fragment_by_fragment_id(
1783        &self,
1784        fragment_id: FragmentId,
1785        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1786        err_msg: &'static str,
1787    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1788        let inner = self.inner.read().await;
1789        let txn = inner.db.begin().await?;
1790
1791        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1792            Fragment::find_by_id(fragment_id)
1793                .select_only()
1794                .columns([
1795                    fragment::Column::FragmentTypeMask,
1796                    fragment::Column::StreamNode,
1797                ])
1798                .into_tuple()
1799                .one(&txn)
1800                .await?
1801                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1802        let mut pb_stream_node = stream_node.to_protobuf();
1803        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1804
1805        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1806            return Err(MetaError::invalid_parameter(format!(
1807                "fragment id {fragment_id}: {}",
1808                err_msg
1809            )));
1810        }
1811
1812        fragment::ActiveModel {
1813            fragment_id: Set(fragment_id),
1814            stream_node: Set(stream_node),
1815            ..Default::default()
1816        }
1817        .update(&txn)
1818        .await?;
1819
1820        let fragment_actors =
1821            self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1822
1823        txn.commit().await?;
1824
1825        Ok(fragment_actors)
1826    }
1827
1828    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1829    // return the actor_ids to be applied
1830    pub async fn update_backfill_rate_limit_by_job_id(
1831        &self,
1832        job_id: JobId,
1833        rate_limit: Option<u32>,
1834    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1835        let update_backfill_rate_limit =
1836            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1837                let mut found = false;
1838                if fragment_type_mask
1839                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1840                {
1841                    visit_stream_node_mut(stream_node, |node| match node {
1842                        PbNodeBody::StreamCdcScan(node) => {
1843                            node.rate_limit = rate_limit;
1844                            found = true;
1845                        }
1846                        PbNodeBody::StreamScan(node) => {
1847                            node.rate_limit = rate_limit;
1848                            found = true;
1849                        }
1850                        PbNodeBody::SourceBackfill(node) => {
1851                            node.rate_limit = rate_limit;
1852                            found = true;
1853                        }
1854                        PbNodeBody::Sink(node) => {
1855                            node.rate_limit = rate_limit;
1856                            found = true;
1857                        }
1858                        _ => {}
1859                    });
1860                }
1861                Ok(found)
1862            };
1863
1864        self.mutate_fragments_by_job_id(
1865            job_id,
1866            update_backfill_rate_limit,
1867            "stream scan node or source node not found",
1868        )
1869        .await
1870    }
1871
1872    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1873    // return the actor_ids to be applied
1874    pub async fn update_sink_rate_limit_by_job_id(
1875        &self,
1876        sink_id: SinkId,
1877        rate_limit: Option<u32>,
1878    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1879        let update_sink_rate_limit =
1880            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1881                let mut found = Ok(false);
1882                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1883                    visit_stream_node_mut(stream_node, |node| {
1884                        if let PbNodeBody::Sink(node) = node {
1885                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1886                                found = Err(MetaError::invalid_parameter(
1887                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1888                                ));
1889                                return;
1890                            }
1891                            node.rate_limit = rate_limit;
1892                            found = Ok(true);
1893                        }
1894                    });
1895                }
1896                found
1897            };
1898
1899        self.mutate_fragments_by_job_id(
1900            sink_id.as_job_id(),
1901            update_sink_rate_limit,
1902            "sink node not found",
1903        )
1904        .await
1905    }
1906
1907    pub async fn update_dml_rate_limit_by_job_id(
1908        &self,
1909        job_id: JobId,
1910        rate_limit: Option<u32>,
1911    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1912        let update_dml_rate_limit =
1913            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1914                let mut found = false;
1915                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1916                    visit_stream_node_mut(stream_node, |node| {
1917                        if let PbNodeBody::Dml(node) = node {
1918                            node.rate_limit = rate_limit;
1919                            found = true;
1920                        }
1921                    });
1922                }
1923                Ok(found)
1924            };
1925
1926        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1927            .await
1928    }
1929
1930    pub async fn update_source_props_by_source_id(
1931        &self,
1932        source_id: SourceId,
1933        alter_props: BTreeMap<String, String>,
1934        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1935    ) -> MetaResult<WithOptionsSecResolved> {
1936        let inner = self.inner.read().await;
1937        let txn = inner.db.begin().await?;
1938
1939        let (source, _obj) = Source::find_by_id(source_id)
1940            .find_also_related(Object)
1941            .one(&txn)
1942            .await?
1943            .ok_or_else(|| {
1944                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1945            })?;
1946        let connector = source.with_properties.0.get_connector().unwrap();
1947        let is_shared_source = source.is_shared();
1948
1949        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1950        if !is_shared_source {
1951            // mv using non-shared source holds a copy of source in their fragments
1952            dep_source_job_ids = ObjectDependency::find()
1953                .select_only()
1954                .column(object_dependency::Column::UsedBy)
1955                .filter(object_dependency::Column::Oid.eq(source_id))
1956                .into_tuple()
1957                .all(&txn)
1958                .await?;
1959        }
1960
1961        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1962        let prop_keys: Vec<String> = alter_props
1963            .keys()
1964            .chain(alter_secret_refs.keys())
1965            .cloned()
1966            .collect();
1967        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1968            &connector, &prop_keys,
1969        )?;
1970
1971        let mut options_with_secret = WithOptionsSecResolved::new(
1972            source.with_properties.0.clone(),
1973            source
1974                .secret_ref
1975                .map(|secret_ref| secret_ref.to_protobuf())
1976                .unwrap_or_default(),
1977        );
1978        let (to_add_secret_dep, to_remove_secret_dep) =
1979            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1980
1981        tracing::info!(
1982            "applying new properties to source: source_id={}, options_with_secret={:?}",
1983            source_id,
1984            options_with_secret
1985        );
1986        // check if the alter-ed props are valid for each Connector
1987        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1988        // todo: validate via source manager
1989
1990        let mut associate_table_id = None;
1991
1992        // can be source_id or table_id
1993        // if updating an associated source, the preferred_id is the table_id
1994        // otherwise, it is the source_id
1995        let mut preferred_id = source_id.as_object_id();
1996        let rewrite_sql = {
1997            let definition = source.definition.clone();
1998
1999            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2000                .map_err(|e| {
2001                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2002                        anyhow!(e).context("Failed to parse source definition SQL"),
2003                    )))
2004                })?
2005                .try_into()
2006                .unwrap();
2007
2008            /// Formats SQL options with secret values properly resolved
2009            ///
2010            /// This function processes configuration options that may contain sensitive data:
2011            /// - Plaintext options are directly converted to `SqlOption`
2012            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2013            ///   without exposing the actual secret value
2014            ///
2015            /// # Arguments
2016            /// * `txn` - Database transaction for retrieving secrets
2017            /// * `options_with_secret` - Container of options with both plaintext and secret values
2018            ///
2019            /// # Returns
2020            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2021            async fn format_with_option_secret_resolved(
2022                txn: &DatabaseTransaction,
2023                options_with_secret: &WithOptionsSecResolved,
2024            ) -> MetaResult<Vec<SqlOption>> {
2025                let mut options = Vec::new();
2026                for (k, v) in options_with_secret.as_plaintext() {
2027                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2028                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2029                    options.push(sql_option);
2030                }
2031                for (k, v) in options_with_secret.as_secret() {
2032                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2033                        let sql_option =
2034                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2035                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2036                        options.push(sql_option);
2037                    } else {
2038                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2039                    }
2040                }
2041                Ok(options)
2042            }
2043
2044            match &mut stmt {
2045                Statement::CreateSource { stmt } => {
2046                    stmt.with_properties.0 =
2047                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2048                }
2049                Statement::CreateTable { with_options, .. } => {
2050                    *with_options =
2051                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2052                    associate_table_id = source.optional_associated_table_id;
2053                    preferred_id = associate_table_id.unwrap().as_object_id();
2054                }
2055                _ => unreachable!(),
2056            }
2057
2058            stmt.to_string()
2059        };
2060
2061        {
2062            // update secret dependencies
2063            if !to_add_secret_dep.is_empty() {
2064                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2065                    object_dependency::ActiveModel {
2066                        oid: Set(secret_id.into()),
2067                        used_by: Set(preferred_id),
2068                        ..Default::default()
2069                    }
2070                }))
2071                .exec(&txn)
2072                .await?;
2073            }
2074            if !to_remove_secret_dep.is_empty() {
2075                // todo: fix the filter logic
2076                let _ = ObjectDependency::delete_many()
2077                    .filter(
2078                        object_dependency::Column::Oid
2079                            .is_in(to_remove_secret_dep)
2080                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2081                    )
2082                    .exec(&txn)
2083                    .await?;
2084            }
2085        }
2086
2087        let active_source_model = source::ActiveModel {
2088            source_id: Set(source_id),
2089            definition: Set(rewrite_sql.clone()),
2090            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2091            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2092                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2093            ..Default::default()
2094        };
2095        active_source_model.update(&txn).await?;
2096
2097        if let Some(associate_table_id) = associate_table_id {
2098            // update the associated table statement accordly
2099            let active_table_model = table::ActiveModel {
2100                table_id: Set(associate_table_id),
2101                definition: Set(rewrite_sql),
2102                ..Default::default()
2103            };
2104            active_table_model.update(&txn).await?;
2105        }
2106
2107        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2108            // if updating table with connector, the fragment_id is table id
2109            associate_table_id.as_job_id()
2110        } else {
2111            source_id.as_share_source_job_id()
2112        }]
2113        .into_iter()
2114        .chain(dep_source_job_ids.into_iter())
2115        .collect_vec();
2116
2117        // update fragments
2118        update_connector_props_fragments(
2119            &txn,
2120            to_check_job_ids,
2121            FragmentTypeFlag::Source,
2122            |node, found| {
2123                if let PbNodeBody::Source(node) = node
2124                    && let Some(source_inner) = &mut node.source_inner
2125                {
2126                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2127                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2128                    *found = true;
2129                }
2130            },
2131            is_shared_source,
2132        )
2133        .await?;
2134
2135        let mut to_update_objs = Vec::with_capacity(2);
2136        let (source, obj) = Source::find_by_id(source_id)
2137            .find_also_related(Object)
2138            .one(&txn)
2139            .await?
2140            .ok_or_else(|| {
2141                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2142            })?;
2143        to_update_objs.push(PbObject {
2144            object_info: Some(PbObjectInfo::Source(
2145                ObjectModel(source, obj.unwrap()).into(),
2146            )),
2147        });
2148
2149        if let Some(associate_table_id) = associate_table_id {
2150            let (table, obj) = Table::find_by_id(associate_table_id)
2151                .find_also_related(Object)
2152                .one(&txn)
2153                .await?
2154                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2155            to_update_objs.push(PbObject {
2156                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2157            });
2158        }
2159
2160        txn.commit().await?;
2161
2162        self.notify_frontend(
2163            NotificationOperation::Update,
2164            NotificationInfo::ObjectGroup(PbObjectGroup {
2165                objects: to_update_objs,
2166            }),
2167        )
2168        .await;
2169
2170        Ok(options_with_secret)
2171    }
2172
2173    pub async fn update_sink_props_by_sink_id(
2174        &self,
2175        sink_id: SinkId,
2176        props: BTreeMap<String, String>,
2177    ) -> MetaResult<HashMap<String, String>> {
2178        let inner = self.inner.read().await;
2179        let txn = inner.db.begin().await?;
2180
2181        let (sink, _obj) = Sink::find_by_id(sink_id)
2182            .find_also_related(Object)
2183            .one(&txn)
2184            .await?
2185            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2186        validate_sink_props(&sink, &props)?;
2187        let definition = sink.definition.clone();
2188        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2189            .map_err(|e| SinkError::Config(anyhow!(e)))?
2190            .try_into()
2191            .unwrap();
2192        if let Statement::CreateSink { stmt } = &mut stmt {
2193            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2194        } else {
2195            panic!("definition is not a create sink statement")
2196        }
2197        let mut new_config = sink.properties.clone().into_inner();
2198        new_config.extend(props.clone());
2199
2200        let definition = stmt.to_string();
2201        let active_sink = sink::ActiveModel {
2202            sink_id: Set(sink_id),
2203            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2204            definition: Set(definition),
2205            ..Default::default()
2206        };
2207        active_sink.update(&txn).await?;
2208
2209        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2210        let (sink, obj) = Sink::find_by_id(sink_id)
2211            .find_also_related(Object)
2212            .one(&txn)
2213            .await?
2214            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2215        txn.commit().await?;
2216        let relation_infos = vec![PbObject {
2217            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2218        }];
2219
2220        let _version = self
2221            .notify_frontend(
2222                NotificationOperation::Update,
2223                NotificationInfo::ObjectGroup(PbObjectGroup {
2224                    objects: relation_infos,
2225                }),
2226            )
2227            .await;
2228
2229        Ok(props.into_iter().collect())
2230    }
2231
2232    pub async fn update_iceberg_table_props_by_table_id(
2233        &self,
2234        table_id: TableId,
2235        props: BTreeMap<String, String>,
2236        alter_iceberg_table_props: Option<
2237            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2238        >,
2239    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2240        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2241            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2242        let inner = self.inner.read().await;
2243        let txn = inner.db.begin().await?;
2244
2245        let (sink, _obj) = Sink::find_by_id(sink_id)
2246            .find_also_related(Object)
2247            .one(&txn)
2248            .await?
2249            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2250        validate_sink_props(&sink, &props)?;
2251
2252        let definition = sink.definition.clone();
2253        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2254            .map_err(|e| SinkError::Config(anyhow!(e)))?
2255            .try_into()
2256            .unwrap();
2257        if let Statement::CreateTable {
2258            with_options,
2259            engine,
2260            ..
2261        } = &mut stmt
2262        {
2263            if !matches!(engine, Engine::Iceberg) {
2264                return Err(SinkError::Config(anyhow!(
2265                    "only iceberg table can be altered as sink"
2266                ))
2267                .into());
2268            }
2269            update_stmt_with_props(with_options, &props)?;
2270        } else {
2271            panic!("definition is not a create iceberg table statement")
2272        }
2273        let mut new_config = sink.properties.clone().into_inner();
2274        new_config.extend(props.clone());
2275
2276        let definition = stmt.to_string();
2277        let active_sink = sink::ActiveModel {
2278            sink_id: Set(sink_id),
2279            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2280            definition: Set(definition.clone()),
2281            ..Default::default()
2282        };
2283        let active_source = source::ActiveModel {
2284            source_id: Set(source_id),
2285            definition: Set(definition.clone()),
2286            ..Default::default()
2287        };
2288        let active_table = table::ActiveModel {
2289            table_id: Set(table_id),
2290            definition: Set(definition),
2291            ..Default::default()
2292        };
2293        active_sink.update(&txn).await?;
2294        active_source.update(&txn).await?;
2295        active_table.update(&txn).await?;
2296
2297        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2298
2299        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2300            .find_also_related(Object)
2301            .one(&txn)
2302            .await?
2303            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2304        let (source, source_obj) = Source::find_by_id(source_id)
2305            .find_also_related(Object)
2306            .one(&txn)
2307            .await?
2308            .ok_or_else(|| {
2309                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2310            })?;
2311        let (table, table_obj) = Table::find_by_id(table_id)
2312            .find_also_related(Object)
2313            .one(&txn)
2314            .await?
2315            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2316        txn.commit().await?;
2317        let relation_infos = vec![
2318            PbObject {
2319                object_info: Some(PbObjectInfo::Sink(
2320                    ObjectModel(sink, sink_obj.unwrap()).into(),
2321                )),
2322            },
2323            PbObject {
2324                object_info: Some(PbObjectInfo::Source(
2325                    ObjectModel(source, source_obj.unwrap()).into(),
2326                )),
2327            },
2328            PbObject {
2329                object_info: Some(PbObjectInfo::Table(
2330                    ObjectModel(table, table_obj.unwrap()).into(),
2331                )),
2332            },
2333        ];
2334        let _version = self
2335            .notify_frontend(
2336                NotificationOperation::Update,
2337                NotificationInfo::ObjectGroup(PbObjectGroup {
2338                    objects: relation_infos,
2339                }),
2340            )
2341            .await;
2342
2343        Ok((props.into_iter().collect(), sink_id))
2344    }
2345
2346    pub async fn update_fragment_rate_limit_by_fragment_id(
2347        &self,
2348        fragment_id: FragmentId,
2349        rate_limit: Option<u32>,
2350    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2351        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2352                                 stream_node: &mut PbStreamNode| {
2353            let mut found = false;
2354            if fragment_type_mask.contains_any(
2355                FragmentTypeFlag::dml_rate_limit_fragments()
2356                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2357                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2358                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2359            ) {
2360                visit_stream_node_mut(stream_node, |node| {
2361                    if let PbNodeBody::Dml(node) = node {
2362                        node.rate_limit = rate_limit;
2363                        found = true;
2364                    }
2365                    if let PbNodeBody::Sink(node) = node {
2366                        node.rate_limit = rate_limit;
2367                        found = true;
2368                    }
2369                    if let PbNodeBody::StreamCdcScan(node) = node {
2370                        node.rate_limit = rate_limit;
2371                        found = true;
2372                    }
2373                    if let PbNodeBody::StreamScan(node) = node {
2374                        node.rate_limit = rate_limit;
2375                        found = true;
2376                    }
2377                    if let PbNodeBody::SourceBackfill(node) = node {
2378                        node.rate_limit = rate_limit;
2379                        found = true;
2380                    }
2381                });
2382            }
2383            found
2384        };
2385        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2386            .await
2387    }
2388
2389    /// Note: `FsFetch` created in old versions are not included.
2390    /// Since this is only used for debugging, it should be fine.
2391    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2392        let inner = self.inner.read().await;
2393        let txn = inner.db.begin().await?;
2394
2395        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2396            .select_only()
2397            .columns([
2398                fragment::Column::FragmentId,
2399                fragment::Column::JobId,
2400                fragment::Column::FragmentTypeMask,
2401                fragment::Column::StreamNode,
2402            ])
2403            .filter(FragmentTypeMask::intersects_any(
2404                FragmentTypeFlag::rate_limit_fragments(),
2405            ))
2406            .into_tuple()
2407            .all(&txn)
2408            .await?;
2409
2410        let mut rate_limits = Vec::new();
2411        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2412            let stream_node = stream_node.to_protobuf();
2413            visit_stream_node_body(&stream_node, |node| {
2414                let mut rate_limit = None;
2415                let mut node_name = None;
2416
2417                match node {
2418                    // source rate limit
2419                    PbNodeBody::Source(node) => {
2420                        if let Some(node_inner) = &node.source_inner {
2421                            rate_limit = node_inner.rate_limit;
2422                            node_name = Some("SOURCE");
2423                        }
2424                    }
2425                    PbNodeBody::StreamFsFetch(node) => {
2426                        if let Some(node_inner) = &node.node_inner {
2427                            rate_limit = node_inner.rate_limit;
2428                            node_name = Some("FS_FETCH");
2429                        }
2430                    }
2431                    // backfill rate limit
2432                    PbNodeBody::SourceBackfill(node) => {
2433                        rate_limit = node.rate_limit;
2434                        node_name = Some("SOURCE_BACKFILL");
2435                    }
2436                    PbNodeBody::StreamScan(node) => {
2437                        rate_limit = node.rate_limit;
2438                        node_name = Some("STREAM_SCAN");
2439                    }
2440                    PbNodeBody::StreamCdcScan(node) => {
2441                        rate_limit = node.rate_limit;
2442                        node_name = Some("STREAM_CDC_SCAN");
2443                    }
2444                    PbNodeBody::Sink(node) => {
2445                        rate_limit = node.rate_limit;
2446                        node_name = Some("SINK");
2447                    }
2448                    _ => {}
2449                }
2450
2451                if let Some(rate_limit) = rate_limit {
2452                    rate_limits.push(RateLimitInfo {
2453                        fragment_id,
2454                        job_id,
2455                        fragment_type_mask: fragment_type_mask as u32,
2456                        rate_limit,
2457                        node_name: node_name.unwrap().to_owned(),
2458                    });
2459                }
2460            });
2461        }
2462
2463        Ok(rate_limits)
2464    }
2465}
2466
2467fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2468    // Validate that props can be altered
2469    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2470        Some(connector) => {
2471            let connector_type = connector.to_lowercase();
2472            let field_names: Vec<String> = props.keys().cloned().collect();
2473            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2474                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2475
2476            match_sink_name_str!(
2477                connector_type.as_str(),
2478                SinkType,
2479                {
2480                    let mut new_props = sink.properties.0.clone();
2481                    new_props.extend(props.clone());
2482                    SinkType::validate_alter_config(&new_props)
2483                },
2484                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2485            )?
2486        }
2487        None => {
2488            return Err(
2489                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2490            );
2491        }
2492    };
2493    Ok(())
2494}
2495
2496fn update_stmt_with_props(
2497    with_properties: &mut Vec<SqlOption>,
2498    props: &BTreeMap<String, String>,
2499) -> MetaResult<()> {
2500    let mut new_sql_options = with_properties
2501        .iter()
2502        .map(|sql_option| (&sql_option.name, sql_option))
2503        .collect::<IndexMap<_, _>>();
2504    let add_sql_options = props
2505        .iter()
2506        .map(|(k, v)| SqlOption::try_from((k, v)))
2507        .collect::<Result<Vec<SqlOption>, ParserError>>()
2508        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2509    new_sql_options.extend(
2510        add_sql_options
2511            .iter()
2512            .map(|sql_option| (&sql_option.name, sql_option)),
2513    );
2514    *with_properties = new_sql_options.into_values().cloned().collect();
2515    Ok(())
2516}
2517
2518async fn update_sink_fragment_props(
2519    txn: &DatabaseTransaction,
2520    sink_id: SinkId,
2521    props: BTreeMap<String, String>,
2522) -> MetaResult<()> {
2523    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2524        .select_only()
2525        .columns([
2526            fragment::Column::FragmentId,
2527            fragment::Column::FragmentTypeMask,
2528            fragment::Column::StreamNode,
2529        ])
2530        .filter(fragment::Column::JobId.eq(sink_id))
2531        .into_tuple()
2532        .all(txn)
2533        .await?;
2534    let fragments = fragments
2535        .into_iter()
2536        .filter(|(_, fragment_type_mask, _)| {
2537            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2538        })
2539        .filter_map(|(id, _, stream_node)| {
2540            let mut stream_node = stream_node.to_protobuf();
2541            let mut found = false;
2542            visit_stream_node_mut(&mut stream_node, |node| {
2543                if let PbNodeBody::Sink(node) = node
2544                    && let Some(sink_desc) = &mut node.sink_desc
2545                    && sink_desc.id == sink_id
2546                {
2547                    sink_desc.properties.extend(props.clone());
2548                    found = true;
2549                }
2550            });
2551            if found { Some((id, stream_node)) } else { None }
2552        })
2553        .collect_vec();
2554    assert!(
2555        !fragments.is_empty(),
2556        "sink id should be used by at least one fragment"
2557    );
2558    for (id, stream_node) in fragments {
2559        fragment::ActiveModel {
2560            fragment_id: Set(id),
2561            stream_node: Set(StreamNode::from(&stream_node)),
2562            ..Default::default()
2563        }
2564        .update(txn)
2565        .await?;
2566    }
2567    Ok(())
2568}
2569
2570pub struct SinkIntoTableContext {
2571    /// For alter table (e.g., add column), this is the list of existing sink ids
2572    /// otherwise empty.
2573    pub updated_sink_catalogs: Vec<SinkId>,
2574}
2575
2576pub struct FinishAutoRefreshSchemaSinkContext {
2577    pub tmp_sink_id: SinkId,
2578    pub original_sink_id: SinkId,
2579    pub columns: Vec<PbColumnCatalog>,
2580    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2581}
2582
2583async fn update_connector_props_fragments<F>(
2584    txn: &DatabaseTransaction,
2585    job_ids: Vec<JobId>,
2586    expect_flag: FragmentTypeFlag,
2587    mut alter_stream_node_fn: F,
2588    is_shared_source: bool,
2589) -> MetaResult<()>
2590where
2591    F: FnMut(&mut PbNodeBody, &mut bool),
2592{
2593    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2594        .select_only()
2595        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2596        .filter(
2597            fragment::Column::JobId
2598                .is_in(job_ids.clone())
2599                .and(FragmentTypeMask::intersects(expect_flag)),
2600        )
2601        .into_tuple()
2602        .all(txn)
2603        .await?;
2604    let fragments = fragments
2605        .into_iter()
2606        .filter_map(|(id, stream_node)| {
2607            let mut stream_node = stream_node.to_protobuf();
2608            let mut found = false;
2609            visit_stream_node_mut(&mut stream_node, |node| {
2610                alter_stream_node_fn(node, &mut found);
2611            });
2612            if found { Some((id, stream_node)) } else { None }
2613        })
2614        .collect_vec();
2615    if is_shared_source || job_ids.len() > 1 {
2616        // the first element is the source_id or associated table_id
2617        // if the source is non-shared, there is no updated fragments
2618        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
2619        assert!(
2620            !fragments.is_empty(),
2621            "job ids {:?} (type: {:?}) should be used by at least one fragment",
2622            job_ids,
2623            expect_flag
2624        );
2625    }
2626
2627    for (id, stream_node) in fragments {
2628        fragment::ActiveModel {
2629            fragment_id: Set(id),
2630            stream_node: Set(StreamNode::from(&stream_node)),
2631            ..Default::default()
2632        }
2633        .update(txn)
2634        .await?;
2635    }
2636
2637    Ok(())
2638}