risingwave_meta/controller/
streaming_job.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22    ColumnCatalog, FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23    RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, max_column_id,
24};
25use risingwave_common::config::DefaultParallelism;
26use risingwave_common::hash::VnodeCountCompat;
27use risingwave_common::id::JobId;
28use risingwave_common::secret::LocalSecretManager;
29use risingwave_common::system_param::AdaptiveParallelismStrategy;
30use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
31use risingwave_common::util::iter_util::ZipEqDebug;
32use risingwave_common::util::stream_graph_visitor::{
33    visit_stream_node_body, visit_stream_node_mut,
34};
35use risingwave_common::{bail, current_cluster_version};
36use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::error::ConnectorError;
39use risingwave_connector::sink::file_sink::fs::FsSink;
40use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
41use risingwave_connector::source::{
42    ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
47use risingwave_meta_model::refresh_job::RefreshState;
48use risingwave_meta_model::streaming_job::BackfillOrders;
49use risingwave_meta_model::table::TableType;
50use risingwave_meta_model::user_privilege::Action;
51use risingwave_meta_model::*;
52use risingwave_pb::catalog::table::PbEngine;
53use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
54use risingwave_pb::ddl_service::streaming_job_resource_type;
55use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
56use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
62use risingwave_pb::plan_common::PbColumnCatalog;
63use risingwave_pb::plan_common::source_refresh_mode::{
64    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
65};
66use risingwave_pb::secret::PbSecretRef;
67use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
68use risingwave_pb::stream_plan::stream_node::PbNodeBody;
69use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
70use risingwave_pb::user::PbUserInfo;
71use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
72use risingwave_sqlparser::parser::{Parser, ParserError};
73use sea_orm::ActiveValue::Set;
74use sea_orm::sea_query::{Expr, Query, SimpleExpr};
75use sea_orm::{
76    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
77    NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
78};
79use thiserror_ext::AsReport;
80
81use super::rename::IndexItemRewriter;
82use crate::barrier::Command;
83use crate::controller::ObjectModel;
84use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
85use crate::controller::fragment::FragmentTypeMaskExt;
86use crate::controller::utils::{
87    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
88    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
89    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
90    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
91    list_object_dependencies_by_object_id, list_user_info_by_ids,
92    try_get_iceberg_table_by_downstream_sink,
93};
94use crate::error::MetaErrorInner;
95use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
96use crate::model::{
97    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
98    StreamJobFragmentsToCreate,
99};
100use crate::stream::SplitAssignment;
101use crate::{MetaError, MetaResult};
102
103/// A planned fragment update for dependent sources when a connection's properties change.
104///
105/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
106/// `type_complexity` warnings.
107#[derive(Debug)]
108struct DependentSourceFragmentUpdate {
109    job_ids: Vec<JobId>,
110    with_properties: BTreeMap<String, String>,
111    secret_refs: BTreeMap<String, PbSecretRef>,
112    is_shared_source: bool,
113}
114
115#[derive(Debug)]
116struct ReplaceOriginalJobInfo {
117    max_parallelism: i32,
118    timezone: Option<String>,
119    config_override: Option<String>,
120    adaptive_parallelism_strategy: Option<String>,
121    parallelism: StreamingParallelism,
122    specific_resource_group: Option<String>,
123}
124
125impl ReplaceOriginalJobInfo {
126    fn resolved_parallelism(
127        &self,
128        specified_parallelism: Option<&NonZeroUsize>,
129    ) -> StreamingParallelism {
130        specified_parallelism
131            .map(|n| StreamingParallelism::Fixed(n.get() as _))
132            .unwrap_or_else(|| self.parallelism.clone())
133    }
134
135    fn stream_context(&self, ctx_override: Option<&StreamContext>) -> StreamContext {
136        StreamContext {
137            timezone: ctx_override
138                .and_then(|ctx| ctx.timezone.clone())
139                .or_else(|| self.timezone.clone()),
140            // We don't expect replacing a job with a different config override.
141            // Thus always use the original config override.
142            config_override: self.config_override.clone().unwrap_or_default().into(),
143        }
144    }
145
146    fn resource_type(&self) -> streaming_job_resource_type::ResourceType {
147        match &self.specific_resource_group {
148            Some(group) => {
149                streaming_job_resource_type::ResourceType::SpecificResourceGroup(group.clone())
150            }
151            None => streaming_job_resource_type::ResourceType::Regular(true),
152        }
153    }
154}
155
156impl From<streaming_job::Model> for ReplaceOriginalJobInfo {
157    fn from(model: streaming_job::Model) -> Self {
158        Self {
159            max_parallelism: model.max_parallelism,
160            timezone: model.timezone,
161            config_override: model.config_override,
162            adaptive_parallelism_strategy: model.adaptive_parallelism_strategy,
163            parallelism: model.parallelism,
164            specific_resource_group: model.specific_resource_group,
165        }
166    }
167}
168
169impl CatalogController {
170    #[expect(clippy::too_many_arguments)]
171    pub async fn create_streaming_job_obj(
172        txn: &DatabaseTransaction,
173        obj_type: ObjectType,
174        owner_id: UserId,
175        database_id: Option<DatabaseId>,
176        schema_id: Option<SchemaId>,
177        create_type: PbCreateType,
178        ctx: StreamContext,
179        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
180        streaming_parallelism: StreamingParallelism,
181        max_parallelism: usize,
182        resource_type: streaming_job_resource_type::ResourceType,
183        backfill_parallelism: Option<StreamingParallelism>,
184        backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
185    ) -> MetaResult<streaming_job::Model> {
186        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
187        let job_id = obj.oid.as_job_id();
188        let specific_resource_group = resource_type.resource_group();
189        let is_serverless_backfill = matches!(
190            &resource_type,
191            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
192        );
193        let model = streaming_job::Model {
194            job_id,
195            job_status: JobStatus::Initial,
196            create_type: create_type.into(),
197            timezone: ctx.timezone,
198            config_override: Some(ctx.config_override.to_string()),
199            adaptive_parallelism_strategy: adaptive_parallelism_strategy
200                .as_ref()
201                .map(ToString::to_string),
202            parallelism: streaming_parallelism,
203            backfill_parallelism,
204            backfill_adaptive_parallelism_strategy: backfill_adaptive_parallelism_strategy
205                .as_ref()
206                .map(ToString::to_string),
207            backfill_orders: None,
208            max_parallelism: max_parallelism as _,
209            specific_resource_group,
210            is_serverless_backfill,
211        };
212        let job = model.clone().into_active_model();
213        StreamingJobModel::insert(job).exec(txn).await?;
214
215        Ok(model)
216    }
217
218    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
219    /// materialized view.
220    ///
221    /// Some of the fields in the given streaming job are placeholders, which will
222    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
223    #[await_tree::instrument]
224    pub async fn create_job_catalog(
225        &self,
226        streaming_job: &mut StreamingJob,
227        ctx: &StreamContext,
228        parallelism: &Option<Parallelism>,
229        max_parallelism: usize,
230        mut dependencies: HashSet<ObjectId>,
231        resource_type: streaming_job_resource_type::ResourceType,
232        backfill_parallelism: &Option<Parallelism>,
233        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
234        backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
235    ) -> MetaResult<streaming_job::Model> {
236        let inner = self.inner.write().await;
237        let txn = inner.db.begin().await?;
238        let create_type = streaming_job.create_type();
239
240        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
241            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
242            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
243            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
244        };
245        let backfill_parallelism = backfill_parallelism
246            .as_ref()
247            .map(|p| StreamingParallelism::Fixed(p.parallelism as _))
248            .or_else(|| {
249                backfill_adaptive_parallelism_strategy
250                    .as_ref()
251                    .map(|_| StreamingParallelism::Adaptive)
252            });
253
254        ensure_user_id(streaming_job.owner() as _, &txn).await?;
255        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
256        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
257        check_relation_name_duplicate(
258            &streaming_job.name(),
259            streaming_job.database_id(),
260            streaming_job.schema_id(),
261            &txn,
262        )
263        .await?;
264
265        // check if any dependency is in altering status.
266        if !dependencies.is_empty() {
267            let altering_cnt = ObjectDependency::find()
268                .join(
269                    JoinType::InnerJoin,
270                    object_dependency::Relation::Object1.def(),
271                )
272                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
273                .filter(
274                    object_dependency::Column::Oid
275                        .is_in(dependencies.clone())
276                        .and(object::Column::ObjType.eq(ObjectType::Table))
277                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
278                        .and(
279                            // It means the referring table is just dummy for altering.
280                            object::Column::Oid.not_in_subquery(
281                                Query::select()
282                                    .column(table::Column::TableId)
283                                    .from(Table)
284                                    .to_owned(),
285                            ),
286                        ),
287                )
288                .count(&txn)
289                .await?;
290            if altering_cnt != 0 {
291                return Err(MetaError::permission_denied(
292                    "some dependent relations are being altered",
293                ));
294            }
295        }
296
297        let streaming_job_model = match streaming_job {
298            StreamingJob::MaterializedView(table) => {
299                let streaming_job_model = Self::create_streaming_job_obj(
300                    &txn,
301                    ObjectType::Table,
302                    table.owner as _,
303                    Some(table.database_id),
304                    Some(table.schema_id),
305                    create_type,
306                    ctx.clone(),
307                    adaptive_parallelism_strategy,
308                    streaming_parallelism,
309                    max_parallelism,
310                    resource_type,
311                    backfill_parallelism.clone(),
312                    backfill_adaptive_parallelism_strategy,
313                )
314                .await?;
315                table.id = streaming_job_model.job_id.as_mv_table_id();
316                let table_model: table::ActiveModel = table.clone().into();
317                Table::insert(table_model).exec(&txn).await?;
318                streaming_job_model
319            }
320            StreamingJob::Sink(sink) => {
321                if let Some(target_table_id) = sink.target_table
322                    && check_sink_into_table_cycle(
323                        target_table_id.into(),
324                        dependencies.iter().cloned().collect(),
325                        &txn,
326                    )
327                    .await?
328                {
329                    bail!("Creating such a sink will result in circular dependency.");
330                }
331
332                let streaming_job_model = Self::create_streaming_job_obj(
333                    &txn,
334                    ObjectType::Sink,
335                    sink.owner as _,
336                    Some(sink.database_id),
337                    Some(sink.schema_id),
338                    create_type,
339                    ctx.clone(),
340                    adaptive_parallelism_strategy,
341                    streaming_parallelism,
342                    max_parallelism,
343                    resource_type,
344                    backfill_parallelism.clone(),
345                    backfill_adaptive_parallelism_strategy,
346                )
347                .await?;
348                sink.id = streaming_job_model.job_id.as_sink_id();
349                let sink_model: sink::ActiveModel = sink.clone().into();
350                Sink::insert(sink_model).exec(&txn).await?;
351                streaming_job_model
352            }
353            StreamingJob::Table(src, table, _) => {
354                let streaming_job_model = Self::create_streaming_job_obj(
355                    &txn,
356                    ObjectType::Table,
357                    table.owner as _,
358                    Some(table.database_id),
359                    Some(table.schema_id),
360                    create_type,
361                    ctx.clone(),
362                    adaptive_parallelism_strategy,
363                    streaming_parallelism,
364                    max_parallelism,
365                    resource_type,
366                    backfill_parallelism.clone(),
367                    backfill_adaptive_parallelism_strategy,
368                )
369                .await?;
370                let job_id = streaming_job_model.job_id;
371                table.id = job_id.as_mv_table_id();
372                if let Some(src) = src {
373                    let src_obj = Self::create_object(
374                        &txn,
375                        ObjectType::Source,
376                        src.owner as _,
377                        Some(src.database_id),
378                        Some(src.schema_id),
379                    )
380                    .await?;
381                    src.id = src_obj.oid.as_source_id();
382                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
383                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
384                    let source: source::ActiveModel = src.clone().into();
385                    Source::insert(source).exec(&txn).await?;
386                }
387                let table_model: table::ActiveModel = table.clone().into();
388                Table::insert(table_model).exec(&txn).await?;
389
390                if table.refreshable {
391                    let trigger_interval_secs = src
392                        .as_ref()
393                        .and_then(|source_catalog| source_catalog.refresh_mode)
394                        .and_then(
395                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
396                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
397                                    refresh_interval_sec,
398                                })) => refresh_interval_sec,
399                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
400                                None => None,
401                            },
402                        );
403
404                    RefreshJob::insert(refresh_job::ActiveModel {
405                        table_id: Set(table.id),
406                        last_trigger_time: Set(None),
407                        trigger_interval_secs: Set(trigger_interval_secs),
408                        current_status: Set(RefreshState::Idle),
409                        last_success_time: Set(None),
410                    })
411                    .exec(&txn)
412                    .await?;
413                }
414                streaming_job_model
415            }
416            StreamingJob::Index(index, table) => {
417                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
418                let streaming_job_model = Self::create_streaming_job_obj(
419                    &txn,
420                    ObjectType::Index,
421                    index.owner as _,
422                    Some(index.database_id),
423                    Some(index.schema_id),
424                    create_type,
425                    ctx.clone(),
426                    adaptive_parallelism_strategy,
427                    streaming_parallelism,
428                    max_parallelism,
429                    resource_type,
430                    backfill_parallelism.clone(),
431                    backfill_adaptive_parallelism_strategy,
432                )
433                .await?;
434                // to be compatible with old implementation.
435                let job_id = streaming_job_model.job_id;
436                index.id = job_id.as_index_id();
437                index.index_table_id = job_id.as_mv_table_id();
438                table.id = job_id.as_mv_table_id();
439
440                ObjectDependency::insert(object_dependency::ActiveModel {
441                    oid: Set(index.primary_table_id.into()),
442                    used_by: Set(table.id.into()),
443                    ..Default::default()
444                })
445                .exec(&txn)
446                .await?;
447
448                let table_model: table::ActiveModel = table.clone().into();
449                Table::insert(table_model).exec(&txn).await?;
450                let index_model: index::ActiveModel = index.clone().into();
451                Index::insert(index_model).exec(&txn).await?;
452                streaming_job_model
453            }
454            StreamingJob::Source(src) => {
455                let streaming_job_model = Self::create_streaming_job_obj(
456                    &txn,
457                    ObjectType::Source,
458                    src.owner as _,
459                    Some(src.database_id),
460                    Some(src.schema_id),
461                    create_type,
462                    ctx.clone(),
463                    adaptive_parallelism_strategy,
464                    streaming_parallelism,
465                    max_parallelism,
466                    resource_type,
467                    backfill_parallelism.clone(),
468                    backfill_adaptive_parallelism_strategy,
469                )
470                .await?;
471                src.id = streaming_job_model.job_id.as_shared_source_id();
472                let source_model: source::ActiveModel = src.clone().into();
473                Source::insert(source_model).exec(&txn).await?;
474                streaming_job_model
475            }
476        };
477
478        // collect dependent secrets.
479        dependencies.extend(
480            streaming_job
481                .dependent_secret_ids()?
482                .into_iter()
483                .map(|id| id.as_object_id()),
484        );
485        // collect dependent connection
486        dependencies.extend(
487            streaming_job
488                .dependent_connection_ids()?
489                .into_iter()
490                .map(|id| id.as_object_id()),
491        );
492
493        // record object dependency.
494        if !dependencies.is_empty() {
495            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
496                object_dependency::ActiveModel {
497                    oid: Set(oid),
498                    used_by: Set(streaming_job.id().as_object_id()),
499                    ..Default::default()
500                }
501            }))
502            .exec(&txn)
503            .await?;
504        }
505
506        txn.commit().await?;
507
508        Ok(streaming_job_model)
509    }
510
511    /// Create catalogs for internal tables, then notify frontend about them if the job is a
512    /// materialized view.
513    ///
514    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
515    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
516    ///
517    /// Returns a mapping from the temporary table id to the actual global table id.
518    pub async fn create_internal_table_catalog(
519        &self,
520        job: &StreamingJob,
521        mut incomplete_internal_tables: Vec<PbTable>,
522    ) -> MetaResult<HashMap<TableId, TableId>> {
523        let job_id = job.id();
524        let inner = self.inner.write().await;
525        let txn = inner.db.begin().await?;
526
527        // Ensure the job exists.
528        ensure_job_not_canceled(job_id, &txn).await?;
529
530        let mut table_id_map = HashMap::new();
531        for table in &mut incomplete_internal_tables {
532            let table_id = Self::create_object(
533                &txn,
534                ObjectType::Table,
535                table.owner as _,
536                Some(table.database_id),
537                Some(table.schema_id),
538            )
539            .await?
540            .oid
541            .as_table_id();
542            table_id_map.insert(table.id, table_id);
543            table.id = table_id;
544            table.job_id = Some(job_id);
545
546            let table_model = table::ActiveModel {
547                table_id: Set(table_id),
548                belongs_to_job_id: Set(Some(job_id)),
549                fragment_id: NotSet,
550                ..table.clone().into()
551            };
552            Table::insert(table_model).exec(&txn).await?;
553        }
554        txn.commit().await?;
555
556        Ok(table_id_map)
557    }
558
559    pub async fn prepare_stream_job_fragments(
560        &self,
561        stream_job_fragments: &StreamJobFragmentsToCreate,
562        streaming_job: &StreamingJob,
563        for_replace: bool,
564        backfill_orders: Option<BackfillOrders>,
565    ) -> MetaResult<()> {
566        self.prepare_streaming_job(
567            stream_job_fragments.stream_job_id(),
568            || stream_job_fragments.fragments.values(),
569            &stream_job_fragments.downstreams,
570            for_replace,
571            Some(streaming_job),
572            backfill_orders,
573        )
574        .await
575    }
576
577    // TODO: In this function, we also update the `Table` model in the meta store.
578    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
579    // making them the source of truth and performing a full replacement for those in the meta store?
580    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
581    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
582    )]
583    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
584        &self,
585        job_id: JobId,
586        get_fragments: impl Fn() -> I + 'a,
587        downstreams: &FragmentDownstreamRelation,
588        for_replace: bool,
589        creating_streaming_job: Option<&'a StreamingJob>,
590        backfill_orders: Option<BackfillOrders>,
591    ) -> MetaResult<()> {
592        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
593
594        let inner = self.inner.write().await;
595
596        let need_notify = creating_streaming_job
597            .map(|job| job.should_notify_creating())
598            .unwrap_or(false);
599        let definition = creating_streaming_job.map(|job| job.definition());
600
601        let mut objects_to_notify = vec![];
602        let txn = inner.db.begin().await?;
603
604        // Ensure the job exists.
605        ensure_job_not_canceled(job_id, &txn).await?;
606
607        if let Some(backfill_orders) = backfill_orders {
608            let job = streaming_job::ActiveModel {
609                job_id: Set(job_id),
610                backfill_orders: Set(Some(backfill_orders)),
611                ..Default::default()
612            };
613            StreamingJobModel::update(job).exec(&txn).await?;
614        }
615
616        let state_table_ids = fragments
617            .iter()
618            .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
619            .collect_vec();
620
621        // Collect fragment IDs before consuming `fragments` for serving mapping notification.
622        let inserted_fragment_ids: Vec<crate::model::FragmentId> = fragments
623            .iter()
624            .map(|f| f.fragment_id as crate::model::FragmentId)
625            .collect();
626
627        if !fragments.is_empty() {
628            let fragment_models = fragments
629                .into_iter()
630                .map(|fragment| fragment.into_active_model())
631                .collect_vec();
632            Fragment::insert_many(fragment_models).exec(&txn).await?;
633        }
634
635        // Fields including `fragment_id` and `vnode_count` were placeholder values before.
636        // After table fragments are created, update them for all tables.
637        if !for_replace {
638            let all_tables = StreamJobFragments::collect_tables(get_fragments());
639            for state_table_id in state_table_ids {
640                // Table's vnode count is not always the fragment's vnode count, so we have to
641                // look up the table from `TableFragments`.
642                // See `ActorGraphBuilder::new`.
643                let table = all_tables
644                    .get(&state_table_id)
645                    .unwrap_or_else(|| panic!("table {} not found", state_table_id));
646                assert_eq!(table.id, state_table_id);
647                let vnode_count = table.vnode_count();
648
649                Table::update(table::ActiveModel {
650                    table_id: Set(state_table_id as _),
651                    fragment_id: Set(Some(table.fragment_id)),
652                    vnode_count: Set(vnode_count as _),
653                    ..Default::default()
654                })
655                .exec(&txn)
656                .await?;
657
658                if need_notify {
659                    let mut table = table.clone();
660                    // In production, definition was replaced but still needed for notification.
661                    if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
662                        table.definition = definition.clone().unwrap();
663                    }
664                    objects_to_notify.push(PbObject {
665                        object_info: Some(PbObjectInfo::Table(table)),
666                    });
667                }
668            }
669        }
670
671        // Add streaming job objects to notification
672        if need_notify {
673            match creating_streaming_job.unwrap() {
674                StreamingJob::Table(Some(source), ..) => {
675                    objects_to_notify.push(PbObject {
676                        object_info: Some(PbObjectInfo::Source(source.clone())),
677                    });
678                }
679                StreamingJob::Sink(sink) => {
680                    objects_to_notify.push(PbObject {
681                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
682                    });
683                }
684                StreamingJob::Index(index, _) => {
685                    objects_to_notify.push(PbObject {
686                        object_info: Some(PbObjectInfo::Index(index.clone())),
687                    });
688                }
689                _ => {}
690            }
691        }
692
693        insert_fragment_relations(&txn, downstreams).await?;
694
695        if !for_replace {
696            // Update dml fragment id.
697            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
698                Table::update(table::ActiveModel {
699                    table_id: Set(table.id),
700                    dml_fragment_id: Set(table.dml_fragment_id),
701                    ..Default::default()
702                })
703                .exec(&txn)
704                .await?;
705            }
706        }
707
708        txn.commit().await?;
709
710        // Notify serving module about newly inserted fragments so it can establish
711        // serving vnode mappings. This is driven by the fragment model insertion,
712        // decoupled from the barrier-driven streaming mapping notifications.
713        self.env
714            .notification_manager()
715            .notify_serving_fragment_mapping_update(inserted_fragment_ids);
716
717        // FIXME: there's a gap between the catalog creation and notification, which may lead to
718        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
719        // need to either forward the notification or filter catalogs without any fragments in the metastore.
720        if !objects_to_notify.is_empty() {
721            self.notify_frontend(
722                Operation::Add,
723                Info::ObjectGroup(PbObjectGroup {
724                    objects: objects_to_notify,
725                    dependencies: vec![],
726                }),
727            )
728            .await;
729        }
730
731        Ok(())
732    }
733
734    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
735    /// information is required to build barrier mutation.
736    pub async fn build_cancel_command(
737        &self,
738        table_fragments: &StreamJobFragments,
739    ) -> MetaResult<Command> {
740        let inner = self.inner.read().await;
741        let txn = inner.db.begin().await?;
742
743        let dropped_sink_fragment_with_target =
744            if let Some(sink_fragment) = table_fragments.sink_fragment() {
745                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
746                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
747                sink_target_fragment
748                    .get(&sink_fragment_id)
749                    .map(|target_fragments| {
750                        let target_fragment_id = *target_fragments
751                            .first()
752                            .expect("sink should have at least one downstream fragment");
753                        (sink_fragment_id, target_fragment_id)
754                    })
755            } else {
756                None
757            };
758
759        Ok(Command::DropStreamingJobs {
760            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
761            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
762            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
763                .into_iter()
764                .map(|(sink, target)| (target as _, vec![sink as _]))
765                .collect(),
766        })
767    }
768
769    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
770    /// It returns (true, _) if the job is not found or aborted.
771    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
772    #[await_tree::instrument]
773    pub async fn try_abort_creating_streaming_job(
774        &self,
775        mut job_id: JobId,
776        is_cancelled: bool,
777    ) -> MetaResult<(bool, Option<DatabaseId>)> {
778        let mut inner = self.inner.write().await;
779        let txn = inner.db.begin().await?;
780
781        let obj = Object::find_by_id(job_id).one(&txn).await?;
782        let Some(obj) = obj else {
783            tracing::warn!(
784                id = %job_id,
785                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
786            );
787            return Ok((true, None));
788        };
789        let database_id = obj
790            .database_id
791            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
792        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
793
794        if !is_cancelled && let Some(streaming_job) = &streaming_job {
795            assert_ne!(streaming_job.job_status, JobStatus::Created);
796            if streaming_job.create_type == CreateType::Background
797                && streaming_job.job_status == JobStatus::Creating
798            {
799                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
800                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
801                {
802                    // If the job belongs to an iceberg table, we still need to clean it.
803                } else {
804                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
805                    tracing::warn!(
806                        id = %job_id,
807                        "streaming job is created in background and still in creating status"
808                    );
809                    return Ok((false, Some(database_id)));
810                }
811            }
812        }
813
814        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
815        let original_job_id = job_id;
816        let original_obj_type = obj.obj_type;
817
818        let iceberg_table_id =
819            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
820        if let Some(iceberg_table_id) = iceberg_table_id {
821            // If the job is iceberg sink, we need to clean the iceberg table as well.
822            // Here we will drop the sink objects directly.
823            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
824            Object::delete_many()
825                .filter(
826                    object::Column::Oid
827                        .eq(job_id)
828                        .or(object::Column::Oid.is_in(internal_tables)),
829                )
830                .exec(&txn)
831                .await?;
832            job_id = iceberg_table_id.as_job_id();
833        };
834
835        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
836
837        // Get the notification info if the job is a materialized view or created in the background.
838        let mut objs = vec![];
839        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
840
841        let mut need_notify =
842            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
843        if !need_notify {
844            if let Some(table) = &table_obj {
845                need_notify = table.table_type == TableType::MaterializedView;
846            } else if original_obj_type == ObjectType::Sink {
847                need_notify = true;
848            }
849        }
850
851        if is_cancelled {
852            let dropped_tables = Table::find()
853                .find_also_related(Object)
854                .filter(
855                    table::Column::TableId.is_in(
856                        internal_table_ids
857                            .iter()
858                            .cloned()
859                            .chain(table_obj.iter().map(|t| t.table_id as _)),
860                    ),
861                )
862                .all(&txn)
863                .await?
864                .into_iter()
865                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
866            inner
867                .dropped_tables
868                .extend(dropped_tables.map(|t| (t.id, t)));
869        }
870
871        if need_notify {
872            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
873            // Ensure we still notify the frontend to delete the original sink object.
874            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
875                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
876                    .select_only()
877                    .columns([
878                        object::Column::Oid,
879                        object::Column::ObjType,
880                        object::Column::SchemaId,
881                        object::Column::DatabaseId,
882                    ])
883                    .into_partial_model()
884                    .one(&txn)
885                    .await?;
886                if let Some(orig_obj) = orig_obj {
887                    objs.push(orig_obj);
888                }
889            }
890
891            let obj: Option<PartialObject> = Object::find_by_id(job_id)
892                .select_only()
893                .columns([
894                    object::Column::Oid,
895                    object::Column::ObjType,
896                    object::Column::SchemaId,
897                    object::Column::DatabaseId,
898                ])
899                .into_partial_model()
900                .one(&txn)
901                .await?;
902            let obj =
903                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
904            objs.push(obj);
905            let internal_table_objs: Vec<PartialObject> = Object::find()
906                .select_only()
907                .columns([
908                    object::Column::Oid,
909                    object::Column::ObjType,
910                    object::Column::SchemaId,
911                    object::Column::DatabaseId,
912                ])
913                .join(JoinType::InnerJoin, object::Relation::Table.def())
914                .filter(table::Column::BelongsToJobId.eq(job_id))
915                .into_partial_model()
916                .all(&txn)
917                .await?;
918            objs.extend(internal_table_objs);
919        }
920
921        // Query fragment IDs before cascade-deleting them, for serving mapping cleanup.
922        let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
923            .select_only()
924            .column(fragment::Column::FragmentId)
925            .filter(fragment::Column::JobId.eq(job_id))
926            .into_tuple()
927            .all(&txn)
928            .await?;
929
930        Object::delete_by_id(job_id).exec(&txn).await?;
931        if !internal_table_ids.is_empty() {
932            Object::delete_many()
933                .filter(object::Column::Oid.is_in(internal_table_ids))
934                .exec(&txn)
935                .await?;
936        }
937        if let Some(t) = &table_obj
938            && let Some(source_id) = t.optional_associated_source_id
939        {
940            Object::delete_by_id(source_id).exec(&txn).await?;
941        }
942
943        let err = if is_cancelled {
944            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
945        } else {
946            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
947        };
948        let abort_reason = format!("streaming job aborted {}", err.as_report());
949        for tx in inner
950            .creating_table_finish_notifier
951            .get_mut(&database_id)
952            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
953            .into_iter()
954            .flatten()
955            .flatten()
956        {
957            let _ = tx.send(Err(abort_reason.clone()));
958        }
959        txn.commit().await?;
960
961        // Notify serving module about deleted fragments from the aborted job.
962        self.env
963            .notification_manager()
964            .notify_serving_fragment_mapping_delete(
965                abort_fragment_ids.iter().map(|id| *id as _).collect(),
966            );
967
968        if !objs.is_empty() {
969            // We also have notified the frontend about these objects,
970            // so we need to notify the frontend to delete them here.
971            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
972                .await;
973        }
974        Ok((true, Some(database_id)))
975    }
976
977    #[await_tree::instrument]
978    pub async fn post_collect_job_fragments(
979        &self,
980        job_id: JobId,
981        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
982        new_sink_downstream: Option<FragmentDownstreamRelation>,
983        split_assignment: Option<&SplitAssignment>,
984    ) -> MetaResult<()> {
985        let inner = self.inner.write().await;
986        let txn = inner.db.begin().await?;
987
988        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
989
990        if let Some(new_downstream) = new_sink_downstream {
991            insert_fragment_relations(&txn, &new_downstream).await?;
992        }
993
994        // Mark job as CREATING.
995        StreamingJobModel::update(streaming_job::ActiveModel {
996            job_id: Set(job_id),
997            job_status: Set(JobStatus::Creating),
998            ..Default::default()
999        })
1000        .exec(&txn)
1001        .await?;
1002
1003        if let Some(split_assignment) = split_assignment {
1004            let fragment_splits = split_assignment
1005                .iter()
1006                .map(|(fragment_id, splits)| {
1007                    (
1008                        *fragment_id as _,
1009                        splits.values().flatten().cloned().collect_vec(),
1010                    )
1011                })
1012                .collect();
1013
1014            self.update_fragment_splits(&txn, &fragment_splits).await?;
1015        }
1016
1017        txn.commit().await?;
1018
1019        Ok(())
1020    }
1021
1022    pub async fn create_job_catalog_for_replace(
1023        &self,
1024        streaming_job: &StreamingJob,
1025        ctx: Option<&StreamContext>,
1026        specified_parallelism: Option<&NonZeroUsize>,
1027        expected_original_max_parallelism: Option<usize>,
1028    ) -> MetaResult<streaming_job::Model> {
1029        let id = streaming_job.id();
1030        let inner = self.inner.write().await;
1031        let txn = inner.db.begin().await?;
1032
1033        // 1. check version.
1034        streaming_job.verify_version_for_replace(&txn).await?;
1035        // 2. check concurrent replace.
1036        let referring_cnt = ObjectDependency::find()
1037            .join(
1038                JoinType::InnerJoin,
1039                object_dependency::Relation::Object1.def(),
1040            )
1041            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1042            .filter(
1043                object_dependency::Column::Oid
1044                    .eq(id)
1045                    .and(object::Column::ObjType.eq(ObjectType::Table))
1046                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
1047            )
1048            .count(&txn)
1049            .await?;
1050        if referring_cnt != 0 {
1051            return Err(MetaError::permission_denied(
1052                "job is being altered or referenced by some creating jobs",
1053            ));
1054        }
1055
1056        // 3. check parallelism.
1057        let original_job = StreamingJobModel::find_by_id(id)
1058            .one(&txn)
1059            .await?
1060            .map(ReplaceOriginalJobInfo::from)
1061            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
1062
1063        if let Some(max_parallelism) = expected_original_max_parallelism
1064            && original_job.max_parallelism != max_parallelism as i32
1065        {
1066            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
1067            // This should not happen in normal cases.
1068            bail!(
1069                "cannot use a different max parallelism \
1070                 when replacing streaming job, \
1071                 original: {}, new: {}",
1072                original_job.max_parallelism,
1073                max_parallelism
1074            );
1075        }
1076
1077        let parallelism = original_job.resolved_parallelism(specified_parallelism);
1078        let ctx = original_job.stream_context(ctx);
1079        let adaptive_parallelism_strategy = original_job
1080            .adaptive_parallelism_strategy
1081            .as_deref()
1082            .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1083        let resource_type = original_job.resource_type();
1084
1085        // 4. create streaming object for new replace table.
1086        let tmp_model = Self::create_streaming_job_obj(
1087            &txn,
1088            streaming_job.object_type(),
1089            streaming_job.owner() as _,
1090            Some(streaming_job.database_id() as _),
1091            Some(streaming_job.schema_id() as _),
1092            streaming_job.create_type(),
1093            ctx,
1094            adaptive_parallelism_strategy,
1095            parallelism,
1096            original_job.max_parallelism as _,
1097            resource_type,
1098            // `backfill_parallelism` is intentionally NOT inherited from the original job.
1099            // Replace has no "backfill finish -> restore parallelism" phase, so inheriting it
1100            // would render actors at the backfill parallelism and never recover to steady-state.
1101            None,
1102            None,
1103        )
1104        .await?;
1105
1106        // 5. record dependency for new replace table.
1107        ObjectDependency::insert(object_dependency::ActiveModel {
1108            oid: Set(id.as_object_id()),
1109            used_by: Set(tmp_model.job_id.as_object_id()),
1110            ..Default::default()
1111        })
1112        .exec(&txn)
1113        .await?;
1114
1115        txn.commit().await?;
1116
1117        Ok(tmp_model)
1118    }
1119
1120    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1121    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1122        let mut inner = self.inner.write().await;
1123        let txn = inner.db.begin().await?;
1124
1125        // Check if the job belongs to iceberg table.
1126        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1127            tracing::info!(
1128                "streaming job {} is for iceberg table, wait for manual finish operation",
1129                job_id
1130            );
1131            return Ok(());
1132        }
1133
1134        let (notification_op, objects, updated_user_info, dependencies) =
1135            self.finish_streaming_job_inner(&txn, job_id).await?;
1136
1137        txn.commit().await?;
1138
1139        let mut version = self
1140            .notify_frontend(
1141                notification_op,
1142                NotificationInfo::ObjectGroup(PbObjectGroup {
1143                    objects,
1144                    dependencies,
1145                }),
1146            )
1147            .await;
1148
1149        // notify users about the default privileges
1150        if !updated_user_info.is_empty() {
1151            version = self.notify_users_update(updated_user_info).await;
1152        }
1153
1154        inner
1155            .creating_table_finish_notifier
1156            .values_mut()
1157            .for_each(|creating_tables| {
1158                if let Some(txs) = creating_tables.remove(&job_id) {
1159                    for tx in txs {
1160                        let _ = tx.send(Ok(version));
1161                    }
1162                }
1163            });
1164
1165        Ok(())
1166    }
1167
1168    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1169    pub async fn finish_streaming_job_inner(
1170        &self,
1171        txn: &DatabaseTransaction,
1172        job_id: JobId,
1173    ) -> MetaResult<(
1174        Operation,
1175        Vec<risingwave_pb::meta::Object>,
1176        Vec<PbUserInfo>,
1177        Vec<PbObjectDependency>,
1178    )> {
1179        let job_type = Object::find_by_id(job_id)
1180            .select_only()
1181            .column(object::Column::ObjType)
1182            .into_tuple()
1183            .one(txn)
1184            .await?
1185            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1186
1187        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1188            .select_only()
1189            .column(streaming_job::Column::CreateType)
1190            .into_tuple()
1191            .one(txn)
1192            .await?
1193            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1194
1195        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1196        let res = Object::update_many()
1197            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1198            .col_expr(
1199                object::Column::CreatedAtClusterVersion,
1200                current_cluster_version().into(),
1201            )
1202            .filter(object::Column::Oid.eq(job_id))
1203            .exec(txn)
1204            .await?;
1205        if res.rows_affected == 0 {
1206            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1207        }
1208
1209        // mark the target stream job as `Created`.
1210        let job = streaming_job::ActiveModel {
1211            job_id: Set(job_id),
1212            job_status: Set(JobStatus::Created),
1213            ..Default::default()
1214        };
1215        let streaming_job = Some(job.update(txn).await?);
1216
1217        // notify frontend: job, internal tables.
1218        let internal_table_objs = Table::find()
1219            .find_also_related(Object)
1220            .filter(table::Column::BelongsToJobId.eq(job_id))
1221            .all(txn)
1222            .await?;
1223        let mut objects = internal_table_objs
1224            .iter()
1225            .map(|(table, obj)| PbObject {
1226                object_info: Some(PbObjectInfo::Table(
1227                    ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1228                )),
1229            })
1230            .collect_vec();
1231        let mut notification_op = if create_type == CreateType::Background {
1232            NotificationOperation::Update
1233        } else {
1234            NotificationOperation::Add
1235        };
1236        let mut updated_user_info = vec![];
1237        let mut need_grant_default_privileges = true;
1238
1239        match job_type {
1240            ObjectType::Table => {
1241                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1242                    .find_also_related(Object)
1243                    .one(txn)
1244                    .await?
1245                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1246                if table.table_type == TableType::MaterializedView {
1247                    notification_op = NotificationOperation::Update;
1248                }
1249
1250                if let Some(source_id) = table.optional_associated_source_id {
1251                    let (src, obj) = Source::find_by_id(source_id)
1252                        .find_also_related(Object)
1253                        .one(txn)
1254                        .await?
1255                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1256                    objects.push(PbObject {
1257                        object_info: Some(PbObjectInfo::Source(
1258                            ObjectModel(src, obj.unwrap(), None).into(),
1259                        )),
1260                    });
1261                }
1262                objects.push(PbObject {
1263                    object_info: Some(PbObjectInfo::Table(
1264                        ObjectModel(table, obj.unwrap(), streaming_job).into(),
1265                    )),
1266                });
1267            }
1268            ObjectType::Sink => {
1269                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1270                    .find_also_related(Object)
1271                    .one(txn)
1272                    .await?
1273                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1274                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1275                    need_grant_default_privileges = false;
1276                }
1277                // If sinks were pre-notified during CREATING, we should use Update at finish
1278                // to avoid duplicate Add notifications (align with MV behavior).
1279                notification_op = NotificationOperation::Update;
1280                objects.push(PbObject {
1281                    object_info: Some(PbObjectInfo::Sink(
1282                        ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1283                    )),
1284                });
1285            }
1286            ObjectType::Index => {
1287                need_grant_default_privileges = false;
1288                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1289                    .find_also_related(Object)
1290                    .one(txn)
1291                    .await?
1292                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1293                {
1294                    let (table, obj) = Table::find_by_id(index.index_table_id)
1295                        .find_also_related(Object)
1296                        .one(txn)
1297                        .await?
1298                        .ok_or_else(|| {
1299                            MetaError::catalog_id_not_found("table", index.index_table_id)
1300                        })?;
1301                    objects.push(PbObject {
1302                        object_info: Some(PbObjectInfo::Table(
1303                            ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1304                        )),
1305                    });
1306                }
1307
1308                // If the index is created on a table with privileges, we should also
1309                // grant the privileges for the index and its state tables.
1310                let primary_table_privileges = UserPrivilege::find()
1311                    .filter(
1312                        user_privilege::Column::Oid
1313                            .eq(index.primary_table_id)
1314                            .and(user_privilege::Column::Action.eq(Action::Select)),
1315                    )
1316                    .all(txn)
1317                    .await?;
1318                if !primary_table_privileges.is_empty() {
1319                    let index_state_table_ids: Vec<TableId> = Table::find()
1320                        .select_only()
1321                        .column(table::Column::TableId)
1322                        .filter(
1323                            table::Column::BelongsToJobId
1324                                .eq(job_id)
1325                                .or(table::Column::TableId.eq(index.index_table_id)),
1326                        )
1327                        .into_tuple()
1328                        .all(txn)
1329                        .await?;
1330                    let mut new_privileges = vec![];
1331                    for privilege in &primary_table_privileges {
1332                        for state_table_id in &index_state_table_ids {
1333                            new_privileges.push(user_privilege::ActiveModel {
1334                                id: Default::default(),
1335                                oid: Set(state_table_id.as_object_id()),
1336                                user_id: Set(privilege.user_id),
1337                                action: Set(Action::Select),
1338                                dependent_id: Set(privilege.dependent_id),
1339                                granted_by: Set(privilege.granted_by),
1340                                with_grant_option: Set(privilege.with_grant_option),
1341                            });
1342                        }
1343                    }
1344                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1345
1346                    updated_user_info = list_user_info_by_ids(
1347                        primary_table_privileges.into_iter().map(|p| p.user_id),
1348                        txn,
1349                    )
1350                    .await?;
1351                }
1352
1353                objects.push(PbObject {
1354                    object_info: Some(PbObjectInfo::Index(
1355                        ObjectModel(index, obj.unwrap(), streaming_job).into(),
1356                    )),
1357                });
1358            }
1359            ObjectType::Source => {
1360                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1361                    .find_also_related(Object)
1362                    .one(txn)
1363                    .await?
1364                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1365                objects.push(PbObject {
1366                    object_info: Some(PbObjectInfo::Source(
1367                        ObjectModel(source, obj.unwrap(), None).into(),
1368                    )),
1369                });
1370            }
1371            _ => unreachable!("invalid job type: {:?}", job_type),
1372        }
1373
1374        if need_grant_default_privileges {
1375            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1376        }
1377
1378        let dependencies =
1379            list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1380
1381        Ok((notification_op, objects, updated_user_info, dependencies))
1382    }
1383
1384    pub async fn finish_replace_streaming_job(
1385        &self,
1386        tmp_id: JobId,
1387        streaming_job: StreamingJob,
1388        replace_upstream: FragmentReplaceUpstream,
1389        sink_into_table_context: SinkIntoTableContext,
1390        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1391        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1392    ) -> MetaResult<NotificationVersion> {
1393        let inner = self.inner.write().await;
1394        let txn = inner.db.begin().await?;
1395
1396        let (objects, delete_notification_objs, old_fragment_ids, new_fragment_ids) =
1397            Self::finish_replace_streaming_job_inner(
1398                tmp_id,
1399                replace_upstream,
1400                sink_into_table_context,
1401                &txn,
1402                streaming_job,
1403                drop_table_connector_ctx,
1404                auto_refresh_schema_sinks,
1405            )
1406            .await?;
1407
1408        txn.commit().await?;
1409
1410        // Notify serving module: delete old fragment mappings, upsert new ones.
1411        let notification_manager = self.env.notification_manager();
1412        notification_manager.notify_serving_fragment_mapping_delete(
1413            old_fragment_ids.iter().map(|id| *id as _).collect(),
1414        );
1415        notification_manager.notify_serving_fragment_mapping_update(
1416            new_fragment_ids.iter().map(|id| *id as _).collect(),
1417        );
1418
1419        let mut version = self
1420            .notify_frontend(
1421                NotificationOperation::Update,
1422                NotificationInfo::ObjectGroup(PbObjectGroup {
1423                    objects,
1424                    dependencies: vec![],
1425                }),
1426            )
1427            .await;
1428
1429        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1430            self.notify_users_update(user_infos).await;
1431            version = self
1432                .notify_frontend(
1433                    NotificationOperation::Delete,
1434                    build_object_group_for_delete(to_drop_objects),
1435                )
1436                .await;
1437        }
1438
1439        Ok(version)
1440    }
1441
1442    fn update_iceberg_source_columns(
1443        original_source_columns: &[ColumnCatalog],
1444        original_row_id_index: Option<usize>,
1445        new_table_columns: &[ColumnCatalog],
1446    ) -> (Vec<ColumnCatalog>, Option<usize>) {
1447        let row_id_column_name = original_row_id_index
1448            .and_then(|idx| original_source_columns.get(idx))
1449            .map(|col| col.name().to_owned());
1450        let mut next_column_id = max_column_id(original_source_columns).next();
1451        let existing_columns: HashMap<String, ColumnCatalog> = original_source_columns
1452            .iter()
1453            .cloned()
1454            .map(|col| (col.name().to_owned(), col))
1455            .collect();
1456
1457        let mut new_columns = Vec::new();
1458        for table_col in new_table_columns
1459            .iter()
1460            .filter(|col| !col.is_rw_sys_column())
1461        {
1462            let mut source_col_name = table_col.name().to_owned();
1463            if source_col_name == ROW_ID_COLUMN_NAME {
1464                source_col_name = RISINGWAVE_ICEBERG_ROW_ID.to_owned();
1465            }
1466
1467            if let Some(existing) = existing_columns.get(&source_col_name) {
1468                new_columns.push(existing.clone());
1469            } else {
1470                let mut new_col = table_col.clone();
1471                new_col.column_desc.name = source_col_name;
1472                new_col.column_desc.column_id = next_column_id;
1473                next_column_id = next_column_id.next();
1474                new_columns.push(new_col);
1475            }
1476        }
1477
1478        let mut seen_names: HashSet<String> = new_columns
1479            .iter()
1480            .map(|col| col.name().to_owned())
1481            .collect();
1482        for col in original_source_columns
1483            .iter()
1484            .filter(|col| col.is_iceberg_hidden_column())
1485        {
1486            if seen_names.insert(col.name().to_owned()) {
1487                new_columns.push(col.clone());
1488            }
1489        }
1490
1491        let new_row_id_index = row_id_column_name
1492            .as_ref()
1493            .and_then(|name| new_columns.iter().position(|col| col.name() == name));
1494
1495        (new_columns, new_row_id_index)
1496    }
1497
1498    pub async fn finish_replace_streaming_job_inner(
1499        tmp_id: JobId,
1500        replace_upstream: FragmentReplaceUpstream,
1501        SinkIntoTableContext {
1502            updated_sink_catalogs,
1503        }: SinkIntoTableContext,
1504        txn: &DatabaseTransaction,
1505        streaming_job: StreamingJob,
1506        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1507        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1508    ) -> MetaResult<(
1509        Vec<PbObject>,
1510        Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1511        Vec<FragmentId>,
1512        Vec<FragmentId>,
1513    )> {
1514        let original_job_id = streaming_job.id();
1515        let job_type = streaming_job.job_type();
1516
1517        // Query old fragment IDs (will be deleted) and new fragment IDs (will be reassigned).
1518        let old_fragment_ids: Vec<FragmentId> = Fragment::find()
1519            .select_only()
1520            .column(fragment::Column::FragmentId)
1521            .filter(fragment::Column::JobId.eq(original_job_id))
1522            .into_tuple()
1523            .all(txn)
1524            .await?;
1525        let new_fragment_ids: Vec<FragmentId> = Fragment::find()
1526            .select_only()
1527            .column(fragment::Column::FragmentId)
1528            .filter(fragment::Column::JobId.eq(tmp_id))
1529            .into_tuple()
1530            .all(txn)
1531            .await?;
1532
1533        let mut index_item_rewriter = None;
1534        let mut updated_iceberg_source_id: Option<SourceId> = None;
1535
1536        // Update catalog
1537        match streaming_job {
1538            StreamingJob::Table(_, table, _table_job_type) => {
1539                let original_column_catalogs =
1540                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1541                let schema_changed = original_column_catalogs.to_protobuf() != table.columns;
1542                let is_iceberg = table
1543                    .engine
1544                    .and_then(|engine| PbEngine::try_from(engine).ok())
1545                    == Some(PbEngine::Iceberg);
1546                if is_iceberg && schema_changed {
1547                    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1548                    let source = Source::find()
1549                        .inner_join(Object)
1550                        .filter(
1551                            object::Column::DatabaseId
1552                                .eq(table.database_id)
1553                                .and(object::Column::SchemaId.eq(table.schema_id))
1554                                .and(source::Column::Name.eq(&iceberg_source_name)),
1555                        )
1556                        .one(txn)
1557                        .await?
1558                        .ok_or_else(|| {
1559                            MetaError::catalog_id_not_found("source", iceberg_source_name)
1560                        })?;
1561
1562                    let source_id = source.source_id;
1563                    let source_version = source.version;
1564                    let source_columns: Vec<ColumnCatalog> = source
1565                        .columns
1566                        .to_protobuf()
1567                        .into_iter()
1568                        .map(ColumnCatalog::from)
1569                        .collect();
1570                    let source_row_id_index = source
1571                        .row_id_index
1572                        .and_then(|idx| usize::try_from(idx).ok());
1573                    let table_columns: Vec<ColumnCatalog> = table
1574                        .columns
1575                        .iter()
1576                        .cloned()
1577                        .map(ColumnCatalog::from)
1578                        .collect();
1579                    let (updated_columns, updated_row_id_index) =
1580                        Self::update_iceberg_source_columns(
1581                            &source_columns,
1582                            source_row_id_index,
1583                            &table_columns,
1584                        );
1585                    let updated_columns: Vec<PbColumnCatalog> = updated_columns
1586                        .into_iter()
1587                        .map(|col| col.to_protobuf())
1588                        .collect();
1589
1590                    let mut source_active = source.into_active_model();
1591                    source_active.columns = Set(ColumnCatalogArray::from(updated_columns));
1592                    source_active.row_id_index = Set(updated_row_id_index.map(|idx| idx as i32));
1593                    source_active.version = Set(source_version + 1);
1594                    source_active.update(txn).await?;
1595                    updated_iceberg_source_id = Some(source_id);
1596                }
1597
1598                index_item_rewriter = Some({
1599                    let original_columns = original_column_catalogs
1600                        .to_protobuf()
1601                        .into_iter()
1602                        .map(|c| c.column_desc.unwrap())
1603                        .collect_vec();
1604                    let new_columns = table
1605                        .columns
1606                        .iter()
1607                        .map(|c| c.column_desc.clone().unwrap())
1608                        .collect_vec();
1609
1610                    IndexItemRewriter {
1611                        original_columns,
1612                        new_columns,
1613                    }
1614                });
1615
1616                // For sinks created in earlier versions, we need to set the original_target_columns.
1617                for sink_id in updated_sink_catalogs {
1618                    Sink::update(sink::ActiveModel {
1619                        sink_id: Set(sink_id as _),
1620                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1621                        ..Default::default()
1622                    })
1623                    .exec(txn)
1624                    .await?;
1625                }
1626                // Update the table catalog with the new one. (column catalog is also updated here)
1627                let mut table = table::ActiveModel::from(table);
1628                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1629                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1630                {
1631                    // drop table connector, the rest logic is in `drop_table_associated_source`
1632                    table.optional_associated_source_id = Set(None);
1633                }
1634
1635                Table::update(table).exec(txn).await?;
1636            }
1637            StreamingJob::Source(source) => {
1638                // Update the source catalog with the new one.
1639                let source = source::ActiveModel::from(source);
1640                Source::update(source).exec(txn).await?;
1641            }
1642            StreamingJob::MaterializedView(table) => {
1643                // Update the table catalog with the new one.
1644                let table = table::ActiveModel::from(table);
1645                Table::update(table).exec(txn).await?;
1646            }
1647            _ => unreachable!(
1648                "invalid streaming job type: {:?}",
1649                streaming_job.job_type_str()
1650            ),
1651        }
1652
1653        async fn finish_fragments(
1654            txn: &DatabaseTransaction,
1655            tmp_id: JobId,
1656            original_job_id: JobId,
1657            replace_upstream: FragmentReplaceUpstream,
1658        ) -> MetaResult<()> {
1659            // 0. update internal tables
1660            // Fields including `fragment_id` were placeholder values before.
1661            // After table fragments are created, update them for all internal tables.
1662            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1663                .select_only()
1664                .columns([
1665                    fragment::Column::FragmentId,
1666                    fragment::Column::StateTableIds,
1667                ])
1668                .filter(fragment::Column::JobId.eq(tmp_id))
1669                .into_tuple()
1670                .all(txn)
1671                .await?;
1672            for (fragment_id, state_table_ids) in fragment_info {
1673                for state_table_id in state_table_ids.into_inner() {
1674                    let state_table_id = TableId::new(state_table_id as _);
1675                    Table::update(table::ActiveModel {
1676                        table_id: Set(state_table_id),
1677                        fragment_id: Set(Some(fragment_id)),
1678                        // No need to update `vnode_count` because it must remain the same.
1679                        ..Default::default()
1680                    })
1681                    .exec(txn)
1682                    .await?;
1683                }
1684            }
1685
1686            // 1. replace old fragments/actors with new ones.
1687            Fragment::delete_many()
1688                .filter(fragment::Column::JobId.eq(original_job_id))
1689                .exec(txn)
1690                .await?;
1691            Fragment::update_many()
1692                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1693                .filter(fragment::Column::JobId.eq(tmp_id))
1694                .exec(txn)
1695                .await?;
1696
1697            // 2. update merges.
1698            // update downstream fragment's Merge node, and upstream_fragment_id
1699            for (fragment_id, fragment_replace_map) in replace_upstream {
1700                let (fragment_id, mut stream_node) =
1701                    Fragment::find_by_id(fragment_id as FragmentId)
1702                        .select_only()
1703                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1704                        .into_tuple::<(FragmentId, StreamNode)>()
1705                        .one(txn)
1706                        .await?
1707                        .map(|(id, node)| (id, node.to_protobuf()))
1708                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1709
1710                visit_stream_node_mut(&mut stream_node, |body| {
1711                    if let PbNodeBody::Merge(m) = body
1712                        && let Some(new_fragment_id) =
1713                            fragment_replace_map.get(&m.upstream_fragment_id)
1714                    {
1715                        m.upstream_fragment_id = *new_fragment_id;
1716                    }
1717                });
1718                Fragment::update(fragment::ActiveModel {
1719                    fragment_id: Set(fragment_id),
1720                    stream_node: Set(StreamNode::from(&stream_node)),
1721                    ..Default::default()
1722                })
1723                .exec(txn)
1724                .await?;
1725            }
1726
1727            // 3. remove dummy object.
1728            Object::delete_by_id(tmp_id).exec(txn).await?;
1729
1730            Ok(())
1731        }
1732
1733        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1734
1735        // 4. update catalogs and notify.
1736        let mut objects = vec![];
1737        match job_type {
1738            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1739                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1740                    .find_also_related(Object)
1741                    .one(txn)
1742                    .await?
1743                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1744                let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1745                    .one(txn)
1746                    .await?;
1747                objects.push(PbObject {
1748                    object_info: Some(PbObjectInfo::Table(
1749                        ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1750                    )),
1751                })
1752            }
1753            StreamingJobType::Source => {
1754                let (source, source_obj) =
1755                    Source::find_by_id(original_job_id.as_shared_source_id())
1756                        .find_also_related(Object)
1757                        .one(txn)
1758                        .await?
1759                        .ok_or_else(|| {
1760                            MetaError::catalog_id_not_found("object", original_job_id)
1761                        })?;
1762                objects.push(PbObject {
1763                    object_info: Some(PbObjectInfo::Source(
1764                        ObjectModel(source, source_obj.unwrap(), None).into(),
1765                    )),
1766                })
1767            }
1768            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1769        }
1770
1771        if let Some(source_id) = updated_iceberg_source_id {
1772            let (source, source_obj) = Source::find_by_id(source_id)
1773                .find_also_related(Object)
1774                .one(txn)
1775                .await?
1776                .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1777            objects.push(PbObject {
1778                object_info: Some(PbObjectInfo::Source(
1779                    ObjectModel(source, source_obj.unwrap(), None).into(),
1780                )),
1781            });
1782        }
1783
1784        if let Some(expr_rewriter) = index_item_rewriter {
1785            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1786                .select_only()
1787                .columns([index::Column::IndexId, index::Column::IndexItems])
1788                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1789                .into_tuple()
1790                .all(txn)
1791                .await?;
1792            for (index_id, nodes) in index_items {
1793                let mut pb_nodes = nodes.to_protobuf();
1794                pb_nodes
1795                    .iter_mut()
1796                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1797                let index = index::ActiveModel {
1798                    index_id: Set(index_id),
1799                    index_items: Set(pb_nodes.into()),
1800                    ..Default::default()
1801                }
1802                .update(txn)
1803                .await?;
1804                let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1805                    .find_also_related(streaming_job::Entity)
1806                    .one(txn)
1807                    .await?
1808                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1809                objects.push(PbObject {
1810                    object_info: Some(PbObjectInfo::Index(
1811                        ObjectModel(index, index_obj, streaming_job).into(),
1812                    )),
1813                });
1814            }
1815        }
1816
1817        if let Some(sinks) = auto_refresh_schema_sinks {
1818            for finish_sink_context in sinks {
1819                finish_fragments(
1820                    txn,
1821                    finish_sink_context.tmp_sink_id.as_job_id(),
1822                    finish_sink_context.original_sink_id.as_job_id(),
1823                    Default::default(),
1824                )
1825                .await?;
1826                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1827                    .find_also_related(Object)
1828                    .one(txn)
1829                    .await?
1830                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1831                let sink_streaming_job =
1832                    streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1833                        .one(txn)
1834                        .await?;
1835                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1836                Sink::update(sink::ActiveModel {
1837                    sink_id: Set(finish_sink_context.original_sink_id),
1838                    columns: Set(columns.clone()),
1839                    ..Default::default()
1840                })
1841                .exec(txn)
1842                .await?;
1843                sink.columns = columns;
1844                objects.push(PbObject {
1845                    object_info: Some(PbObjectInfo::Sink(
1846                        ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1847                    )),
1848                });
1849                if let Some(new_log_store_table) = finish_sink_context.new_log_store_table {
1850                    let log_store_table_id = new_log_store_table.id;
1851                    let new_log_store_table_columns: ColumnCatalogArray =
1852                        new_log_store_table.columns.clone().into();
1853                    let new_log_store_table_value_indices =
1854                        new_log_store_table.value_indices.clone();
1855                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1856                        .find_also_related(Object)
1857                        .one(txn)
1858                        .await?
1859                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1860                    Table::update(table::ActiveModel {
1861                        table_id: Set(log_store_table_id),
1862                        columns: Set(new_log_store_table_columns.clone()),
1863                        value_indices: Set(new_log_store_table_value_indices.clone().into()),
1864                        ..Default::default()
1865                    })
1866                    .exec(txn)
1867                    .await?;
1868                    table.columns = new_log_store_table_columns;
1869                    table.value_indices = new_log_store_table_value_indices.into();
1870                    objects.push(PbObject {
1871                        object_info: Some(PbObjectInfo::Table(
1872                            ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1873                                .into(),
1874                        )),
1875                    });
1876                }
1877            }
1878        }
1879
1880        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1881        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1882            notification_objs =
1883                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1884        }
1885
1886        Ok((
1887            objects,
1888            notification_objs,
1889            old_fragment_ids,
1890            new_fragment_ids,
1891        ))
1892    }
1893
1894    /// Abort the replacing streaming job by deleting the temporary job object.
1895    pub async fn try_abort_replacing_streaming_job(
1896        &self,
1897        tmp_job_id: JobId,
1898        tmp_sink_ids: Option<Vec<ObjectId>>,
1899    ) -> MetaResult<()> {
1900        let inner = self.inner.write().await;
1901        let txn = inner.db.begin().await?;
1902
1903        // Query fragment IDs of the temp job and temp sinks before cascade-deleting them.
1904        let mut all_job_ids: Vec<ObjectId> = vec![tmp_job_id.into()];
1905        if let Some(ref sink_ids) = tmp_sink_ids {
1906            all_job_ids.extend(sink_ids.iter().copied());
1907        }
1908        let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
1909            .select_only()
1910            .column(fragment::Column::FragmentId)
1911            .filter(fragment::Column::JobId.is_in(all_job_ids))
1912            .into_tuple()
1913            .all(&txn)
1914            .await?;
1915
1916        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1917        if let Some(tmp_sink_ids) = tmp_sink_ids {
1918            for tmp_sink_id in tmp_sink_ids {
1919                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1920            }
1921        }
1922        txn.commit().await?;
1923
1924        // Notify serving module about deleted fragments from the aborted replace job.
1925        self.env
1926            .notification_manager()
1927            .notify_serving_fragment_mapping_delete(
1928                abort_fragment_ids.iter().map(|id| *id as _).collect(),
1929            );
1930
1931        Ok(())
1932    }
1933
1934    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1935    // return the actor_ids to be applied
1936    pub async fn update_source_rate_limit_by_source_id(
1937        &self,
1938        source_id: SourceId,
1939        rate_limit: Option<u32>,
1940    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1941        let inner = self.inner.read().await;
1942        let txn = inner.db.begin().await?;
1943
1944        {
1945            let active_source = source::ActiveModel {
1946                source_id: Set(source_id),
1947                rate_limit: Set(rate_limit.map(|v| v as i32)),
1948                ..Default::default()
1949            };
1950            Source::update(active_source).exec(&txn).await?;
1951        }
1952
1953        let (source, obj) = Source::find_by_id(source_id)
1954            .find_also_related(Object)
1955            .one(&txn)
1956            .await?
1957            .ok_or_else(|| {
1958                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1959            })?;
1960
1961        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1962        let streaming_job_ids: Vec<JobId> =
1963            if let Some(table_id) = source.optional_associated_table_id {
1964                vec![table_id.as_job_id()]
1965            } else if let Some(source_info) = &source.source_info
1966                && source_info.to_protobuf().is_shared()
1967            {
1968                vec![source_id.as_share_source_job_id()]
1969            } else {
1970                ObjectDependency::find()
1971                    .select_only()
1972                    .column(object_dependency::Column::UsedBy)
1973                    .filter(object_dependency::Column::Oid.eq(source_id))
1974                    .into_tuple()
1975                    .all(&txn)
1976                    .await?
1977            };
1978
1979        if streaming_job_ids.is_empty() {
1980            return Err(MetaError::invalid_parameter(format!(
1981                "source id {source_id} not used by any streaming job"
1982            )));
1983        }
1984
1985        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1986            .select_only()
1987            .columns([
1988                fragment::Column::FragmentId,
1989                fragment::Column::JobId,
1990                fragment::Column::FragmentTypeMask,
1991                fragment::Column::StreamNode,
1992            ])
1993            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1994            .into_tuple()
1995            .all(&txn)
1996            .await?;
1997        let mut fragments = fragments
1998            .into_iter()
1999            .map(|(id, job_id, mask, stream_node)| {
2000                (
2001                    id,
2002                    job_id,
2003                    FragmentTypeMask::from(mask as u32),
2004                    stream_node.to_protobuf(),
2005                )
2006            })
2007            .collect_vec();
2008
2009        fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
2010            let mut found = false;
2011            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
2012                visit_stream_node_mut(stream_node, |node| {
2013                    if let PbNodeBody::Source(node) = node
2014                        && let Some(node_inner) = &mut node.source_inner
2015                        && node_inner.source_id == source_id
2016                    {
2017                        node_inner.rate_limit = rate_limit;
2018                        found = true;
2019                    }
2020                });
2021            }
2022            if is_fs_source {
2023                // in older versions, there's no fragment type flag for `FsFetch` node,
2024                // so we just scan all fragments for StreamFsFetch node if using fs connector
2025                visit_stream_node_mut(stream_node, |node| {
2026                    if let PbNodeBody::StreamFsFetch(node) = node {
2027                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
2028                        if let Some(node_inner) = &mut node.node_inner
2029                            && node_inner.source_id == source_id
2030                        {
2031                            node_inner.rate_limit = rate_limit;
2032                            found = true;
2033                        }
2034                    }
2035                });
2036            }
2037            found
2038        });
2039
2040        assert!(
2041            !fragments.is_empty(),
2042            "source id should be used by at least one fragment"
2043        );
2044
2045        let (fragment_ids, job_ids) = fragments
2046            .iter()
2047            .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
2048            .unzip();
2049
2050        for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
2051            Fragment::update(fragment::ActiveModel {
2052                fragment_id: Set(fragment_id),
2053                fragment_type_mask: Set(fragment_type_mask.into()),
2054                stream_node: Set(StreamNode::from(&stream_node)),
2055                ..Default::default()
2056            })
2057            .exec(&txn)
2058            .await?;
2059        }
2060
2061        txn.commit().await?;
2062
2063        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
2064        let _version = self
2065            .notify_frontend(
2066                NotificationOperation::Update,
2067                NotificationInfo::ObjectGroup(PbObjectGroup {
2068                    objects: vec![PbObject {
2069                        object_info: Some(relation_info),
2070                    }],
2071                    dependencies: vec![],
2072                }),
2073            )
2074            .await;
2075
2076        Ok((job_ids, fragment_ids))
2077    }
2078
2079    // edit the content of fragments in given `table_id`
2080    // return the actor_ids to be applied
2081    pub async fn mutate_fragments_by_job_id(
2082        &self,
2083        job_id: JobId,
2084        // returns true if the mutation is applied
2085        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
2086        // error message when no relevant fragments is found
2087        err_msg: &'static str,
2088    ) -> MetaResult<HashSet<FragmentId>> {
2089        let inner = self.inner.read().await;
2090        let txn = inner.db.begin().await?;
2091
2092        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2093            .select_only()
2094            .columns([
2095                fragment::Column::FragmentId,
2096                fragment::Column::FragmentTypeMask,
2097                fragment::Column::StreamNode,
2098            ])
2099            .filter(fragment::Column::JobId.eq(job_id))
2100            .into_tuple()
2101            .all(&txn)
2102            .await?;
2103        let mut fragments = fragments
2104            .into_iter()
2105            .map(|(id, mask, stream_node)| {
2106                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
2107            })
2108            .collect_vec();
2109
2110        let fragments = fragments
2111            .iter_mut()
2112            .map(|(_, fragment_type_mask, stream_node)| {
2113                fragments_mutation_fn(*fragment_type_mask, stream_node)
2114            })
2115            .collect::<MetaResult<Vec<bool>>>()?
2116            .into_iter()
2117            .zip_eq_debug(std::mem::take(&mut fragments))
2118            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
2119            .collect::<Vec<_>>();
2120
2121        if fragments.is_empty() {
2122            return Err(MetaError::invalid_parameter(format!(
2123                "job id {job_id}: {}",
2124                err_msg
2125            )));
2126        }
2127
2128        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
2129        for (id, _, stream_node) in fragments {
2130            Fragment::update(fragment::ActiveModel {
2131                fragment_id: Set(id),
2132                stream_node: Set(StreamNode::from(&stream_node)),
2133                ..Default::default()
2134            })
2135            .exec(&txn)
2136            .await?;
2137        }
2138
2139        txn.commit().await?;
2140
2141        Ok(fragment_ids)
2142    }
2143
2144    async fn mutate_fragment_by_fragment_id(
2145        &self,
2146        fragment_id: FragmentId,
2147        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
2148        err_msg: &'static str,
2149    ) -> MetaResult<()> {
2150        let inner = self.inner.read().await;
2151        let txn = inner.db.begin().await?;
2152
2153        let (fragment_type_mask, stream_node): (i32, StreamNode) =
2154            Fragment::find_by_id(fragment_id)
2155                .select_only()
2156                .columns([
2157                    fragment::Column::FragmentTypeMask,
2158                    fragment::Column::StreamNode,
2159                ])
2160                .into_tuple()
2161                .one(&txn)
2162                .await?
2163                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2164        let mut pb_stream_node = stream_node.to_protobuf();
2165        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
2166
2167        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
2168            return Err(MetaError::invalid_parameter(format!(
2169                "fragment id {fragment_id}: {}",
2170                err_msg
2171            )));
2172        }
2173
2174        Fragment::update(fragment::ActiveModel {
2175            fragment_id: Set(fragment_id),
2176            stream_node: Set(stream_node),
2177            ..Default::default()
2178        })
2179        .exec(&txn)
2180        .await?;
2181
2182        txn.commit().await?;
2183
2184        Ok(())
2185    }
2186
2187    pub async fn update_backfill_orders_by_job_id(
2188        &self,
2189        job_id: JobId,
2190        backfill_orders: Option<BackfillOrders>,
2191    ) -> MetaResult<()> {
2192        let inner = self.inner.write().await;
2193        let txn = inner.db.begin().await?;
2194
2195        ensure_job_not_canceled(job_id, &txn).await?;
2196
2197        streaming_job::ActiveModel {
2198            job_id: Set(job_id),
2199            backfill_orders: Set(backfill_orders),
2200            ..Default::default()
2201        }
2202        .update(&txn)
2203        .await?;
2204
2205        txn.commit().await?;
2206
2207        Ok(())
2208    }
2209
2210    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
2211    // return the actor_ids to be applied
2212    pub async fn update_backfill_rate_limit_by_job_id(
2213        &self,
2214        job_id: JobId,
2215        rate_limit: Option<u32>,
2216    ) -> MetaResult<HashSet<FragmentId>> {
2217        let update_backfill_rate_limit =
2218            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2219                let mut found = false;
2220                if fragment_type_mask
2221                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
2222                {
2223                    visit_stream_node_mut(stream_node, |node| match node {
2224                        PbNodeBody::StreamCdcScan(node) => {
2225                            node.rate_limit = rate_limit;
2226                            found = true;
2227                        }
2228                        PbNodeBody::StreamScan(node) => {
2229                            node.rate_limit = rate_limit;
2230                            found = true;
2231                        }
2232                        PbNodeBody::SourceBackfill(node) => {
2233                            node.rate_limit = rate_limit;
2234                            found = true;
2235                        }
2236                        _ => {}
2237                    });
2238                }
2239                Ok(found)
2240            };
2241
2242        self.mutate_fragments_by_job_id(
2243            job_id,
2244            update_backfill_rate_limit,
2245            "stream scan node or source node not found",
2246        )
2247        .await
2248    }
2249
2250    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
2251    // return the actor_ids to be applied
2252    pub async fn update_sink_rate_limit_by_job_id(
2253        &self,
2254        sink_id: SinkId,
2255        rate_limit: Option<u32>,
2256    ) -> MetaResult<HashSet<FragmentId>> {
2257        let update_sink_rate_limit =
2258            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2259                let mut found = Ok(false);
2260                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2261                    visit_stream_node_mut(stream_node, |node| {
2262                        if let PbNodeBody::Sink(node) = node {
2263                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2264                                found = Err(MetaError::invalid_parameter(
2265                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2266                                ));
2267                                return;
2268                            }
2269                            node.rate_limit = rate_limit;
2270                            found = Ok(true);
2271                        }
2272                    });
2273                }
2274                found
2275            };
2276
2277        self.mutate_fragments_by_job_id(
2278            sink_id.as_job_id(),
2279            update_sink_rate_limit,
2280            "sink node not found",
2281        )
2282        .await
2283    }
2284
2285    pub async fn update_dml_rate_limit_by_job_id(
2286        &self,
2287        job_id: JobId,
2288        rate_limit: Option<u32>,
2289    ) -> MetaResult<HashSet<FragmentId>> {
2290        let update_dml_rate_limit =
2291            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2292                let mut found = false;
2293                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2294                    visit_stream_node_mut(stream_node, |node| {
2295                        if let PbNodeBody::Dml(node) = node {
2296                            node.rate_limit = rate_limit;
2297                            found = true;
2298                        }
2299                    });
2300                }
2301                Ok(found)
2302            };
2303
2304        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2305            .await
2306    }
2307
2308    pub async fn update_source_props_by_source_id(
2309        &self,
2310        source_id: SourceId,
2311        alter_props: BTreeMap<String, String>,
2312        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2313        skip_alter_on_fly_check: bool,
2314    ) -> MetaResult<WithOptionsSecResolved> {
2315        let inner = self.inner.read().await;
2316        let txn = inner.db.begin().await?;
2317
2318        let (source, _obj) = Source::find_by_id(source_id)
2319            .find_also_related(Object)
2320            .one(&txn)
2321            .await?
2322            .ok_or_else(|| {
2323                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2324            })?;
2325        let connector = source.with_properties.0.get_connector().unwrap();
2326        let is_shared_source = source.is_shared();
2327
2328        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2329        if !is_shared_source {
2330            // mv using non-shared source holds a copy of source in their fragments
2331            dep_source_job_ids = ObjectDependency::find()
2332                .select_only()
2333                .column(object_dependency::Column::UsedBy)
2334                .filter(object_dependency::Column::Oid.eq(source_id))
2335                .into_tuple()
2336                .all(&txn)
2337                .await?;
2338        }
2339
2340        // Validate that connector type is not being changed
2341        if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2342            && new_connector != &connector
2343        {
2344            return Err(MetaError::invalid_parameter(format!(
2345                "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2346                connector, new_connector
2347            )));
2348        }
2349
2350        // Only check alter-on-fly restrictions for SQL ALTER SOURCE, not for admin risectl operations
2351        if !skip_alter_on_fly_check {
2352            let prop_keys: Vec<String> = alter_props
2353                .keys()
2354                .chain(alter_secret_refs.keys())
2355                .cloned()
2356                .collect();
2357            risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2358                &connector, &prop_keys,
2359            )?;
2360        }
2361
2362        let mut options_with_secret = WithOptionsSecResolved::new(
2363            source.with_properties.0.clone(),
2364            source
2365                .secret_ref
2366                .map(|secret_ref| secret_ref.to_protobuf())
2367                .unwrap_or_default(),
2368        );
2369        let (to_add_secret_dep, to_remove_secret_dep) =
2370            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2371
2372        tracing::info!(
2373            "applying new properties to source: source_id={}, options_with_secret={:?}",
2374            source_id,
2375            options_with_secret
2376        );
2377        // check if the alter-ed props are valid for each Connector
2378        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2379        // todo: validate via source manager
2380
2381        let mut associate_table_id = None;
2382
2383        // can be source_id or table_id
2384        // if updating an associated source, the preferred_id is the table_id
2385        // otherwise, it is the source_id
2386        let mut preferred_id = source_id.as_object_id();
2387        let rewrite_sql = {
2388            let definition = source.definition.clone();
2389
2390            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2391                .map_err(|e| {
2392                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2393                        anyhow!(e).context("Failed to parse source definition SQL"),
2394                    )))
2395                })?
2396                .try_into()
2397                .unwrap();
2398
2399            /// Formats SQL options with secret values properly resolved
2400            ///
2401            /// This function processes configuration options that may contain sensitive data:
2402            /// - Plaintext options are directly converted to `SqlOption`
2403            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2404            ///   without exposing the actual secret value
2405            ///
2406            /// # Arguments
2407            /// * `txn` - Database transaction for retrieving secrets
2408            /// * `options_with_secret` - Container of options with both plaintext and secret values
2409            ///
2410            /// # Returns
2411            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2412            async fn format_with_option_secret_resolved(
2413                txn: &DatabaseTransaction,
2414                options_with_secret: &WithOptionsSecResolved,
2415            ) -> MetaResult<Vec<SqlOption>> {
2416                let mut options = Vec::new();
2417                for (k, v) in options_with_secret.as_plaintext() {
2418                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2419                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2420                    options.push(sql_option);
2421                }
2422                for (k, v) in options_with_secret.as_secret() {
2423                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2424                        let sql_option =
2425                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2426                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2427                        options.push(sql_option);
2428                    } else {
2429                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2430                    }
2431                }
2432                Ok(options)
2433            }
2434
2435            match &mut stmt {
2436                Statement::CreateSource { stmt } => {
2437                    stmt.with_properties.0 =
2438                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2439                }
2440                Statement::CreateTable { with_options, .. } => {
2441                    *with_options =
2442                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2443                    associate_table_id = source.optional_associated_table_id;
2444                    preferred_id = associate_table_id.unwrap().as_object_id();
2445                }
2446                _ => unreachable!(),
2447            }
2448
2449            stmt.to_string()
2450        };
2451
2452        {
2453            // Update secret dependencies atomically within the transaction.
2454            // Add new dependencies for secrets that are newly referenced.
2455            if !to_add_secret_dep.is_empty() {
2456                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2457                    object_dependency::ActiveModel {
2458                        oid: Set(secret_id.into()),
2459                        used_by: Set(preferred_id),
2460                        ..Default::default()
2461                    }
2462                }))
2463                .exec(&txn)
2464                .await?;
2465            }
2466            // Remove dependencies for secrets that are no longer referenced.
2467            // This allows the secrets to be deleted after this source no longer uses them.
2468            if !to_remove_secret_dep.is_empty() {
2469                let _ = ObjectDependency::delete_many()
2470                    .filter(
2471                        object_dependency::Column::Oid
2472                            .is_in(to_remove_secret_dep)
2473                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2474                    )
2475                    .exec(&txn)
2476                    .await?;
2477            }
2478        }
2479
2480        let active_source_model = source::ActiveModel {
2481            source_id: Set(source_id),
2482            definition: Set(rewrite_sql.clone()),
2483            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2484            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2485                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2486            ..Default::default()
2487        };
2488        Source::update(active_source_model).exec(&txn).await?;
2489
2490        if let Some(associate_table_id) = associate_table_id {
2491            // update the associated table statement accordly
2492            let active_table_model = table::ActiveModel {
2493                table_id: Set(associate_table_id),
2494                definition: Set(rewrite_sql),
2495                ..Default::default()
2496            };
2497            Table::update(active_table_model).exec(&txn).await?;
2498        }
2499
2500        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2501            // if updating table with connector, the fragment_id is table id
2502            associate_table_id.as_job_id()
2503        } else {
2504            source_id.as_share_source_job_id()
2505        }]
2506        .into_iter()
2507        .chain(dep_source_job_ids.into_iter())
2508        .collect_vec();
2509
2510        // update fragments
2511        update_connector_props_fragments(
2512            &txn,
2513            to_check_job_ids,
2514            FragmentTypeFlag::Source,
2515            |node, found| {
2516                if let PbNodeBody::Source(node) = node
2517                    && let Some(source_inner) = &mut node.source_inner
2518                {
2519                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2520                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2521                    *found = true;
2522                }
2523            },
2524            is_shared_source,
2525        )
2526        .await?;
2527
2528        let mut to_update_objs = Vec::with_capacity(2);
2529        let (source, obj) = Source::find_by_id(source_id)
2530            .find_also_related(Object)
2531            .one(&txn)
2532            .await?
2533            .ok_or_else(|| {
2534                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2535            })?;
2536        to_update_objs.push(PbObject {
2537            object_info: Some(PbObjectInfo::Source(
2538                ObjectModel(source, obj.unwrap(), None).into(),
2539            )),
2540        });
2541
2542        if let Some(associate_table_id) = associate_table_id {
2543            let (table, obj) = Table::find_by_id(associate_table_id)
2544                .find_also_related(Object)
2545                .one(&txn)
2546                .await?
2547                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2548            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2549                .one(&txn)
2550                .await?;
2551            to_update_objs.push(PbObject {
2552                object_info: Some(PbObjectInfo::Table(
2553                    ObjectModel(table, obj.unwrap(), streaming_job).into(),
2554                )),
2555            });
2556        }
2557
2558        txn.commit().await?;
2559
2560        self.notify_frontend(
2561            NotificationOperation::Update,
2562            NotificationInfo::ObjectGroup(PbObjectGroup {
2563                objects: to_update_objs,
2564                dependencies: vec![],
2565            }),
2566        )
2567        .await;
2568
2569        Ok(options_with_secret)
2570    }
2571
2572    pub async fn update_sink_props_by_sink_id(
2573        &self,
2574        sink_id: SinkId,
2575        props: BTreeMap<String, String>,
2576    ) -> MetaResult<HashMap<String, String>> {
2577        let inner = self.inner.read().await;
2578        let txn = inner.db.begin().await?;
2579
2580        let (sink, _obj) = Sink::find_by_id(sink_id)
2581            .find_also_related(Object)
2582            .one(&txn)
2583            .await?
2584            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2585        validate_sink_props(&sink, &props)?;
2586        let definition = sink.definition.clone();
2587        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2588            .map_err(|e| SinkError::Config(anyhow!(e)))?
2589            .try_into()
2590            .unwrap();
2591        if let Statement::CreateSink { stmt } = &mut stmt {
2592            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2593        } else {
2594            panic!("definition is not a create sink statement")
2595        }
2596        let mut new_config = sink.properties.clone().into_inner();
2597        new_config.extend(props.clone());
2598
2599        let definition = stmt.to_string();
2600        let active_sink = sink::ActiveModel {
2601            sink_id: Set(sink_id),
2602            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2603            definition: Set(definition),
2604            ..Default::default()
2605        };
2606        Sink::update(active_sink).exec(&txn).await?;
2607
2608        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2609        let (sink, obj) = Sink::find_by_id(sink_id)
2610            .find_also_related(Object)
2611            .one(&txn)
2612            .await?
2613            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2614        let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2615            .one(&txn)
2616            .await?;
2617        txn.commit().await?;
2618        let relation_infos = vec![PbObject {
2619            object_info: Some(PbObjectInfo::Sink(
2620                ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2621            )),
2622        }];
2623
2624        let _version = self
2625            .notify_frontend(
2626                NotificationOperation::Update,
2627                NotificationInfo::ObjectGroup(PbObjectGroup {
2628                    objects: relation_infos,
2629                    dependencies: vec![],
2630                }),
2631            )
2632            .await;
2633
2634        Ok(props.into_iter().collect())
2635    }
2636
2637    pub async fn update_iceberg_table_props_by_table_id(
2638        &self,
2639        table_id: TableId,
2640        props: BTreeMap<String, String>,
2641        alter_iceberg_table_props: Option<
2642            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2643        >,
2644    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2645        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2646            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2647        let inner = self.inner.read().await;
2648        let txn = inner.db.begin().await?;
2649
2650        let (sink, _obj) = Sink::find_by_id(sink_id)
2651            .find_also_related(Object)
2652            .one(&txn)
2653            .await?
2654            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2655        validate_sink_props(&sink, &props)?;
2656
2657        let definition = sink.definition.clone();
2658        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2659            .map_err(|e| SinkError::Config(anyhow!(e)))?
2660            .try_into()
2661            .unwrap();
2662        if let Statement::CreateTable {
2663            with_options,
2664            engine,
2665            ..
2666        } = &mut stmt
2667        {
2668            if !matches!(engine, Engine::Iceberg) {
2669                return Err(SinkError::Config(anyhow!(
2670                    "only iceberg table can be altered as sink"
2671                ))
2672                .into());
2673            }
2674            update_stmt_with_props(with_options, &props)?;
2675        } else {
2676            panic!("definition is not a create iceberg table statement")
2677        }
2678        let mut new_config = sink.properties.clone().into_inner();
2679        new_config.extend(props.clone());
2680
2681        let definition = stmt.to_string();
2682        let active_sink = sink::ActiveModel {
2683            sink_id: Set(sink_id),
2684            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2685            definition: Set(definition.clone()),
2686            ..Default::default()
2687        };
2688        let active_source = source::ActiveModel {
2689            source_id: Set(source_id),
2690            definition: Set(definition.clone()),
2691            ..Default::default()
2692        };
2693        let active_table = table::ActiveModel {
2694            table_id: Set(table_id),
2695            definition: Set(definition),
2696            ..Default::default()
2697        };
2698        Sink::update(active_sink).exec(&txn).await?;
2699        Source::update(active_source).exec(&txn).await?;
2700        Table::update(active_table).exec(&txn).await?;
2701
2702        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2703
2704        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2705            .find_also_related(Object)
2706            .one(&txn)
2707            .await?
2708            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2709        let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2710            .one(&txn)
2711            .await?;
2712        let (source, source_obj) = Source::find_by_id(source_id)
2713            .find_also_related(Object)
2714            .one(&txn)
2715            .await?
2716            .ok_or_else(|| {
2717                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2718            })?;
2719        let (table, table_obj) = Table::find_by_id(table_id)
2720            .find_also_related(Object)
2721            .one(&txn)
2722            .await?
2723            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2724        let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2725            .one(&txn)
2726            .await?;
2727        txn.commit().await?;
2728        let relation_infos = vec![
2729            PbObject {
2730                object_info: Some(PbObjectInfo::Sink(
2731                    ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2732                )),
2733            },
2734            PbObject {
2735                object_info: Some(PbObjectInfo::Source(
2736                    ObjectModel(source, source_obj.unwrap(), None).into(),
2737                )),
2738            },
2739            PbObject {
2740                object_info: Some(PbObjectInfo::Table(
2741                    ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2742                )),
2743            },
2744        ];
2745        let _version = self
2746            .notify_frontend(
2747                NotificationOperation::Update,
2748                NotificationInfo::ObjectGroup(PbObjectGroup {
2749                    objects: relation_infos,
2750                    dependencies: vec![],
2751                }),
2752            )
2753            .await;
2754
2755        Ok((props.into_iter().collect(), sink_id))
2756    }
2757
2758    /// Update connection properties and all dependent sources/sinks in a single transaction
2759    pub async fn update_connection_and_dependent_objects_props(
2760        &self,
2761        connection_id: ConnectionId,
2762        alter_props: BTreeMap<String, String>,
2763        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2764    ) -> MetaResult<(
2765        WithOptionsSecResolved,                   // Connection's new properties
2766        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2767        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2768    )> {
2769        let inner = self.inner.read().await;
2770        let txn = inner.db.begin().await?;
2771
2772        // Find all dependent sources and sinks first
2773        let dependent_sources: Vec<SourceId> = Source::find()
2774            .select_only()
2775            .column(source::Column::SourceId)
2776            .filter(source::Column::ConnectionId.eq(connection_id))
2777            .into_tuple()
2778            .all(&txn)
2779            .await?;
2780
2781        let dependent_sinks: Vec<SinkId> = Sink::find()
2782            .select_only()
2783            .column(sink::Column::SinkId)
2784            .filter(sink::Column::ConnectionId.eq(connection_id))
2785            .into_tuple()
2786            .all(&txn)
2787            .await?;
2788
2789        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2790            .find_also_related(Object)
2791            .one(&txn)
2792            .await?
2793            .ok_or_else(|| {
2794                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2795            })?;
2796
2797        // Validate that props can be altered
2798        let prop_keys: Vec<String> = alter_props
2799            .keys()
2800            .chain(alter_secret_refs.keys())
2801            .cloned()
2802            .collect();
2803
2804        // Map the connection type enum to the string name expected by the validation function
2805        let connection_type_str = pb_connection_type_to_connection_type(
2806            &connection_catalog.params.to_protobuf().connection_type(),
2807        )
2808        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2809
2810        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2811            connection_type_str, &prop_keys,
2812        )?;
2813
2814        let connection_pb = connection_catalog.params.to_protobuf();
2815        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2816            connection_pb.properties.into_iter().collect(),
2817            connection_pb.secret_refs.into_iter().collect(),
2818        );
2819
2820        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2821            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2822
2823        tracing::debug!(
2824            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2825            connection_id,
2826            dependent_sources,
2827            dependent_sinks
2828        );
2829
2830        // Validate connection
2831        {
2832            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2833                connection_type: connection_pb.connection_type,
2834                properties: connection_options_with_secret
2835                    .as_plaintext()
2836                    .clone()
2837                    .into_iter()
2838                    .collect(),
2839                secret_refs: connection_options_with_secret
2840                    .as_secret()
2841                    .clone()
2842                    .into_iter()
2843                    .collect(),
2844            };
2845            let connection = PbConnection {
2846                id: connection_id as _,
2847                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2848                    conn_params_pb,
2849                )),
2850                ..Default::default()
2851            };
2852            validate_connection(&connection).await?;
2853        }
2854
2855        // Update connection secret dependencies
2856        if !to_add_secret_dep.is_empty() {
2857            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2858                object_dependency::ActiveModel {
2859                    oid: Set(secret_id.into()),
2860                    used_by: Set(connection_id.as_object_id()),
2861                    ..Default::default()
2862                }
2863            }))
2864            .exec(&txn)
2865            .await?;
2866        }
2867        if !to_remove_secret_dep.is_empty() {
2868            let _ = ObjectDependency::delete_many()
2869                .filter(
2870                    object_dependency::Column::Oid
2871                        .is_in(to_remove_secret_dep)
2872                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2873                )
2874                .exec(&txn)
2875                .await?;
2876        }
2877
2878        // Update the connection with new properties
2879        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2880            connection_type: connection_pb.connection_type,
2881            properties: connection_options_with_secret
2882                .as_plaintext()
2883                .clone()
2884                .into_iter()
2885                .collect(),
2886            secret_refs: connection_options_with_secret
2887                .as_secret()
2888                .clone()
2889                .into_iter()
2890                .collect(),
2891        };
2892        let active_connection_model = connection::ActiveModel {
2893            connection_id: Set(connection_id),
2894            params: Set(ConnectionParams::from(&updated_connection_params)),
2895            ..Default::default()
2896        };
2897        Connection::update(active_connection_model)
2898            .exec(&txn)
2899            .await?;
2900
2901        // Batch update dependent sources and collect their complete properties
2902        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2903
2904        if !dependent_sources.is_empty() {
2905            // Batch fetch all dependent sources
2906            let sources_with_objs = Source::find()
2907                .find_also_related(Object)
2908                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2909                .all(&txn)
2910                .await?;
2911
2912            // Prepare batch updates
2913            let mut source_updates = Vec::new();
2914            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2915
2916            for (source, _obj) in sources_with_objs {
2917                let source_id = source.source_id;
2918
2919                let mut source_options_with_secret = WithOptionsSecResolved::new(
2920                    source.with_properties.0.clone(),
2921                    source
2922                        .secret_ref
2923                        .clone()
2924                        .map(|secret_ref| secret_ref.to_protobuf())
2925                        .unwrap_or_default(),
2926                );
2927                let (source_to_add_secret_dep, source_to_remove_secret_dep) =
2928                    source_options_with_secret
2929                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2930
2931                // Validate the updated source properties
2932                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2933
2934                // Keep source-level secret dependencies in sync with the source properties that
2935                // are rewritten from the altered connection.
2936                let source_used_by_id = source
2937                    .optional_associated_table_id
2938                    .map(|table_id| table_id.as_object_id())
2939                    .unwrap_or_else(|| source_id.as_object_id());
2940                if !source_to_add_secret_dep.is_empty() {
2941                    ObjectDependency::insert_many(source_to_add_secret_dep.into_iter().map(
2942                        |secret_id| object_dependency::ActiveModel {
2943                            oid: Set(secret_id.into()),
2944                            used_by: Set(source_used_by_id),
2945                            ..Default::default()
2946                        },
2947                    ))
2948                    .exec(&txn)
2949                    .await?;
2950                }
2951                if !source_to_remove_secret_dep.is_empty() {
2952                    let _ = ObjectDependency::delete_many()
2953                        .filter(
2954                            object_dependency::Column::Oid
2955                                .is_in(source_to_remove_secret_dep)
2956                                .and(object_dependency::Column::UsedBy.eq(source_used_by_id)),
2957                        )
2958                        .exec(&txn)
2959                        .await?;
2960                }
2961
2962                // Prepare source update
2963                let active_source = source::ActiveModel {
2964                    source_id: Set(source_id),
2965                    with_properties: Set(Property(
2966                        source_options_with_secret.as_plaintext().clone(),
2967                    )),
2968                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2969                        || {
2970                            risingwave_meta_model::SecretRef::from(
2971                                source_options_with_secret.as_secret().clone(),
2972                            )
2973                        },
2974                    )),
2975                    ..Default::default()
2976                };
2977                source_updates.push(active_source);
2978
2979                // Prepare fragment update:
2980                // - If the source is a table-associated source, update fragments for the table job.
2981                // - Otherwise update the shared source job.
2982                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2983                let is_shared_source = source.is_shared();
2984                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2985                if !is_shared_source {
2986                    dep_source_job_ids = ObjectDependency::find()
2987                        .select_only()
2988                        .column(object_dependency::Column::UsedBy)
2989                        .filter(object_dependency::Column::Oid.eq(source_id))
2990                        .into_tuple()
2991                        .all(&txn)
2992                        .await?;
2993                }
2994
2995                let base_job_id =
2996                    if let Some(associate_table_id) = source.optional_associated_table_id {
2997                        associate_table_id.as_job_id()
2998                    } else {
2999                        source_id.as_share_source_job_id()
3000                    };
3001                let job_ids = vec![base_job_id]
3002                    .into_iter()
3003                    .chain(dep_source_job_ids.into_iter())
3004                    .collect_vec();
3005
3006                fragment_updates.push(DependentSourceFragmentUpdate {
3007                    job_ids,
3008                    with_properties: source_options_with_secret.as_plaintext().clone(),
3009                    secret_refs: source_options_with_secret.as_secret().clone(),
3010                    is_shared_source,
3011                });
3012
3013                // Collect the complete properties for runtime broadcast
3014                let complete_source_props = LocalSecretManager::global()
3015                    .fill_secrets(
3016                        source_options_with_secret.as_plaintext().clone(),
3017                        source_options_with_secret.as_secret().clone(),
3018                    )
3019                    .map_err(MetaError::from)?
3020                    .into_iter()
3021                    .collect::<HashMap<String, String>>();
3022                updated_sources_with_props.push((source_id, complete_source_props));
3023            }
3024
3025            for source_update in source_updates {
3026                Source::update(source_update).exec(&txn).await?;
3027            }
3028
3029            // Batch execute fragment updates
3030            for DependentSourceFragmentUpdate {
3031                job_ids,
3032                with_properties,
3033                secret_refs,
3034                is_shared_source,
3035            } in fragment_updates
3036            {
3037                update_connector_props_fragments(
3038                    &txn,
3039                    job_ids,
3040                    FragmentTypeFlag::Source,
3041                    |node, found| {
3042                        if let PbNodeBody::Source(node) = node
3043                            && let Some(source_inner) = &mut node.source_inner
3044                        {
3045                            source_inner.with_properties = with_properties.clone();
3046                            source_inner.secret_refs = secret_refs.clone();
3047                            *found = true;
3048                        }
3049                    },
3050                    is_shared_source,
3051                )
3052                .await?;
3053            }
3054        }
3055
3056        // Batch update dependent sinks and collect their complete properties
3057        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
3058
3059        if !dependent_sinks.is_empty() {
3060            // Batch fetch all dependent sinks
3061            let sinks_with_objs = Sink::find()
3062                .find_also_related(Object)
3063                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
3064                .all(&txn)
3065                .await?;
3066
3067            // Prepare batch updates
3068            let mut sink_updates = Vec::new();
3069            let mut sink_fragment_updates = Vec::new();
3070
3071            for (sink, _obj) in sinks_with_objs {
3072                let sink_id = sink.sink_id;
3073
3074                // Validate that sink props can be altered
3075                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3076                    Some(connector) => {
3077                        let connector_type = connector.to_lowercase();
3078                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
3079                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
3080
3081                        match_sink_name_str!(
3082                            connector_type.as_str(),
3083                            SinkType,
3084                            {
3085                                let mut new_sink_props = sink.properties.0.clone();
3086                                new_sink_props.extend(alter_props.clone());
3087                                SinkType::validate_alter_config(&new_sink_props)
3088                            },
3089                            |sink: &str| Err(SinkError::Config(anyhow!(
3090                                "unsupported sink type {}",
3091                                sink
3092                            )))
3093                        )?
3094                    }
3095                    None => {
3096                        return Err(SinkError::Config(anyhow!(
3097                            "connector not specified when alter sink"
3098                        ))
3099                        .into());
3100                    }
3101                };
3102
3103                let mut new_sink_props = sink.properties.0.clone();
3104                new_sink_props.extend(alter_props.clone());
3105
3106                // Prepare sink update
3107                let active_sink = sink::ActiveModel {
3108                    sink_id: Set(sink_id),
3109                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
3110                    ..Default::default()
3111                };
3112                sink_updates.push(active_sink);
3113
3114                // Prepare fragment updates for this sink
3115                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
3116
3117                // Collect the complete properties for runtime broadcast
3118                let complete_sink_props: HashMap<String, String> =
3119                    new_sink_props.into_iter().collect();
3120                updated_sinks_with_props.push((sink_id, complete_sink_props));
3121            }
3122
3123            // Batch execute sink updates
3124            for sink_update in sink_updates {
3125                Sink::update(sink_update).exec(&txn).await?;
3126            }
3127
3128            // Batch execute sink fragment updates using the reusable function
3129            for (sink_id, new_sink_props) in sink_fragment_updates {
3130                update_connector_props_fragments(
3131                    &txn,
3132                    vec![sink_id.as_job_id()],
3133                    FragmentTypeFlag::Sink,
3134                    |node, found| {
3135                        if let PbNodeBody::Sink(node) = node
3136                            && let Some(sink_desc) = &mut node.sink_desc
3137                            && sink_desc.id == sink_id.as_raw_id()
3138                        {
3139                            sink_desc.properties = new_sink_props.clone();
3140                            *found = true;
3141                        }
3142                    },
3143                    true,
3144                )
3145                .await?;
3146            }
3147        }
3148
3149        // Collect all updated objects for frontend notification
3150        let mut updated_objects = Vec::new();
3151
3152        // Add connection
3153        let (connection, obj) = Connection::find_by_id(connection_id)
3154            .find_also_related(Object)
3155            .one(&txn)
3156            .await?
3157            .ok_or_else(|| {
3158                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
3159            })?;
3160        updated_objects.push(PbObject {
3161            object_info: Some(PbObjectInfo::Connection(
3162                ObjectModel(connection, obj.unwrap(), None).into(),
3163            )),
3164        });
3165
3166        // Add sources
3167        for source_id in &dependent_sources {
3168            let (source, obj) = Source::find_by_id(*source_id)
3169                .find_also_related(Object)
3170                .one(&txn)
3171                .await?
3172                .ok_or_else(|| {
3173                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
3174                })?;
3175            updated_objects.push(PbObject {
3176                object_info: Some(PbObjectInfo::Source(
3177                    ObjectModel(source, obj.unwrap(), None).into(),
3178                )),
3179            });
3180        }
3181
3182        // Add sinks
3183        for sink_id in &dependent_sinks {
3184            let (sink, obj) = Sink::find_by_id(*sink_id)
3185                .find_also_related(Object)
3186                .one(&txn)
3187                .await?
3188                .ok_or_else(|| {
3189                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
3190                })?;
3191            let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
3192                .one(&txn)
3193                .await?;
3194            updated_objects.push(PbObject {
3195                object_info: Some(PbObjectInfo::Sink(
3196                    ObjectModel(sink, obj.unwrap(), streaming_job).into(),
3197                )),
3198            });
3199        }
3200
3201        // Commit the transaction
3202        txn.commit().await?;
3203
3204        // Notify frontend about all updated objects
3205        if !updated_objects.is_empty() {
3206            self.notify_frontend(
3207                NotificationOperation::Update,
3208                NotificationInfo::ObjectGroup(PbObjectGroup {
3209                    objects: updated_objects,
3210                    dependencies: vec![],
3211                }),
3212            )
3213            .await;
3214        }
3215
3216        Ok((
3217            connection_options_with_secret,
3218            updated_sources_with_props,
3219            updated_sinks_with_props,
3220        ))
3221    }
3222
3223    pub async fn update_fragment_rate_limit_by_fragment_id(
3224        &self,
3225        fragment_id: FragmentId,
3226        rate_limit: Option<u32>,
3227    ) -> MetaResult<()> {
3228        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
3229                                 stream_node: &mut PbStreamNode| {
3230            let mut found = false;
3231            if fragment_type_mask.contains_any(
3232                FragmentTypeFlag::dml_rate_limit_fragments()
3233                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
3234                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
3235                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
3236            ) {
3237                visit_stream_node_mut(stream_node, |node| {
3238                    if let PbNodeBody::Dml(node) = node {
3239                        node.rate_limit = rate_limit;
3240                        found = true;
3241                    }
3242                    if let PbNodeBody::Sink(node) = node {
3243                        node.rate_limit = rate_limit;
3244                        found = true;
3245                    }
3246                    if let PbNodeBody::StreamCdcScan(node) = node {
3247                        node.rate_limit = rate_limit;
3248                        found = true;
3249                    }
3250                    if let PbNodeBody::StreamScan(node) = node {
3251                        node.rate_limit = rate_limit;
3252                        found = true;
3253                    }
3254                    if let PbNodeBody::SourceBackfill(node) = node {
3255                        node.rate_limit = rate_limit;
3256                        found = true;
3257                    }
3258                });
3259            }
3260            found
3261        };
3262        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
3263            .await
3264    }
3265
3266    /// Note: `FsFetch` created in old versions are not included.
3267    /// Since this is only used for debugging, it should be fine.
3268    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
3269        let inner = self.inner.read().await;
3270        let txn = inner.db.begin().await?;
3271
3272        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
3273            .select_only()
3274            .columns([
3275                fragment::Column::FragmentId,
3276                fragment::Column::JobId,
3277                fragment::Column::FragmentTypeMask,
3278                fragment::Column::StreamNode,
3279            ])
3280            .filter(FragmentTypeMask::intersects_any(
3281                FragmentTypeFlag::rate_limit_fragments(),
3282            ))
3283            .into_tuple()
3284            .all(&txn)
3285            .await?;
3286
3287        let mut rate_limits = Vec::new();
3288        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3289            let stream_node = stream_node.to_protobuf();
3290            visit_stream_node_body(&stream_node, |node| {
3291                let mut rate_limit = None;
3292                let mut node_name = None;
3293
3294                match node {
3295                    // source rate limit
3296                    PbNodeBody::Source(node) => {
3297                        if let Some(node_inner) = &node.source_inner {
3298                            rate_limit = node_inner.rate_limit;
3299                            node_name = Some("SOURCE");
3300                        }
3301                    }
3302                    PbNodeBody::StreamFsFetch(node) => {
3303                        if let Some(node_inner) = &node.node_inner {
3304                            rate_limit = node_inner.rate_limit;
3305                            node_name = Some("FS_FETCH");
3306                        }
3307                    }
3308                    // backfill rate limit
3309                    PbNodeBody::SourceBackfill(node) => {
3310                        rate_limit = node.rate_limit;
3311                        node_name = Some("SOURCE_BACKFILL");
3312                    }
3313                    PbNodeBody::StreamScan(node) => {
3314                        rate_limit = node.rate_limit;
3315                        node_name = Some("STREAM_SCAN");
3316                    }
3317                    PbNodeBody::StreamCdcScan(node) => {
3318                        rate_limit = node.rate_limit;
3319                        node_name = Some("STREAM_CDC_SCAN");
3320                    }
3321                    PbNodeBody::Sink(node) => {
3322                        rate_limit = node.rate_limit;
3323                        node_name = Some("SINK");
3324                    }
3325                    _ => {}
3326                }
3327
3328                if let Some(rate_limit) = rate_limit {
3329                    rate_limits.push(RateLimitInfo {
3330                        fragment_id,
3331                        job_id,
3332                        fragment_type_mask: fragment_type_mask as u32,
3333                        rate_limit,
3334                        node_name: node_name.unwrap().to_owned(),
3335                    });
3336                }
3337            });
3338        }
3339
3340        Ok(rate_limits)
3341    }
3342}
3343
3344fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3345    // Validate that props can be altered
3346    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3347        Some(connector) => {
3348            let connector_type = connector.to_lowercase();
3349            let field_names: Vec<String> = props.keys().cloned().collect();
3350            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3351                .map_err(|e| SinkError::Config(anyhow!(e)))?;
3352
3353            match_sink_name_str!(
3354                connector_type.as_str(),
3355                SinkType,
3356                {
3357                    let mut new_props = sink.properties.0.clone();
3358                    new_props.extend(props.clone());
3359                    SinkType::validate_alter_config(&new_props)
3360                },
3361                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3362            )?
3363        }
3364        None => {
3365            return Err(
3366                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3367            );
3368        }
3369    };
3370    Ok(())
3371}
3372
3373fn update_stmt_with_props(
3374    with_properties: &mut Vec<SqlOption>,
3375    props: &BTreeMap<String, String>,
3376) -> MetaResult<()> {
3377    let mut new_sql_options = with_properties
3378        .iter()
3379        .map(|sql_option| (&sql_option.name, sql_option))
3380        .collect::<IndexMap<_, _>>();
3381    let add_sql_options = props
3382        .iter()
3383        .map(|(k, v)| SqlOption::try_from((k, v)))
3384        .collect::<Result<Vec<SqlOption>, ParserError>>()
3385        .map_err(|e| SinkError::Config(anyhow!(e)))?;
3386    new_sql_options.extend(
3387        add_sql_options
3388            .iter()
3389            .map(|sql_option| (&sql_option.name, sql_option)),
3390    );
3391    *with_properties = new_sql_options.into_values().cloned().collect();
3392    Ok(())
3393}
3394
3395async fn update_sink_fragment_props(
3396    txn: &DatabaseTransaction,
3397    sink_id: SinkId,
3398    props: BTreeMap<String, String>,
3399) -> MetaResult<()> {
3400    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3401        .select_only()
3402        .columns([
3403            fragment::Column::FragmentId,
3404            fragment::Column::FragmentTypeMask,
3405            fragment::Column::StreamNode,
3406        ])
3407        .filter(fragment::Column::JobId.eq(sink_id))
3408        .into_tuple()
3409        .all(txn)
3410        .await?;
3411    let fragments = fragments
3412        .into_iter()
3413        .filter(|(_, fragment_type_mask, _)| {
3414            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3415        })
3416        .filter_map(|(id, _, stream_node)| {
3417            let mut stream_node = stream_node.to_protobuf();
3418            let mut found = false;
3419            visit_stream_node_mut(&mut stream_node, |node| {
3420                if let PbNodeBody::Sink(node) = node
3421                    && let Some(sink_desc) = &mut node.sink_desc
3422                    && sink_desc.id == sink_id
3423                {
3424                    sink_desc.properties.extend(props.clone());
3425                    found = true;
3426                }
3427            });
3428            if found { Some((id, stream_node)) } else { None }
3429        })
3430        .collect_vec();
3431    assert!(
3432        !fragments.is_empty(),
3433        "sink id should be used by at least one fragment"
3434    );
3435    for (id, stream_node) in fragments {
3436        Fragment::update(fragment::ActiveModel {
3437            fragment_id: Set(id),
3438            stream_node: Set(StreamNode::from(&stream_node)),
3439            ..Default::default()
3440        })
3441        .exec(txn)
3442        .await?;
3443    }
3444    Ok(())
3445}
3446
3447pub struct SinkIntoTableContext {
3448    /// For alter table (e.g., add column), this is the list of existing sink ids
3449    /// otherwise empty.
3450    pub updated_sink_catalogs: Vec<SinkId>,
3451}
3452
3453pub struct FinishAutoRefreshSchemaSinkContext {
3454    pub tmp_sink_id: SinkId,
3455    pub original_sink_id: SinkId,
3456    pub columns: Vec<PbColumnCatalog>,
3457    pub new_log_store_table: Option<Box<PbTable>>,
3458}
3459
3460async fn update_connector_props_fragments<F>(
3461    txn: &DatabaseTransaction,
3462    job_ids: Vec<JobId>,
3463    expect_flag: FragmentTypeFlag,
3464    mut alter_stream_node_fn: F,
3465    is_shared_source: bool,
3466) -> MetaResult<()>
3467where
3468    F: FnMut(&mut PbNodeBody, &mut bool),
3469{
3470    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3471        .select_only()
3472        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3473        .filter(
3474            fragment::Column::JobId
3475                .is_in(job_ids.clone())
3476                .and(FragmentTypeMask::intersects(expect_flag)),
3477        )
3478        .into_tuple()
3479        .all(txn)
3480        .await?;
3481    let fragments = fragments
3482        .into_iter()
3483        .filter_map(|(id, stream_node)| {
3484            let mut stream_node = stream_node.to_protobuf();
3485            let mut found = false;
3486            visit_stream_node_mut(&mut stream_node, |node| {
3487                alter_stream_node_fn(node, &mut found);
3488            });
3489            if found { Some((id, stream_node)) } else { None }
3490        })
3491        .collect_vec();
3492    if is_shared_source || job_ids.len() > 1 {
3493        // the first element is the source_id or associated table_id
3494        // if the source is non-shared, there is no updated fragments
3495        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3496        assert!(
3497            !fragments.is_empty(),
3498            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3499            job_ids,
3500            expect_flag
3501        );
3502    }
3503
3504    for (id, stream_node) in fragments {
3505        Fragment::update(fragment::ActiveModel {
3506            fragment_id: Set(id),
3507            stream_node: Set(StreamNode::from(&stream_node)),
3508            ..Default::default()
3509        })
3510        .exec(txn)
3511        .await?;
3512    }
3513
3514    Ok(())
3515}