risingwave_meta/controller/
streaming_job.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::{
29    visit_stream_node_body, visit_stream_node_mut,
30};
31use risingwave_common::{bail, current_cluster_version};
32use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::error::ConnectorError;
35use risingwave_connector::sink::file_sink::fs::FsSink;
36use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
37use risingwave_connector::source::{
38    ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
39};
40use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
41use risingwave_meta_model::object::ObjectType;
42use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
43use risingwave_meta_model::refresh_job::RefreshState;
44use risingwave_meta_model::streaming_job::BackfillOrders;
45use risingwave_meta_model::table::TableType;
46use risingwave_meta_model::user_privilege::Action;
47use risingwave_meta_model::*;
48use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
49use risingwave_pb::ddl_service::streaming_job_resource_type;
50use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
51use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
52use risingwave_pb::meta::object::PbObjectInfo;
53use risingwave_pb::meta::subscribe_response::{
54    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
55};
56use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
57use risingwave_pb::plan_common::PbColumnCatalog;
58use risingwave_pb::plan_common::source_refresh_mode::{
59    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
60};
61use risingwave_pb::secret::PbSecretRef;
62use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
63use risingwave_pb::stream_plan::stream_node::PbNodeBody;
64use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
65use risingwave_pb::user::PbUserInfo;
66use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
67use risingwave_sqlparser::parser::{Parser, ParserError};
68use sea_orm::ActiveValue::Set;
69use sea_orm::sea_query::{Expr, Query, SimpleExpr};
70use sea_orm::{
71    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
72    NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
73};
74use thiserror_ext::AsReport;
75
76use super::rename::IndexItemRewriter;
77use crate::barrier::Command;
78use crate::controller::ObjectModel;
79use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
80use crate::controller::fragment::FragmentTypeMaskExt;
81use crate::controller::utils::{
82    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
83    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
84    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
85    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
86    list_object_dependencies_by_object_id, list_user_info_by_ids,
87    try_get_iceberg_table_by_downstream_sink,
88};
89use crate::error::MetaErrorInner;
90use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
91use crate::model::{
92    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
93    StreamJobFragmentsToCreate,
94};
95use crate::stream::SplitAssignment;
96use crate::{MetaError, MetaResult};
97
98/// A planned fragment update for dependent sources when a connection's properties change.
99///
100/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
101/// `type_complexity` warnings.
102#[derive(Debug)]
103struct DependentSourceFragmentUpdate {
104    job_ids: Vec<JobId>,
105    with_properties: BTreeMap<String, String>,
106    secret_refs: BTreeMap<String, PbSecretRef>,
107    is_shared_source: bool,
108}
109
110impl CatalogController {
111    #[allow(clippy::too_many_arguments)]
112    pub async fn create_streaming_job_obj(
113        txn: &DatabaseTransaction,
114        obj_type: ObjectType,
115        owner_id: UserId,
116        database_id: Option<DatabaseId>,
117        schema_id: Option<SchemaId>,
118        create_type: PbCreateType,
119        ctx: StreamContext,
120        streaming_parallelism: StreamingParallelism,
121        max_parallelism: usize,
122        resource_type: streaming_job_resource_type::ResourceType,
123        backfill_parallelism: Option<StreamingParallelism>,
124    ) -> MetaResult<JobId> {
125        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
126        let job_id = obj.oid.as_job_id();
127        let specific_resource_group = resource_type.resource_group();
128        let is_serverless_backfill = matches!(
129            &resource_type,
130            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
131        );
132        let job = streaming_job::ActiveModel {
133            job_id: Set(job_id),
134            job_status: Set(JobStatus::Initial),
135            create_type: Set(create_type.into()),
136            timezone: Set(ctx.timezone),
137            config_override: Set(Some(ctx.config_override.to_string())),
138            adaptive_parallelism_strategy: Set(ctx
139                .adaptive_parallelism_strategy
140                .as_ref()
141                .map(ToString::to_string)),
142            parallelism: Set(streaming_parallelism),
143            backfill_parallelism: Set(backfill_parallelism),
144            backfill_orders: Set(None),
145            max_parallelism: Set(max_parallelism as _),
146            specific_resource_group: Set(specific_resource_group),
147            is_serverless_backfill: Set(is_serverless_backfill),
148        };
149        StreamingJobModel::insert(job).exec(txn).await?;
150
151        Ok(job_id)
152    }
153
154    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
155    /// materialized view.
156    ///
157    /// Some of the fields in the given streaming job are placeholders, which will
158    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
159    #[await_tree::instrument]
160    pub async fn create_job_catalog(
161        &self,
162        streaming_job: &mut StreamingJob,
163        ctx: &StreamContext,
164        parallelism: &Option<Parallelism>,
165        max_parallelism: usize,
166        mut dependencies: HashSet<ObjectId>,
167        resource_type: streaming_job_resource_type::ResourceType,
168        backfill_parallelism: &Option<Parallelism>,
169    ) -> MetaResult<()> {
170        let inner = self.inner.write().await;
171        let txn = inner.db.begin().await?;
172        let create_type = streaming_job.create_type();
173
174        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
175            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
176            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
177            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
178        };
179        let backfill_parallelism = backfill_parallelism
180            .as_ref()
181            .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
182
183        ensure_user_id(streaming_job.owner() as _, &txn).await?;
184        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
185        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
186        check_relation_name_duplicate(
187            &streaming_job.name(),
188            streaming_job.database_id(),
189            streaming_job.schema_id(),
190            &txn,
191        )
192        .await?;
193
194        // check if any dependency is in altering status.
195        if !dependencies.is_empty() {
196            let altering_cnt = ObjectDependency::find()
197                .join(
198                    JoinType::InnerJoin,
199                    object_dependency::Relation::Object1.def(),
200                )
201                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
202                .filter(
203                    object_dependency::Column::Oid
204                        .is_in(dependencies.clone())
205                        .and(object::Column::ObjType.eq(ObjectType::Table))
206                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
207                        .and(
208                            // It means the referring table is just dummy for altering.
209                            object::Column::Oid.not_in_subquery(
210                                Query::select()
211                                    .column(table::Column::TableId)
212                                    .from(Table)
213                                    .to_owned(),
214                            ),
215                        ),
216                )
217                .count(&txn)
218                .await?;
219            if altering_cnt != 0 {
220                return Err(MetaError::permission_denied(
221                    "some dependent relations are being altered",
222                ));
223            }
224        }
225
226        match streaming_job {
227            StreamingJob::MaterializedView(table) => {
228                let job_id = Self::create_streaming_job_obj(
229                    &txn,
230                    ObjectType::Table,
231                    table.owner as _,
232                    Some(table.database_id),
233                    Some(table.schema_id),
234                    create_type,
235                    ctx.clone(),
236                    streaming_parallelism,
237                    max_parallelism,
238                    resource_type,
239                    backfill_parallelism.clone(),
240                )
241                .await?;
242                table.id = job_id.as_mv_table_id();
243                let table_model: table::ActiveModel = table.clone().into();
244                Table::insert(table_model).exec(&txn).await?;
245            }
246            StreamingJob::Sink(sink) => {
247                if let Some(target_table_id) = sink.target_table
248                    && check_sink_into_table_cycle(
249                        target_table_id.into(),
250                        dependencies.iter().cloned().collect(),
251                        &txn,
252                    )
253                    .await?
254                {
255                    bail!("Creating such a sink will result in circular dependency.");
256                }
257
258                let job_id = Self::create_streaming_job_obj(
259                    &txn,
260                    ObjectType::Sink,
261                    sink.owner as _,
262                    Some(sink.database_id),
263                    Some(sink.schema_id),
264                    create_type,
265                    ctx.clone(),
266                    streaming_parallelism,
267                    max_parallelism,
268                    resource_type,
269                    backfill_parallelism.clone(),
270                )
271                .await?;
272                sink.id = job_id.as_sink_id();
273                let sink_model: sink::ActiveModel = sink.clone().into();
274                Sink::insert(sink_model).exec(&txn).await?;
275            }
276            StreamingJob::Table(src, table, _) => {
277                let job_id = Self::create_streaming_job_obj(
278                    &txn,
279                    ObjectType::Table,
280                    table.owner as _,
281                    Some(table.database_id),
282                    Some(table.schema_id),
283                    create_type,
284                    ctx.clone(),
285                    streaming_parallelism,
286                    max_parallelism,
287                    resource_type,
288                    backfill_parallelism.clone(),
289                )
290                .await?;
291                table.id = job_id.as_mv_table_id();
292                if let Some(src) = src {
293                    let src_obj = Self::create_object(
294                        &txn,
295                        ObjectType::Source,
296                        src.owner as _,
297                        Some(src.database_id),
298                        Some(src.schema_id),
299                    )
300                    .await?;
301                    src.id = src_obj.oid.as_source_id();
302                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
303                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
304                    let source: source::ActiveModel = src.clone().into();
305                    Source::insert(source).exec(&txn).await?;
306                }
307                let table_model: table::ActiveModel = table.clone().into();
308                Table::insert(table_model).exec(&txn).await?;
309
310                if table.refreshable {
311                    let trigger_interval_secs = src
312                        .as_ref()
313                        .and_then(|source_catalog| source_catalog.refresh_mode)
314                        .and_then(
315                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
316                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
317                                    refresh_interval_sec,
318                                })) => refresh_interval_sec,
319                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
320                                None => None,
321                            },
322                        );
323
324                    RefreshJob::insert(refresh_job::ActiveModel {
325                        table_id: Set(table.id),
326                        last_trigger_time: Set(None),
327                        trigger_interval_secs: Set(trigger_interval_secs),
328                        current_status: Set(RefreshState::Idle),
329                        last_success_time: Set(None),
330                    })
331                    .exec(&txn)
332                    .await?;
333                }
334            }
335            StreamingJob::Index(index, table) => {
336                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
337                let job_id = Self::create_streaming_job_obj(
338                    &txn,
339                    ObjectType::Index,
340                    index.owner as _,
341                    Some(index.database_id),
342                    Some(index.schema_id),
343                    create_type,
344                    ctx.clone(),
345                    streaming_parallelism,
346                    max_parallelism,
347                    resource_type,
348                    backfill_parallelism.clone(),
349                )
350                .await?;
351                // to be compatible with old implementation.
352                index.id = job_id.as_index_id();
353                index.index_table_id = job_id.as_mv_table_id();
354                table.id = job_id.as_mv_table_id();
355
356                ObjectDependency::insert(object_dependency::ActiveModel {
357                    oid: Set(index.primary_table_id.into()),
358                    used_by: Set(table.id.into()),
359                    ..Default::default()
360                })
361                .exec(&txn)
362                .await?;
363
364                let table_model: table::ActiveModel = table.clone().into();
365                Table::insert(table_model).exec(&txn).await?;
366                let index_model: index::ActiveModel = index.clone().into();
367                Index::insert(index_model).exec(&txn).await?;
368            }
369            StreamingJob::Source(src) => {
370                let job_id = Self::create_streaming_job_obj(
371                    &txn,
372                    ObjectType::Source,
373                    src.owner as _,
374                    Some(src.database_id),
375                    Some(src.schema_id),
376                    create_type,
377                    ctx.clone(),
378                    streaming_parallelism,
379                    max_parallelism,
380                    resource_type,
381                    backfill_parallelism.clone(),
382                )
383                .await?;
384                src.id = job_id.as_shared_source_id();
385                let source_model: source::ActiveModel = src.clone().into();
386                Source::insert(source_model).exec(&txn).await?;
387            }
388        }
389
390        // collect dependent secrets.
391        dependencies.extend(
392            streaming_job
393                .dependent_secret_ids()?
394                .into_iter()
395                .map(|id| id.as_object_id()),
396        );
397        // collect dependent connection
398        dependencies.extend(
399            streaming_job
400                .dependent_connection_ids()?
401                .into_iter()
402                .map(|id| id.as_object_id()),
403        );
404
405        // record object dependency.
406        if !dependencies.is_empty() {
407            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
408                object_dependency::ActiveModel {
409                    oid: Set(oid),
410                    used_by: Set(streaming_job.id().as_object_id()),
411                    ..Default::default()
412                }
413            }))
414            .exec(&txn)
415            .await?;
416        }
417
418        txn.commit().await?;
419
420        Ok(())
421    }
422
423    /// Create catalogs for internal tables, then notify frontend about them if the job is a
424    /// materialized view.
425    ///
426    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
427    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
428    ///
429    /// Returns a mapping from the temporary table id to the actual global table id.
430    pub async fn create_internal_table_catalog(
431        &self,
432        job: &StreamingJob,
433        mut incomplete_internal_tables: Vec<PbTable>,
434    ) -> MetaResult<HashMap<TableId, TableId>> {
435        let job_id = job.id();
436        let inner = self.inner.write().await;
437        let txn = inner.db.begin().await?;
438
439        // Ensure the job exists.
440        ensure_job_not_canceled(job_id, &txn).await?;
441
442        let mut table_id_map = HashMap::new();
443        for table in &mut incomplete_internal_tables {
444            let table_id = Self::create_object(
445                &txn,
446                ObjectType::Table,
447                table.owner as _,
448                Some(table.database_id),
449                Some(table.schema_id),
450            )
451            .await?
452            .oid
453            .as_table_id();
454            table_id_map.insert(table.id, table_id);
455            table.id = table_id;
456            table.job_id = Some(job_id);
457
458            let table_model = table::ActiveModel {
459                table_id: Set(table_id),
460                belongs_to_job_id: Set(Some(job_id)),
461                fragment_id: NotSet,
462                ..table.clone().into()
463            };
464            Table::insert(table_model).exec(&txn).await?;
465        }
466        txn.commit().await?;
467
468        Ok(table_id_map)
469    }
470
471    pub async fn prepare_stream_job_fragments(
472        &self,
473        stream_job_fragments: &StreamJobFragmentsToCreate,
474        streaming_job: &StreamingJob,
475        for_replace: bool,
476        backfill_orders: Option<BackfillOrders>,
477    ) -> MetaResult<()> {
478        self.prepare_streaming_job(
479            stream_job_fragments.stream_job_id(),
480            || stream_job_fragments.fragments.values(),
481            &stream_job_fragments.downstreams,
482            for_replace,
483            Some(streaming_job),
484            backfill_orders,
485        )
486        .await
487    }
488
489    // TODO: In this function, we also update the `Table` model in the meta store.
490    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
491    // making them the source of truth and performing a full replacement for those in the meta store?
492    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
493    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
494    )]
495    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
496        &self,
497        job_id: JobId,
498        get_fragments: impl Fn() -> I + 'a,
499        downstreams: &FragmentDownstreamRelation,
500        for_replace: bool,
501        creating_streaming_job: Option<&'a StreamingJob>,
502        backfill_orders: Option<BackfillOrders>,
503    ) -> MetaResult<()> {
504        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
505
506        let inner = self.inner.write().await;
507
508        let need_notify = creating_streaming_job
509            .map(|job| job.should_notify_creating())
510            .unwrap_or(false);
511        let definition = creating_streaming_job.map(|job| job.definition());
512
513        let mut objects_to_notify = vec![];
514        let txn = inner.db.begin().await?;
515
516        // Ensure the job exists.
517        ensure_job_not_canceled(job_id, &txn).await?;
518
519        if let Some(backfill_orders) = backfill_orders {
520            let job = streaming_job::ActiveModel {
521                job_id: Set(job_id),
522                backfill_orders: Set(Some(backfill_orders)),
523                ..Default::default()
524            };
525            StreamingJobModel::update(job).exec(&txn).await?;
526        }
527
528        let state_table_ids = fragments
529            .iter()
530            .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
531            .collect_vec();
532
533        if !fragments.is_empty() {
534            let fragment_models = fragments
535                .into_iter()
536                .map(|fragment| fragment.into_active_model())
537                .collect_vec();
538            Fragment::insert_many(fragment_models).exec(&txn).await?;
539        }
540
541        // Fields including `fragment_id` and `vnode_count` were placeholder values before.
542        // After table fragments are created, update them for all tables.
543        if !for_replace {
544            let all_tables = StreamJobFragments::collect_tables(get_fragments());
545            for state_table_id in state_table_ids {
546                // Table's vnode count is not always the fragment's vnode count, so we have to
547                // look up the table from `TableFragments`.
548                // See `ActorGraphBuilder::new`.
549                let table = all_tables
550                    .get(&state_table_id)
551                    .unwrap_or_else(|| panic!("table {} not found", state_table_id));
552                assert_eq!(table.id, state_table_id);
553                let vnode_count = table.vnode_count();
554
555                Table::update(table::ActiveModel {
556                    table_id: Set(state_table_id as _),
557                    fragment_id: Set(Some(table.fragment_id)),
558                    vnode_count: Set(vnode_count as _),
559                    ..Default::default()
560                })
561                .exec(&txn)
562                .await?;
563
564                if need_notify {
565                    let mut table = table.clone();
566                    // In production, definition was replaced but still needed for notification.
567                    if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
568                        table.definition = definition.clone().unwrap();
569                    }
570                    objects_to_notify.push(PbObject {
571                        object_info: Some(PbObjectInfo::Table(table)),
572                    });
573                }
574            }
575        }
576
577        // Add streaming job objects to notification
578        if need_notify {
579            match creating_streaming_job.unwrap() {
580                StreamingJob::Table(Some(source), ..) => {
581                    objects_to_notify.push(PbObject {
582                        object_info: Some(PbObjectInfo::Source(source.clone())),
583                    });
584                }
585                StreamingJob::Sink(sink) => {
586                    objects_to_notify.push(PbObject {
587                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
588                    });
589                }
590                StreamingJob::Index(index, _) => {
591                    objects_to_notify.push(PbObject {
592                        object_info: Some(PbObjectInfo::Index(index.clone())),
593                    });
594                }
595                _ => {}
596            }
597        }
598
599        insert_fragment_relations(&txn, downstreams).await?;
600
601        if !for_replace {
602            // Update dml fragment id.
603            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
604                Table::update(table::ActiveModel {
605                    table_id: Set(table.id),
606                    dml_fragment_id: Set(table.dml_fragment_id),
607                    ..Default::default()
608                })
609                .exec(&txn)
610                .await?;
611            }
612        }
613
614        txn.commit().await?;
615
616        // FIXME: there's a gap between the catalog creation and notification, which may lead to
617        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
618        // need to either forward the notification or filter catalogs without any fragments in the metastore.
619        if !objects_to_notify.is_empty() {
620            self.notify_frontend(
621                Operation::Add,
622                Info::ObjectGroup(PbObjectGroup {
623                    objects: objects_to_notify,
624                    dependencies: vec![],
625                }),
626            )
627            .await;
628        }
629
630        Ok(())
631    }
632
633    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
634    /// information is required to build barrier mutation.
635    pub async fn build_cancel_command(
636        &self,
637        table_fragments: &StreamJobFragments,
638    ) -> MetaResult<Command> {
639        let inner = self.inner.read().await;
640        let txn = inner.db.begin().await?;
641
642        let dropped_sink_fragment_with_target =
643            if let Some(sink_fragment) = table_fragments.sink_fragment() {
644                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
645                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
646                sink_target_fragment
647                    .get(&sink_fragment_id)
648                    .map(|target_fragments| {
649                        let target_fragment_id = *target_fragments
650                            .first()
651                            .expect("sink should have at least one downstream fragment");
652                        (sink_fragment_id, target_fragment_id)
653                    })
654            } else {
655                None
656            };
657
658        Ok(Command::DropStreamingJobs {
659            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
660            actors: table_fragments.actor_ids().collect(),
661            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
662            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
663            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
664                .into_iter()
665                .map(|(sink, target)| (target as _, vec![sink as _]))
666                .collect(),
667        })
668    }
669
670    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
671    /// It returns (true, _) if the job is not found or aborted.
672    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
673    #[await_tree::instrument]
674    pub async fn try_abort_creating_streaming_job(
675        &self,
676        mut job_id: JobId,
677        is_cancelled: bool,
678    ) -> MetaResult<(bool, Option<DatabaseId>)> {
679        let mut inner = self.inner.write().await;
680        let txn = inner.db.begin().await?;
681
682        let obj = Object::find_by_id(job_id).one(&txn).await?;
683        let Some(obj) = obj else {
684            tracing::warn!(
685                id = %job_id,
686                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
687            );
688            return Ok((true, None));
689        };
690        let database_id = obj
691            .database_id
692            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
693        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
694
695        if !is_cancelled && let Some(streaming_job) = &streaming_job {
696            assert_ne!(streaming_job.job_status, JobStatus::Created);
697            if streaming_job.create_type == CreateType::Background
698                && streaming_job.job_status == JobStatus::Creating
699            {
700                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
701                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
702                {
703                    // If the job belongs to an iceberg table, we still need to clean it.
704                } else {
705                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
706                    tracing::warn!(
707                        id = %job_id,
708                        "streaming job is created in background and still in creating status"
709                    );
710                    return Ok((false, Some(database_id)));
711                }
712            }
713        }
714
715        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
716        let original_job_id = job_id;
717        let original_obj_type = obj.obj_type;
718
719        let iceberg_table_id =
720            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
721        if let Some(iceberg_table_id) = iceberg_table_id {
722            // If the job is iceberg sink, we need to clean the iceberg table as well.
723            // Here we will drop the sink objects directly.
724            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
725            Object::delete_many()
726                .filter(
727                    object::Column::Oid
728                        .eq(job_id)
729                        .or(object::Column::Oid.is_in(internal_tables)),
730                )
731                .exec(&txn)
732                .await?;
733            job_id = iceberg_table_id.as_job_id();
734        };
735
736        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
737
738        // Get the notification info if the job is a materialized view or created in the background.
739        let mut objs = vec![];
740        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
741
742        let mut need_notify =
743            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
744        if !need_notify {
745            if let Some(table) = &table_obj {
746                need_notify = table.table_type == TableType::MaterializedView;
747            } else if original_obj_type == ObjectType::Sink {
748                need_notify = true;
749            }
750        }
751
752        if is_cancelled {
753            let dropped_tables = Table::find()
754                .find_also_related(Object)
755                .filter(
756                    table::Column::TableId.is_in(
757                        internal_table_ids
758                            .iter()
759                            .cloned()
760                            .chain(table_obj.iter().map(|t| t.table_id as _)),
761                    ),
762                )
763                .all(&txn)
764                .await?
765                .into_iter()
766                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
767            inner
768                .dropped_tables
769                .extend(dropped_tables.map(|t| (t.id, t)));
770        }
771
772        if need_notify {
773            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
774            // Ensure we still notify the frontend to delete the original sink object.
775            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
776                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
777                    .select_only()
778                    .columns([
779                        object::Column::Oid,
780                        object::Column::ObjType,
781                        object::Column::SchemaId,
782                        object::Column::DatabaseId,
783                    ])
784                    .into_partial_model()
785                    .one(&txn)
786                    .await?;
787                if let Some(orig_obj) = orig_obj {
788                    objs.push(orig_obj);
789                }
790            }
791
792            let obj: Option<PartialObject> = Object::find_by_id(job_id)
793                .select_only()
794                .columns([
795                    object::Column::Oid,
796                    object::Column::ObjType,
797                    object::Column::SchemaId,
798                    object::Column::DatabaseId,
799                ])
800                .into_partial_model()
801                .one(&txn)
802                .await?;
803            let obj =
804                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
805            objs.push(obj);
806            let internal_table_objs: Vec<PartialObject> = Object::find()
807                .select_only()
808                .columns([
809                    object::Column::Oid,
810                    object::Column::ObjType,
811                    object::Column::SchemaId,
812                    object::Column::DatabaseId,
813                ])
814                .join(JoinType::InnerJoin, object::Relation::Table.def())
815                .filter(table::Column::BelongsToJobId.eq(job_id))
816                .into_partial_model()
817                .all(&txn)
818                .await?;
819            objs.extend(internal_table_objs);
820        }
821
822        // Check if the job is creating sink into table.
823        if table_obj.is_none()
824            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
825                .select_only()
826                .column(sink::Column::TargetTable)
827                .into_tuple::<Option<TableId>>()
828                .one(&txn)
829                .await?
830        {
831            let tmp_id: Option<ObjectId> = ObjectDependency::find()
832                .select_only()
833                .column(object_dependency::Column::UsedBy)
834                .join(
835                    JoinType::InnerJoin,
836                    object_dependency::Relation::Object1.def(),
837                )
838                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
839                .filter(
840                    object_dependency::Column::Oid
841                        .eq(target_table_id)
842                        .and(object::Column::ObjType.eq(ObjectType::Table))
843                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
844                )
845                .into_tuple()
846                .one(&txn)
847                .await?;
848            if let Some(tmp_id) = tmp_id {
849                tracing::warn!(
850                    id = %tmp_id,
851                    "aborting temp streaming job for sink into table"
852                );
853
854                Object::delete_by_id(tmp_id).exec(&txn).await?;
855            }
856        }
857
858        Object::delete_by_id(job_id).exec(&txn).await?;
859        if !internal_table_ids.is_empty() {
860            Object::delete_many()
861                .filter(object::Column::Oid.is_in(internal_table_ids))
862                .exec(&txn)
863                .await?;
864        }
865        if let Some(t) = &table_obj
866            && let Some(source_id) = t.optional_associated_source_id
867        {
868            Object::delete_by_id(source_id).exec(&txn).await?;
869        }
870
871        let err = if is_cancelled {
872            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
873        } else {
874            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
875        };
876        let abort_reason = format!("streaming job aborted {}", err.as_report());
877        for tx in inner
878            .creating_table_finish_notifier
879            .get_mut(&database_id)
880            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
881            .into_iter()
882            .flatten()
883            .flatten()
884        {
885            let _ = tx.send(Err(abort_reason.clone()));
886        }
887        txn.commit().await?;
888
889        if !objs.is_empty() {
890            // We also have notified the frontend about these objects,
891            // so we need to notify the frontend to delete them here.
892            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
893                .await;
894        }
895        Ok((true, Some(database_id)))
896    }
897
898    #[await_tree::instrument]
899    pub async fn post_collect_job_fragments(
900        &self,
901        job_id: JobId,
902        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
903        new_sink_downstream: Option<FragmentDownstreamRelation>,
904        split_assignment: Option<&SplitAssignment>,
905    ) -> MetaResult<()> {
906        let inner = self.inner.write().await;
907        let txn = inner.db.begin().await?;
908
909        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
910
911        if let Some(new_downstream) = new_sink_downstream {
912            insert_fragment_relations(&txn, &new_downstream).await?;
913        }
914
915        // Mark job as CREATING.
916        StreamingJobModel::update(streaming_job::ActiveModel {
917            job_id: Set(job_id),
918            job_status: Set(JobStatus::Creating),
919            ..Default::default()
920        })
921        .exec(&txn)
922        .await?;
923
924        if let Some(split_assignment) = split_assignment {
925            let fragment_splits = split_assignment
926                .iter()
927                .map(|(fragment_id, splits)| {
928                    (
929                        *fragment_id as _,
930                        splits.values().flatten().cloned().collect_vec(),
931                    )
932                })
933                .collect();
934
935            self.update_fragment_splits(&txn, &fragment_splits).await?;
936        }
937
938        txn.commit().await?;
939
940        Ok(())
941    }
942
943    pub async fn create_job_catalog_for_replace(
944        &self,
945        streaming_job: &StreamingJob,
946        ctx: Option<&StreamContext>,
947        specified_parallelism: Option<&NonZeroUsize>,
948        expected_original_max_parallelism: Option<usize>,
949    ) -> MetaResult<JobId> {
950        let id = streaming_job.id();
951        let inner = self.inner.write().await;
952        let txn = inner.db.begin().await?;
953
954        // 1. check version.
955        streaming_job.verify_version_for_replace(&txn).await?;
956        // 2. check concurrent replace.
957        let referring_cnt = ObjectDependency::find()
958            .join(
959                JoinType::InnerJoin,
960                object_dependency::Relation::Object1.def(),
961            )
962            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
963            .filter(
964                object_dependency::Column::Oid
965                    .eq(id)
966                    .and(object::Column::ObjType.eq(ObjectType::Table))
967                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
968            )
969            .count(&txn)
970            .await?;
971        if referring_cnt != 0 {
972            return Err(MetaError::permission_denied(
973                "job is being altered or referenced by some creating jobs",
974            ));
975        }
976
977        // 3. check parallelism.
978        let (
979            original_max_parallelism,
980            original_timezone,
981            original_config_override,
982            original_adaptive_strategy,
983        ): (i32, Option<String>, Option<String>, Option<String>) =
984            StreamingJobModel::find_by_id(id)
985                .select_only()
986                .column(streaming_job::Column::MaxParallelism)
987                .column(streaming_job::Column::Timezone)
988                .column(streaming_job::Column::ConfigOverride)
989                .column(streaming_job::Column::AdaptiveParallelismStrategy)
990                .into_tuple()
991                .one(&txn)
992                .await?
993                .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
994
995        if let Some(max_parallelism) = expected_original_max_parallelism
996            && original_max_parallelism != max_parallelism as i32
997        {
998            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
999            // This should not happen in normal cases.
1000            bail!(
1001                "cannot use a different max parallelism \
1002                 when replacing streaming job, \
1003                 original: {}, new: {}",
1004                original_max_parallelism,
1005                max_parallelism
1006            );
1007        }
1008
1009        let parallelism = match specified_parallelism {
1010            None => StreamingParallelism::Adaptive,
1011            Some(n) => StreamingParallelism::Fixed(n.get() as _),
1012        };
1013
1014        let ctx = StreamContext {
1015            timezone: ctx
1016                .map(|ctx| ctx.timezone.clone())
1017                .unwrap_or(original_timezone),
1018            // We don't expect replacing a job with a different config override.
1019            // Thus always use the original config override.
1020            config_override: original_config_override.unwrap_or_default().into(),
1021            adaptive_parallelism_strategy: original_adaptive_strategy.as_deref().map(|s| {
1022                parse_strategy(s).expect("strategy should be validated before persisting")
1023            }),
1024        };
1025
1026        // 4. create streaming object for new replace table.
1027        let new_obj_id = Self::create_streaming_job_obj(
1028            &txn,
1029            streaming_job.object_type(),
1030            streaming_job.owner() as _,
1031            Some(streaming_job.database_id() as _),
1032            Some(streaming_job.schema_id() as _),
1033            streaming_job.create_type(),
1034            ctx,
1035            parallelism,
1036            original_max_parallelism as _,
1037            streaming_job_resource_type::ResourceType::Regular(true),
1038            None,
1039        )
1040        .await?;
1041
1042        // 5. record dependency for new replace table.
1043        ObjectDependency::insert(object_dependency::ActiveModel {
1044            oid: Set(id.as_object_id()),
1045            used_by: Set(new_obj_id.as_object_id()),
1046            ..Default::default()
1047        })
1048        .exec(&txn)
1049        .await?;
1050
1051        txn.commit().await?;
1052
1053        Ok(new_obj_id)
1054    }
1055
1056    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1057    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1058        let mut inner = self.inner.write().await;
1059        let txn = inner.db.begin().await?;
1060
1061        // Check if the job belongs to iceberg table.
1062        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1063            tracing::info!(
1064                "streaming job {} is for iceberg table, wait for manual finish operation",
1065                job_id
1066            );
1067            return Ok(());
1068        }
1069
1070        let (notification_op, objects, updated_user_info, dependencies) =
1071            self.finish_streaming_job_inner(&txn, job_id).await?;
1072
1073        txn.commit().await?;
1074
1075        let mut version = self
1076            .notify_frontend(
1077                notification_op,
1078                NotificationInfo::ObjectGroup(PbObjectGroup {
1079                    objects,
1080                    dependencies,
1081                }),
1082            )
1083            .await;
1084
1085        // notify users about the default privileges
1086        if !updated_user_info.is_empty() {
1087            version = self.notify_users_update(updated_user_info).await;
1088        }
1089
1090        inner
1091            .creating_table_finish_notifier
1092            .values_mut()
1093            .for_each(|creating_tables| {
1094                if let Some(txs) = creating_tables.remove(&job_id) {
1095                    for tx in txs {
1096                        let _ = tx.send(Ok(version));
1097                    }
1098                }
1099            });
1100
1101        Ok(())
1102    }
1103
1104    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1105    pub async fn finish_streaming_job_inner(
1106        &self,
1107        txn: &DatabaseTransaction,
1108        job_id: JobId,
1109    ) -> MetaResult<(
1110        Operation,
1111        Vec<risingwave_pb::meta::Object>,
1112        Vec<PbUserInfo>,
1113        Vec<PbObjectDependency>,
1114    )> {
1115        let job_type = Object::find_by_id(job_id)
1116            .select_only()
1117            .column(object::Column::ObjType)
1118            .into_tuple()
1119            .one(txn)
1120            .await?
1121            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1122
1123        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1124            .select_only()
1125            .column(streaming_job::Column::CreateType)
1126            .into_tuple()
1127            .one(txn)
1128            .await?
1129            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1130
1131        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1132        let res = Object::update_many()
1133            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1134            .col_expr(
1135                object::Column::CreatedAtClusterVersion,
1136                current_cluster_version().into(),
1137            )
1138            .filter(object::Column::Oid.eq(job_id))
1139            .exec(txn)
1140            .await?;
1141        if res.rows_affected == 0 {
1142            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1143        }
1144
1145        // mark the target stream job as `Created`.
1146        let job = streaming_job::ActiveModel {
1147            job_id: Set(job_id),
1148            job_status: Set(JobStatus::Created),
1149            ..Default::default()
1150        };
1151        let streaming_job = Some(job.update(txn).await?);
1152
1153        // notify frontend: job, internal tables.
1154        let internal_table_objs = Table::find()
1155            .find_also_related(Object)
1156            .filter(table::Column::BelongsToJobId.eq(job_id))
1157            .all(txn)
1158            .await?;
1159        let mut objects = internal_table_objs
1160            .iter()
1161            .map(|(table, obj)| PbObject {
1162                object_info: Some(PbObjectInfo::Table(
1163                    ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1164                )),
1165            })
1166            .collect_vec();
1167        let mut notification_op = if create_type == CreateType::Background {
1168            NotificationOperation::Update
1169        } else {
1170            NotificationOperation::Add
1171        };
1172        let mut updated_user_info = vec![];
1173        let mut need_grant_default_privileges = true;
1174
1175        match job_type {
1176            ObjectType::Table => {
1177                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1178                    .find_also_related(Object)
1179                    .one(txn)
1180                    .await?
1181                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1182                if table.table_type == TableType::MaterializedView {
1183                    notification_op = NotificationOperation::Update;
1184                }
1185
1186                if let Some(source_id) = table.optional_associated_source_id {
1187                    let (src, obj) = Source::find_by_id(source_id)
1188                        .find_also_related(Object)
1189                        .one(txn)
1190                        .await?
1191                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1192                    objects.push(PbObject {
1193                        object_info: Some(PbObjectInfo::Source(
1194                            ObjectModel(src, obj.unwrap(), None).into(),
1195                        )),
1196                    });
1197                }
1198                objects.push(PbObject {
1199                    object_info: Some(PbObjectInfo::Table(
1200                        ObjectModel(table, obj.unwrap(), streaming_job).into(),
1201                    )),
1202                });
1203            }
1204            ObjectType::Sink => {
1205                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1206                    .find_also_related(Object)
1207                    .one(txn)
1208                    .await?
1209                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1210                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1211                    need_grant_default_privileges = false;
1212                }
1213                // If sinks were pre-notified during CREATING, we should use Update at finish
1214                // to avoid duplicate Add notifications (align with MV behavior).
1215                notification_op = NotificationOperation::Update;
1216                objects.push(PbObject {
1217                    object_info: Some(PbObjectInfo::Sink(
1218                        ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1219                    )),
1220                });
1221            }
1222            ObjectType::Index => {
1223                need_grant_default_privileges = false;
1224                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1225                    .find_also_related(Object)
1226                    .one(txn)
1227                    .await?
1228                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1229                {
1230                    let (table, obj) = Table::find_by_id(index.index_table_id)
1231                        .find_also_related(Object)
1232                        .one(txn)
1233                        .await?
1234                        .ok_or_else(|| {
1235                            MetaError::catalog_id_not_found("table", index.index_table_id)
1236                        })?;
1237                    objects.push(PbObject {
1238                        object_info: Some(PbObjectInfo::Table(
1239                            ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1240                        )),
1241                    });
1242                }
1243
1244                // If the index is created on a table with privileges, we should also
1245                // grant the privileges for the index and its state tables.
1246                let primary_table_privileges = UserPrivilege::find()
1247                    .filter(
1248                        user_privilege::Column::Oid
1249                            .eq(index.primary_table_id)
1250                            .and(user_privilege::Column::Action.eq(Action::Select)),
1251                    )
1252                    .all(txn)
1253                    .await?;
1254                if !primary_table_privileges.is_empty() {
1255                    let index_state_table_ids: Vec<TableId> = Table::find()
1256                        .select_only()
1257                        .column(table::Column::TableId)
1258                        .filter(
1259                            table::Column::BelongsToJobId
1260                                .eq(job_id)
1261                                .or(table::Column::TableId.eq(index.index_table_id)),
1262                        )
1263                        .into_tuple()
1264                        .all(txn)
1265                        .await?;
1266                    let mut new_privileges = vec![];
1267                    for privilege in &primary_table_privileges {
1268                        for state_table_id in &index_state_table_ids {
1269                            new_privileges.push(user_privilege::ActiveModel {
1270                                id: Default::default(),
1271                                oid: Set(state_table_id.as_object_id()),
1272                                user_id: Set(privilege.user_id),
1273                                action: Set(Action::Select),
1274                                dependent_id: Set(privilege.dependent_id),
1275                                granted_by: Set(privilege.granted_by),
1276                                with_grant_option: Set(privilege.with_grant_option),
1277                            });
1278                        }
1279                    }
1280                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1281
1282                    updated_user_info = list_user_info_by_ids(
1283                        primary_table_privileges.into_iter().map(|p| p.user_id),
1284                        txn,
1285                    )
1286                    .await?;
1287                }
1288
1289                objects.push(PbObject {
1290                    object_info: Some(PbObjectInfo::Index(
1291                        ObjectModel(index, obj.unwrap(), streaming_job).into(),
1292                    )),
1293                });
1294            }
1295            ObjectType::Source => {
1296                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1297                    .find_also_related(Object)
1298                    .one(txn)
1299                    .await?
1300                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1301                objects.push(PbObject {
1302                    object_info: Some(PbObjectInfo::Source(
1303                        ObjectModel(source, obj.unwrap(), None).into(),
1304                    )),
1305                });
1306            }
1307            _ => unreachable!("invalid job type: {:?}", job_type),
1308        }
1309
1310        if need_grant_default_privileges {
1311            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1312        }
1313
1314        let dependencies =
1315            list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1316
1317        Ok((notification_op, objects, updated_user_info, dependencies))
1318    }
1319
1320    pub async fn finish_replace_streaming_job(
1321        &self,
1322        tmp_id: JobId,
1323        streaming_job: StreamingJob,
1324        replace_upstream: FragmentReplaceUpstream,
1325        sink_into_table_context: SinkIntoTableContext,
1326        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1327        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1328    ) -> MetaResult<NotificationVersion> {
1329        let inner = self.inner.write().await;
1330        let txn = inner.db.begin().await?;
1331
1332        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1333            tmp_id,
1334            replace_upstream,
1335            sink_into_table_context,
1336            &txn,
1337            streaming_job,
1338            drop_table_connector_ctx,
1339            auto_refresh_schema_sinks,
1340        )
1341        .await?;
1342
1343        txn.commit().await?;
1344
1345        let mut version = self
1346            .notify_frontend(
1347                NotificationOperation::Update,
1348                NotificationInfo::ObjectGroup(PbObjectGroup {
1349                    objects,
1350                    dependencies: vec![],
1351                }),
1352            )
1353            .await;
1354
1355        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1356            self.notify_users_update(user_infos).await;
1357            version = self
1358                .notify_frontend(
1359                    NotificationOperation::Delete,
1360                    build_object_group_for_delete(to_drop_objects),
1361                )
1362                .await;
1363        }
1364
1365        Ok(version)
1366    }
1367
1368    pub async fn finish_replace_streaming_job_inner(
1369        tmp_id: JobId,
1370        replace_upstream: FragmentReplaceUpstream,
1371        SinkIntoTableContext {
1372            updated_sink_catalogs,
1373        }: SinkIntoTableContext,
1374        txn: &DatabaseTransaction,
1375        streaming_job: StreamingJob,
1376        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1377        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1378    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1379        let original_job_id = streaming_job.id();
1380        let job_type = streaming_job.job_type();
1381
1382        let mut index_item_rewriter = None;
1383
1384        // Update catalog
1385        match streaming_job {
1386            StreamingJob::Table(_source, table, _table_job_type) => {
1387                // The source catalog should remain unchanged
1388
1389                let original_column_catalogs =
1390                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1391
1392                index_item_rewriter = Some({
1393                    let original_columns = original_column_catalogs
1394                        .to_protobuf()
1395                        .into_iter()
1396                        .map(|c| c.column_desc.unwrap())
1397                        .collect_vec();
1398                    let new_columns = table
1399                        .columns
1400                        .iter()
1401                        .map(|c| c.column_desc.clone().unwrap())
1402                        .collect_vec();
1403
1404                    IndexItemRewriter {
1405                        original_columns,
1406                        new_columns,
1407                    }
1408                });
1409
1410                // For sinks created in earlier versions, we need to set the original_target_columns.
1411                for sink_id in updated_sink_catalogs {
1412                    Sink::update(sink::ActiveModel {
1413                        sink_id: Set(sink_id as _),
1414                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1415                        ..Default::default()
1416                    })
1417                    .exec(txn)
1418                    .await?;
1419                }
1420                // Update the table catalog with the new one. (column catalog is also updated here)
1421                let mut table = table::ActiveModel::from(table);
1422                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1423                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1424                {
1425                    // drop table connector, the rest logic is in `drop_table_associated_source`
1426                    table.optional_associated_source_id = Set(None);
1427                }
1428
1429                Table::update(table).exec(txn).await?;
1430            }
1431            StreamingJob::Source(source) => {
1432                // Update the source catalog with the new one.
1433                let source = source::ActiveModel::from(source);
1434                Source::update(source).exec(txn).await?;
1435            }
1436            StreamingJob::MaterializedView(table) => {
1437                // Update the table catalog with the new one.
1438                let table = table::ActiveModel::from(table);
1439                Table::update(table).exec(txn).await?;
1440            }
1441            _ => unreachable!(
1442                "invalid streaming job type: {:?}",
1443                streaming_job.job_type_str()
1444            ),
1445        }
1446
1447        async fn finish_fragments(
1448            txn: &DatabaseTransaction,
1449            tmp_id: JobId,
1450            original_job_id: JobId,
1451            replace_upstream: FragmentReplaceUpstream,
1452        ) -> MetaResult<()> {
1453            // 0. update internal tables
1454            // Fields including `fragment_id` were placeholder values before.
1455            // After table fragments are created, update them for all internal tables.
1456            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1457                .select_only()
1458                .columns([
1459                    fragment::Column::FragmentId,
1460                    fragment::Column::StateTableIds,
1461                ])
1462                .filter(fragment::Column::JobId.eq(tmp_id))
1463                .into_tuple()
1464                .all(txn)
1465                .await?;
1466            for (fragment_id, state_table_ids) in fragment_info {
1467                for state_table_id in state_table_ids.into_inner() {
1468                    let state_table_id = TableId::new(state_table_id as _);
1469                    Table::update(table::ActiveModel {
1470                        table_id: Set(state_table_id),
1471                        fragment_id: Set(Some(fragment_id)),
1472                        // No need to update `vnode_count` because it must remain the same.
1473                        ..Default::default()
1474                    })
1475                    .exec(txn)
1476                    .await?;
1477                }
1478            }
1479
1480            // 1. replace old fragments/actors with new ones.
1481            Fragment::delete_many()
1482                .filter(fragment::Column::JobId.eq(original_job_id))
1483                .exec(txn)
1484                .await?;
1485            Fragment::update_many()
1486                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1487                .filter(fragment::Column::JobId.eq(tmp_id))
1488                .exec(txn)
1489                .await?;
1490
1491            // 2. update merges.
1492            // update downstream fragment's Merge node, and upstream_fragment_id
1493            for (fragment_id, fragment_replace_map) in replace_upstream {
1494                let (fragment_id, mut stream_node) =
1495                    Fragment::find_by_id(fragment_id as FragmentId)
1496                        .select_only()
1497                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1498                        .into_tuple::<(FragmentId, StreamNode)>()
1499                        .one(txn)
1500                        .await?
1501                        .map(|(id, node)| (id, node.to_protobuf()))
1502                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1503
1504                visit_stream_node_mut(&mut stream_node, |body| {
1505                    if let PbNodeBody::Merge(m) = body
1506                        && let Some(new_fragment_id) =
1507                            fragment_replace_map.get(&m.upstream_fragment_id)
1508                    {
1509                        m.upstream_fragment_id = *new_fragment_id;
1510                    }
1511                });
1512                Fragment::update(fragment::ActiveModel {
1513                    fragment_id: Set(fragment_id),
1514                    stream_node: Set(StreamNode::from(&stream_node)),
1515                    ..Default::default()
1516                })
1517                .exec(txn)
1518                .await?;
1519            }
1520
1521            // 3. remove dummy object.
1522            Object::delete_by_id(tmp_id).exec(txn).await?;
1523
1524            Ok(())
1525        }
1526
1527        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1528
1529        // 4. update catalogs and notify.
1530        let mut objects = vec![];
1531        match job_type {
1532            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1533                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1534                    .find_also_related(Object)
1535                    .one(txn)
1536                    .await?
1537                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1538                let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1539                    .one(txn)
1540                    .await?;
1541                objects.push(PbObject {
1542                    object_info: Some(PbObjectInfo::Table(
1543                        ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1544                    )),
1545                })
1546            }
1547            StreamingJobType::Source => {
1548                let (source, source_obj) =
1549                    Source::find_by_id(original_job_id.as_shared_source_id())
1550                        .find_also_related(Object)
1551                        .one(txn)
1552                        .await?
1553                        .ok_or_else(|| {
1554                            MetaError::catalog_id_not_found("object", original_job_id)
1555                        })?;
1556                objects.push(PbObject {
1557                    object_info: Some(PbObjectInfo::Source(
1558                        ObjectModel(source, source_obj.unwrap(), None).into(),
1559                    )),
1560                })
1561            }
1562            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1563        }
1564
1565        if let Some(expr_rewriter) = index_item_rewriter {
1566            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1567                .select_only()
1568                .columns([index::Column::IndexId, index::Column::IndexItems])
1569                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1570                .into_tuple()
1571                .all(txn)
1572                .await?;
1573            for (index_id, nodes) in index_items {
1574                let mut pb_nodes = nodes.to_protobuf();
1575                pb_nodes
1576                    .iter_mut()
1577                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1578                let index = index::ActiveModel {
1579                    index_id: Set(index_id),
1580                    index_items: Set(pb_nodes.into()),
1581                    ..Default::default()
1582                }
1583                .update(txn)
1584                .await?;
1585                let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1586                    .find_also_related(streaming_job::Entity)
1587                    .one(txn)
1588                    .await?
1589                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1590                objects.push(PbObject {
1591                    object_info: Some(PbObjectInfo::Index(
1592                        ObjectModel(index, index_obj, streaming_job).into(),
1593                    )),
1594                });
1595            }
1596        }
1597
1598        if let Some(sinks) = auto_refresh_schema_sinks {
1599            for finish_sink_context in sinks {
1600                finish_fragments(
1601                    txn,
1602                    finish_sink_context.tmp_sink_id.as_job_id(),
1603                    finish_sink_context.original_sink_id.as_job_id(),
1604                    Default::default(),
1605                )
1606                .await?;
1607                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1608                    .find_also_related(Object)
1609                    .one(txn)
1610                    .await?
1611                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1612                let sink_streaming_job =
1613                    streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1614                        .one(txn)
1615                        .await?;
1616                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1617                Sink::update(sink::ActiveModel {
1618                    sink_id: Set(finish_sink_context.original_sink_id),
1619                    columns: Set(columns.clone()),
1620                    ..Default::default()
1621                })
1622                .exec(txn)
1623                .await?;
1624                sink.columns = columns;
1625                objects.push(PbObject {
1626                    object_info: Some(PbObjectInfo::Sink(
1627                        ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1628                    )),
1629                });
1630                if let Some((log_store_table_id, new_log_store_table_columns)) =
1631                    finish_sink_context.new_log_store_table
1632                {
1633                    let new_log_store_table_columns: ColumnCatalogArray =
1634                        new_log_store_table_columns.into();
1635                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1636                        .find_also_related(Object)
1637                        .one(txn)
1638                        .await?
1639                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1640                    Table::update(table::ActiveModel {
1641                        table_id: Set(log_store_table_id),
1642                        columns: Set(new_log_store_table_columns.clone()),
1643                        ..Default::default()
1644                    })
1645                    .exec(txn)
1646                    .await?;
1647                    table.columns = new_log_store_table_columns;
1648                    objects.push(PbObject {
1649                        object_info: Some(PbObjectInfo::Table(
1650                            ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1651                                .into(),
1652                        )),
1653                    });
1654                }
1655            }
1656        }
1657
1658        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1659        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1660            notification_objs =
1661                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1662        }
1663
1664        Ok((objects, notification_objs))
1665    }
1666
1667    /// Abort the replacing streaming job by deleting the temporary job object.
1668    pub async fn try_abort_replacing_streaming_job(
1669        &self,
1670        tmp_job_id: JobId,
1671        tmp_sink_ids: Option<Vec<ObjectId>>,
1672    ) -> MetaResult<()> {
1673        let inner = self.inner.write().await;
1674        let txn = inner.db.begin().await?;
1675        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1676        if let Some(tmp_sink_ids) = tmp_sink_ids {
1677            for tmp_sink_id in tmp_sink_ids {
1678                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1679            }
1680        }
1681        txn.commit().await?;
1682        Ok(())
1683    }
1684
1685    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1686    // return the actor_ids to be applied
1687    pub async fn update_source_rate_limit_by_source_id(
1688        &self,
1689        source_id: SourceId,
1690        rate_limit: Option<u32>,
1691    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1692        let inner = self.inner.read().await;
1693        let txn = inner.db.begin().await?;
1694
1695        {
1696            let active_source = source::ActiveModel {
1697                source_id: Set(source_id),
1698                rate_limit: Set(rate_limit.map(|v| v as i32)),
1699                ..Default::default()
1700            };
1701            Source::update(active_source).exec(&txn).await?;
1702        }
1703
1704        let (source, obj) = Source::find_by_id(source_id)
1705            .find_also_related(Object)
1706            .one(&txn)
1707            .await?
1708            .ok_or_else(|| {
1709                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1710            })?;
1711
1712        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1713        let streaming_job_ids: Vec<JobId> =
1714            if let Some(table_id) = source.optional_associated_table_id {
1715                vec![table_id.as_job_id()]
1716            } else if let Some(source_info) = &source.source_info
1717                && source_info.to_protobuf().is_shared()
1718            {
1719                vec![source_id.as_share_source_job_id()]
1720            } else {
1721                ObjectDependency::find()
1722                    .select_only()
1723                    .column(object_dependency::Column::UsedBy)
1724                    .filter(object_dependency::Column::Oid.eq(source_id))
1725                    .into_tuple()
1726                    .all(&txn)
1727                    .await?
1728            };
1729
1730        if streaming_job_ids.is_empty() {
1731            return Err(MetaError::invalid_parameter(format!(
1732                "source id {source_id} not used by any streaming job"
1733            )));
1734        }
1735
1736        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1737            .select_only()
1738            .columns([
1739                fragment::Column::FragmentId,
1740                fragment::Column::JobId,
1741                fragment::Column::FragmentTypeMask,
1742                fragment::Column::StreamNode,
1743            ])
1744            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1745            .into_tuple()
1746            .all(&txn)
1747            .await?;
1748        let mut fragments = fragments
1749            .into_iter()
1750            .map(|(id, job_id, mask, stream_node)| {
1751                (
1752                    id,
1753                    job_id,
1754                    FragmentTypeMask::from(mask as u32),
1755                    stream_node.to_protobuf(),
1756                )
1757            })
1758            .collect_vec();
1759
1760        fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1761            let mut found = false;
1762            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1763                visit_stream_node_mut(stream_node, |node| {
1764                    if let PbNodeBody::Source(node) = node
1765                        && let Some(node_inner) = &mut node.source_inner
1766                        && node_inner.source_id == source_id
1767                    {
1768                        node_inner.rate_limit = rate_limit;
1769                        found = true;
1770                    }
1771                });
1772            }
1773            if is_fs_source {
1774                // in older versions, there's no fragment type flag for `FsFetch` node,
1775                // so we just scan all fragments for StreamFsFetch node if using fs connector
1776                visit_stream_node_mut(stream_node, |node| {
1777                    if let PbNodeBody::StreamFsFetch(node) = node {
1778                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1779                        if let Some(node_inner) = &mut node.node_inner
1780                            && node_inner.source_id == source_id
1781                        {
1782                            node_inner.rate_limit = rate_limit;
1783                            found = true;
1784                        }
1785                    }
1786                });
1787            }
1788            found
1789        });
1790
1791        assert!(
1792            !fragments.is_empty(),
1793            "source id should be used by at least one fragment"
1794        );
1795
1796        let (fragment_ids, job_ids) = fragments
1797            .iter()
1798            .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1799            .unzip();
1800
1801        for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1802            Fragment::update(fragment::ActiveModel {
1803                fragment_id: Set(fragment_id),
1804                fragment_type_mask: Set(fragment_type_mask.into()),
1805                stream_node: Set(StreamNode::from(&stream_node)),
1806                ..Default::default()
1807            })
1808            .exec(&txn)
1809            .await?;
1810        }
1811
1812        txn.commit().await?;
1813
1814        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
1815        let _version = self
1816            .notify_frontend(
1817                NotificationOperation::Update,
1818                NotificationInfo::ObjectGroup(PbObjectGroup {
1819                    objects: vec![PbObject {
1820                        object_info: Some(relation_info),
1821                    }],
1822                    dependencies: vec![],
1823                }),
1824            )
1825            .await;
1826
1827        Ok((job_ids, fragment_ids))
1828    }
1829
1830    // edit the content of fragments in given `table_id`
1831    // return the actor_ids to be applied
1832    pub async fn mutate_fragments_by_job_id(
1833        &self,
1834        job_id: JobId,
1835        // returns true if the mutation is applied
1836        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1837        // error message when no relevant fragments is found
1838        err_msg: &'static str,
1839    ) -> MetaResult<HashSet<FragmentId>> {
1840        let inner = self.inner.read().await;
1841        let txn = inner.db.begin().await?;
1842
1843        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1844            .select_only()
1845            .columns([
1846                fragment::Column::FragmentId,
1847                fragment::Column::FragmentTypeMask,
1848                fragment::Column::StreamNode,
1849            ])
1850            .filter(fragment::Column::JobId.eq(job_id))
1851            .into_tuple()
1852            .all(&txn)
1853            .await?;
1854        let mut fragments = fragments
1855            .into_iter()
1856            .map(|(id, mask, stream_node)| {
1857                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1858            })
1859            .collect_vec();
1860
1861        let fragments = fragments
1862            .iter_mut()
1863            .map(|(_, fragment_type_mask, stream_node)| {
1864                fragments_mutation_fn(*fragment_type_mask, stream_node)
1865            })
1866            .collect::<MetaResult<Vec<bool>>>()?
1867            .into_iter()
1868            .zip_eq_debug(std::mem::take(&mut fragments))
1869            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1870            .collect::<Vec<_>>();
1871
1872        if fragments.is_empty() {
1873            return Err(MetaError::invalid_parameter(format!(
1874                "job id {job_id}: {}",
1875                err_msg
1876            )));
1877        }
1878
1879        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1880        for (id, _, stream_node) in fragments {
1881            Fragment::update(fragment::ActiveModel {
1882                fragment_id: Set(id),
1883                stream_node: Set(StreamNode::from(&stream_node)),
1884                ..Default::default()
1885            })
1886            .exec(&txn)
1887            .await?;
1888        }
1889
1890        txn.commit().await?;
1891
1892        Ok(fragment_ids)
1893    }
1894
1895    async fn mutate_fragment_by_fragment_id(
1896        &self,
1897        fragment_id: FragmentId,
1898        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1899        err_msg: &'static str,
1900    ) -> MetaResult<()> {
1901        let inner = self.inner.read().await;
1902        let txn = inner.db.begin().await?;
1903
1904        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1905            Fragment::find_by_id(fragment_id)
1906                .select_only()
1907                .columns([
1908                    fragment::Column::FragmentTypeMask,
1909                    fragment::Column::StreamNode,
1910                ])
1911                .into_tuple()
1912                .one(&txn)
1913                .await?
1914                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1915        let mut pb_stream_node = stream_node.to_protobuf();
1916        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1917
1918        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1919            return Err(MetaError::invalid_parameter(format!(
1920                "fragment id {fragment_id}: {}",
1921                err_msg
1922            )));
1923        }
1924
1925        Fragment::update(fragment::ActiveModel {
1926            fragment_id: Set(fragment_id),
1927            stream_node: Set(stream_node),
1928            ..Default::default()
1929        })
1930        .exec(&txn)
1931        .await?;
1932
1933        txn.commit().await?;
1934
1935        Ok(())
1936    }
1937
1938    pub async fn update_backfill_orders_by_job_id(
1939        &self,
1940        job_id: JobId,
1941        backfill_orders: Option<BackfillOrders>,
1942    ) -> MetaResult<()> {
1943        let inner = self.inner.write().await;
1944        let txn = inner.db.begin().await?;
1945
1946        ensure_job_not_canceled(job_id, &txn).await?;
1947
1948        streaming_job::ActiveModel {
1949            job_id: Set(job_id),
1950            backfill_orders: Set(backfill_orders),
1951            ..Default::default()
1952        }
1953        .update(&txn)
1954        .await?;
1955
1956        txn.commit().await?;
1957
1958        Ok(())
1959    }
1960
1961    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1962    // return the actor_ids to be applied
1963    pub async fn update_backfill_rate_limit_by_job_id(
1964        &self,
1965        job_id: JobId,
1966        rate_limit: Option<u32>,
1967    ) -> MetaResult<HashSet<FragmentId>> {
1968        let update_backfill_rate_limit =
1969            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1970                let mut found = false;
1971                if fragment_type_mask
1972                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1973                {
1974                    visit_stream_node_mut(stream_node, |node| match node {
1975                        PbNodeBody::StreamCdcScan(node) => {
1976                            node.rate_limit = rate_limit;
1977                            found = true;
1978                        }
1979                        PbNodeBody::StreamScan(node) => {
1980                            node.rate_limit = rate_limit;
1981                            found = true;
1982                        }
1983                        PbNodeBody::SourceBackfill(node) => {
1984                            node.rate_limit = rate_limit;
1985                            found = true;
1986                        }
1987                        _ => {}
1988                    });
1989                }
1990                Ok(found)
1991            };
1992
1993        self.mutate_fragments_by_job_id(
1994            job_id,
1995            update_backfill_rate_limit,
1996            "stream scan node or source node not found",
1997        )
1998        .await
1999    }
2000
2001    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
2002    // return the actor_ids to be applied
2003    pub async fn update_sink_rate_limit_by_job_id(
2004        &self,
2005        sink_id: SinkId,
2006        rate_limit: Option<u32>,
2007    ) -> MetaResult<HashSet<FragmentId>> {
2008        let update_sink_rate_limit =
2009            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2010                let mut found = Ok(false);
2011                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2012                    visit_stream_node_mut(stream_node, |node| {
2013                        if let PbNodeBody::Sink(node) = node {
2014                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2015                                found = Err(MetaError::invalid_parameter(
2016                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2017                                ));
2018                                return;
2019                            }
2020                            node.rate_limit = rate_limit;
2021                            found = Ok(true);
2022                        }
2023                    });
2024                }
2025                found
2026            };
2027
2028        self.mutate_fragments_by_job_id(
2029            sink_id.as_job_id(),
2030            update_sink_rate_limit,
2031            "sink node not found",
2032        )
2033        .await
2034    }
2035
2036    pub async fn update_dml_rate_limit_by_job_id(
2037        &self,
2038        job_id: JobId,
2039        rate_limit: Option<u32>,
2040    ) -> MetaResult<HashSet<FragmentId>> {
2041        let update_dml_rate_limit =
2042            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2043                let mut found = false;
2044                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2045                    visit_stream_node_mut(stream_node, |node| {
2046                        if let PbNodeBody::Dml(node) = node {
2047                            node.rate_limit = rate_limit;
2048                            found = true;
2049                        }
2050                    });
2051                }
2052                Ok(found)
2053            };
2054
2055        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2056            .await
2057    }
2058
2059    pub async fn update_source_props_by_source_id(
2060        &self,
2061        source_id: SourceId,
2062        alter_props: BTreeMap<String, String>,
2063        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2064        skip_alter_on_fly_check: bool,
2065    ) -> MetaResult<WithOptionsSecResolved> {
2066        let inner = self.inner.read().await;
2067        let txn = inner.db.begin().await?;
2068
2069        let (source, _obj) = Source::find_by_id(source_id)
2070            .find_also_related(Object)
2071            .one(&txn)
2072            .await?
2073            .ok_or_else(|| {
2074                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2075            })?;
2076        let connector = source.with_properties.0.get_connector().unwrap();
2077        let is_shared_source = source.is_shared();
2078
2079        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2080        if !is_shared_source {
2081            // mv using non-shared source holds a copy of source in their fragments
2082            dep_source_job_ids = ObjectDependency::find()
2083                .select_only()
2084                .column(object_dependency::Column::UsedBy)
2085                .filter(object_dependency::Column::Oid.eq(source_id))
2086                .into_tuple()
2087                .all(&txn)
2088                .await?;
2089        }
2090
2091        // Validate that connector type is not being changed
2092        if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2093            && new_connector != &connector
2094        {
2095            return Err(MetaError::invalid_parameter(format!(
2096                "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2097                connector, new_connector
2098            )));
2099        }
2100
2101        // Only check alter-on-fly restrictions for SQL ALTER SOURCE, not for admin risectl operations
2102        if !skip_alter_on_fly_check {
2103            let prop_keys: Vec<String> = alter_props
2104                .keys()
2105                .chain(alter_secret_refs.keys())
2106                .cloned()
2107                .collect();
2108            risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2109                &connector, &prop_keys,
2110            )?;
2111        }
2112
2113        let mut options_with_secret = WithOptionsSecResolved::new(
2114            source.with_properties.0.clone(),
2115            source
2116                .secret_ref
2117                .map(|secret_ref| secret_ref.to_protobuf())
2118                .unwrap_or_default(),
2119        );
2120        let (to_add_secret_dep, to_remove_secret_dep) =
2121            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2122
2123        tracing::info!(
2124            "applying new properties to source: source_id={}, options_with_secret={:?}",
2125            source_id,
2126            options_with_secret
2127        );
2128        // check if the alter-ed props are valid for each Connector
2129        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2130        // todo: validate via source manager
2131
2132        let mut associate_table_id = None;
2133
2134        // can be source_id or table_id
2135        // if updating an associated source, the preferred_id is the table_id
2136        // otherwise, it is the source_id
2137        let mut preferred_id = source_id.as_object_id();
2138        let rewrite_sql = {
2139            let definition = source.definition.clone();
2140
2141            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2142                .map_err(|e| {
2143                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2144                        anyhow!(e).context("Failed to parse source definition SQL"),
2145                    )))
2146                })?
2147                .try_into()
2148                .unwrap();
2149
2150            /// Formats SQL options with secret values properly resolved
2151            ///
2152            /// This function processes configuration options that may contain sensitive data:
2153            /// - Plaintext options are directly converted to `SqlOption`
2154            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2155            ///   without exposing the actual secret value
2156            ///
2157            /// # Arguments
2158            /// * `txn` - Database transaction for retrieving secrets
2159            /// * `options_with_secret` - Container of options with both plaintext and secret values
2160            ///
2161            /// # Returns
2162            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2163            async fn format_with_option_secret_resolved(
2164                txn: &DatabaseTransaction,
2165                options_with_secret: &WithOptionsSecResolved,
2166            ) -> MetaResult<Vec<SqlOption>> {
2167                let mut options = Vec::new();
2168                for (k, v) in options_with_secret.as_plaintext() {
2169                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2170                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2171                    options.push(sql_option);
2172                }
2173                for (k, v) in options_with_secret.as_secret() {
2174                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2175                        let sql_option =
2176                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2177                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2178                        options.push(sql_option);
2179                    } else {
2180                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2181                    }
2182                }
2183                Ok(options)
2184            }
2185
2186            match &mut stmt {
2187                Statement::CreateSource { stmt } => {
2188                    stmt.with_properties.0 =
2189                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2190                }
2191                Statement::CreateTable { with_options, .. } => {
2192                    *with_options =
2193                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2194                    associate_table_id = source.optional_associated_table_id;
2195                    preferred_id = associate_table_id.unwrap().as_object_id();
2196                }
2197                _ => unreachable!(),
2198            }
2199
2200            stmt.to_string()
2201        };
2202
2203        {
2204            // Update secret dependencies atomically within the transaction.
2205            // Add new dependencies for secrets that are newly referenced.
2206            if !to_add_secret_dep.is_empty() {
2207                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2208                    object_dependency::ActiveModel {
2209                        oid: Set(secret_id.into()),
2210                        used_by: Set(preferred_id),
2211                        ..Default::default()
2212                    }
2213                }))
2214                .exec(&txn)
2215                .await?;
2216            }
2217            // Remove dependencies for secrets that are no longer referenced.
2218            // This allows the secrets to be deleted after this source no longer uses them.
2219            if !to_remove_secret_dep.is_empty() {
2220                let _ = ObjectDependency::delete_many()
2221                    .filter(
2222                        object_dependency::Column::Oid
2223                            .is_in(to_remove_secret_dep)
2224                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2225                    )
2226                    .exec(&txn)
2227                    .await?;
2228            }
2229        }
2230
2231        let active_source_model = source::ActiveModel {
2232            source_id: Set(source_id),
2233            definition: Set(rewrite_sql.clone()),
2234            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2235            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2236                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2237            ..Default::default()
2238        };
2239        Source::update(active_source_model).exec(&txn).await?;
2240
2241        if let Some(associate_table_id) = associate_table_id {
2242            // update the associated table statement accordly
2243            let active_table_model = table::ActiveModel {
2244                table_id: Set(associate_table_id),
2245                definition: Set(rewrite_sql),
2246                ..Default::default()
2247            };
2248            Table::update(active_table_model).exec(&txn).await?;
2249        }
2250
2251        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2252            // if updating table with connector, the fragment_id is table id
2253            associate_table_id.as_job_id()
2254        } else {
2255            source_id.as_share_source_job_id()
2256        }]
2257        .into_iter()
2258        .chain(dep_source_job_ids.into_iter())
2259        .collect_vec();
2260
2261        // update fragments
2262        update_connector_props_fragments(
2263            &txn,
2264            to_check_job_ids,
2265            FragmentTypeFlag::Source,
2266            |node, found| {
2267                if let PbNodeBody::Source(node) = node
2268                    && let Some(source_inner) = &mut node.source_inner
2269                {
2270                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2271                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2272                    *found = true;
2273                }
2274            },
2275            is_shared_source,
2276        )
2277        .await?;
2278
2279        let mut to_update_objs = Vec::with_capacity(2);
2280        let (source, obj) = Source::find_by_id(source_id)
2281            .find_also_related(Object)
2282            .one(&txn)
2283            .await?
2284            .ok_or_else(|| {
2285                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2286            })?;
2287        to_update_objs.push(PbObject {
2288            object_info: Some(PbObjectInfo::Source(
2289                ObjectModel(source, obj.unwrap(), None).into(),
2290            )),
2291        });
2292
2293        if let Some(associate_table_id) = associate_table_id {
2294            let (table, obj) = Table::find_by_id(associate_table_id)
2295                .find_also_related(Object)
2296                .one(&txn)
2297                .await?
2298                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2299            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2300                .one(&txn)
2301                .await?;
2302            to_update_objs.push(PbObject {
2303                object_info: Some(PbObjectInfo::Table(
2304                    ObjectModel(table, obj.unwrap(), streaming_job).into(),
2305                )),
2306            });
2307        }
2308
2309        txn.commit().await?;
2310
2311        self.notify_frontend(
2312            NotificationOperation::Update,
2313            NotificationInfo::ObjectGroup(PbObjectGroup {
2314                objects: to_update_objs,
2315                dependencies: vec![],
2316            }),
2317        )
2318        .await;
2319
2320        Ok(options_with_secret)
2321    }
2322
2323    pub async fn update_sink_props_by_sink_id(
2324        &self,
2325        sink_id: SinkId,
2326        props: BTreeMap<String, String>,
2327    ) -> MetaResult<HashMap<String, String>> {
2328        let inner = self.inner.read().await;
2329        let txn = inner.db.begin().await?;
2330
2331        let (sink, _obj) = Sink::find_by_id(sink_id)
2332            .find_also_related(Object)
2333            .one(&txn)
2334            .await?
2335            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2336        validate_sink_props(&sink, &props)?;
2337        let definition = sink.definition.clone();
2338        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2339            .map_err(|e| SinkError::Config(anyhow!(e)))?
2340            .try_into()
2341            .unwrap();
2342        if let Statement::CreateSink { stmt } = &mut stmt {
2343            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2344        } else {
2345            panic!("definition is not a create sink statement")
2346        }
2347        let mut new_config = sink.properties.clone().into_inner();
2348        new_config.extend(props.clone());
2349
2350        let definition = stmt.to_string();
2351        let active_sink = sink::ActiveModel {
2352            sink_id: Set(sink_id),
2353            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2354            definition: Set(definition),
2355            ..Default::default()
2356        };
2357        Sink::update(active_sink).exec(&txn).await?;
2358
2359        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2360        let (sink, obj) = Sink::find_by_id(sink_id)
2361            .find_also_related(Object)
2362            .one(&txn)
2363            .await?
2364            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2365        let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2366            .one(&txn)
2367            .await?;
2368        txn.commit().await?;
2369        let relation_infos = vec![PbObject {
2370            object_info: Some(PbObjectInfo::Sink(
2371                ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2372            )),
2373        }];
2374
2375        let _version = self
2376            .notify_frontend(
2377                NotificationOperation::Update,
2378                NotificationInfo::ObjectGroup(PbObjectGroup {
2379                    objects: relation_infos,
2380                    dependencies: vec![],
2381                }),
2382            )
2383            .await;
2384
2385        Ok(props.into_iter().collect())
2386    }
2387
2388    pub async fn update_iceberg_table_props_by_table_id(
2389        &self,
2390        table_id: TableId,
2391        props: BTreeMap<String, String>,
2392        alter_iceberg_table_props: Option<
2393            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2394        >,
2395    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2396        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2397            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2398        let inner = self.inner.read().await;
2399        let txn = inner.db.begin().await?;
2400
2401        let (sink, _obj) = Sink::find_by_id(sink_id)
2402            .find_also_related(Object)
2403            .one(&txn)
2404            .await?
2405            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2406        validate_sink_props(&sink, &props)?;
2407
2408        let definition = sink.definition.clone();
2409        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2410            .map_err(|e| SinkError::Config(anyhow!(e)))?
2411            .try_into()
2412            .unwrap();
2413        if let Statement::CreateTable {
2414            with_options,
2415            engine,
2416            ..
2417        } = &mut stmt
2418        {
2419            if !matches!(engine, Engine::Iceberg) {
2420                return Err(SinkError::Config(anyhow!(
2421                    "only iceberg table can be altered as sink"
2422                ))
2423                .into());
2424            }
2425            update_stmt_with_props(with_options, &props)?;
2426        } else {
2427            panic!("definition is not a create iceberg table statement")
2428        }
2429        let mut new_config = sink.properties.clone().into_inner();
2430        new_config.extend(props.clone());
2431
2432        let definition = stmt.to_string();
2433        let active_sink = sink::ActiveModel {
2434            sink_id: Set(sink_id),
2435            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2436            definition: Set(definition.clone()),
2437            ..Default::default()
2438        };
2439        let active_source = source::ActiveModel {
2440            source_id: Set(source_id),
2441            definition: Set(definition.clone()),
2442            ..Default::default()
2443        };
2444        let active_table = table::ActiveModel {
2445            table_id: Set(table_id),
2446            definition: Set(definition),
2447            ..Default::default()
2448        };
2449        Sink::update(active_sink).exec(&txn).await?;
2450        Source::update(active_source).exec(&txn).await?;
2451        Table::update(active_table).exec(&txn).await?;
2452
2453        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2454
2455        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2456            .find_also_related(Object)
2457            .one(&txn)
2458            .await?
2459            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2460        let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2461            .one(&txn)
2462            .await?;
2463        let (source, source_obj) = Source::find_by_id(source_id)
2464            .find_also_related(Object)
2465            .one(&txn)
2466            .await?
2467            .ok_or_else(|| {
2468                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2469            })?;
2470        let (table, table_obj) = Table::find_by_id(table_id)
2471            .find_also_related(Object)
2472            .one(&txn)
2473            .await?
2474            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2475        let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2476            .one(&txn)
2477            .await?;
2478        txn.commit().await?;
2479        let relation_infos = vec![
2480            PbObject {
2481                object_info: Some(PbObjectInfo::Sink(
2482                    ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2483                )),
2484            },
2485            PbObject {
2486                object_info: Some(PbObjectInfo::Source(
2487                    ObjectModel(source, source_obj.unwrap(), None).into(),
2488                )),
2489            },
2490            PbObject {
2491                object_info: Some(PbObjectInfo::Table(
2492                    ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2493                )),
2494            },
2495        ];
2496        let _version = self
2497            .notify_frontend(
2498                NotificationOperation::Update,
2499                NotificationInfo::ObjectGroup(PbObjectGroup {
2500                    objects: relation_infos,
2501                    dependencies: vec![],
2502                }),
2503            )
2504            .await;
2505
2506        Ok((props.into_iter().collect(), sink_id))
2507    }
2508
2509    /// Update connection properties and all dependent sources/sinks in a single transaction
2510    pub async fn update_connection_and_dependent_objects_props(
2511        &self,
2512        connection_id: ConnectionId,
2513        alter_props: BTreeMap<String, String>,
2514        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2515    ) -> MetaResult<(
2516        WithOptionsSecResolved,                   // Connection's new properties
2517        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2518        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2519    )> {
2520        let inner = self.inner.read().await;
2521        let txn = inner.db.begin().await?;
2522
2523        // Find all dependent sources and sinks first
2524        let dependent_sources: Vec<SourceId> = Source::find()
2525            .select_only()
2526            .column(source::Column::SourceId)
2527            .filter(source::Column::ConnectionId.eq(connection_id))
2528            .into_tuple()
2529            .all(&txn)
2530            .await?;
2531
2532        let dependent_sinks: Vec<SinkId> = Sink::find()
2533            .select_only()
2534            .column(sink::Column::SinkId)
2535            .filter(sink::Column::ConnectionId.eq(connection_id))
2536            .into_tuple()
2537            .all(&txn)
2538            .await?;
2539
2540        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2541            .find_also_related(Object)
2542            .one(&txn)
2543            .await?
2544            .ok_or_else(|| {
2545                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2546            })?;
2547
2548        // Validate that props can be altered
2549        let prop_keys: Vec<String> = alter_props
2550            .keys()
2551            .chain(alter_secret_refs.keys())
2552            .cloned()
2553            .collect();
2554
2555        // Map the connection type enum to the string name expected by the validation function
2556        let connection_type_str = pb_connection_type_to_connection_type(
2557            &connection_catalog.params.to_protobuf().connection_type(),
2558        )
2559        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2560
2561        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2562            connection_type_str, &prop_keys,
2563        )?;
2564
2565        let connection_pb = connection_catalog.params.to_protobuf();
2566        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2567            connection_pb.properties.into_iter().collect(),
2568            connection_pb.secret_refs.into_iter().collect(),
2569        );
2570
2571        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2572            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2573
2574        tracing::debug!(
2575            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2576            connection_id,
2577            dependent_sources,
2578            dependent_sinks
2579        );
2580
2581        // Validate connection
2582        {
2583            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2584                connection_type: connection_pb.connection_type,
2585                properties: connection_options_with_secret
2586                    .as_plaintext()
2587                    .clone()
2588                    .into_iter()
2589                    .collect(),
2590                secret_refs: connection_options_with_secret
2591                    .as_secret()
2592                    .clone()
2593                    .into_iter()
2594                    .collect(),
2595            };
2596            let connection = PbConnection {
2597                id: connection_id as _,
2598                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2599                    conn_params_pb,
2600                )),
2601                ..Default::default()
2602            };
2603            validate_connection(&connection).await?;
2604        }
2605
2606        // Update connection secret dependencies
2607        if !to_add_secret_dep.is_empty() {
2608            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2609                object_dependency::ActiveModel {
2610                    oid: Set(secret_id.into()),
2611                    used_by: Set(connection_id.as_object_id()),
2612                    ..Default::default()
2613                }
2614            }))
2615            .exec(&txn)
2616            .await?;
2617        }
2618        if !to_remove_secret_dep.is_empty() {
2619            let _ = ObjectDependency::delete_many()
2620                .filter(
2621                    object_dependency::Column::Oid
2622                        .is_in(to_remove_secret_dep)
2623                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2624                )
2625                .exec(&txn)
2626                .await?;
2627        }
2628
2629        // Update the connection with new properties
2630        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2631            connection_type: connection_pb.connection_type,
2632            properties: connection_options_with_secret
2633                .as_plaintext()
2634                .clone()
2635                .into_iter()
2636                .collect(),
2637            secret_refs: connection_options_with_secret
2638                .as_secret()
2639                .clone()
2640                .into_iter()
2641                .collect(),
2642        };
2643        let active_connection_model = connection::ActiveModel {
2644            connection_id: Set(connection_id),
2645            params: Set(ConnectionParams::from(&updated_connection_params)),
2646            ..Default::default()
2647        };
2648        Connection::update(active_connection_model)
2649            .exec(&txn)
2650            .await?;
2651
2652        // Batch update dependent sources and collect their complete properties
2653        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2654
2655        if !dependent_sources.is_empty() {
2656            // Batch fetch all dependent sources
2657            let sources_with_objs = Source::find()
2658                .find_also_related(Object)
2659                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2660                .all(&txn)
2661                .await?;
2662
2663            // Prepare batch updates
2664            let mut source_updates = Vec::new();
2665            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2666
2667            for (source, _obj) in sources_with_objs {
2668                let source_id = source.source_id;
2669
2670                let mut source_options_with_secret = WithOptionsSecResolved::new(
2671                    source.with_properties.0.clone(),
2672                    source
2673                        .secret_ref
2674                        .clone()
2675                        .map(|secret_ref| secret_ref.to_protobuf())
2676                        .unwrap_or_default(),
2677                );
2678                let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2679                    source_options_with_secret
2680                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2681
2682                // Validate the updated source properties
2683                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2684
2685                // Prepare source update
2686                let active_source = source::ActiveModel {
2687                    source_id: Set(source_id),
2688                    with_properties: Set(Property(
2689                        source_options_with_secret.as_plaintext().clone(),
2690                    )),
2691                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2692                        || {
2693                            risingwave_meta_model::SecretRef::from(
2694                                source_options_with_secret.as_secret().clone(),
2695                            )
2696                        },
2697                    )),
2698                    ..Default::default()
2699                };
2700                source_updates.push(active_source);
2701
2702                // Prepare fragment update:
2703                // - If the source is a table-associated source, update fragments for the table job.
2704                // - Otherwise update the shared source job.
2705                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2706                let is_shared_source = source.is_shared();
2707                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2708                if !is_shared_source {
2709                    dep_source_job_ids = ObjectDependency::find()
2710                        .select_only()
2711                        .column(object_dependency::Column::UsedBy)
2712                        .filter(object_dependency::Column::Oid.eq(source_id))
2713                        .into_tuple()
2714                        .all(&txn)
2715                        .await?;
2716                }
2717
2718                let base_job_id =
2719                    if let Some(associate_table_id) = source.optional_associated_table_id {
2720                        associate_table_id.as_job_id()
2721                    } else {
2722                        source_id.as_share_source_job_id()
2723                    };
2724                let job_ids = vec![base_job_id]
2725                    .into_iter()
2726                    .chain(dep_source_job_ids.into_iter())
2727                    .collect_vec();
2728
2729                fragment_updates.push(DependentSourceFragmentUpdate {
2730                    job_ids,
2731                    with_properties: source_options_with_secret.as_plaintext().clone(),
2732                    secret_refs: source_options_with_secret.as_secret().clone(),
2733                    is_shared_source,
2734                });
2735
2736                // Collect the complete properties for runtime broadcast
2737                let complete_source_props = LocalSecretManager::global()
2738                    .fill_secrets(
2739                        source_options_with_secret.as_plaintext().clone(),
2740                        source_options_with_secret.as_secret().clone(),
2741                    )
2742                    .map_err(MetaError::from)?
2743                    .into_iter()
2744                    .collect::<HashMap<String, String>>();
2745                updated_sources_with_props.push((source_id, complete_source_props));
2746            }
2747
2748            for source_update in source_updates {
2749                Source::update(source_update).exec(&txn).await?;
2750            }
2751
2752            // Batch execute fragment updates
2753            for DependentSourceFragmentUpdate {
2754                job_ids,
2755                with_properties,
2756                secret_refs,
2757                is_shared_source,
2758            } in fragment_updates
2759            {
2760                update_connector_props_fragments(
2761                    &txn,
2762                    job_ids,
2763                    FragmentTypeFlag::Source,
2764                    |node, found| {
2765                        if let PbNodeBody::Source(node) = node
2766                            && let Some(source_inner) = &mut node.source_inner
2767                        {
2768                            source_inner.with_properties = with_properties.clone();
2769                            source_inner.secret_refs = secret_refs.clone();
2770                            *found = true;
2771                        }
2772                    },
2773                    is_shared_source,
2774                )
2775                .await?;
2776            }
2777        }
2778
2779        // Batch update dependent sinks and collect their complete properties
2780        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2781
2782        if !dependent_sinks.is_empty() {
2783            // Batch fetch all dependent sinks
2784            let sinks_with_objs = Sink::find()
2785                .find_also_related(Object)
2786                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2787                .all(&txn)
2788                .await?;
2789
2790            // Prepare batch updates
2791            let mut sink_updates = Vec::new();
2792            let mut sink_fragment_updates = Vec::new();
2793
2794            for (sink, _obj) in sinks_with_objs {
2795                let sink_id = sink.sink_id;
2796
2797                // Validate that sink props can be altered
2798                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2799                    Some(connector) => {
2800                        let connector_type = connector.to_lowercase();
2801                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2802                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
2803
2804                        match_sink_name_str!(
2805                            connector_type.as_str(),
2806                            SinkType,
2807                            {
2808                                let mut new_sink_props = sink.properties.0.clone();
2809                                new_sink_props.extend(alter_props.clone());
2810                                SinkType::validate_alter_config(&new_sink_props)
2811                            },
2812                            |sink: &str| Err(SinkError::Config(anyhow!(
2813                                "unsupported sink type {}",
2814                                sink
2815                            )))
2816                        )?
2817                    }
2818                    None => {
2819                        return Err(SinkError::Config(anyhow!(
2820                            "connector not specified when alter sink"
2821                        ))
2822                        .into());
2823                    }
2824                };
2825
2826                let mut new_sink_props = sink.properties.0.clone();
2827                new_sink_props.extend(alter_props.clone());
2828
2829                // Prepare sink update
2830                let active_sink = sink::ActiveModel {
2831                    sink_id: Set(sink_id),
2832                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2833                    ..Default::default()
2834                };
2835                sink_updates.push(active_sink);
2836
2837                // Prepare fragment updates for this sink
2838                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2839
2840                // Collect the complete properties for runtime broadcast
2841                let complete_sink_props: HashMap<String, String> =
2842                    new_sink_props.into_iter().collect();
2843                updated_sinks_with_props.push((sink_id, complete_sink_props));
2844            }
2845
2846            // Batch execute sink updates
2847            for sink_update in sink_updates {
2848                Sink::update(sink_update).exec(&txn).await?;
2849            }
2850
2851            // Batch execute sink fragment updates using the reusable function
2852            for (sink_id, new_sink_props) in sink_fragment_updates {
2853                update_connector_props_fragments(
2854                    &txn,
2855                    vec![sink_id.as_job_id()],
2856                    FragmentTypeFlag::Sink,
2857                    |node, found| {
2858                        if let PbNodeBody::Sink(node) = node
2859                            && let Some(sink_desc) = &mut node.sink_desc
2860                            && sink_desc.id == sink_id.as_raw_id()
2861                        {
2862                            sink_desc.properties = new_sink_props.clone();
2863                            *found = true;
2864                        }
2865                    },
2866                    true,
2867                )
2868                .await?;
2869            }
2870        }
2871
2872        // Collect all updated objects for frontend notification
2873        let mut updated_objects = Vec::new();
2874
2875        // Add connection
2876        let (connection, obj) = Connection::find_by_id(connection_id)
2877            .find_also_related(Object)
2878            .one(&txn)
2879            .await?
2880            .ok_or_else(|| {
2881                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2882            })?;
2883        updated_objects.push(PbObject {
2884            object_info: Some(PbObjectInfo::Connection(
2885                ObjectModel(connection, obj.unwrap(), None).into(),
2886            )),
2887        });
2888
2889        // Add sources
2890        for source_id in &dependent_sources {
2891            let (source, obj) = Source::find_by_id(*source_id)
2892                .find_also_related(Object)
2893                .one(&txn)
2894                .await?
2895                .ok_or_else(|| {
2896                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2897                })?;
2898            updated_objects.push(PbObject {
2899                object_info: Some(PbObjectInfo::Source(
2900                    ObjectModel(source, obj.unwrap(), None).into(),
2901                )),
2902            });
2903        }
2904
2905        // Add sinks
2906        for sink_id in &dependent_sinks {
2907            let (sink, obj) = Sink::find_by_id(*sink_id)
2908                .find_also_related(Object)
2909                .one(&txn)
2910                .await?
2911                .ok_or_else(|| {
2912                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2913                })?;
2914            let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2915                .one(&txn)
2916                .await?;
2917            updated_objects.push(PbObject {
2918                object_info: Some(PbObjectInfo::Sink(
2919                    ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2920                )),
2921            });
2922        }
2923
2924        // Commit the transaction
2925        txn.commit().await?;
2926
2927        // Notify frontend about all updated objects
2928        if !updated_objects.is_empty() {
2929            self.notify_frontend(
2930                NotificationOperation::Update,
2931                NotificationInfo::ObjectGroup(PbObjectGroup {
2932                    objects: updated_objects,
2933                    dependencies: vec![],
2934                }),
2935            )
2936            .await;
2937        }
2938
2939        Ok((
2940            connection_options_with_secret,
2941            updated_sources_with_props,
2942            updated_sinks_with_props,
2943        ))
2944    }
2945
2946    pub async fn update_fragment_rate_limit_by_fragment_id(
2947        &self,
2948        fragment_id: FragmentId,
2949        rate_limit: Option<u32>,
2950    ) -> MetaResult<()> {
2951        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2952                                 stream_node: &mut PbStreamNode| {
2953            let mut found = false;
2954            if fragment_type_mask.contains_any(
2955                FragmentTypeFlag::dml_rate_limit_fragments()
2956                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2957                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2958                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2959            ) {
2960                visit_stream_node_mut(stream_node, |node| {
2961                    if let PbNodeBody::Dml(node) = node {
2962                        node.rate_limit = rate_limit;
2963                        found = true;
2964                    }
2965                    if let PbNodeBody::Sink(node) = node {
2966                        node.rate_limit = rate_limit;
2967                        found = true;
2968                    }
2969                    if let PbNodeBody::StreamCdcScan(node) = node {
2970                        node.rate_limit = rate_limit;
2971                        found = true;
2972                    }
2973                    if let PbNodeBody::StreamScan(node) = node {
2974                        node.rate_limit = rate_limit;
2975                        found = true;
2976                    }
2977                    if let PbNodeBody::SourceBackfill(node) = node {
2978                        node.rate_limit = rate_limit;
2979                        found = true;
2980                    }
2981                });
2982            }
2983            found
2984        };
2985        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2986            .await
2987    }
2988
2989    /// Note: `FsFetch` created in old versions are not included.
2990    /// Since this is only used for debugging, it should be fine.
2991    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2992        let inner = self.inner.read().await;
2993        let txn = inner.db.begin().await?;
2994
2995        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2996            .select_only()
2997            .columns([
2998                fragment::Column::FragmentId,
2999                fragment::Column::JobId,
3000                fragment::Column::FragmentTypeMask,
3001                fragment::Column::StreamNode,
3002            ])
3003            .filter(FragmentTypeMask::intersects_any(
3004                FragmentTypeFlag::rate_limit_fragments(),
3005            ))
3006            .into_tuple()
3007            .all(&txn)
3008            .await?;
3009
3010        let mut rate_limits = Vec::new();
3011        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3012            let stream_node = stream_node.to_protobuf();
3013            visit_stream_node_body(&stream_node, |node| {
3014                let mut rate_limit = None;
3015                let mut node_name = None;
3016
3017                match node {
3018                    // source rate limit
3019                    PbNodeBody::Source(node) => {
3020                        if let Some(node_inner) = &node.source_inner {
3021                            rate_limit = node_inner.rate_limit;
3022                            node_name = Some("SOURCE");
3023                        }
3024                    }
3025                    PbNodeBody::StreamFsFetch(node) => {
3026                        if let Some(node_inner) = &node.node_inner {
3027                            rate_limit = node_inner.rate_limit;
3028                            node_name = Some("FS_FETCH");
3029                        }
3030                    }
3031                    // backfill rate limit
3032                    PbNodeBody::SourceBackfill(node) => {
3033                        rate_limit = node.rate_limit;
3034                        node_name = Some("SOURCE_BACKFILL");
3035                    }
3036                    PbNodeBody::StreamScan(node) => {
3037                        rate_limit = node.rate_limit;
3038                        node_name = Some("STREAM_SCAN");
3039                    }
3040                    PbNodeBody::StreamCdcScan(node) => {
3041                        rate_limit = node.rate_limit;
3042                        node_name = Some("STREAM_CDC_SCAN");
3043                    }
3044                    PbNodeBody::Sink(node) => {
3045                        rate_limit = node.rate_limit;
3046                        node_name = Some("SINK");
3047                    }
3048                    _ => {}
3049                }
3050
3051                if let Some(rate_limit) = rate_limit {
3052                    rate_limits.push(RateLimitInfo {
3053                        fragment_id,
3054                        job_id,
3055                        fragment_type_mask: fragment_type_mask as u32,
3056                        rate_limit,
3057                        node_name: node_name.unwrap().to_owned(),
3058                    });
3059                }
3060            });
3061        }
3062
3063        Ok(rate_limits)
3064    }
3065}
3066
3067fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3068    // Validate that props can be altered
3069    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3070        Some(connector) => {
3071            let connector_type = connector.to_lowercase();
3072            let field_names: Vec<String> = props.keys().cloned().collect();
3073            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3074                .map_err(|e| SinkError::Config(anyhow!(e)))?;
3075
3076            match_sink_name_str!(
3077                connector_type.as_str(),
3078                SinkType,
3079                {
3080                    let mut new_props = sink.properties.0.clone();
3081                    new_props.extend(props.clone());
3082                    SinkType::validate_alter_config(&new_props)
3083                },
3084                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3085            )?
3086        }
3087        None => {
3088            return Err(
3089                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3090            );
3091        }
3092    };
3093    Ok(())
3094}
3095
3096fn update_stmt_with_props(
3097    with_properties: &mut Vec<SqlOption>,
3098    props: &BTreeMap<String, String>,
3099) -> MetaResult<()> {
3100    let mut new_sql_options = with_properties
3101        .iter()
3102        .map(|sql_option| (&sql_option.name, sql_option))
3103        .collect::<IndexMap<_, _>>();
3104    let add_sql_options = props
3105        .iter()
3106        .map(|(k, v)| SqlOption::try_from((k, v)))
3107        .collect::<Result<Vec<SqlOption>, ParserError>>()
3108        .map_err(|e| SinkError::Config(anyhow!(e)))?;
3109    new_sql_options.extend(
3110        add_sql_options
3111            .iter()
3112            .map(|sql_option| (&sql_option.name, sql_option)),
3113    );
3114    *with_properties = new_sql_options.into_values().cloned().collect();
3115    Ok(())
3116}
3117
3118async fn update_sink_fragment_props(
3119    txn: &DatabaseTransaction,
3120    sink_id: SinkId,
3121    props: BTreeMap<String, String>,
3122) -> MetaResult<()> {
3123    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3124        .select_only()
3125        .columns([
3126            fragment::Column::FragmentId,
3127            fragment::Column::FragmentTypeMask,
3128            fragment::Column::StreamNode,
3129        ])
3130        .filter(fragment::Column::JobId.eq(sink_id))
3131        .into_tuple()
3132        .all(txn)
3133        .await?;
3134    let fragments = fragments
3135        .into_iter()
3136        .filter(|(_, fragment_type_mask, _)| {
3137            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3138        })
3139        .filter_map(|(id, _, stream_node)| {
3140            let mut stream_node = stream_node.to_protobuf();
3141            let mut found = false;
3142            visit_stream_node_mut(&mut stream_node, |node| {
3143                if let PbNodeBody::Sink(node) = node
3144                    && let Some(sink_desc) = &mut node.sink_desc
3145                    && sink_desc.id == sink_id
3146                {
3147                    sink_desc.properties.extend(props.clone());
3148                    found = true;
3149                }
3150            });
3151            if found { Some((id, stream_node)) } else { None }
3152        })
3153        .collect_vec();
3154    assert!(
3155        !fragments.is_empty(),
3156        "sink id should be used by at least one fragment"
3157    );
3158    for (id, stream_node) in fragments {
3159        Fragment::update(fragment::ActiveModel {
3160            fragment_id: Set(id),
3161            stream_node: Set(StreamNode::from(&stream_node)),
3162            ..Default::default()
3163        })
3164        .exec(txn)
3165        .await?;
3166    }
3167    Ok(())
3168}
3169
3170pub struct SinkIntoTableContext {
3171    /// For alter table (e.g., add column), this is the list of existing sink ids
3172    /// otherwise empty.
3173    pub updated_sink_catalogs: Vec<SinkId>,
3174}
3175
3176pub struct FinishAutoRefreshSchemaSinkContext {
3177    pub tmp_sink_id: SinkId,
3178    pub original_sink_id: SinkId,
3179    pub columns: Vec<PbColumnCatalog>,
3180    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3181}
3182
3183async fn update_connector_props_fragments<F>(
3184    txn: &DatabaseTransaction,
3185    job_ids: Vec<JobId>,
3186    expect_flag: FragmentTypeFlag,
3187    mut alter_stream_node_fn: F,
3188    is_shared_source: bool,
3189) -> MetaResult<()>
3190where
3191    F: FnMut(&mut PbNodeBody, &mut bool),
3192{
3193    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3194        .select_only()
3195        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3196        .filter(
3197            fragment::Column::JobId
3198                .is_in(job_ids.clone())
3199                .and(FragmentTypeMask::intersects(expect_flag)),
3200        )
3201        .into_tuple()
3202        .all(txn)
3203        .await?;
3204    let fragments = fragments
3205        .into_iter()
3206        .filter_map(|(id, stream_node)| {
3207            let mut stream_node = stream_node.to_protobuf();
3208            let mut found = false;
3209            visit_stream_node_mut(&mut stream_node, |node| {
3210                alter_stream_node_fn(node, &mut found);
3211            });
3212            if found { Some((id, stream_node)) } else { None }
3213        })
3214        .collect_vec();
3215    if is_shared_source || job_ids.len() > 1 {
3216        // the first element is the source_id or associated table_id
3217        // if the source is non-shared, there is no updated fragments
3218        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3219        assert!(
3220            !fragments.is_empty(),
3221            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3222            job_ids,
3223            expect_flag
3224        );
3225    }
3226
3227    for (id, stream_node) in fragments {
3228        Fragment::update(fragment::ActiveModel {
3229            fragment_id: Set(id),
3230            stream_node: Set(StreamNode::from(&stream_node)),
3231            ..Default::default()
3232        })
3233        .exec(txn)
3234        .await?;
3235    }
3236
3237    Ok(())
3238}