risingwave_meta/controller/
streaming_job.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22    ColumnCatalog, FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23    RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, max_column_id,
24};
25use risingwave_common::config::DefaultParallelism;
26use risingwave_common::hash::VnodeCountCompat;
27use risingwave_common::id::JobId;
28use risingwave_common::secret::LocalSecretManager;
29use risingwave_common::system_param::AdaptiveParallelismStrategy;
30use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
31use risingwave_common::util::iter_util::ZipEqDebug;
32use risingwave_common::util::stream_graph_visitor::{
33    visit_stream_node_body, visit_stream_node_mut,
34};
35use risingwave_common::{bail, current_cluster_version};
36use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::error::ConnectorError;
39use risingwave_connector::sink::file_sink::fs::FsSink;
40use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
41use risingwave_connector::source::{
42    ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
43};
44use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
47use risingwave_meta_model::refresh_job::RefreshState;
48use risingwave_meta_model::streaming_job::BackfillOrders;
49use risingwave_meta_model::table::TableType;
50use risingwave_meta_model::user_privilege::Action;
51use risingwave_meta_model::*;
52use risingwave_pb::catalog::table::PbEngine;
53use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
54use risingwave_pb::ddl_service::streaming_job_resource_type;
55use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
56use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
57use risingwave_pb::meta::object::PbObjectInfo;
58use risingwave_pb::meta::subscribe_response::{
59    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
60};
61use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
62use risingwave_pb::plan_common::PbColumnCatalog;
63use risingwave_pb::plan_common::source_refresh_mode::{
64    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
65};
66use risingwave_pb::secret::PbSecretRef;
67use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
68use risingwave_pb::stream_plan::stream_node::PbNodeBody;
69use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
70use risingwave_pb::user::PbUserInfo;
71use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
72use risingwave_sqlparser::parser::{Parser, ParserError};
73use sea_orm::ActiveValue::Set;
74use sea_orm::sea_query::{Expr, Query, SimpleExpr};
75use sea_orm::{
76    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
77    NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
78};
79use thiserror_ext::AsReport;
80
81use super::rename::IndexItemRewriter;
82use crate::barrier::Command;
83use crate::controller::ObjectModel;
84use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
85use crate::controller::fragment::FragmentTypeMaskExt;
86use crate::controller::utils::{
87    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
88    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
89    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
90    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
91    list_object_dependencies_by_object_id, list_user_info_by_ids,
92    try_get_iceberg_table_by_downstream_sink,
93};
94use crate::error::MetaErrorInner;
95use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
96use crate::model::{
97    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
98    StreamJobFragmentsToCreate,
99};
100use crate::stream::SplitAssignment;
101use crate::{MetaError, MetaResult};
102
103/// A planned fragment update for dependent sources when a connection's properties change.
104///
105/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
106/// `type_complexity` warnings.
107#[derive(Debug)]
108struct DependentSourceFragmentUpdate {
109    job_ids: Vec<JobId>,
110    with_properties: BTreeMap<String, String>,
111    secret_refs: BTreeMap<String, PbSecretRef>,
112    is_shared_source: bool,
113}
114
115#[derive(Debug)]
116struct ReplaceOriginalJobInfo {
117    max_parallelism: i32,
118    timezone: Option<String>,
119    config_override: Option<String>,
120    adaptive_parallelism_strategy: Option<String>,
121    parallelism: StreamingParallelism,
122    specific_resource_group: Option<String>,
123}
124
125impl ReplaceOriginalJobInfo {
126    fn resolved_parallelism(
127        &self,
128        specified_parallelism: Option<&NonZeroUsize>,
129    ) -> StreamingParallelism {
130        specified_parallelism
131            .map(|n| StreamingParallelism::Fixed(n.get() as _))
132            .unwrap_or_else(|| self.parallelism.clone())
133    }
134
135    fn stream_context(&self, ctx_override: Option<&StreamContext>) -> StreamContext {
136        StreamContext {
137            timezone: ctx_override
138                .and_then(|ctx| ctx.timezone.clone())
139                .or_else(|| self.timezone.clone()),
140            // We don't expect replacing a job with a different config override.
141            // Thus always use the original config override.
142            config_override: self.config_override.clone().unwrap_or_default().into(),
143        }
144    }
145
146    fn resource_type(&self) -> streaming_job_resource_type::ResourceType {
147        match &self.specific_resource_group {
148            Some(group) => {
149                streaming_job_resource_type::ResourceType::SpecificResourceGroup(group.clone())
150            }
151            None => streaming_job_resource_type::ResourceType::Regular(true),
152        }
153    }
154}
155
156impl From<streaming_job::Model> for ReplaceOriginalJobInfo {
157    fn from(model: streaming_job::Model) -> Self {
158        Self {
159            max_parallelism: model.max_parallelism,
160            timezone: model.timezone,
161            config_override: model.config_override,
162            adaptive_parallelism_strategy: model.adaptive_parallelism_strategy,
163            parallelism: model.parallelism,
164            specific_resource_group: model.specific_resource_group,
165        }
166    }
167}
168
169impl CatalogController {
170    #[expect(clippy::too_many_arguments)]
171    pub async fn create_streaming_job_obj(
172        txn: &DatabaseTransaction,
173        obj_type: ObjectType,
174        owner_id: UserId,
175        database_id: Option<DatabaseId>,
176        schema_id: Option<SchemaId>,
177        create_type: PbCreateType,
178        ctx: StreamContext,
179        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
180        streaming_parallelism: StreamingParallelism,
181        max_parallelism: usize,
182        resource_type: streaming_job_resource_type::ResourceType,
183        backfill_parallelism: Option<StreamingParallelism>,
184        backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
185        refresh_interval_sec: Option<u64>,
186    ) -> MetaResult<streaming_job::Model> {
187        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
188        let job_id = obj.oid.as_job_id();
189        let specific_resource_group = resource_type.resource_group();
190        let is_serverless_backfill = matches!(
191            &resource_type,
192            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
193        );
194        let model = streaming_job::Model {
195            job_id,
196            job_status: JobStatus::Initial,
197            create_type: create_type.into(),
198            timezone: ctx.timezone,
199            config_override: Some(ctx.config_override.to_string()),
200            adaptive_parallelism_strategy: adaptive_parallelism_strategy
201                .as_ref()
202                .map(ToString::to_string),
203            parallelism: streaming_parallelism,
204            backfill_parallelism,
205            backfill_adaptive_parallelism_strategy: backfill_adaptive_parallelism_strategy
206                .as_ref()
207                .map(ToString::to_string),
208            backfill_orders: None,
209            max_parallelism: max_parallelism as _,
210            specific_resource_group,
211            is_serverless_backfill,
212            refresh_interval_sec: refresh_interval_sec.map(|s| s as i64),
213        };
214        let job = model.clone().into_active_model();
215        StreamingJobModel::insert(job).exec(txn).await?;
216
217        Ok(model)
218    }
219
220    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
221    /// materialized view.
222    ///
223    /// Some of the fields in the given streaming job are placeholders, which will
224    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
225    #[expect(clippy::too_many_arguments)]
226    #[await_tree::instrument]
227    pub async fn create_job_catalog(
228        &self,
229        streaming_job: &mut StreamingJob,
230        ctx: &StreamContext,
231        parallelism: &Option<Parallelism>,
232        max_parallelism: usize,
233        mut dependencies: HashSet<ObjectId>,
234        resource_type: streaming_job_resource_type::ResourceType,
235        backfill_parallelism: &Option<Parallelism>,
236        adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
237        backfill_adaptive_parallelism_strategy: Option<AdaptiveParallelismStrategy>,
238        refresh_interval_sec: Option<u64>,
239    ) -> MetaResult<streaming_job::Model> {
240        let inner = self.inner.write().await;
241        let txn = inner.db.begin().await?;
242        let create_type = streaming_job.create_type();
243
244        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
245            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
246            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
247            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
248        };
249        let backfill_parallelism = backfill_parallelism
250            .as_ref()
251            .map(|p| StreamingParallelism::Fixed(p.parallelism as _))
252            .or_else(|| {
253                backfill_adaptive_parallelism_strategy
254                    .as_ref()
255                    .map(|_| StreamingParallelism::Adaptive)
256            });
257
258        ensure_user_id(streaming_job.owner() as _, &txn).await?;
259        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
260        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
261        check_relation_name_duplicate(
262            &streaming_job.name(),
263            streaming_job.database_id(),
264            streaming_job.schema_id(),
265            &txn,
266        )
267        .await?;
268
269        // check if any dependency is in altering status.
270        if !dependencies.is_empty() {
271            let altering_cnt = ObjectDependency::find()
272                .join(
273                    JoinType::InnerJoin,
274                    object_dependency::Relation::Object1.def(),
275                )
276                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
277                .filter(
278                    object_dependency::Column::Oid
279                        .is_in(dependencies.clone())
280                        .and(object::Column::ObjType.eq(ObjectType::Table))
281                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
282                        .and(
283                            // It means the referring table is just dummy for altering.
284                            object::Column::Oid.not_in_subquery(
285                                Query::select()
286                                    .column(table::Column::TableId)
287                                    .from(Table)
288                                    .to_owned(),
289                            ),
290                        ),
291                )
292                .count(&txn)
293                .await?;
294            if altering_cnt != 0 {
295                return Err(MetaError::permission_denied(
296                    "some dependent relations are being altered",
297                ));
298            }
299
300            // Check if any dependency is a batch refresh job.
301            // Streaming on batch refresh materialized views is not supported.
302            let batch_refresh_cnt = StreamingJobModel::find()
303                .filter(
304                    streaming_job::Column::JobId
305                        .is_in(dependencies.iter().map(|id| JobId::new(id.as_raw_id())))
306                        .and(streaming_job::Column::RefreshIntervalSec.is_not_null()),
307                )
308                .count(&txn)
309                .await?;
310            if batch_refresh_cnt != 0 {
311                return Err(MetaError::permission_denied(
312                    "creating streaming jobs on batch refresh materialized views is not supported",
313                ));
314            }
315        }
316
317        let streaming_job_model = match streaming_job {
318            StreamingJob::MaterializedView(table) => {
319                let streaming_job_model = Self::create_streaming_job_obj(
320                    &txn,
321                    ObjectType::Table,
322                    table.owner as _,
323                    Some(table.database_id),
324                    Some(table.schema_id),
325                    create_type,
326                    ctx.clone(),
327                    adaptive_parallelism_strategy,
328                    streaming_parallelism,
329                    max_parallelism,
330                    resource_type,
331                    backfill_parallelism.clone(),
332                    backfill_adaptive_parallelism_strategy,
333                    refresh_interval_sec,
334                )
335                .await?;
336                table.id = streaming_job_model.job_id.as_mv_table_id();
337                let table_model: table::ActiveModel = table.clone().into();
338                Table::insert(table_model).exec(&txn).await?;
339                streaming_job_model
340            }
341            StreamingJob::Sink(sink) => {
342                if let Some(target_table_id) = sink.target_table
343                    && check_sink_into_table_cycle(
344                        target_table_id.into(),
345                        dependencies.iter().cloned().collect(),
346                        &txn,
347                    )
348                    .await?
349                {
350                    bail!("Creating such a sink will result in circular dependency.");
351                }
352
353                let streaming_job_model = Self::create_streaming_job_obj(
354                    &txn,
355                    ObjectType::Sink,
356                    sink.owner as _,
357                    Some(sink.database_id),
358                    Some(sink.schema_id),
359                    create_type,
360                    ctx.clone(),
361                    adaptive_parallelism_strategy,
362                    streaming_parallelism,
363                    max_parallelism,
364                    resource_type,
365                    backfill_parallelism.clone(),
366                    backfill_adaptive_parallelism_strategy,
367                    None, // refresh_interval_sec: only for MV
368                )
369                .await?;
370                sink.id = streaming_job_model.job_id.as_sink_id();
371                let sink_model: sink::ActiveModel = sink.clone().into();
372                Sink::insert(sink_model).exec(&txn).await?;
373                streaming_job_model
374            }
375            StreamingJob::Table(src, table, _) => {
376                let streaming_job_model = Self::create_streaming_job_obj(
377                    &txn,
378                    ObjectType::Table,
379                    table.owner as _,
380                    Some(table.database_id),
381                    Some(table.schema_id),
382                    create_type,
383                    ctx.clone(),
384                    adaptive_parallelism_strategy,
385                    streaming_parallelism,
386                    max_parallelism,
387                    resource_type,
388                    backfill_parallelism.clone(),
389                    backfill_adaptive_parallelism_strategy,
390                    None, // refresh_interval_sec: only for MV
391                )
392                .await?;
393                let job_id = streaming_job_model.job_id;
394                table.id = job_id.as_mv_table_id();
395                if let Some(src) = src {
396                    let src_obj = Self::create_object(
397                        &txn,
398                        ObjectType::Source,
399                        src.owner as _,
400                        Some(src.database_id),
401                        Some(src.schema_id),
402                    )
403                    .await?;
404                    src.id = src_obj.oid.as_source_id();
405                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
406                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
407                    let source: source::ActiveModel = src.clone().into();
408                    Source::insert(source).exec(&txn).await?;
409                }
410                let table_model: table::ActiveModel = table.clone().into();
411                Table::insert(table_model).exec(&txn).await?;
412
413                if table.refreshable {
414                    let trigger_interval_secs = src
415                        .as_ref()
416                        .and_then(|source_catalog| source_catalog.refresh_mode)
417                        .and_then(
418                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
419                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
420                                    refresh_interval_sec,
421                                })) => refresh_interval_sec,
422                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
423                                None => None,
424                            },
425                        );
426
427                    RefreshJob::insert(refresh_job::ActiveModel {
428                        table_id: Set(table.id),
429                        last_trigger_time: Set(None),
430                        trigger_interval_secs: Set(trigger_interval_secs),
431                        current_status: Set(RefreshState::Idle),
432                        last_success_time: Set(None),
433                    })
434                    .exec(&txn)
435                    .await?;
436                }
437                streaming_job_model
438            }
439            StreamingJob::Index(index, table) => {
440                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
441                let streaming_job_model = Self::create_streaming_job_obj(
442                    &txn,
443                    ObjectType::Index,
444                    index.owner as _,
445                    Some(index.database_id),
446                    Some(index.schema_id),
447                    create_type,
448                    ctx.clone(),
449                    adaptive_parallelism_strategy,
450                    streaming_parallelism,
451                    max_parallelism,
452                    resource_type,
453                    backfill_parallelism.clone(),
454                    backfill_adaptive_parallelism_strategy,
455                    None, // refresh_interval_sec: only for MV
456                )
457                .await?;
458                // to be compatible with old implementation.
459                let job_id = streaming_job_model.job_id;
460                index.id = job_id.as_index_id();
461                index.index_table_id = job_id.as_mv_table_id();
462                table.id = job_id.as_mv_table_id();
463
464                ObjectDependency::insert(object_dependency::ActiveModel {
465                    oid: Set(index.primary_table_id.into()),
466                    used_by: Set(table.id.into()),
467                    ..Default::default()
468                })
469                .exec(&txn)
470                .await?;
471
472                let table_model: table::ActiveModel = table.clone().into();
473                Table::insert(table_model).exec(&txn).await?;
474                let index_model: index::ActiveModel = index.clone().into();
475                Index::insert(index_model).exec(&txn).await?;
476                streaming_job_model
477            }
478            StreamingJob::Source(src) => {
479                let streaming_job_model = Self::create_streaming_job_obj(
480                    &txn,
481                    ObjectType::Source,
482                    src.owner as _,
483                    Some(src.database_id),
484                    Some(src.schema_id),
485                    create_type,
486                    ctx.clone(),
487                    adaptive_parallelism_strategy,
488                    streaming_parallelism,
489                    max_parallelism,
490                    resource_type,
491                    backfill_parallelism.clone(),
492                    backfill_adaptive_parallelism_strategy,
493                    None, // refresh_interval_sec: only for MV
494                )
495                .await?;
496                src.id = streaming_job_model.job_id.as_shared_source_id();
497                let source_model: source::ActiveModel = src.clone().into();
498                Source::insert(source_model).exec(&txn).await?;
499                streaming_job_model
500            }
501        };
502
503        // collect dependent secrets.
504        dependencies.extend(
505            streaming_job
506                .dependent_secret_ids()?
507                .into_iter()
508                .map(|id| id.as_object_id()),
509        );
510        // collect dependent connection
511        dependencies.extend(
512            streaming_job
513                .dependent_connection_ids()?
514                .into_iter()
515                .map(|id| id.as_object_id()),
516        );
517
518        // record object dependency.
519        if !dependencies.is_empty() {
520            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
521                object_dependency::ActiveModel {
522                    oid: Set(oid),
523                    used_by: Set(streaming_job.id().as_object_id()),
524                    ..Default::default()
525                }
526            }))
527            .exec(&txn)
528            .await?;
529        }
530
531        txn.commit().await?;
532
533        Ok(streaming_job_model)
534    }
535
536    /// Create catalogs for internal tables, then notify frontend about them if the job is a
537    /// materialized view.
538    ///
539    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
540    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
541    ///
542    /// Returns a mapping from the temporary table id to the actual global table id.
543    pub async fn create_internal_table_catalog(
544        &self,
545        job: &StreamingJob,
546        mut incomplete_internal_tables: Vec<PbTable>,
547    ) -> MetaResult<HashMap<TableId, TableId>> {
548        let job_id = job.id();
549        let inner = self.inner.write().await;
550        let txn = inner.db.begin().await?;
551
552        // Ensure the job exists.
553        ensure_job_not_canceled(job_id, &txn).await?;
554
555        let mut table_id_map = HashMap::new();
556        for table in &mut incomplete_internal_tables {
557            let table_id = Self::create_object(
558                &txn,
559                ObjectType::Table,
560                table.owner as _,
561                Some(table.database_id),
562                Some(table.schema_id),
563            )
564            .await?
565            .oid
566            .as_table_id();
567            table_id_map.insert(table.id, table_id);
568            table.id = table_id;
569            table.job_id = Some(job_id);
570
571            let table_model = table::ActiveModel {
572                table_id: Set(table_id),
573                belongs_to_job_id: Set(Some(job_id)),
574                fragment_id: NotSet,
575                ..table.clone().into()
576            };
577            Table::insert(table_model).exec(&txn).await?;
578        }
579        txn.commit().await?;
580
581        Ok(table_id_map)
582    }
583
584    pub async fn prepare_stream_job_fragments(
585        &self,
586        stream_job_fragments: &StreamJobFragmentsToCreate,
587        streaming_job: &StreamingJob,
588        for_replace: bool,
589        backfill_orders: Option<BackfillOrders>,
590    ) -> MetaResult<()> {
591        self.prepare_streaming_job(
592            stream_job_fragments.stream_job_id(),
593            || stream_job_fragments.fragments.values(),
594            &stream_job_fragments.downstreams,
595            for_replace,
596            Some(streaming_job),
597            backfill_orders,
598        )
599        .await
600    }
601
602    // TODO: In this function, we also update the `Table` model in the meta store.
603    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
604    // making them the source of truth and performing a full replacement for those in the meta store?
605    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
606    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
607    )]
608    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
609        &self,
610        job_id: JobId,
611        get_fragments: impl Fn() -> I + 'a,
612        downstreams: &FragmentDownstreamRelation,
613        for_replace: bool,
614        creating_streaming_job: Option<&'a StreamingJob>,
615        backfill_orders: Option<BackfillOrders>,
616    ) -> MetaResult<()> {
617        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
618
619        let inner = self.inner.write().await;
620
621        let need_notify = creating_streaming_job
622            .map(|job| job.should_notify_creating())
623            .unwrap_or(false);
624        let definition = creating_streaming_job.map(|job| job.definition());
625
626        let mut objects_to_notify = vec![];
627        let txn = inner.db.begin().await?;
628
629        // Ensure the job exists.
630        ensure_job_not_canceled(job_id, &txn).await?;
631
632        if let Some(backfill_orders) = backfill_orders {
633            let job = streaming_job::ActiveModel {
634                job_id: Set(job_id),
635                backfill_orders: Set(Some(backfill_orders)),
636                ..Default::default()
637            };
638            StreamingJobModel::update(job).exec(&txn).await?;
639        }
640
641        let state_table_ids = fragments
642            .iter()
643            .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
644            .collect_vec();
645
646        // Collect fragment IDs before consuming `fragments` for serving mapping notification.
647        let inserted_fragment_ids: Vec<crate::model::FragmentId> = fragments
648            .iter()
649            .map(|f| f.fragment_id as crate::model::FragmentId)
650            .collect();
651
652        if !fragments.is_empty() {
653            let fragment_models = fragments
654                .into_iter()
655                .map(|fragment| fragment.into_active_model())
656                .collect_vec();
657            Fragment::insert_many(fragment_models).exec(&txn).await?;
658        }
659
660        // Fields including `fragment_id` and `vnode_count` were placeholder values before.
661        // After table fragments are created, update them for all tables.
662        if !for_replace {
663            let all_tables = StreamJobFragments::collect_tables(get_fragments());
664            for state_table_id in state_table_ids {
665                // Table's vnode count is not always the fragment's vnode count, so we have to
666                // look up the table from `TableFragments`.
667                // See `ActorGraphBuilder::new`.
668                let table = all_tables
669                    .get(&state_table_id)
670                    .unwrap_or_else(|| panic!("table {} not found", state_table_id));
671                assert_eq!(table.id, state_table_id);
672                let vnode_count = table.vnode_count();
673
674                Table::update(table::ActiveModel {
675                    table_id: Set(state_table_id as _),
676                    fragment_id: Set(Some(table.fragment_id)),
677                    vnode_count: Set(vnode_count as _),
678                    ..Default::default()
679                })
680                .exec(&txn)
681                .await?;
682
683                if need_notify {
684                    let mut table = table.clone();
685                    // In production, definition was replaced but still needed for notification.
686                    if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
687                        table.definition = definition.clone().unwrap();
688                    }
689                    objects_to_notify.push(PbObject {
690                        object_info: Some(PbObjectInfo::Table(table)),
691                    });
692                }
693            }
694        }
695
696        // Add streaming job objects to notification
697        if need_notify {
698            match creating_streaming_job.unwrap() {
699                StreamingJob::Table(Some(source), ..) => {
700                    objects_to_notify.push(PbObject {
701                        object_info: Some(PbObjectInfo::Source(source.clone())),
702                    });
703                }
704                StreamingJob::Sink(sink) => {
705                    objects_to_notify.push(PbObject {
706                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
707                    });
708                }
709                StreamingJob::Index(index, _) => {
710                    objects_to_notify.push(PbObject {
711                        object_info: Some(PbObjectInfo::Index(index.clone())),
712                    });
713                }
714                _ => {}
715            }
716        }
717
718        insert_fragment_relations(&txn, downstreams).await?;
719
720        if !for_replace {
721            // Update dml fragment id.
722            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
723                Table::update(table::ActiveModel {
724                    table_id: Set(table.id),
725                    dml_fragment_id: Set(table.dml_fragment_id),
726                    ..Default::default()
727                })
728                .exec(&txn)
729                .await?;
730            }
731        }
732
733        txn.commit().await?;
734
735        // Notify serving module about newly inserted fragments so it can establish
736        // serving vnode mappings. This is driven by the fragment model insertion,
737        // decoupled from the barrier-driven streaming mapping notifications.
738        self.env
739            .notification_manager()
740            .notify_serving_fragment_mapping_update(inserted_fragment_ids);
741
742        // FIXME: there's a gap between the catalog creation and notification, which may lead to
743        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
744        // need to either forward the notification or filter catalogs without any fragments in the metastore.
745        if !objects_to_notify.is_empty() {
746            self.notify_frontend(
747                Operation::Add,
748                Info::ObjectGroup(PbObjectGroup {
749                    objects: objects_to_notify,
750                    dependencies: vec![],
751                }),
752            )
753            .await;
754        }
755
756        Ok(())
757    }
758
759    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
760    /// information is required to build barrier mutation.
761    pub async fn build_cancel_command(
762        &self,
763        table_fragments: &StreamJobFragments,
764    ) -> MetaResult<Command> {
765        let inner = self.inner.read().await;
766        let txn = inner.db.begin().await?;
767
768        let dropped_sink_fragment_with_target =
769            if let Some(sink_fragment) = table_fragments.sink_fragment() {
770                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
771                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
772                sink_target_fragment
773                    .get(&sink_fragment_id)
774                    .map(|target_fragments| {
775                        let target_fragment_id = *target_fragments
776                            .first()
777                            .expect("sink should have at least one downstream fragment");
778                        (sink_fragment_id, target_fragment_id)
779                    })
780            } else {
781                None
782            };
783
784        Ok(Command::DropStreamingJobs {
785            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
786            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
787            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
788                .into_iter()
789                .map(|(sink, target)| (target as _, vec![sink as _]))
790                .collect(),
791        })
792    }
793
794    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
795    /// It returns (true, _) if the job is not found or aborted.
796    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
797    #[await_tree::instrument]
798    pub async fn try_abort_creating_streaming_job(
799        &self,
800        mut job_id: JobId,
801        is_cancelled: bool,
802    ) -> MetaResult<(bool, Option<DatabaseId>)> {
803        let mut inner = self.inner.write().await;
804        let txn = inner.db.begin().await?;
805
806        let obj = Object::find_by_id(job_id).one(&txn).await?;
807        let Some(obj) = obj else {
808            tracing::warn!(
809                id = %job_id,
810                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
811            );
812            return Ok((true, None));
813        };
814        let database_id = obj
815            .database_id
816            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
817        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
818
819        if !is_cancelled && let Some(streaming_job) = &streaming_job {
820            assert_ne!(streaming_job.job_status, JobStatus::Created);
821            if streaming_job.create_type == CreateType::Background
822                && streaming_job.job_status == JobStatus::Creating
823            {
824                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
825                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
826                {
827                    // If the job belongs to an iceberg table, we still need to clean it.
828                } else {
829                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
830                    tracing::warn!(
831                        id = %job_id,
832                        "streaming job is created in background and still in creating status"
833                    );
834                    return Ok((false, Some(database_id)));
835                }
836            }
837        }
838
839        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
840        let original_job_id = job_id;
841        let original_obj_type = obj.obj_type;
842
843        let iceberg_table_id =
844            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
845        if let Some(iceberg_table_id) = iceberg_table_id {
846            // If the job is iceberg sink, we need to clean the iceberg table as well.
847            // Here we will drop the sink objects directly.
848            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
849            Object::delete_many()
850                .filter(
851                    object::Column::Oid
852                        .eq(job_id)
853                        .or(object::Column::Oid.is_in(internal_tables)),
854                )
855                .exec(&txn)
856                .await?;
857            job_id = iceberg_table_id.as_job_id();
858        };
859
860        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
861
862        // Get the notification info if the job is a materialized view or created in the background.
863        let mut objs = vec![];
864        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
865
866        let mut need_notify =
867            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
868        if !need_notify {
869            if let Some(table) = &table_obj {
870                need_notify = table.table_type == TableType::MaterializedView;
871            } else if original_obj_type == ObjectType::Sink {
872                need_notify = true;
873            }
874        }
875
876        if is_cancelled {
877            let dropped_tables = Table::find()
878                .find_also_related(Object)
879                .filter(
880                    table::Column::TableId.is_in(
881                        internal_table_ids
882                            .iter()
883                            .cloned()
884                            .chain(table_obj.iter().map(|t| t.table_id as _)),
885                    ),
886                )
887                .all(&txn)
888                .await?
889                .into_iter()
890                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
891            inner
892                .dropped_tables
893                .extend(dropped_tables.map(|t| (t.id, t)));
894        }
895
896        if need_notify {
897            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
898            // Ensure we still notify the frontend to delete the original sink object.
899            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
900                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
901                    .select_only()
902                    .columns([
903                        object::Column::Oid,
904                        object::Column::ObjType,
905                        object::Column::SchemaId,
906                        object::Column::DatabaseId,
907                    ])
908                    .into_partial_model()
909                    .one(&txn)
910                    .await?;
911                if let Some(orig_obj) = orig_obj {
912                    objs.push(orig_obj);
913                }
914            }
915
916            let obj: Option<PartialObject> = Object::find_by_id(job_id)
917                .select_only()
918                .columns([
919                    object::Column::Oid,
920                    object::Column::ObjType,
921                    object::Column::SchemaId,
922                    object::Column::DatabaseId,
923                ])
924                .into_partial_model()
925                .one(&txn)
926                .await?;
927            let obj =
928                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
929            objs.push(obj);
930            let internal_table_objs: Vec<PartialObject> = Object::find()
931                .select_only()
932                .columns([
933                    object::Column::Oid,
934                    object::Column::ObjType,
935                    object::Column::SchemaId,
936                    object::Column::DatabaseId,
937                ])
938                .join(JoinType::InnerJoin, object::Relation::Table.def())
939                .filter(table::Column::BelongsToJobId.eq(job_id))
940                .into_partial_model()
941                .all(&txn)
942                .await?;
943            objs.extend(internal_table_objs);
944        }
945
946        // Query fragment IDs before cascade-deleting them, for serving mapping cleanup.
947        let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
948            .select_only()
949            .column(fragment::Column::FragmentId)
950            .filter(fragment::Column::JobId.eq(job_id))
951            .into_tuple()
952            .all(&txn)
953            .await?;
954
955        Object::delete_by_id(job_id).exec(&txn).await?;
956        if !internal_table_ids.is_empty() {
957            Object::delete_many()
958                .filter(object::Column::Oid.is_in(internal_table_ids))
959                .exec(&txn)
960                .await?;
961        }
962        if let Some(t) = &table_obj
963            && let Some(source_id) = t.optional_associated_source_id
964        {
965            Object::delete_by_id(source_id).exec(&txn).await?;
966        }
967
968        let err = if is_cancelled {
969            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
970        } else {
971            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
972        };
973        let abort_reason = format!("streaming job aborted {}", err.as_report());
974        for tx in inner
975            .creating_table_finish_notifier
976            .get_mut(&database_id)
977            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
978            .into_iter()
979            .flatten()
980            .flatten()
981        {
982            let _ = tx.send(Err(abort_reason.clone()));
983        }
984        txn.commit().await?;
985
986        // Notify serving module about deleted fragments from the aborted job.
987        self.env
988            .notification_manager()
989            .notify_serving_fragment_mapping_delete(
990                abort_fragment_ids.iter().map(|id| *id as _).collect(),
991            );
992
993        if !objs.is_empty() {
994            // We also have notified the frontend about these objects,
995            // so we need to notify the frontend to delete them here.
996            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
997                .await;
998        }
999        Ok((true, Some(database_id)))
1000    }
1001
1002    #[await_tree::instrument]
1003    pub async fn post_collect_job_fragments(
1004        &self,
1005        job_id: JobId,
1006        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
1007        new_sink_downstream: Option<FragmentDownstreamRelation>,
1008        split_assignment: Option<&SplitAssignment>,
1009    ) -> MetaResult<()> {
1010        let inner = self.inner.write().await;
1011        let txn = inner.db.begin().await?;
1012
1013        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
1014
1015        if let Some(new_downstream) = new_sink_downstream {
1016            insert_fragment_relations(&txn, &new_downstream).await?;
1017        }
1018
1019        // Mark job as CREATING.
1020        StreamingJobModel::update(streaming_job::ActiveModel {
1021            job_id: Set(job_id),
1022            job_status: Set(JobStatus::Creating),
1023            ..Default::default()
1024        })
1025        .exec(&txn)
1026        .await?;
1027
1028        if let Some(split_assignment) = split_assignment {
1029            let fragment_splits = split_assignment
1030                .iter()
1031                .map(|(fragment_id, splits)| {
1032                    (
1033                        *fragment_id as _,
1034                        splits.values().flatten().cloned().collect_vec(),
1035                    )
1036                })
1037                .collect();
1038
1039            self.update_fragment_splits(&txn, &fragment_splits).await?;
1040        }
1041
1042        txn.commit().await?;
1043
1044        Ok(())
1045    }
1046
1047    pub async fn create_job_catalog_for_replace(
1048        &self,
1049        streaming_job: &StreamingJob,
1050        ctx: Option<&StreamContext>,
1051        specified_parallelism: Option<&NonZeroUsize>,
1052        expected_original_max_parallelism: Option<usize>,
1053    ) -> MetaResult<streaming_job::Model> {
1054        let id = streaming_job.id();
1055        let inner = self.inner.write().await;
1056        let txn = inner.db.begin().await?;
1057
1058        // 1. check version.
1059        streaming_job.verify_version_for_replace(&txn).await?;
1060        // 2. check concurrent replace.
1061        let referring_cnt = ObjectDependency::find()
1062            .join(
1063                JoinType::InnerJoin,
1064                object_dependency::Relation::Object1.def(),
1065            )
1066            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1067            .filter(
1068                object_dependency::Column::Oid
1069                    .eq(id)
1070                    .and(object::Column::ObjType.eq(ObjectType::Table))
1071                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
1072            )
1073            .count(&txn)
1074            .await?;
1075        if referring_cnt != 0 {
1076            return Err(MetaError::permission_denied(
1077                "job is being altered or referenced by some creating jobs",
1078            ));
1079        }
1080
1081        // Check if any dependent job is a batch refresh job.
1082        // Replace table is not supported when a batch refresh MV depends on it.
1083        let batch_refresh_dep_cnt = ObjectDependency::find()
1084            .join(
1085                JoinType::InnerJoin,
1086                object_dependency::Relation::Object1.def(),
1087            )
1088            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1089            .filter(
1090                object_dependency::Column::Oid
1091                    .eq(id)
1092                    .and(streaming_job::Column::RefreshIntervalSec.is_not_null()),
1093            )
1094            .count(&txn)
1095            .await?;
1096        if batch_refresh_dep_cnt != 0 {
1097            return Err(MetaError::permission_denied(
1098                "replacing a table with dependent batch refresh materialized views is not supported",
1099            ));
1100        }
1101
1102        // 3. check parallelism.
1103        let original_job = StreamingJobModel::find_by_id(id)
1104            .one(&txn)
1105            .await?
1106            .map(ReplaceOriginalJobInfo::from)
1107            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
1108
1109        if let Some(max_parallelism) = expected_original_max_parallelism
1110            && original_job.max_parallelism != max_parallelism as i32
1111        {
1112            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
1113            // This should not happen in normal cases.
1114            bail!(
1115                "cannot use a different max parallelism \
1116                 when replacing streaming job, \
1117                 original: {}, new: {}",
1118                original_job.max_parallelism,
1119                max_parallelism
1120            );
1121        }
1122
1123        let parallelism = original_job.resolved_parallelism(specified_parallelism);
1124        let ctx = original_job.stream_context(ctx);
1125        let adaptive_parallelism_strategy = original_job
1126            .adaptive_parallelism_strategy
1127            .as_deref()
1128            .map(|s| parse_strategy(s).expect("strategy should be validated before persisting"));
1129        let resource_type = original_job.resource_type();
1130
1131        // 4. create streaming object for new replace table.
1132        let tmp_model = Self::create_streaming_job_obj(
1133            &txn,
1134            streaming_job.object_type(),
1135            streaming_job.owner() as _,
1136            Some(streaming_job.database_id() as _),
1137            Some(streaming_job.schema_id() as _),
1138            streaming_job.create_type(),
1139            ctx,
1140            adaptive_parallelism_strategy,
1141            parallelism,
1142            original_job.max_parallelism as _,
1143            resource_type,
1144            // `backfill_parallelism` is intentionally NOT inherited from the original job.
1145            // Replace has no "backfill finish -> restore parallelism" phase, so inheriting it
1146            // would render actors at the backfill parallelism and never recover to steady-state.
1147            None,
1148            None,
1149            None, // refresh_interval_sec: not applicable for replace jobs
1150        )
1151        .await?;
1152
1153        // 5. record dependency for new replace table.
1154        ObjectDependency::insert(object_dependency::ActiveModel {
1155            oid: Set(id.as_object_id()),
1156            used_by: Set(tmp_model.job_id.as_object_id()),
1157            ..Default::default()
1158        })
1159        .exec(&txn)
1160        .await?;
1161
1162        txn.commit().await?;
1163
1164        Ok(tmp_model)
1165    }
1166
1167    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1168    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1169        let mut inner = self.inner.write().await;
1170        let txn = inner.db.begin().await?;
1171
1172        // Check if the job belongs to iceberg table.
1173        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1174            tracing::info!(
1175                "streaming job {} is for iceberg table, wait for manual finish operation",
1176                job_id
1177            );
1178            return Ok(());
1179        }
1180
1181        let (notification_op, objects, updated_user_info, dependencies) =
1182            self.finish_streaming_job_inner(&txn, job_id).await?;
1183
1184        txn.commit().await?;
1185
1186        let mut version = self
1187            .notify_frontend(
1188                notification_op,
1189                NotificationInfo::ObjectGroup(PbObjectGroup {
1190                    objects,
1191                    dependencies,
1192                }),
1193            )
1194            .await;
1195
1196        // notify users about the default privileges
1197        if !updated_user_info.is_empty() {
1198            version = self.notify_users_update(updated_user_info).await;
1199        }
1200
1201        inner
1202            .creating_table_finish_notifier
1203            .values_mut()
1204            .for_each(|creating_tables| {
1205                if let Some(txs) = creating_tables.remove(&job_id) {
1206                    for tx in txs {
1207                        let _ = tx.send(Ok(version));
1208                    }
1209                }
1210            });
1211
1212        Ok(())
1213    }
1214
1215    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1216    pub async fn finish_streaming_job_inner(
1217        &self,
1218        txn: &DatabaseTransaction,
1219        job_id: JobId,
1220    ) -> MetaResult<(
1221        Operation,
1222        Vec<risingwave_pb::meta::Object>,
1223        Vec<PbUserInfo>,
1224        Vec<PbObjectDependency>,
1225    )> {
1226        let job_type = Object::find_by_id(job_id)
1227            .select_only()
1228            .column(object::Column::ObjType)
1229            .into_tuple()
1230            .one(txn)
1231            .await?
1232            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1233
1234        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1235            .select_only()
1236            .column(streaming_job::Column::CreateType)
1237            .into_tuple()
1238            .one(txn)
1239            .await?
1240            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1241
1242        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1243        let res = Object::update_many()
1244            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1245            .col_expr(
1246                object::Column::CreatedAtClusterVersion,
1247                current_cluster_version().into(),
1248            )
1249            .filter(object::Column::Oid.eq(job_id))
1250            .exec(txn)
1251            .await?;
1252        if res.rows_affected == 0 {
1253            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1254        }
1255
1256        // mark the target stream job as `Created`.
1257        let job = streaming_job::ActiveModel {
1258            job_id: Set(job_id),
1259            job_status: Set(JobStatus::Created),
1260            ..Default::default()
1261        };
1262        let streaming_job = Some(job.update(txn).await?);
1263
1264        // notify frontend: job, internal tables.
1265        let internal_table_objs = Table::find()
1266            .find_also_related(Object)
1267            .filter(table::Column::BelongsToJobId.eq(job_id))
1268            .all(txn)
1269            .await?;
1270        let mut objects = internal_table_objs
1271            .iter()
1272            .map(|(table, obj)| PbObject {
1273                object_info: Some(PbObjectInfo::Table(
1274                    ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1275                )),
1276            })
1277            .collect_vec();
1278        let mut notification_op = if create_type == CreateType::Background {
1279            NotificationOperation::Update
1280        } else {
1281            NotificationOperation::Add
1282        };
1283        let mut updated_user_info = vec![];
1284        let mut need_grant_default_privileges = true;
1285
1286        match job_type {
1287            ObjectType::Table => {
1288                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1289                    .find_also_related(Object)
1290                    .one(txn)
1291                    .await?
1292                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1293                if table.table_type == TableType::MaterializedView {
1294                    notification_op = NotificationOperation::Update;
1295                }
1296
1297                if let Some(source_id) = table.optional_associated_source_id {
1298                    let (src, obj) = Source::find_by_id(source_id)
1299                        .find_also_related(Object)
1300                        .one(txn)
1301                        .await?
1302                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1303                    objects.push(PbObject {
1304                        object_info: Some(PbObjectInfo::Source(
1305                            ObjectModel(src, obj.unwrap(), None).into(),
1306                        )),
1307                    });
1308                }
1309                objects.push(PbObject {
1310                    object_info: Some(PbObjectInfo::Table(
1311                        ObjectModel(table, obj.unwrap(), streaming_job).into(),
1312                    )),
1313                });
1314            }
1315            ObjectType::Sink => {
1316                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1317                    .find_also_related(Object)
1318                    .one(txn)
1319                    .await?
1320                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1321                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1322                    need_grant_default_privileges = false;
1323                }
1324                // If sinks were pre-notified during CREATING, we should use Update at finish
1325                // to avoid duplicate Add notifications (align with MV behavior).
1326                notification_op = NotificationOperation::Update;
1327                objects.push(PbObject {
1328                    object_info: Some(PbObjectInfo::Sink(
1329                        ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1330                    )),
1331                });
1332            }
1333            ObjectType::Index => {
1334                need_grant_default_privileges = false;
1335                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1336                    .find_also_related(Object)
1337                    .one(txn)
1338                    .await?
1339                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1340                {
1341                    let (table, obj) = Table::find_by_id(index.index_table_id)
1342                        .find_also_related(Object)
1343                        .one(txn)
1344                        .await?
1345                        .ok_or_else(|| {
1346                            MetaError::catalog_id_not_found("table", index.index_table_id)
1347                        })?;
1348                    objects.push(PbObject {
1349                        object_info: Some(PbObjectInfo::Table(
1350                            ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1351                        )),
1352                    });
1353                }
1354
1355                // If the index is created on a table with privileges, we should also
1356                // grant the privileges for the index and its state tables.
1357                let primary_table_privileges = UserPrivilege::find()
1358                    .filter(
1359                        user_privilege::Column::Oid
1360                            .eq(index.primary_table_id)
1361                            .and(user_privilege::Column::Action.eq(Action::Select)),
1362                    )
1363                    .all(txn)
1364                    .await?;
1365                if !primary_table_privileges.is_empty() {
1366                    let index_state_table_ids: Vec<TableId> = Table::find()
1367                        .select_only()
1368                        .column(table::Column::TableId)
1369                        .filter(
1370                            table::Column::BelongsToJobId
1371                                .eq(job_id)
1372                                .or(table::Column::TableId.eq(index.index_table_id)),
1373                        )
1374                        .into_tuple()
1375                        .all(txn)
1376                        .await?;
1377                    let mut new_privileges = vec![];
1378                    for privilege in &primary_table_privileges {
1379                        for state_table_id in &index_state_table_ids {
1380                            new_privileges.push(user_privilege::ActiveModel {
1381                                id: Default::default(),
1382                                oid: Set(state_table_id.as_object_id()),
1383                                user_id: Set(privilege.user_id),
1384                                action: Set(Action::Select),
1385                                dependent_id: Set(privilege.dependent_id),
1386                                granted_by: Set(privilege.granted_by),
1387                                with_grant_option: Set(privilege.with_grant_option),
1388                            });
1389                        }
1390                    }
1391                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1392
1393                    updated_user_info = list_user_info_by_ids(
1394                        primary_table_privileges.into_iter().map(|p| p.user_id),
1395                        txn,
1396                    )
1397                    .await?;
1398                }
1399
1400                objects.push(PbObject {
1401                    object_info: Some(PbObjectInfo::Index(
1402                        ObjectModel(index, obj.unwrap(), streaming_job).into(),
1403                    )),
1404                });
1405            }
1406            ObjectType::Source => {
1407                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1408                    .find_also_related(Object)
1409                    .one(txn)
1410                    .await?
1411                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1412                objects.push(PbObject {
1413                    object_info: Some(PbObjectInfo::Source(
1414                        ObjectModel(source, obj.unwrap(), None).into(),
1415                    )),
1416                });
1417            }
1418            _ => unreachable!("invalid job type: {:?}", job_type),
1419        }
1420
1421        if need_grant_default_privileges {
1422            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1423        }
1424
1425        let dependencies =
1426            list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1427
1428        Ok((notification_op, objects, updated_user_info, dependencies))
1429    }
1430
1431    pub async fn finish_replace_streaming_job(
1432        &self,
1433        tmp_id: JobId,
1434        streaming_job: StreamingJob,
1435        replace_upstream: FragmentReplaceUpstream,
1436        sink_into_table_context: SinkIntoTableContext,
1437        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1438        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1439    ) -> MetaResult<NotificationVersion> {
1440        let inner = self.inner.write().await;
1441        let txn = inner.db.begin().await?;
1442
1443        let (objects, delete_notification_objs, old_fragment_ids, new_fragment_ids) =
1444            Self::finish_replace_streaming_job_inner(
1445                tmp_id,
1446                replace_upstream,
1447                sink_into_table_context,
1448                &txn,
1449                streaming_job,
1450                drop_table_connector_ctx,
1451                auto_refresh_schema_sinks,
1452            )
1453            .await?;
1454
1455        txn.commit().await?;
1456
1457        // Notify serving module: delete old fragment mappings, upsert new ones.
1458        let notification_manager = self.env.notification_manager();
1459        notification_manager.notify_serving_fragment_mapping_delete(
1460            old_fragment_ids.iter().map(|id| *id as _).collect(),
1461        );
1462        notification_manager.notify_serving_fragment_mapping_update(
1463            new_fragment_ids.iter().map(|id| *id as _).collect(),
1464        );
1465
1466        let mut version = self
1467            .notify_frontend(
1468                NotificationOperation::Update,
1469                NotificationInfo::ObjectGroup(PbObjectGroup {
1470                    objects,
1471                    dependencies: vec![],
1472                }),
1473            )
1474            .await;
1475
1476        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1477            self.notify_users_update(user_infos).await;
1478            version = self
1479                .notify_frontend(
1480                    NotificationOperation::Delete,
1481                    build_object_group_for_delete(to_drop_objects),
1482                )
1483                .await;
1484        }
1485
1486        Ok(version)
1487    }
1488
1489    fn update_iceberg_source_columns(
1490        original_source_columns: &[ColumnCatalog],
1491        original_row_id_index: Option<usize>,
1492        new_table_columns: &[ColumnCatalog],
1493    ) -> (Vec<ColumnCatalog>, Option<usize>) {
1494        let row_id_column_name = original_row_id_index
1495            .and_then(|idx| original_source_columns.get(idx))
1496            .map(|col| col.name().to_owned());
1497        let mut next_column_id = max_column_id(original_source_columns).next();
1498        let existing_columns: HashMap<String, ColumnCatalog> = original_source_columns
1499            .iter()
1500            .cloned()
1501            .map(|col| (col.name().to_owned(), col))
1502            .collect();
1503
1504        let mut new_columns = Vec::new();
1505        for table_col in new_table_columns
1506            .iter()
1507            .filter(|col| !col.is_rw_sys_column())
1508        {
1509            let mut source_col_name = table_col.name().to_owned();
1510            if source_col_name == ROW_ID_COLUMN_NAME {
1511                source_col_name = RISINGWAVE_ICEBERG_ROW_ID.to_owned();
1512            }
1513
1514            if let Some(existing) = existing_columns.get(&source_col_name) {
1515                new_columns.push(existing.clone());
1516            } else {
1517                let mut new_col = table_col.clone();
1518                new_col.column_desc.name = source_col_name;
1519                new_col.column_desc.column_id = next_column_id;
1520                next_column_id = next_column_id.next();
1521                new_columns.push(new_col);
1522            }
1523        }
1524
1525        let mut seen_names: HashSet<String> = new_columns
1526            .iter()
1527            .map(|col| col.name().to_owned())
1528            .collect();
1529        for col in original_source_columns
1530            .iter()
1531            .filter(|col| col.is_iceberg_hidden_column())
1532        {
1533            if seen_names.insert(col.name().to_owned()) {
1534                new_columns.push(col.clone());
1535            }
1536        }
1537
1538        let new_row_id_index = row_id_column_name
1539            .as_ref()
1540            .and_then(|name| new_columns.iter().position(|col| col.name() == name));
1541
1542        (new_columns, new_row_id_index)
1543    }
1544
1545    pub async fn finish_replace_streaming_job_inner(
1546        tmp_id: JobId,
1547        replace_upstream: FragmentReplaceUpstream,
1548        SinkIntoTableContext {
1549            updated_sink_catalogs,
1550        }: SinkIntoTableContext,
1551        txn: &DatabaseTransaction,
1552        streaming_job: StreamingJob,
1553        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1554        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1555    ) -> MetaResult<(
1556        Vec<PbObject>,
1557        Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1558        Vec<FragmentId>,
1559        Vec<FragmentId>,
1560    )> {
1561        let original_job_id = streaming_job.id();
1562        let job_type = streaming_job.job_type();
1563
1564        // Query old fragment IDs (will be deleted) and new fragment IDs (will be reassigned).
1565        let old_fragment_ids: Vec<FragmentId> = Fragment::find()
1566            .select_only()
1567            .column(fragment::Column::FragmentId)
1568            .filter(fragment::Column::JobId.eq(original_job_id))
1569            .into_tuple()
1570            .all(txn)
1571            .await?;
1572        let new_fragment_ids: Vec<FragmentId> = Fragment::find()
1573            .select_only()
1574            .column(fragment::Column::FragmentId)
1575            .filter(fragment::Column::JobId.eq(tmp_id))
1576            .into_tuple()
1577            .all(txn)
1578            .await?;
1579
1580        let mut index_item_rewriter = None;
1581        let mut updated_iceberg_source_id: Option<SourceId> = None;
1582
1583        // Update catalog
1584        match streaming_job {
1585            StreamingJob::Table(_, table, _table_job_type) => {
1586                let original_column_catalogs =
1587                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1588                let schema_changed = original_column_catalogs.to_protobuf() != table.columns;
1589                let is_iceberg = table
1590                    .engine
1591                    .and_then(|engine| PbEngine::try_from(engine).ok())
1592                    == Some(PbEngine::Iceberg);
1593                if is_iceberg && schema_changed {
1594                    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1595                    let source = Source::find()
1596                        .inner_join(Object)
1597                        .filter(
1598                            object::Column::DatabaseId
1599                                .eq(table.database_id)
1600                                .and(object::Column::SchemaId.eq(table.schema_id))
1601                                .and(source::Column::Name.eq(&iceberg_source_name)),
1602                        )
1603                        .one(txn)
1604                        .await?
1605                        .ok_or_else(|| {
1606                            MetaError::catalog_id_not_found("source", iceberg_source_name)
1607                        })?;
1608
1609                    let source_id = source.source_id;
1610                    let source_version = source.version;
1611                    let source_columns: Vec<ColumnCatalog> = source
1612                        .columns
1613                        .to_protobuf()
1614                        .into_iter()
1615                        .map(ColumnCatalog::from)
1616                        .collect();
1617                    let source_row_id_index = source
1618                        .row_id_index
1619                        .and_then(|idx| usize::try_from(idx).ok());
1620                    let table_columns: Vec<ColumnCatalog> = table
1621                        .columns
1622                        .iter()
1623                        .cloned()
1624                        .map(ColumnCatalog::from)
1625                        .collect();
1626                    let (updated_columns, updated_row_id_index) =
1627                        Self::update_iceberg_source_columns(
1628                            &source_columns,
1629                            source_row_id_index,
1630                            &table_columns,
1631                        );
1632                    let updated_columns: Vec<PbColumnCatalog> = updated_columns
1633                        .into_iter()
1634                        .map(|col| col.to_protobuf())
1635                        .collect();
1636
1637                    let mut source_active = source.into_active_model();
1638                    source_active.columns = Set(ColumnCatalogArray::from(updated_columns));
1639                    source_active.row_id_index = Set(updated_row_id_index.map(|idx| idx as i32));
1640                    source_active.version = Set(source_version + 1);
1641                    source_active.update(txn).await?;
1642                    updated_iceberg_source_id = Some(source_id);
1643                }
1644
1645                index_item_rewriter = Some({
1646                    let original_columns = original_column_catalogs
1647                        .to_protobuf()
1648                        .into_iter()
1649                        .map(|c| c.column_desc.unwrap())
1650                        .collect_vec();
1651                    let new_columns = table
1652                        .columns
1653                        .iter()
1654                        .map(|c| c.column_desc.clone().unwrap())
1655                        .collect_vec();
1656
1657                    IndexItemRewriter {
1658                        original_columns,
1659                        new_columns,
1660                    }
1661                });
1662
1663                // For sinks created in earlier versions, we need to set the original_target_columns.
1664                for sink_id in updated_sink_catalogs {
1665                    Sink::update(sink::ActiveModel {
1666                        sink_id: Set(sink_id as _),
1667                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1668                        ..Default::default()
1669                    })
1670                    .exec(txn)
1671                    .await?;
1672                }
1673                // Update the table catalog with the new one. (column catalog is also updated here)
1674                let mut table = table::ActiveModel::from(table);
1675                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1676                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1677                {
1678                    // drop table connector, the rest logic is in `drop_table_associated_source`
1679                    table.optional_associated_source_id = Set(None);
1680                }
1681
1682                Table::update(table).exec(txn).await?;
1683            }
1684            StreamingJob::Source(source) => {
1685                // Update the source catalog with the new one.
1686                let source = source::ActiveModel::from(source);
1687                Source::update(source).exec(txn).await?;
1688            }
1689            StreamingJob::MaterializedView(table) => {
1690                // Update the table catalog with the new one.
1691                let table = table::ActiveModel::from(table);
1692                Table::update(table).exec(txn).await?;
1693            }
1694            _ => unreachable!(
1695                "invalid streaming job type: {:?}",
1696                streaming_job.job_type_str()
1697            ),
1698        }
1699
1700        async fn finish_fragments(
1701            txn: &DatabaseTransaction,
1702            tmp_id: JobId,
1703            original_job_id: JobId,
1704            replace_upstream: FragmentReplaceUpstream,
1705        ) -> MetaResult<()> {
1706            // 0. update internal tables
1707            // Fields including `fragment_id` were placeholder values before.
1708            // After table fragments are created, update them for all internal tables.
1709            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1710                .select_only()
1711                .columns([
1712                    fragment::Column::FragmentId,
1713                    fragment::Column::StateTableIds,
1714                ])
1715                .filter(fragment::Column::JobId.eq(tmp_id))
1716                .into_tuple()
1717                .all(txn)
1718                .await?;
1719            for (fragment_id, state_table_ids) in fragment_info {
1720                for state_table_id in state_table_ids.into_inner() {
1721                    let state_table_id = TableId::new(state_table_id as _);
1722                    Table::update(table::ActiveModel {
1723                        table_id: Set(state_table_id),
1724                        fragment_id: Set(Some(fragment_id)),
1725                        // No need to update `vnode_count` because it must remain the same.
1726                        ..Default::default()
1727                    })
1728                    .exec(txn)
1729                    .await?;
1730                }
1731            }
1732
1733            // 1. replace old fragments/actors with new ones.
1734            Fragment::delete_many()
1735                .filter(fragment::Column::JobId.eq(original_job_id))
1736                .exec(txn)
1737                .await?;
1738            Fragment::update_many()
1739                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1740                .filter(fragment::Column::JobId.eq(tmp_id))
1741                .exec(txn)
1742                .await?;
1743
1744            // 2. update merges.
1745            // update downstream fragment's Merge node, and upstream_fragment_id
1746            for (fragment_id, fragment_replace_map) in replace_upstream {
1747                let (fragment_id, mut stream_node) =
1748                    Fragment::find_by_id(fragment_id as FragmentId)
1749                        .select_only()
1750                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1751                        .into_tuple::<(FragmentId, StreamNode)>()
1752                        .one(txn)
1753                        .await?
1754                        .map(|(id, node)| (id, node.to_protobuf()))
1755                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1756
1757                visit_stream_node_mut(&mut stream_node, |body| {
1758                    if let PbNodeBody::Merge(m) = body
1759                        && let Some(new_fragment_id) =
1760                            fragment_replace_map.get(&m.upstream_fragment_id)
1761                    {
1762                        m.upstream_fragment_id = *new_fragment_id;
1763                    }
1764                });
1765                Fragment::update(fragment::ActiveModel {
1766                    fragment_id: Set(fragment_id),
1767                    stream_node: Set(StreamNode::from(&stream_node)),
1768                    ..Default::default()
1769                })
1770                .exec(txn)
1771                .await?;
1772            }
1773
1774            // 3. remove dummy object.
1775            Object::delete_by_id(tmp_id).exec(txn).await?;
1776
1777            Ok(())
1778        }
1779
1780        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1781
1782        // 4. update catalogs and notify.
1783        let mut objects = vec![];
1784        match job_type {
1785            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1786                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1787                    .find_also_related(Object)
1788                    .one(txn)
1789                    .await?
1790                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1791                let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1792                    .one(txn)
1793                    .await?;
1794                objects.push(PbObject {
1795                    object_info: Some(PbObjectInfo::Table(
1796                        ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1797                    )),
1798                })
1799            }
1800            StreamingJobType::Source => {
1801                let (source, source_obj) =
1802                    Source::find_by_id(original_job_id.as_shared_source_id())
1803                        .find_also_related(Object)
1804                        .one(txn)
1805                        .await?
1806                        .ok_or_else(|| {
1807                            MetaError::catalog_id_not_found("object", original_job_id)
1808                        })?;
1809                objects.push(PbObject {
1810                    object_info: Some(PbObjectInfo::Source(
1811                        ObjectModel(source, source_obj.unwrap(), None).into(),
1812                    )),
1813                })
1814            }
1815            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1816        }
1817
1818        if let Some(source_id) = updated_iceberg_source_id {
1819            let (source, source_obj) = Source::find_by_id(source_id)
1820                .find_also_related(Object)
1821                .one(txn)
1822                .await?
1823                .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1824            objects.push(PbObject {
1825                object_info: Some(PbObjectInfo::Source(
1826                    ObjectModel(source, source_obj.unwrap(), None).into(),
1827                )),
1828            });
1829        }
1830
1831        if let Some(expr_rewriter) = index_item_rewriter {
1832            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1833                .select_only()
1834                .columns([index::Column::IndexId, index::Column::IndexItems])
1835                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1836                .into_tuple()
1837                .all(txn)
1838                .await?;
1839            for (index_id, nodes) in index_items {
1840                let mut pb_nodes = nodes.to_protobuf();
1841                pb_nodes
1842                    .iter_mut()
1843                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1844                let index = index::ActiveModel {
1845                    index_id: Set(index_id),
1846                    index_items: Set(pb_nodes.into()),
1847                    ..Default::default()
1848                }
1849                .update(txn)
1850                .await?;
1851                let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1852                    .find_also_related(streaming_job::Entity)
1853                    .one(txn)
1854                    .await?
1855                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1856                objects.push(PbObject {
1857                    object_info: Some(PbObjectInfo::Index(
1858                        ObjectModel(index, index_obj, streaming_job).into(),
1859                    )),
1860                });
1861            }
1862        }
1863
1864        if let Some(sinks) = auto_refresh_schema_sinks {
1865            for finish_sink_context in sinks {
1866                finish_fragments(
1867                    txn,
1868                    finish_sink_context.tmp_sink_id.as_job_id(),
1869                    finish_sink_context.original_sink_id.as_job_id(),
1870                    Default::default(),
1871                )
1872                .await?;
1873                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1874                    .find_also_related(Object)
1875                    .one(txn)
1876                    .await?
1877                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1878                let sink_streaming_job =
1879                    streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1880                        .one(txn)
1881                        .await?;
1882                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1883                Sink::update(sink::ActiveModel {
1884                    sink_id: Set(finish_sink_context.original_sink_id),
1885                    columns: Set(columns.clone()),
1886                    ..Default::default()
1887                })
1888                .exec(txn)
1889                .await?;
1890                sink.columns = columns;
1891                objects.push(PbObject {
1892                    object_info: Some(PbObjectInfo::Sink(
1893                        ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1894                    )),
1895                });
1896                if let Some(new_log_store_table) = finish_sink_context.new_log_store_table {
1897                    let log_store_table_id = new_log_store_table.id;
1898                    let new_log_store_table_columns: ColumnCatalogArray =
1899                        new_log_store_table.columns.clone().into();
1900                    let new_log_store_table_value_indices =
1901                        new_log_store_table.value_indices.clone();
1902                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1903                        .find_also_related(Object)
1904                        .one(txn)
1905                        .await?
1906                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1907                    Table::update(table::ActiveModel {
1908                        table_id: Set(log_store_table_id),
1909                        columns: Set(new_log_store_table_columns.clone()),
1910                        value_indices: Set(new_log_store_table_value_indices.clone().into()),
1911                        ..Default::default()
1912                    })
1913                    .exec(txn)
1914                    .await?;
1915                    table.columns = new_log_store_table_columns;
1916                    table.value_indices = new_log_store_table_value_indices.into();
1917                    objects.push(PbObject {
1918                        object_info: Some(PbObjectInfo::Table(
1919                            ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1920                                .into(),
1921                        )),
1922                    });
1923                }
1924            }
1925        }
1926
1927        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1928        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1929            notification_objs =
1930                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1931        }
1932
1933        Ok((
1934            objects,
1935            notification_objs,
1936            old_fragment_ids,
1937            new_fragment_ids,
1938        ))
1939    }
1940
1941    /// Abort the replacing streaming job by deleting the temporary job object.
1942    pub async fn try_abort_replacing_streaming_job(
1943        &self,
1944        tmp_job_id: JobId,
1945        tmp_sink_ids: Option<Vec<ObjectId>>,
1946    ) -> MetaResult<()> {
1947        let inner = self.inner.write().await;
1948        let txn = inner.db.begin().await?;
1949
1950        // Query fragment IDs of the temp job and temp sinks before cascade-deleting them.
1951        let mut all_job_ids: Vec<ObjectId> = vec![tmp_job_id.into()];
1952        if let Some(ref sink_ids) = tmp_sink_ids {
1953            all_job_ids.extend(sink_ids.iter().copied());
1954        }
1955        let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
1956            .select_only()
1957            .column(fragment::Column::FragmentId)
1958            .filter(fragment::Column::JobId.is_in(all_job_ids))
1959            .into_tuple()
1960            .all(&txn)
1961            .await?;
1962
1963        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1964        if let Some(tmp_sink_ids) = tmp_sink_ids {
1965            for tmp_sink_id in tmp_sink_ids {
1966                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1967            }
1968        }
1969        txn.commit().await?;
1970
1971        // Notify serving module about deleted fragments from the aborted replace job.
1972        self.env
1973            .notification_manager()
1974            .notify_serving_fragment_mapping_delete(
1975                abort_fragment_ids.iter().map(|id| *id as _).collect(),
1976            );
1977
1978        Ok(())
1979    }
1980
1981    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1982    // return the actor_ids to be applied
1983    pub async fn update_source_rate_limit_by_source_id(
1984        &self,
1985        source_id: SourceId,
1986        rate_limit: Option<u32>,
1987    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1988        let inner = self.inner.read().await;
1989        let txn = inner.db.begin().await?;
1990
1991        {
1992            let active_source = source::ActiveModel {
1993                source_id: Set(source_id),
1994                rate_limit: Set(rate_limit.map(|v| v as i32)),
1995                ..Default::default()
1996            };
1997            Source::update(active_source).exec(&txn).await?;
1998        }
1999
2000        let (source, obj) = Source::find_by_id(source_id)
2001            .find_also_related(Object)
2002            .one(&txn)
2003            .await?
2004            .ok_or_else(|| {
2005                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2006            })?;
2007
2008        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
2009        let streaming_job_ids: Vec<JobId> =
2010            if let Some(table_id) = source.optional_associated_table_id {
2011                vec![table_id.as_job_id()]
2012            } else if let Some(source_info) = &source.source_info
2013                && source_info.to_protobuf().is_shared()
2014            {
2015                vec![source_id.as_share_source_job_id()]
2016            } else {
2017                ObjectDependency::find()
2018                    .select_only()
2019                    .column(object_dependency::Column::UsedBy)
2020                    .filter(object_dependency::Column::Oid.eq(source_id))
2021                    .into_tuple()
2022                    .all(&txn)
2023                    .await?
2024            };
2025
2026        if streaming_job_ids.is_empty() {
2027            return Err(MetaError::invalid_parameter(format!(
2028                "source id {source_id} not used by any streaming job"
2029            )));
2030        }
2031
2032        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2033            .select_only()
2034            .columns([
2035                fragment::Column::FragmentId,
2036                fragment::Column::JobId,
2037                fragment::Column::FragmentTypeMask,
2038                fragment::Column::StreamNode,
2039            ])
2040            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
2041            .into_tuple()
2042            .all(&txn)
2043            .await?;
2044        let mut fragments = fragments
2045            .into_iter()
2046            .map(|(id, job_id, mask, stream_node)| {
2047                (
2048                    id,
2049                    job_id,
2050                    FragmentTypeMask::from(mask as u32),
2051                    stream_node.to_protobuf(),
2052                )
2053            })
2054            .collect_vec();
2055
2056        fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
2057            let mut found = false;
2058            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
2059                visit_stream_node_mut(stream_node, |node| {
2060                    if let PbNodeBody::Source(node) = node
2061                        && let Some(node_inner) = &mut node.source_inner
2062                        && node_inner.source_id == source_id
2063                    {
2064                        node_inner.rate_limit = rate_limit;
2065                        found = true;
2066                    }
2067                });
2068            }
2069            if is_fs_source {
2070                // in older versions, there's no fragment type flag for `FsFetch` node,
2071                // so we just scan all fragments for StreamFsFetch node if using fs connector
2072                visit_stream_node_mut(stream_node, |node| {
2073                    if let PbNodeBody::StreamFsFetch(node) = node {
2074                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
2075                        if let Some(node_inner) = &mut node.node_inner
2076                            && node_inner.source_id == source_id
2077                        {
2078                            node_inner.rate_limit = rate_limit;
2079                            found = true;
2080                        }
2081                    }
2082                });
2083            }
2084            found
2085        });
2086
2087        assert!(
2088            !fragments.is_empty(),
2089            "source id should be used by at least one fragment"
2090        );
2091
2092        let (fragment_ids, job_ids) = fragments
2093            .iter()
2094            .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
2095            .unzip();
2096
2097        for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
2098            Fragment::update(fragment::ActiveModel {
2099                fragment_id: Set(fragment_id),
2100                fragment_type_mask: Set(fragment_type_mask.into()),
2101                stream_node: Set(StreamNode::from(&stream_node)),
2102                ..Default::default()
2103            })
2104            .exec(&txn)
2105            .await?;
2106        }
2107
2108        txn.commit().await?;
2109
2110        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
2111        let _version = self
2112            .notify_frontend(
2113                NotificationOperation::Update,
2114                NotificationInfo::ObjectGroup(PbObjectGroup {
2115                    objects: vec![PbObject {
2116                        object_info: Some(relation_info),
2117                    }],
2118                    dependencies: vec![],
2119                }),
2120            )
2121            .await;
2122
2123        Ok((job_ids, fragment_ids))
2124    }
2125
2126    // edit the content of fragments in given `table_id`
2127    // return the actor_ids to be applied
2128    pub async fn mutate_fragments_by_job_id(
2129        &self,
2130        job_id: JobId,
2131        // returns true if the mutation is applied
2132        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
2133        // error message when no relevant fragments is found
2134        err_msg: &'static str,
2135    ) -> MetaResult<HashSet<FragmentId>> {
2136        let inner = self.inner.read().await;
2137        let txn = inner.db.begin().await?;
2138
2139        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2140            .select_only()
2141            .columns([
2142                fragment::Column::FragmentId,
2143                fragment::Column::FragmentTypeMask,
2144                fragment::Column::StreamNode,
2145            ])
2146            .filter(fragment::Column::JobId.eq(job_id))
2147            .into_tuple()
2148            .all(&txn)
2149            .await?;
2150        let mut fragments = fragments
2151            .into_iter()
2152            .map(|(id, mask, stream_node)| {
2153                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
2154            })
2155            .collect_vec();
2156
2157        let fragments = fragments
2158            .iter_mut()
2159            .map(|(_, fragment_type_mask, stream_node)| {
2160                fragments_mutation_fn(*fragment_type_mask, stream_node)
2161            })
2162            .collect::<MetaResult<Vec<bool>>>()?
2163            .into_iter()
2164            .zip_eq_debug(std::mem::take(&mut fragments))
2165            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
2166            .collect::<Vec<_>>();
2167
2168        if fragments.is_empty() {
2169            return Err(MetaError::invalid_parameter(format!(
2170                "job id {job_id}: {}",
2171                err_msg
2172            )));
2173        }
2174
2175        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
2176        for (id, _, stream_node) in fragments {
2177            Fragment::update(fragment::ActiveModel {
2178                fragment_id: Set(id),
2179                stream_node: Set(StreamNode::from(&stream_node)),
2180                ..Default::default()
2181            })
2182            .exec(&txn)
2183            .await?;
2184        }
2185
2186        txn.commit().await?;
2187
2188        Ok(fragment_ids)
2189    }
2190
2191    async fn mutate_fragment_by_fragment_id(
2192        &self,
2193        fragment_id: FragmentId,
2194        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
2195        err_msg: &'static str,
2196    ) -> MetaResult<()> {
2197        let inner = self.inner.read().await;
2198        let txn = inner.db.begin().await?;
2199
2200        let (fragment_type_mask, stream_node): (i32, StreamNode) =
2201            Fragment::find_by_id(fragment_id)
2202                .select_only()
2203                .columns([
2204                    fragment::Column::FragmentTypeMask,
2205                    fragment::Column::StreamNode,
2206                ])
2207                .into_tuple()
2208                .one(&txn)
2209                .await?
2210                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2211        let mut pb_stream_node = stream_node.to_protobuf();
2212        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
2213
2214        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
2215            return Err(MetaError::invalid_parameter(format!(
2216                "fragment id {fragment_id}: {}",
2217                err_msg
2218            )));
2219        }
2220
2221        Fragment::update(fragment::ActiveModel {
2222            fragment_id: Set(fragment_id),
2223            stream_node: Set(stream_node),
2224            ..Default::default()
2225        })
2226        .exec(&txn)
2227        .await?;
2228
2229        txn.commit().await?;
2230
2231        Ok(())
2232    }
2233
2234    pub async fn update_backfill_orders_by_job_id(
2235        &self,
2236        job_id: JobId,
2237        backfill_orders: Option<BackfillOrders>,
2238    ) -> MetaResult<()> {
2239        let inner = self.inner.write().await;
2240        let txn = inner.db.begin().await?;
2241
2242        ensure_job_not_canceled(job_id, &txn).await?;
2243
2244        streaming_job::ActiveModel {
2245            job_id: Set(job_id),
2246            backfill_orders: Set(backfill_orders),
2247            ..Default::default()
2248        }
2249        .update(&txn)
2250        .await?;
2251
2252        txn.commit().await?;
2253
2254        Ok(())
2255    }
2256
2257    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
2258    // return the actor_ids to be applied
2259    pub async fn update_backfill_rate_limit_by_job_id(
2260        &self,
2261        job_id: JobId,
2262        rate_limit: Option<u32>,
2263    ) -> MetaResult<HashSet<FragmentId>> {
2264        let update_backfill_rate_limit =
2265            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2266                let mut found = false;
2267                if fragment_type_mask
2268                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
2269                {
2270                    visit_stream_node_mut(stream_node, |node| match node {
2271                        PbNodeBody::StreamCdcScan(node) => {
2272                            node.rate_limit = rate_limit;
2273                            found = true;
2274                        }
2275                        PbNodeBody::StreamScan(node) => {
2276                            node.rate_limit = rate_limit;
2277                            found = true;
2278                        }
2279                        PbNodeBody::SourceBackfill(node) => {
2280                            node.rate_limit = rate_limit;
2281                            found = true;
2282                        }
2283                        _ => {}
2284                    });
2285                }
2286                Ok(found)
2287            };
2288
2289        self.mutate_fragments_by_job_id(
2290            job_id,
2291            update_backfill_rate_limit,
2292            "stream scan node or source node not found",
2293        )
2294        .await
2295    }
2296
2297    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
2298    // return the actor_ids to be applied
2299    pub async fn update_sink_rate_limit_by_job_id(
2300        &self,
2301        sink_id: SinkId,
2302        rate_limit: Option<u32>,
2303    ) -> MetaResult<HashSet<FragmentId>> {
2304        let update_sink_rate_limit =
2305            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2306                let mut found = Ok(false);
2307                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2308                    visit_stream_node_mut(stream_node, |node| {
2309                        if let PbNodeBody::Sink(node) = node {
2310                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2311                                found = Err(MetaError::invalid_parameter(
2312                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2313                                ));
2314                                return;
2315                            }
2316                            node.rate_limit = rate_limit;
2317                            found = Ok(true);
2318                        }
2319                    });
2320                }
2321                found
2322            };
2323
2324        self.mutate_fragments_by_job_id(
2325            sink_id.as_job_id(),
2326            update_sink_rate_limit,
2327            "sink node not found",
2328        )
2329        .await
2330    }
2331
2332    pub async fn update_dml_rate_limit_by_job_id(
2333        &self,
2334        job_id: JobId,
2335        rate_limit: Option<u32>,
2336    ) -> MetaResult<HashSet<FragmentId>> {
2337        let update_dml_rate_limit =
2338            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2339                let mut found = false;
2340                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2341                    visit_stream_node_mut(stream_node, |node| {
2342                        if let PbNodeBody::Dml(node) = node {
2343                            node.rate_limit = rate_limit;
2344                            found = true;
2345                        }
2346                    });
2347                }
2348                Ok(found)
2349            };
2350
2351        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2352            .await
2353    }
2354
2355    pub async fn update_source_props_by_source_id(
2356        &self,
2357        source_id: SourceId,
2358        alter_props: BTreeMap<String, String>,
2359        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2360        skip_alter_on_fly_check: bool,
2361    ) -> MetaResult<WithOptionsSecResolved> {
2362        let inner = self.inner.read().await;
2363        let txn = inner.db.begin().await?;
2364
2365        let (source, _obj) = Source::find_by_id(source_id)
2366            .find_also_related(Object)
2367            .one(&txn)
2368            .await?
2369            .ok_or_else(|| {
2370                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2371            })?;
2372        let connector = source.with_properties.0.get_connector().unwrap();
2373        let is_shared_source = source.is_shared();
2374
2375        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2376        if !is_shared_source {
2377            // mv using non-shared source holds a copy of source in their fragments
2378            dep_source_job_ids = ObjectDependency::find()
2379                .select_only()
2380                .column(object_dependency::Column::UsedBy)
2381                .filter(object_dependency::Column::Oid.eq(source_id))
2382                .into_tuple()
2383                .all(&txn)
2384                .await?;
2385        }
2386
2387        // Validate that connector type is not being changed
2388        if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2389            && new_connector != &connector
2390        {
2391            return Err(MetaError::invalid_parameter(format!(
2392                "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2393                connector, new_connector
2394            )));
2395        }
2396
2397        // Only check alter-on-fly restrictions for SQL ALTER SOURCE, not for admin risectl operations
2398        if !skip_alter_on_fly_check {
2399            let prop_keys: Vec<String> = alter_props
2400                .keys()
2401                .chain(alter_secret_refs.keys())
2402                .cloned()
2403                .collect();
2404            risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2405                &connector, &prop_keys,
2406            )?;
2407        }
2408
2409        let mut options_with_secret = WithOptionsSecResolved::new(
2410            source.with_properties.0.clone(),
2411            source
2412                .secret_ref
2413                .map(|secret_ref| secret_ref.to_protobuf())
2414                .unwrap_or_default(),
2415        );
2416        let (to_add_secret_dep, to_remove_secret_dep) =
2417            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2418
2419        tracing::info!(
2420            "applying new properties to source: source_id={}, options_with_secret={:?}",
2421            source_id,
2422            options_with_secret
2423        );
2424        // check if the alter-ed props are valid for each Connector
2425        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2426        // todo: validate via source manager
2427
2428        let mut associate_table_id = None;
2429
2430        // can be source_id or table_id
2431        // if updating an associated source, the preferred_id is the table_id
2432        // otherwise, it is the source_id
2433        let mut preferred_id = source_id.as_object_id();
2434        let rewrite_sql = {
2435            let definition = source.definition.clone();
2436
2437            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2438                .map_err(|e| {
2439                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2440                        anyhow!(e).context("Failed to parse source definition SQL"),
2441                    )))
2442                })?
2443                .try_into()
2444                .unwrap();
2445
2446            /// Formats SQL options with secret values properly resolved
2447            ///
2448            /// This function processes configuration options that may contain sensitive data:
2449            /// - Plaintext options are directly converted to `SqlOption`
2450            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2451            ///   without exposing the actual secret value
2452            ///
2453            /// # Arguments
2454            /// * `txn` - Database transaction for retrieving secrets
2455            /// * `options_with_secret` - Container of options with both plaintext and secret values
2456            ///
2457            /// # Returns
2458            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2459            async fn format_with_option_secret_resolved(
2460                txn: &DatabaseTransaction,
2461                options_with_secret: &WithOptionsSecResolved,
2462            ) -> MetaResult<Vec<SqlOption>> {
2463                let mut options = Vec::new();
2464                for (k, v) in options_with_secret.as_plaintext() {
2465                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2466                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2467                    options.push(sql_option);
2468                }
2469                for (k, v) in options_with_secret.as_secret() {
2470                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2471                        let sql_option =
2472                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2473                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2474                        options.push(sql_option);
2475                    } else {
2476                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2477                    }
2478                }
2479                Ok(options)
2480            }
2481
2482            match &mut stmt {
2483                Statement::CreateSource { stmt } => {
2484                    stmt.with_properties.0 =
2485                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2486                }
2487                Statement::CreateTable { with_options, .. } => {
2488                    *with_options =
2489                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2490                    associate_table_id = source.optional_associated_table_id;
2491                    preferred_id = associate_table_id.unwrap().as_object_id();
2492                }
2493                _ => unreachable!(),
2494            }
2495
2496            stmt.to_string()
2497        };
2498
2499        {
2500            // Update secret dependencies atomically within the transaction.
2501            // Add new dependencies for secrets that are newly referenced.
2502            if !to_add_secret_dep.is_empty() {
2503                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2504                    object_dependency::ActiveModel {
2505                        oid: Set(secret_id.into()),
2506                        used_by: Set(preferred_id),
2507                        ..Default::default()
2508                    }
2509                }))
2510                .exec(&txn)
2511                .await?;
2512            }
2513            // Remove dependencies for secrets that are no longer referenced.
2514            // This allows the secrets to be deleted after this source no longer uses them.
2515            if !to_remove_secret_dep.is_empty() {
2516                let _ = ObjectDependency::delete_many()
2517                    .filter(
2518                        object_dependency::Column::Oid
2519                            .is_in(to_remove_secret_dep)
2520                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2521                    )
2522                    .exec(&txn)
2523                    .await?;
2524            }
2525        }
2526
2527        let active_source_model = source::ActiveModel {
2528            source_id: Set(source_id),
2529            definition: Set(rewrite_sql.clone()),
2530            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2531            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2532                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2533            ..Default::default()
2534        };
2535        Source::update(active_source_model).exec(&txn).await?;
2536
2537        if let Some(associate_table_id) = associate_table_id {
2538            // update the associated table statement accordly
2539            let active_table_model = table::ActiveModel {
2540                table_id: Set(associate_table_id),
2541                definition: Set(rewrite_sql),
2542                ..Default::default()
2543            };
2544            Table::update(active_table_model).exec(&txn).await?;
2545        }
2546
2547        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2548            // if updating table with connector, the fragment_id is table id
2549            associate_table_id.as_job_id()
2550        } else {
2551            source_id.as_share_source_job_id()
2552        }]
2553        .into_iter()
2554        .chain(dep_source_job_ids.into_iter())
2555        .collect_vec();
2556
2557        // update fragments
2558        update_connector_props_fragments(
2559            &txn,
2560            to_check_job_ids,
2561            FragmentTypeFlag::Source,
2562            |node, found| {
2563                if let PbNodeBody::Source(node) = node
2564                    && let Some(source_inner) = &mut node.source_inner
2565                {
2566                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2567                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2568                    *found = true;
2569                }
2570            },
2571            is_shared_source,
2572        )
2573        .await?;
2574
2575        let mut to_update_objs = Vec::with_capacity(2);
2576        let (source, obj) = Source::find_by_id(source_id)
2577            .find_also_related(Object)
2578            .one(&txn)
2579            .await?
2580            .ok_or_else(|| {
2581                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2582            })?;
2583        to_update_objs.push(PbObject {
2584            object_info: Some(PbObjectInfo::Source(
2585                ObjectModel(source, obj.unwrap(), None).into(),
2586            )),
2587        });
2588
2589        if let Some(associate_table_id) = associate_table_id {
2590            let (table, obj) = Table::find_by_id(associate_table_id)
2591                .find_also_related(Object)
2592                .one(&txn)
2593                .await?
2594                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2595            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2596                .one(&txn)
2597                .await?;
2598            to_update_objs.push(PbObject {
2599                object_info: Some(PbObjectInfo::Table(
2600                    ObjectModel(table, obj.unwrap(), streaming_job).into(),
2601                )),
2602            });
2603        }
2604
2605        txn.commit().await?;
2606
2607        self.notify_frontend(
2608            NotificationOperation::Update,
2609            NotificationInfo::ObjectGroup(PbObjectGroup {
2610                objects: to_update_objs,
2611                dependencies: vec![],
2612            }),
2613        )
2614        .await;
2615
2616        Ok(options_with_secret)
2617    }
2618
2619    pub async fn update_sink_props_by_sink_id(
2620        &self,
2621        sink_id: SinkId,
2622        props: BTreeMap<String, String>,
2623    ) -> MetaResult<HashMap<String, String>> {
2624        let inner = self.inner.read().await;
2625        let txn = inner.db.begin().await?;
2626
2627        let (sink, _obj) = Sink::find_by_id(sink_id)
2628            .find_also_related(Object)
2629            .one(&txn)
2630            .await?
2631            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2632        validate_sink_props(&sink, &props)?;
2633        let definition = sink.definition.clone();
2634        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2635            .map_err(|e| SinkError::Config(anyhow!(e)))?
2636            .try_into()
2637            .unwrap();
2638        if let Statement::CreateSink { stmt } = &mut stmt {
2639            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2640        } else {
2641            panic!("definition is not a create sink statement")
2642        }
2643        let mut new_config = sink.properties.clone().into_inner();
2644        new_config.extend(props.clone());
2645
2646        let definition = stmt.to_string();
2647        let active_sink = sink::ActiveModel {
2648            sink_id: Set(sink_id),
2649            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2650            definition: Set(definition),
2651            ..Default::default()
2652        };
2653        Sink::update(active_sink).exec(&txn).await?;
2654
2655        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2656        let (sink, obj) = Sink::find_by_id(sink_id)
2657            .find_also_related(Object)
2658            .one(&txn)
2659            .await?
2660            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2661        let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2662            .one(&txn)
2663            .await?;
2664        txn.commit().await?;
2665        let relation_infos = vec![PbObject {
2666            object_info: Some(PbObjectInfo::Sink(
2667                ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2668            )),
2669        }];
2670
2671        let _version = self
2672            .notify_frontend(
2673                NotificationOperation::Update,
2674                NotificationInfo::ObjectGroup(PbObjectGroup {
2675                    objects: relation_infos,
2676                    dependencies: vec![],
2677                }),
2678            )
2679            .await;
2680
2681        Ok(props.into_iter().collect())
2682    }
2683
2684    pub async fn update_iceberg_table_props_by_table_id(
2685        &self,
2686        table_id: TableId,
2687        props: BTreeMap<String, String>,
2688        alter_iceberg_table_props: Option<
2689            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2690        >,
2691    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2692        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2693            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2694        let inner = self.inner.read().await;
2695        let txn = inner.db.begin().await?;
2696
2697        let (sink, _obj) = Sink::find_by_id(sink_id)
2698            .find_also_related(Object)
2699            .one(&txn)
2700            .await?
2701            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2702        validate_sink_props(&sink, &props)?;
2703
2704        let definition = sink.definition.clone();
2705        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2706            .map_err(|e| SinkError::Config(anyhow!(e)))?
2707            .try_into()
2708            .unwrap();
2709        if let Statement::CreateTable {
2710            with_options,
2711            engine,
2712            ..
2713        } = &mut stmt
2714        {
2715            if !matches!(engine, Engine::Iceberg) {
2716                return Err(SinkError::Config(anyhow!(
2717                    "only iceberg table can be altered as sink"
2718                ))
2719                .into());
2720            }
2721            update_stmt_with_props(with_options, &props)?;
2722        } else {
2723            panic!("definition is not a create iceberg table statement")
2724        }
2725        let mut new_config = sink.properties.clone().into_inner();
2726        new_config.extend(props.clone());
2727
2728        let definition = stmt.to_string();
2729        let active_sink = sink::ActiveModel {
2730            sink_id: Set(sink_id),
2731            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2732            definition: Set(definition.clone()),
2733            ..Default::default()
2734        };
2735        let active_source = source::ActiveModel {
2736            source_id: Set(source_id),
2737            definition: Set(definition.clone()),
2738            ..Default::default()
2739        };
2740        let active_table = table::ActiveModel {
2741            table_id: Set(table_id),
2742            definition: Set(definition),
2743            ..Default::default()
2744        };
2745        Sink::update(active_sink).exec(&txn).await?;
2746        Source::update(active_source).exec(&txn).await?;
2747        Table::update(active_table).exec(&txn).await?;
2748
2749        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2750
2751        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2752            .find_also_related(Object)
2753            .one(&txn)
2754            .await?
2755            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2756        let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2757            .one(&txn)
2758            .await?;
2759        let (source, source_obj) = Source::find_by_id(source_id)
2760            .find_also_related(Object)
2761            .one(&txn)
2762            .await?
2763            .ok_or_else(|| {
2764                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2765            })?;
2766        let (table, table_obj) = Table::find_by_id(table_id)
2767            .find_also_related(Object)
2768            .one(&txn)
2769            .await?
2770            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2771        let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2772            .one(&txn)
2773            .await?;
2774        txn.commit().await?;
2775        let relation_infos = vec![
2776            PbObject {
2777                object_info: Some(PbObjectInfo::Sink(
2778                    ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2779                )),
2780            },
2781            PbObject {
2782                object_info: Some(PbObjectInfo::Source(
2783                    ObjectModel(source, source_obj.unwrap(), None).into(),
2784                )),
2785            },
2786            PbObject {
2787                object_info: Some(PbObjectInfo::Table(
2788                    ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2789                )),
2790            },
2791        ];
2792        let _version = self
2793            .notify_frontend(
2794                NotificationOperation::Update,
2795                NotificationInfo::ObjectGroup(PbObjectGroup {
2796                    objects: relation_infos,
2797                    dependencies: vec![],
2798                }),
2799            )
2800            .await;
2801
2802        Ok((props.into_iter().collect(), sink_id))
2803    }
2804
2805    /// Update connection properties and all dependent sources/sinks in a single transaction
2806    pub async fn update_connection_and_dependent_objects_props(
2807        &self,
2808        connection_id: ConnectionId,
2809        alter_props: BTreeMap<String, String>,
2810        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2811    ) -> MetaResult<(
2812        WithOptionsSecResolved,                   // Connection's new properties
2813        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2814        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2815    )> {
2816        let inner = self.inner.read().await;
2817        let txn = inner.db.begin().await?;
2818
2819        // Find all dependent sources and sinks first
2820        let dependent_sources: Vec<SourceId> = Source::find()
2821            .select_only()
2822            .column(source::Column::SourceId)
2823            .filter(source::Column::ConnectionId.eq(connection_id))
2824            .into_tuple()
2825            .all(&txn)
2826            .await?;
2827
2828        let dependent_sinks: Vec<SinkId> = Sink::find()
2829            .select_only()
2830            .column(sink::Column::SinkId)
2831            .filter(sink::Column::ConnectionId.eq(connection_id))
2832            .into_tuple()
2833            .all(&txn)
2834            .await?;
2835
2836        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2837            .find_also_related(Object)
2838            .one(&txn)
2839            .await?
2840            .ok_or_else(|| {
2841                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2842            })?;
2843
2844        // Validate that props can be altered
2845        let prop_keys: Vec<String> = alter_props
2846            .keys()
2847            .chain(alter_secret_refs.keys())
2848            .cloned()
2849            .collect();
2850
2851        // Map the connection type enum to the string name expected by the validation function
2852        let connection_type_str = pb_connection_type_to_connection_type(
2853            &connection_catalog.params.to_protobuf().connection_type(),
2854        )
2855        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2856
2857        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2858            connection_type_str, &prop_keys,
2859        )?;
2860
2861        let connection_pb = connection_catalog.params.to_protobuf();
2862        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2863            connection_pb.properties.into_iter().collect(),
2864            connection_pb.secret_refs.into_iter().collect(),
2865        );
2866
2867        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2868            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2869
2870        tracing::debug!(
2871            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2872            connection_id,
2873            dependent_sources,
2874            dependent_sinks
2875        );
2876
2877        // Validate connection
2878        {
2879            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2880                connection_type: connection_pb.connection_type,
2881                properties: connection_options_with_secret
2882                    .as_plaintext()
2883                    .clone()
2884                    .into_iter()
2885                    .collect(),
2886                secret_refs: connection_options_with_secret
2887                    .as_secret()
2888                    .clone()
2889                    .into_iter()
2890                    .collect(),
2891            };
2892            let connection = PbConnection {
2893                id: connection_id as _,
2894                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2895                    conn_params_pb,
2896                )),
2897                ..Default::default()
2898            };
2899            validate_connection(&connection).await?;
2900        }
2901
2902        // Update connection secret dependencies
2903        if !to_add_secret_dep.is_empty() {
2904            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2905                object_dependency::ActiveModel {
2906                    oid: Set(secret_id.into()),
2907                    used_by: Set(connection_id.as_object_id()),
2908                    ..Default::default()
2909                }
2910            }))
2911            .exec(&txn)
2912            .await?;
2913        }
2914        if !to_remove_secret_dep.is_empty() {
2915            let _ = ObjectDependency::delete_many()
2916                .filter(
2917                    object_dependency::Column::Oid
2918                        .is_in(to_remove_secret_dep)
2919                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2920                )
2921                .exec(&txn)
2922                .await?;
2923        }
2924
2925        // Update the connection with new properties
2926        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2927            connection_type: connection_pb.connection_type,
2928            properties: connection_options_with_secret
2929                .as_plaintext()
2930                .clone()
2931                .into_iter()
2932                .collect(),
2933            secret_refs: connection_options_with_secret
2934                .as_secret()
2935                .clone()
2936                .into_iter()
2937                .collect(),
2938        };
2939        let active_connection_model = connection::ActiveModel {
2940            connection_id: Set(connection_id),
2941            params: Set(ConnectionParams::from(&updated_connection_params)),
2942            ..Default::default()
2943        };
2944        Connection::update(active_connection_model)
2945            .exec(&txn)
2946            .await?;
2947
2948        // Batch update dependent sources and collect their complete properties
2949        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2950
2951        if !dependent_sources.is_empty() {
2952            // Batch fetch all dependent sources
2953            let sources_with_objs = Source::find()
2954                .find_also_related(Object)
2955                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2956                .all(&txn)
2957                .await?;
2958
2959            // Prepare batch updates
2960            let mut source_updates = Vec::new();
2961            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2962
2963            for (source, _obj) in sources_with_objs {
2964                let source_id = source.source_id;
2965
2966                let mut source_options_with_secret = WithOptionsSecResolved::new(
2967                    source.with_properties.0.clone(),
2968                    source
2969                        .secret_ref
2970                        .clone()
2971                        .map(|secret_ref| secret_ref.to_protobuf())
2972                        .unwrap_or_default(),
2973                );
2974                let (source_to_add_secret_dep, source_to_remove_secret_dep) =
2975                    source_options_with_secret
2976                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2977
2978                // Validate the updated source properties
2979                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2980
2981                // Keep source-level secret dependencies in sync with the source properties that
2982                // are rewritten from the altered connection.
2983                let source_used_by_id = source
2984                    .optional_associated_table_id
2985                    .map(|table_id| table_id.as_object_id())
2986                    .unwrap_or_else(|| source_id.as_object_id());
2987                if !source_to_add_secret_dep.is_empty() {
2988                    ObjectDependency::insert_many(source_to_add_secret_dep.into_iter().map(
2989                        |secret_id| object_dependency::ActiveModel {
2990                            oid: Set(secret_id.into()),
2991                            used_by: Set(source_used_by_id),
2992                            ..Default::default()
2993                        },
2994                    ))
2995                    .exec(&txn)
2996                    .await?;
2997                }
2998                if !source_to_remove_secret_dep.is_empty() {
2999                    let _ = ObjectDependency::delete_many()
3000                        .filter(
3001                            object_dependency::Column::Oid
3002                                .is_in(source_to_remove_secret_dep)
3003                                .and(object_dependency::Column::UsedBy.eq(source_used_by_id)),
3004                        )
3005                        .exec(&txn)
3006                        .await?;
3007                }
3008
3009                // Prepare source update
3010                let active_source = source::ActiveModel {
3011                    source_id: Set(source_id),
3012                    with_properties: Set(Property(
3013                        source_options_with_secret.as_plaintext().clone(),
3014                    )),
3015                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
3016                        || {
3017                            risingwave_meta_model::SecretRef::from(
3018                                source_options_with_secret.as_secret().clone(),
3019                            )
3020                        },
3021                    )),
3022                    ..Default::default()
3023                };
3024                source_updates.push(active_source);
3025
3026                // Prepare fragment update:
3027                // - If the source is a table-associated source, update fragments for the table job.
3028                // - Otherwise update the shared source job.
3029                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
3030                let is_shared_source = source.is_shared();
3031                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
3032                if !is_shared_source {
3033                    dep_source_job_ids = ObjectDependency::find()
3034                        .select_only()
3035                        .column(object_dependency::Column::UsedBy)
3036                        .filter(object_dependency::Column::Oid.eq(source_id))
3037                        .into_tuple()
3038                        .all(&txn)
3039                        .await?;
3040                }
3041
3042                let base_job_id =
3043                    if let Some(associate_table_id) = source.optional_associated_table_id {
3044                        associate_table_id.as_job_id()
3045                    } else {
3046                        source_id.as_share_source_job_id()
3047                    };
3048                let job_ids = vec![base_job_id]
3049                    .into_iter()
3050                    .chain(dep_source_job_ids.into_iter())
3051                    .collect_vec();
3052
3053                fragment_updates.push(DependentSourceFragmentUpdate {
3054                    job_ids,
3055                    with_properties: source_options_with_secret.as_plaintext().clone(),
3056                    secret_refs: source_options_with_secret.as_secret().clone(),
3057                    is_shared_source,
3058                });
3059
3060                // Collect the complete properties for runtime broadcast
3061                let complete_source_props = LocalSecretManager::global()
3062                    .fill_secrets(
3063                        source_options_with_secret.as_plaintext().clone(),
3064                        source_options_with_secret.as_secret().clone(),
3065                    )
3066                    .map_err(MetaError::from)?
3067                    .into_iter()
3068                    .collect::<HashMap<String, String>>();
3069                updated_sources_with_props.push((source_id, complete_source_props));
3070            }
3071
3072            for source_update in source_updates {
3073                Source::update(source_update).exec(&txn).await?;
3074            }
3075
3076            // Batch execute fragment updates
3077            for DependentSourceFragmentUpdate {
3078                job_ids,
3079                with_properties,
3080                secret_refs,
3081                is_shared_source,
3082            } in fragment_updates
3083            {
3084                update_connector_props_fragments(
3085                    &txn,
3086                    job_ids,
3087                    FragmentTypeFlag::Source,
3088                    |node, found| {
3089                        if let PbNodeBody::Source(node) = node
3090                            && let Some(source_inner) = &mut node.source_inner
3091                        {
3092                            source_inner.with_properties = with_properties.clone();
3093                            source_inner.secret_refs = secret_refs.clone();
3094                            *found = true;
3095                        }
3096                    },
3097                    is_shared_source,
3098                )
3099                .await?;
3100            }
3101        }
3102
3103        // Batch update dependent sinks and collect their complete properties
3104        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
3105
3106        if !dependent_sinks.is_empty() {
3107            // Batch fetch all dependent sinks
3108            let sinks_with_objs = Sink::find()
3109                .find_also_related(Object)
3110                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
3111                .all(&txn)
3112                .await?;
3113
3114            // Prepare batch updates
3115            let mut sink_updates = Vec::new();
3116            let mut sink_fragment_updates = Vec::new();
3117
3118            for (sink, _obj) in sinks_with_objs {
3119                let sink_id = sink.sink_id;
3120
3121                // Validate that sink props can be altered
3122                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3123                    Some(connector) => {
3124                        let connector_type = connector.to_lowercase();
3125                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
3126                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
3127
3128                        match_sink_name_str!(
3129                            connector_type.as_str(),
3130                            SinkType,
3131                            {
3132                                let mut new_sink_props = sink.properties.0.clone();
3133                                new_sink_props.extend(alter_props.clone());
3134                                SinkType::validate_alter_config(&new_sink_props)
3135                            },
3136                            |sink: &str| Err(SinkError::Config(anyhow!(
3137                                "unsupported sink type {}",
3138                                sink
3139                            )))
3140                        )?
3141                    }
3142                    None => {
3143                        return Err(SinkError::Config(anyhow!(
3144                            "connector not specified when alter sink"
3145                        ))
3146                        .into());
3147                    }
3148                };
3149
3150                let mut new_sink_props = sink.properties.0.clone();
3151                new_sink_props.extend(alter_props.clone());
3152
3153                // Prepare sink update
3154                let active_sink = sink::ActiveModel {
3155                    sink_id: Set(sink_id),
3156                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
3157                    ..Default::default()
3158                };
3159                sink_updates.push(active_sink);
3160
3161                // Prepare fragment updates for this sink
3162                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
3163
3164                // Collect the complete properties for runtime broadcast
3165                let complete_sink_props: HashMap<String, String> =
3166                    new_sink_props.into_iter().collect();
3167                updated_sinks_with_props.push((sink_id, complete_sink_props));
3168            }
3169
3170            // Batch execute sink updates
3171            for sink_update in sink_updates {
3172                Sink::update(sink_update).exec(&txn).await?;
3173            }
3174
3175            // Batch execute sink fragment updates using the reusable function
3176            for (sink_id, new_sink_props) in sink_fragment_updates {
3177                update_connector_props_fragments(
3178                    &txn,
3179                    vec![sink_id.as_job_id()],
3180                    FragmentTypeFlag::Sink,
3181                    |node, found| {
3182                        if let PbNodeBody::Sink(node) = node
3183                            && let Some(sink_desc) = &mut node.sink_desc
3184                            && sink_desc.id == sink_id.as_raw_id()
3185                        {
3186                            sink_desc.properties = new_sink_props.clone();
3187                            *found = true;
3188                        }
3189                    },
3190                    true,
3191                )
3192                .await?;
3193            }
3194        }
3195
3196        // Collect all updated objects for frontend notification
3197        let mut updated_objects = Vec::new();
3198
3199        // Add connection
3200        let (connection, obj) = Connection::find_by_id(connection_id)
3201            .find_also_related(Object)
3202            .one(&txn)
3203            .await?
3204            .ok_or_else(|| {
3205                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
3206            })?;
3207        updated_objects.push(PbObject {
3208            object_info: Some(PbObjectInfo::Connection(
3209                ObjectModel(connection, obj.unwrap(), None).into(),
3210            )),
3211        });
3212
3213        // Add sources
3214        for source_id in &dependent_sources {
3215            let (source, obj) = Source::find_by_id(*source_id)
3216                .find_also_related(Object)
3217                .one(&txn)
3218                .await?
3219                .ok_or_else(|| {
3220                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
3221                })?;
3222            updated_objects.push(PbObject {
3223                object_info: Some(PbObjectInfo::Source(
3224                    ObjectModel(source, obj.unwrap(), None).into(),
3225                )),
3226            });
3227        }
3228
3229        // Add sinks
3230        for sink_id in &dependent_sinks {
3231            let (sink, obj) = Sink::find_by_id(*sink_id)
3232                .find_also_related(Object)
3233                .one(&txn)
3234                .await?
3235                .ok_or_else(|| {
3236                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
3237                })?;
3238            let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
3239                .one(&txn)
3240                .await?;
3241            updated_objects.push(PbObject {
3242                object_info: Some(PbObjectInfo::Sink(
3243                    ObjectModel(sink, obj.unwrap(), streaming_job).into(),
3244                )),
3245            });
3246        }
3247
3248        // Commit the transaction
3249        txn.commit().await?;
3250
3251        // Notify frontend about all updated objects
3252        if !updated_objects.is_empty() {
3253            self.notify_frontend(
3254                NotificationOperation::Update,
3255                NotificationInfo::ObjectGroup(PbObjectGroup {
3256                    objects: updated_objects,
3257                    dependencies: vec![],
3258                }),
3259            )
3260            .await;
3261        }
3262
3263        Ok((
3264            connection_options_with_secret,
3265            updated_sources_with_props,
3266            updated_sinks_with_props,
3267        ))
3268    }
3269
3270    pub async fn update_fragment_rate_limit_by_fragment_id(
3271        &self,
3272        fragment_id: FragmentId,
3273        rate_limit: Option<u32>,
3274    ) -> MetaResult<()> {
3275        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
3276                                 stream_node: &mut PbStreamNode| {
3277            let mut found = false;
3278            if fragment_type_mask.contains_any(
3279                FragmentTypeFlag::dml_rate_limit_fragments()
3280                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
3281                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
3282                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
3283            ) {
3284                visit_stream_node_mut(stream_node, |node| {
3285                    if let PbNodeBody::Dml(node) = node {
3286                        node.rate_limit = rate_limit;
3287                        found = true;
3288                    }
3289                    if let PbNodeBody::Sink(node) = node {
3290                        node.rate_limit = rate_limit;
3291                        found = true;
3292                    }
3293                    if let PbNodeBody::StreamCdcScan(node) = node {
3294                        node.rate_limit = rate_limit;
3295                        found = true;
3296                    }
3297                    if let PbNodeBody::StreamScan(node) = node {
3298                        node.rate_limit = rate_limit;
3299                        found = true;
3300                    }
3301                    if let PbNodeBody::SourceBackfill(node) = node {
3302                        node.rate_limit = rate_limit;
3303                        found = true;
3304                    }
3305                });
3306            }
3307            found
3308        };
3309        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
3310            .await
3311    }
3312
3313    /// Note: `FsFetch` created in old versions are not included.
3314    /// Since this is only used for debugging, it should be fine.
3315    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
3316        let inner = self.inner.read().await;
3317        let txn = inner.db.begin().await?;
3318
3319        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
3320            .select_only()
3321            .columns([
3322                fragment::Column::FragmentId,
3323                fragment::Column::JobId,
3324                fragment::Column::FragmentTypeMask,
3325                fragment::Column::StreamNode,
3326            ])
3327            .filter(FragmentTypeMask::intersects_any(
3328                FragmentTypeFlag::rate_limit_fragments(),
3329            ))
3330            .into_tuple()
3331            .all(&txn)
3332            .await?;
3333
3334        let mut rate_limits = Vec::new();
3335        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3336            let stream_node = stream_node.to_protobuf();
3337            visit_stream_node_body(&stream_node, |node| {
3338                let mut rate_limit = None;
3339                let mut node_name = None;
3340
3341                match node {
3342                    // source rate limit
3343                    PbNodeBody::Source(node) => {
3344                        if let Some(node_inner) = &node.source_inner {
3345                            rate_limit = node_inner.rate_limit;
3346                            node_name = Some("SOURCE");
3347                        }
3348                    }
3349                    PbNodeBody::StreamFsFetch(node) => {
3350                        if let Some(node_inner) = &node.node_inner {
3351                            rate_limit = node_inner.rate_limit;
3352                            node_name = Some("FS_FETCH");
3353                        }
3354                    }
3355                    // backfill rate limit
3356                    PbNodeBody::SourceBackfill(node) => {
3357                        rate_limit = node.rate_limit;
3358                        node_name = Some("SOURCE_BACKFILL");
3359                    }
3360                    PbNodeBody::StreamScan(node) => {
3361                        rate_limit = node.rate_limit;
3362                        node_name = Some("STREAM_SCAN");
3363                    }
3364                    PbNodeBody::StreamCdcScan(node) => {
3365                        rate_limit = node.rate_limit;
3366                        node_name = Some("STREAM_CDC_SCAN");
3367                    }
3368                    PbNodeBody::Sink(node) => {
3369                        rate_limit = node.rate_limit;
3370                        node_name = Some("SINK");
3371                    }
3372                    _ => {}
3373                }
3374
3375                if let Some(rate_limit) = rate_limit {
3376                    rate_limits.push(RateLimitInfo {
3377                        fragment_id,
3378                        job_id,
3379                        fragment_type_mask: fragment_type_mask as u32,
3380                        rate_limit,
3381                        node_name: node_name.unwrap().to_owned(),
3382                    });
3383                }
3384            });
3385        }
3386
3387        Ok(rate_limits)
3388    }
3389}
3390
3391fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3392    // Validate that props can be altered
3393    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3394        Some(connector) => {
3395            let connector_type = connector.to_lowercase();
3396            let field_names: Vec<String> = props.keys().cloned().collect();
3397            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3398                .map_err(|e| SinkError::Config(anyhow!(e)))?;
3399
3400            match_sink_name_str!(
3401                connector_type.as_str(),
3402                SinkType,
3403                {
3404                    let mut new_props = sink.properties.0.clone();
3405                    new_props.extend(props.clone());
3406                    SinkType::validate_alter_config(&new_props)
3407                },
3408                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3409            )?
3410        }
3411        None => {
3412            return Err(
3413                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3414            );
3415        }
3416    };
3417    Ok(())
3418}
3419
3420fn update_stmt_with_props(
3421    with_properties: &mut Vec<SqlOption>,
3422    props: &BTreeMap<String, String>,
3423) -> MetaResult<()> {
3424    let mut new_sql_options = with_properties
3425        .iter()
3426        .map(|sql_option| (&sql_option.name, sql_option))
3427        .collect::<IndexMap<_, _>>();
3428    let add_sql_options = props
3429        .iter()
3430        .map(|(k, v)| SqlOption::try_from((k, v)))
3431        .collect::<Result<Vec<SqlOption>, ParserError>>()
3432        .map_err(|e| SinkError::Config(anyhow!(e)))?;
3433    new_sql_options.extend(
3434        add_sql_options
3435            .iter()
3436            .map(|sql_option| (&sql_option.name, sql_option)),
3437    );
3438    *with_properties = new_sql_options.into_values().cloned().collect();
3439    Ok(())
3440}
3441
3442async fn update_sink_fragment_props(
3443    txn: &DatabaseTransaction,
3444    sink_id: SinkId,
3445    props: BTreeMap<String, String>,
3446) -> MetaResult<()> {
3447    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3448        .select_only()
3449        .columns([
3450            fragment::Column::FragmentId,
3451            fragment::Column::FragmentTypeMask,
3452            fragment::Column::StreamNode,
3453        ])
3454        .filter(fragment::Column::JobId.eq(sink_id))
3455        .into_tuple()
3456        .all(txn)
3457        .await?;
3458    let fragments = fragments
3459        .into_iter()
3460        .filter(|(_, fragment_type_mask, _)| {
3461            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3462        })
3463        .filter_map(|(id, _, stream_node)| {
3464            let mut stream_node = stream_node.to_protobuf();
3465            let mut found = false;
3466            visit_stream_node_mut(&mut stream_node, |node| {
3467                if let PbNodeBody::Sink(node) = node
3468                    && let Some(sink_desc) = &mut node.sink_desc
3469                    && sink_desc.id == sink_id
3470                {
3471                    sink_desc.properties.extend(props.clone());
3472                    found = true;
3473                }
3474            });
3475            if found { Some((id, stream_node)) } else { None }
3476        })
3477        .collect_vec();
3478    assert!(
3479        !fragments.is_empty(),
3480        "sink id should be used by at least one fragment"
3481    );
3482    for (id, stream_node) in fragments {
3483        Fragment::update(fragment::ActiveModel {
3484            fragment_id: Set(id),
3485            stream_node: Set(StreamNode::from(&stream_node)),
3486            ..Default::default()
3487        })
3488        .exec(txn)
3489        .await?;
3490    }
3491    Ok(())
3492}
3493
3494pub struct SinkIntoTableContext {
3495    /// For alter table (e.g., add column), this is the list of existing sink ids
3496    /// otherwise empty.
3497    pub updated_sink_catalogs: Vec<SinkId>,
3498}
3499
3500pub struct FinishAutoRefreshSchemaSinkContext {
3501    pub tmp_sink_id: SinkId,
3502    pub original_sink_id: SinkId,
3503    pub columns: Vec<PbColumnCatalog>,
3504    pub new_log_store_table: Option<Box<PbTable>>,
3505}
3506
3507async fn update_connector_props_fragments<F>(
3508    txn: &DatabaseTransaction,
3509    job_ids: Vec<JobId>,
3510    expect_flag: FragmentTypeFlag,
3511    mut alter_stream_node_fn: F,
3512    is_shared_source: bool,
3513) -> MetaResult<()>
3514where
3515    F: FnMut(&mut PbNodeBody, &mut bool),
3516{
3517    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3518        .select_only()
3519        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3520        .filter(
3521            fragment::Column::JobId
3522                .is_in(job_ids.clone())
3523                .and(FragmentTypeMask::intersects(expect_flag)),
3524        )
3525        .into_tuple()
3526        .all(txn)
3527        .await?;
3528    let fragments = fragments
3529        .into_iter()
3530        .filter_map(|(id, stream_node)| {
3531            let mut stream_node = stream_node.to_protobuf();
3532            let mut found = false;
3533            visit_stream_node_mut(&mut stream_node, |node| {
3534                alter_stream_node_fn(node, &mut found);
3535            });
3536            if found { Some((id, stream_node)) } else { None }
3537        })
3538        .collect_vec();
3539    if is_shared_source || job_ids.len() > 1 {
3540        // the first element is the source_id or associated table_id
3541        // if the source is non-shared, there is no updated fragments
3542        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3543        assert!(
3544            !fragments.is_empty(),
3545            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3546            job_ids,
3547            expect_flag
3548        );
3549    }
3550
3551    for (id, stream_node) in fragments {
3552        Fragment::update(fragment::ActiveModel {
3553            fragment_id: Set(id),
3554            stream_node: Set(StreamNode::from(&stream_node)),
3555            ..Default::default()
3556        })
3557        .exec(txn)
3558        .await?;
3559    }
3560
3561    Ok(())
3562}