risingwave_meta/controller/
streaming_job.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{
22    ColumnCatalog, FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX,
23    RISINGWAVE_ICEBERG_ROW_ID, ROW_ID_COLUMN_NAME, max_column_id,
24};
25use risingwave_common::config::DefaultParallelism;
26use risingwave_common::hash::VnodeCountCompat;
27use risingwave_common::id::JobId;
28use risingwave_common::secret::LocalSecretManager;
29use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_common::util::stream_graph_visitor::{
32    visit_stream_node_body, visit_stream_node_mut,
33};
34use risingwave_common::{bail, current_cluster_version};
35use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::error::ConnectorError;
38use risingwave_connector::sink::file_sink::fs::FsSink;
39use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
40use risingwave_connector::source::{
41    ConnectorProperties, UPSTREAM_SOURCE_KEY, pb_connection_type_to_connection_type,
42};
43use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
46use risingwave_meta_model::refresh_job::RefreshState;
47use risingwave_meta_model::streaming_job::BackfillOrders;
48use risingwave_meta_model::table::TableType;
49use risingwave_meta_model::user_privilege::Action;
50use risingwave_meta_model::*;
51use risingwave_pb::catalog::table::PbEngine;
52use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
53use risingwave_pb::ddl_service::streaming_job_resource_type;
54use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
55use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
56use risingwave_pb::meta::object::PbObjectInfo;
57use risingwave_pb::meta::subscribe_response::{
58    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
59};
60use risingwave_pb::meta::{ObjectDependency as PbObjectDependency, PbObject, PbObjectGroup};
61use risingwave_pb::plan_common::PbColumnCatalog;
62use risingwave_pb::plan_common::source_refresh_mode::{
63    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
64};
65use risingwave_pb::secret::PbSecretRef;
66use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
67use risingwave_pb::stream_plan::stream_node::PbNodeBody;
68use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
69use risingwave_pb::user::PbUserInfo;
70use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
71use risingwave_sqlparser::parser::{Parser, ParserError};
72use sea_orm::ActiveValue::Set;
73use sea_orm::sea_query::{Expr, Query, SimpleExpr};
74use sea_orm::{
75    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
76    NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
77};
78use thiserror_ext::AsReport;
79
80use super::rename::IndexItemRewriter;
81use crate::barrier::Command;
82use crate::controller::ObjectModel;
83use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
84use crate::controller::fragment::FragmentTypeMaskExt;
85use crate::controller::utils::{
86    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
87    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
88    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
89    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
90    list_object_dependencies_by_object_id, list_user_info_by_ids,
91    try_get_iceberg_table_by_downstream_sink,
92};
93use crate::error::MetaErrorInner;
94use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
95use crate::model::{
96    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
97    StreamJobFragmentsToCreate,
98};
99use crate::stream::SplitAssignment;
100use crate::{MetaError, MetaResult};
101
102/// A planned fragment update for dependent sources when a connection's properties change.
103///
104/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
105/// `type_complexity` warnings.
106#[derive(Debug)]
107struct DependentSourceFragmentUpdate {
108    job_ids: Vec<JobId>,
109    with_properties: BTreeMap<String, String>,
110    secret_refs: BTreeMap<String, PbSecretRef>,
111    is_shared_source: bool,
112}
113
114#[derive(Debug)]
115struct ReplaceOriginalJobInfo {
116    max_parallelism: i32,
117    timezone: Option<String>,
118    config_override: Option<String>,
119    adaptive_parallelism_strategy: Option<String>,
120    parallelism: StreamingParallelism,
121    specific_resource_group: Option<String>,
122}
123
124impl ReplaceOriginalJobInfo {
125    fn resolved_parallelism(
126        &self,
127        specified_parallelism: Option<&NonZeroUsize>,
128    ) -> StreamingParallelism {
129        specified_parallelism
130            .map(|n| StreamingParallelism::Fixed(n.get() as _))
131            .unwrap_or_else(|| self.parallelism.clone())
132    }
133
134    fn stream_context(&self, ctx_override: Option<&StreamContext>) -> StreamContext {
135        StreamContext {
136            timezone: ctx_override
137                .and_then(|ctx| ctx.timezone.clone())
138                .or_else(|| self.timezone.clone()),
139            // We don't expect replacing a job with a different config override.
140            // Thus always use the original config override.
141            config_override: self.config_override.clone().unwrap_or_default().into(),
142            adaptive_parallelism_strategy: self.adaptive_parallelism_strategy.as_deref().map(|s| {
143                parse_strategy(s).expect("strategy should be validated before persisting")
144            }),
145        }
146    }
147
148    fn resource_type(&self) -> streaming_job_resource_type::ResourceType {
149        match &self.specific_resource_group {
150            Some(group) => {
151                streaming_job_resource_type::ResourceType::SpecificResourceGroup(group.clone())
152            }
153            None => streaming_job_resource_type::ResourceType::Regular(true),
154        }
155    }
156}
157
158impl From<streaming_job::Model> for ReplaceOriginalJobInfo {
159    fn from(model: streaming_job::Model) -> Self {
160        Self {
161            max_parallelism: model.max_parallelism,
162            timezone: model.timezone,
163            config_override: model.config_override,
164            adaptive_parallelism_strategy: model.adaptive_parallelism_strategy,
165            parallelism: model.parallelism,
166            specific_resource_group: model.specific_resource_group,
167        }
168    }
169}
170
171impl CatalogController {
172    #[allow(clippy::too_many_arguments)]
173    pub async fn create_streaming_job_obj(
174        txn: &DatabaseTransaction,
175        obj_type: ObjectType,
176        owner_id: UserId,
177        database_id: Option<DatabaseId>,
178        schema_id: Option<SchemaId>,
179        create_type: PbCreateType,
180        ctx: StreamContext,
181        streaming_parallelism: StreamingParallelism,
182        max_parallelism: usize,
183        resource_type: streaming_job_resource_type::ResourceType,
184        backfill_parallelism: Option<StreamingParallelism>,
185    ) -> MetaResult<streaming_job::Model> {
186        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
187        let job_id = obj.oid.as_job_id();
188        let specific_resource_group = resource_type.resource_group();
189        let is_serverless_backfill = matches!(
190            &resource_type,
191            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
192        );
193        let model = streaming_job::Model {
194            job_id,
195            job_status: JobStatus::Initial,
196            create_type: create_type.into(),
197            timezone: ctx.timezone,
198            config_override: Some(ctx.config_override.to_string()),
199            adaptive_parallelism_strategy: ctx
200                .adaptive_parallelism_strategy
201                .as_ref()
202                .map(ToString::to_string),
203            parallelism: streaming_parallelism,
204            backfill_parallelism,
205            backfill_orders: None,
206            max_parallelism: max_parallelism as _,
207            specific_resource_group,
208            is_serverless_backfill,
209        };
210        let job = model.clone().into_active_model();
211        StreamingJobModel::insert(job).exec(txn).await?;
212
213        Ok(model)
214    }
215
216    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
217    /// materialized view.
218    ///
219    /// Some of the fields in the given streaming job are placeholders, which will
220    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
221    #[await_tree::instrument]
222    pub async fn create_job_catalog(
223        &self,
224        streaming_job: &mut StreamingJob,
225        ctx: &StreamContext,
226        parallelism: &Option<Parallelism>,
227        max_parallelism: usize,
228        mut dependencies: HashSet<ObjectId>,
229        resource_type: streaming_job_resource_type::ResourceType,
230        backfill_parallelism: &Option<Parallelism>,
231    ) -> MetaResult<streaming_job::Model> {
232        let inner = self.inner.write().await;
233        let txn = inner.db.begin().await?;
234        let create_type = streaming_job.create_type();
235
236        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
237            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
238            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
239            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
240        };
241        let backfill_parallelism = backfill_parallelism
242            .as_ref()
243            .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
244
245        ensure_user_id(streaming_job.owner() as _, &txn).await?;
246        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
247        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
248        check_relation_name_duplicate(
249            &streaming_job.name(),
250            streaming_job.database_id(),
251            streaming_job.schema_id(),
252            &txn,
253        )
254        .await?;
255
256        // check if any dependency is in altering status.
257        if !dependencies.is_empty() {
258            let altering_cnt = ObjectDependency::find()
259                .join(
260                    JoinType::InnerJoin,
261                    object_dependency::Relation::Object1.def(),
262                )
263                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
264                .filter(
265                    object_dependency::Column::Oid
266                        .is_in(dependencies.clone())
267                        .and(object::Column::ObjType.eq(ObjectType::Table))
268                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
269                        .and(
270                            // It means the referring table is just dummy for altering.
271                            object::Column::Oid.not_in_subquery(
272                                Query::select()
273                                    .column(table::Column::TableId)
274                                    .from(Table)
275                                    .to_owned(),
276                            ),
277                        ),
278                )
279                .count(&txn)
280                .await?;
281            if altering_cnt != 0 {
282                return Err(MetaError::permission_denied(
283                    "some dependent relations are being altered",
284                ));
285            }
286        }
287
288        let streaming_job_model = match streaming_job {
289            StreamingJob::MaterializedView(table) => {
290                let streaming_job_model = Self::create_streaming_job_obj(
291                    &txn,
292                    ObjectType::Table,
293                    table.owner as _,
294                    Some(table.database_id),
295                    Some(table.schema_id),
296                    create_type,
297                    ctx.clone(),
298                    streaming_parallelism,
299                    max_parallelism,
300                    resource_type,
301                    backfill_parallelism.clone(),
302                )
303                .await?;
304                table.id = streaming_job_model.job_id.as_mv_table_id();
305                let table_model: table::ActiveModel = table.clone().into();
306                Table::insert(table_model).exec(&txn).await?;
307                streaming_job_model
308            }
309            StreamingJob::Sink(sink) => {
310                if let Some(target_table_id) = sink.target_table
311                    && check_sink_into_table_cycle(
312                        target_table_id.into(),
313                        dependencies.iter().cloned().collect(),
314                        &txn,
315                    )
316                    .await?
317                {
318                    bail!("Creating such a sink will result in circular dependency.");
319                }
320
321                let streaming_job_model = Self::create_streaming_job_obj(
322                    &txn,
323                    ObjectType::Sink,
324                    sink.owner as _,
325                    Some(sink.database_id),
326                    Some(sink.schema_id),
327                    create_type,
328                    ctx.clone(),
329                    streaming_parallelism,
330                    max_parallelism,
331                    resource_type,
332                    backfill_parallelism.clone(),
333                )
334                .await?;
335                sink.id = streaming_job_model.job_id.as_sink_id();
336                let sink_model: sink::ActiveModel = sink.clone().into();
337                Sink::insert(sink_model).exec(&txn).await?;
338                streaming_job_model
339            }
340            StreamingJob::Table(src, table, _) => {
341                let streaming_job_model = Self::create_streaming_job_obj(
342                    &txn,
343                    ObjectType::Table,
344                    table.owner as _,
345                    Some(table.database_id),
346                    Some(table.schema_id),
347                    create_type,
348                    ctx.clone(),
349                    streaming_parallelism,
350                    max_parallelism,
351                    resource_type,
352                    backfill_parallelism.clone(),
353                )
354                .await?;
355                let job_id = streaming_job_model.job_id;
356                table.id = job_id.as_mv_table_id();
357                if let Some(src) = src {
358                    let src_obj = Self::create_object(
359                        &txn,
360                        ObjectType::Source,
361                        src.owner as _,
362                        Some(src.database_id),
363                        Some(src.schema_id),
364                    )
365                    .await?;
366                    src.id = src_obj.oid.as_source_id();
367                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
368                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
369                    let source: source::ActiveModel = src.clone().into();
370                    Source::insert(source).exec(&txn).await?;
371                }
372                let table_model: table::ActiveModel = table.clone().into();
373                Table::insert(table_model).exec(&txn).await?;
374
375                if table.refreshable {
376                    let trigger_interval_secs = src
377                        .as_ref()
378                        .and_then(|source_catalog| source_catalog.refresh_mode)
379                        .and_then(
380                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
381                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
382                                    refresh_interval_sec,
383                                })) => refresh_interval_sec,
384                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
385                                None => None,
386                            },
387                        );
388
389                    RefreshJob::insert(refresh_job::ActiveModel {
390                        table_id: Set(table.id),
391                        last_trigger_time: Set(None),
392                        trigger_interval_secs: Set(trigger_interval_secs),
393                        current_status: Set(RefreshState::Idle),
394                        last_success_time: Set(None),
395                    })
396                    .exec(&txn)
397                    .await?;
398                }
399                streaming_job_model
400            }
401            StreamingJob::Index(index, table) => {
402                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
403                let streaming_job_model = Self::create_streaming_job_obj(
404                    &txn,
405                    ObjectType::Index,
406                    index.owner as _,
407                    Some(index.database_id),
408                    Some(index.schema_id),
409                    create_type,
410                    ctx.clone(),
411                    streaming_parallelism,
412                    max_parallelism,
413                    resource_type,
414                    backfill_parallelism.clone(),
415                )
416                .await?;
417                // to be compatible with old implementation.
418                let job_id = streaming_job_model.job_id;
419                index.id = job_id.as_index_id();
420                index.index_table_id = job_id.as_mv_table_id();
421                table.id = job_id.as_mv_table_id();
422
423                ObjectDependency::insert(object_dependency::ActiveModel {
424                    oid: Set(index.primary_table_id.into()),
425                    used_by: Set(table.id.into()),
426                    ..Default::default()
427                })
428                .exec(&txn)
429                .await?;
430
431                let table_model: table::ActiveModel = table.clone().into();
432                Table::insert(table_model).exec(&txn).await?;
433                let index_model: index::ActiveModel = index.clone().into();
434                Index::insert(index_model).exec(&txn).await?;
435                streaming_job_model
436            }
437            StreamingJob::Source(src) => {
438                let streaming_job_model = Self::create_streaming_job_obj(
439                    &txn,
440                    ObjectType::Source,
441                    src.owner as _,
442                    Some(src.database_id),
443                    Some(src.schema_id),
444                    create_type,
445                    ctx.clone(),
446                    streaming_parallelism,
447                    max_parallelism,
448                    resource_type,
449                    backfill_parallelism.clone(),
450                )
451                .await?;
452                src.id = streaming_job_model.job_id.as_shared_source_id();
453                let source_model: source::ActiveModel = src.clone().into();
454                Source::insert(source_model).exec(&txn).await?;
455                streaming_job_model
456            }
457        };
458
459        // collect dependent secrets.
460        dependencies.extend(
461            streaming_job
462                .dependent_secret_ids()?
463                .into_iter()
464                .map(|id| id.as_object_id()),
465        );
466        // collect dependent connection
467        dependencies.extend(
468            streaming_job
469                .dependent_connection_ids()?
470                .into_iter()
471                .map(|id| id.as_object_id()),
472        );
473
474        // record object dependency.
475        if !dependencies.is_empty() {
476            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
477                object_dependency::ActiveModel {
478                    oid: Set(oid),
479                    used_by: Set(streaming_job.id().as_object_id()),
480                    ..Default::default()
481                }
482            }))
483            .exec(&txn)
484            .await?;
485        }
486
487        txn.commit().await?;
488
489        Ok(streaming_job_model)
490    }
491
492    /// Create catalogs for internal tables, then notify frontend about them if the job is a
493    /// materialized view.
494    ///
495    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
496    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
497    ///
498    /// Returns a mapping from the temporary table id to the actual global table id.
499    pub async fn create_internal_table_catalog(
500        &self,
501        job: &StreamingJob,
502        mut incomplete_internal_tables: Vec<PbTable>,
503    ) -> MetaResult<HashMap<TableId, TableId>> {
504        let job_id = job.id();
505        let inner = self.inner.write().await;
506        let txn = inner.db.begin().await?;
507
508        // Ensure the job exists.
509        ensure_job_not_canceled(job_id, &txn).await?;
510
511        let mut table_id_map = HashMap::new();
512        for table in &mut incomplete_internal_tables {
513            let table_id = Self::create_object(
514                &txn,
515                ObjectType::Table,
516                table.owner as _,
517                Some(table.database_id),
518                Some(table.schema_id),
519            )
520            .await?
521            .oid
522            .as_table_id();
523            table_id_map.insert(table.id, table_id);
524            table.id = table_id;
525            table.job_id = Some(job_id);
526
527            let table_model = table::ActiveModel {
528                table_id: Set(table_id),
529                belongs_to_job_id: Set(Some(job_id)),
530                fragment_id: NotSet,
531                ..table.clone().into()
532            };
533            Table::insert(table_model).exec(&txn).await?;
534        }
535        txn.commit().await?;
536
537        Ok(table_id_map)
538    }
539
540    pub async fn prepare_stream_job_fragments(
541        &self,
542        stream_job_fragments: &StreamJobFragmentsToCreate,
543        streaming_job: &StreamingJob,
544        for_replace: bool,
545        backfill_orders: Option<BackfillOrders>,
546    ) -> MetaResult<()> {
547        self.prepare_streaming_job(
548            stream_job_fragments.stream_job_id(),
549            || stream_job_fragments.fragments.values(),
550            &stream_job_fragments.downstreams,
551            for_replace,
552            Some(streaming_job),
553            backfill_orders,
554        )
555        .await
556    }
557
558    // TODO: In this function, we also update the `Table` model in the meta store.
559    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
560    // making them the source of truth and performing a full replacement for those in the meta store?
561    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
562    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
563    )]
564    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
565        &self,
566        job_id: JobId,
567        get_fragments: impl Fn() -> I + 'a,
568        downstreams: &FragmentDownstreamRelation,
569        for_replace: bool,
570        creating_streaming_job: Option<&'a StreamingJob>,
571        backfill_orders: Option<BackfillOrders>,
572    ) -> MetaResult<()> {
573        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
574
575        let inner = self.inner.write().await;
576
577        let need_notify = creating_streaming_job
578            .map(|job| job.should_notify_creating())
579            .unwrap_or(false);
580        let definition = creating_streaming_job.map(|job| job.definition());
581
582        let mut objects_to_notify = vec![];
583        let txn = inner.db.begin().await?;
584
585        // Ensure the job exists.
586        ensure_job_not_canceled(job_id, &txn).await?;
587
588        if let Some(backfill_orders) = backfill_orders {
589            let job = streaming_job::ActiveModel {
590                job_id: Set(job_id),
591                backfill_orders: Set(Some(backfill_orders)),
592                ..Default::default()
593            };
594            StreamingJobModel::update(job).exec(&txn).await?;
595        }
596
597        let state_table_ids = fragments
598            .iter()
599            .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
600            .collect_vec();
601
602        // Collect fragment IDs before consuming `fragments` for serving mapping notification.
603        let inserted_fragment_ids: Vec<crate::model::FragmentId> = fragments
604            .iter()
605            .map(|f| f.fragment_id as crate::model::FragmentId)
606            .collect();
607
608        if !fragments.is_empty() {
609            let fragment_models = fragments
610                .into_iter()
611                .map(|fragment| fragment.into_active_model())
612                .collect_vec();
613            Fragment::insert_many(fragment_models).exec(&txn).await?;
614        }
615
616        // Fields including `fragment_id` and `vnode_count` were placeholder values before.
617        // After table fragments are created, update them for all tables.
618        if !for_replace {
619            let all_tables = StreamJobFragments::collect_tables(get_fragments());
620            for state_table_id in state_table_ids {
621                // Table's vnode count is not always the fragment's vnode count, so we have to
622                // look up the table from `TableFragments`.
623                // See `ActorGraphBuilder::new`.
624                let table = all_tables
625                    .get(&state_table_id)
626                    .unwrap_or_else(|| panic!("table {} not found", state_table_id));
627                assert_eq!(table.id, state_table_id);
628                let vnode_count = table.vnode_count();
629
630                Table::update(table::ActiveModel {
631                    table_id: Set(state_table_id as _),
632                    fragment_id: Set(Some(table.fragment_id)),
633                    vnode_count: Set(vnode_count as _),
634                    ..Default::default()
635                })
636                .exec(&txn)
637                .await?;
638
639                if need_notify {
640                    let mut table = table.clone();
641                    // In production, definition was replaced but still needed for notification.
642                    if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
643                        table.definition = definition.clone().unwrap();
644                    }
645                    objects_to_notify.push(PbObject {
646                        object_info: Some(PbObjectInfo::Table(table)),
647                    });
648                }
649            }
650        }
651
652        // Add streaming job objects to notification
653        if need_notify {
654            match creating_streaming_job.unwrap() {
655                StreamingJob::Table(Some(source), ..) => {
656                    objects_to_notify.push(PbObject {
657                        object_info: Some(PbObjectInfo::Source(source.clone())),
658                    });
659                }
660                StreamingJob::Sink(sink) => {
661                    objects_to_notify.push(PbObject {
662                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
663                    });
664                }
665                StreamingJob::Index(index, _) => {
666                    objects_to_notify.push(PbObject {
667                        object_info: Some(PbObjectInfo::Index(index.clone())),
668                    });
669                }
670                _ => {}
671            }
672        }
673
674        insert_fragment_relations(&txn, downstreams).await?;
675
676        if !for_replace {
677            // Update dml fragment id.
678            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
679                Table::update(table::ActiveModel {
680                    table_id: Set(table.id),
681                    dml_fragment_id: Set(table.dml_fragment_id),
682                    ..Default::default()
683                })
684                .exec(&txn)
685                .await?;
686            }
687        }
688
689        txn.commit().await?;
690
691        // Notify serving module about newly inserted fragments so it can establish
692        // serving vnode mappings. This is driven by the fragment model insertion,
693        // decoupled from the barrier-driven streaming mapping notifications.
694        self.env
695            .notification_manager()
696            .notify_serving_fragment_mapping_update(inserted_fragment_ids);
697
698        // FIXME: there's a gap between the catalog creation and notification, which may lead to
699        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
700        // need to either forward the notification or filter catalogs without any fragments in the metastore.
701        if !objects_to_notify.is_empty() {
702            self.notify_frontend(
703                Operation::Add,
704                Info::ObjectGroup(PbObjectGroup {
705                    objects: objects_to_notify,
706                    dependencies: vec![],
707                }),
708            )
709            .await;
710        }
711
712        Ok(())
713    }
714
715    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
716    /// information is required to build barrier mutation.
717    pub async fn build_cancel_command(
718        &self,
719        table_fragments: &StreamJobFragments,
720    ) -> MetaResult<Command> {
721        let inner = self.inner.read().await;
722        let txn = inner.db.begin().await?;
723
724        let dropped_sink_fragment_with_target =
725            if let Some(sink_fragment) = table_fragments.sink_fragment() {
726                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
727                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
728                sink_target_fragment
729                    .get(&sink_fragment_id)
730                    .map(|target_fragments| {
731                        let target_fragment_id = *target_fragments
732                            .first()
733                            .expect("sink should have at least one downstream fragment");
734                        (sink_fragment_id, target_fragment_id)
735                    })
736            } else {
737                None
738            };
739
740        Ok(Command::DropStreamingJobs {
741            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
742            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
743            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
744            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
745                .into_iter()
746                .map(|(sink, target)| (target as _, vec![sink as _]))
747                .collect(),
748        })
749    }
750
751    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
752    /// It returns (true, _) if the job is not found or aborted.
753    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
754    #[await_tree::instrument]
755    pub async fn try_abort_creating_streaming_job(
756        &self,
757        mut job_id: JobId,
758        is_cancelled: bool,
759    ) -> MetaResult<(bool, Option<DatabaseId>)> {
760        let mut inner = self.inner.write().await;
761        let txn = inner.db.begin().await?;
762
763        let obj = Object::find_by_id(job_id).one(&txn).await?;
764        let Some(obj) = obj else {
765            tracing::warn!(
766                id = %job_id,
767                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
768            );
769            return Ok((true, None));
770        };
771        let database_id = obj
772            .database_id
773            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
774        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
775
776        if !is_cancelled && let Some(streaming_job) = &streaming_job {
777            assert_ne!(streaming_job.job_status, JobStatus::Created);
778            if streaming_job.create_type == CreateType::Background
779                && streaming_job.job_status == JobStatus::Creating
780            {
781                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
782                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
783                {
784                    // If the job belongs to an iceberg table, we still need to clean it.
785                } else {
786                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
787                    tracing::warn!(
788                        id = %job_id,
789                        "streaming job is created in background and still in creating status"
790                    );
791                    return Ok((false, Some(database_id)));
792                }
793            }
794        }
795
796        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
797        let original_job_id = job_id;
798        let original_obj_type = obj.obj_type;
799
800        let iceberg_table_id =
801            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
802        if let Some(iceberg_table_id) = iceberg_table_id {
803            // If the job is iceberg sink, we need to clean the iceberg table as well.
804            // Here we will drop the sink objects directly.
805            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
806            Object::delete_many()
807                .filter(
808                    object::Column::Oid
809                        .eq(job_id)
810                        .or(object::Column::Oid.is_in(internal_tables)),
811                )
812                .exec(&txn)
813                .await?;
814            job_id = iceberg_table_id.as_job_id();
815        };
816
817        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
818
819        // Get the notification info if the job is a materialized view or created in the background.
820        let mut objs = vec![];
821        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
822
823        let mut need_notify =
824            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
825        if !need_notify {
826            if let Some(table) = &table_obj {
827                need_notify = table.table_type == TableType::MaterializedView;
828            } else if original_obj_type == ObjectType::Sink {
829                need_notify = true;
830            }
831        }
832
833        if is_cancelled {
834            let dropped_tables = Table::find()
835                .find_also_related(Object)
836                .filter(
837                    table::Column::TableId.is_in(
838                        internal_table_ids
839                            .iter()
840                            .cloned()
841                            .chain(table_obj.iter().map(|t| t.table_id as _)),
842                    ),
843                )
844                .all(&txn)
845                .await?
846                .into_iter()
847                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
848            inner
849                .dropped_tables
850                .extend(dropped_tables.map(|t| (t.id, t)));
851        }
852
853        if need_notify {
854            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
855            // Ensure we still notify the frontend to delete the original sink object.
856            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
857                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
858                    .select_only()
859                    .columns([
860                        object::Column::Oid,
861                        object::Column::ObjType,
862                        object::Column::SchemaId,
863                        object::Column::DatabaseId,
864                    ])
865                    .into_partial_model()
866                    .one(&txn)
867                    .await?;
868                if let Some(orig_obj) = orig_obj {
869                    objs.push(orig_obj);
870                }
871            }
872
873            let obj: Option<PartialObject> = Object::find_by_id(job_id)
874                .select_only()
875                .columns([
876                    object::Column::Oid,
877                    object::Column::ObjType,
878                    object::Column::SchemaId,
879                    object::Column::DatabaseId,
880                ])
881                .into_partial_model()
882                .one(&txn)
883                .await?;
884            let obj =
885                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
886            objs.push(obj);
887            let internal_table_objs: Vec<PartialObject> = Object::find()
888                .select_only()
889                .columns([
890                    object::Column::Oid,
891                    object::Column::ObjType,
892                    object::Column::SchemaId,
893                    object::Column::DatabaseId,
894                ])
895                .join(JoinType::InnerJoin, object::Relation::Table.def())
896                .filter(table::Column::BelongsToJobId.eq(job_id))
897                .into_partial_model()
898                .all(&txn)
899                .await?;
900            objs.extend(internal_table_objs);
901        }
902
903        // Query fragment IDs before cascade-deleting them, for serving mapping cleanup.
904        let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
905            .select_only()
906            .column(fragment::Column::FragmentId)
907            .filter(fragment::Column::JobId.eq(job_id))
908            .into_tuple()
909            .all(&txn)
910            .await?;
911
912        Object::delete_by_id(job_id).exec(&txn).await?;
913        if !internal_table_ids.is_empty() {
914            Object::delete_many()
915                .filter(object::Column::Oid.is_in(internal_table_ids))
916                .exec(&txn)
917                .await?;
918        }
919        if let Some(t) = &table_obj
920            && let Some(source_id) = t.optional_associated_source_id
921        {
922            Object::delete_by_id(source_id).exec(&txn).await?;
923        }
924
925        let err = if is_cancelled {
926            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
927        } else {
928            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
929        };
930        let abort_reason = format!("streaming job aborted {}", err.as_report());
931        for tx in inner
932            .creating_table_finish_notifier
933            .get_mut(&database_id)
934            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
935            .into_iter()
936            .flatten()
937            .flatten()
938        {
939            let _ = tx.send(Err(abort_reason.clone()));
940        }
941        txn.commit().await?;
942
943        // Notify serving module about deleted fragments from the aborted job.
944        self.env
945            .notification_manager()
946            .notify_serving_fragment_mapping_delete(
947                abort_fragment_ids.iter().map(|id| *id as _).collect(),
948            );
949
950        if !objs.is_empty() {
951            // We also have notified the frontend about these objects,
952            // so we need to notify the frontend to delete them here.
953            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
954                .await;
955        }
956        Ok((true, Some(database_id)))
957    }
958
959    #[await_tree::instrument]
960    pub async fn post_collect_job_fragments(
961        &self,
962        job_id: JobId,
963        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
964        new_sink_downstream: Option<FragmentDownstreamRelation>,
965        split_assignment: Option<&SplitAssignment>,
966    ) -> MetaResult<()> {
967        let inner = self.inner.write().await;
968        let txn = inner.db.begin().await?;
969
970        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
971
972        if let Some(new_downstream) = new_sink_downstream {
973            insert_fragment_relations(&txn, &new_downstream).await?;
974        }
975
976        // Mark job as CREATING.
977        StreamingJobModel::update(streaming_job::ActiveModel {
978            job_id: Set(job_id),
979            job_status: Set(JobStatus::Creating),
980            ..Default::default()
981        })
982        .exec(&txn)
983        .await?;
984
985        if let Some(split_assignment) = split_assignment {
986            let fragment_splits = split_assignment
987                .iter()
988                .map(|(fragment_id, splits)| {
989                    (
990                        *fragment_id as _,
991                        splits.values().flatten().cloned().collect_vec(),
992                    )
993                })
994                .collect();
995
996            self.update_fragment_splits(&txn, &fragment_splits).await?;
997        }
998
999        txn.commit().await?;
1000
1001        Ok(())
1002    }
1003
1004    pub async fn create_job_catalog_for_replace(
1005        &self,
1006        streaming_job: &StreamingJob,
1007        ctx: Option<&StreamContext>,
1008        specified_parallelism: Option<&NonZeroUsize>,
1009        expected_original_max_parallelism: Option<usize>,
1010    ) -> MetaResult<streaming_job::Model> {
1011        let id = streaming_job.id();
1012        let inner = self.inner.write().await;
1013        let txn = inner.db.begin().await?;
1014
1015        // 1. check version.
1016        streaming_job.verify_version_for_replace(&txn).await?;
1017        // 2. check concurrent replace.
1018        let referring_cnt = ObjectDependency::find()
1019            .join(
1020                JoinType::InnerJoin,
1021                object_dependency::Relation::Object1.def(),
1022            )
1023            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
1024            .filter(
1025                object_dependency::Column::Oid
1026                    .eq(id)
1027                    .and(object::Column::ObjType.eq(ObjectType::Table))
1028                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
1029            )
1030            .count(&txn)
1031            .await?;
1032        if referring_cnt != 0 {
1033            return Err(MetaError::permission_denied(
1034                "job is being altered or referenced by some creating jobs",
1035            ));
1036        }
1037
1038        // 3. check parallelism.
1039        let original_job = StreamingJobModel::find_by_id(id)
1040            .one(&txn)
1041            .await?
1042            .map(ReplaceOriginalJobInfo::from)
1043            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
1044
1045        if let Some(max_parallelism) = expected_original_max_parallelism
1046            && original_job.max_parallelism != max_parallelism as i32
1047        {
1048            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
1049            // This should not happen in normal cases.
1050            bail!(
1051                "cannot use a different max parallelism \
1052                 when replacing streaming job, \
1053                 original: {}, new: {}",
1054                original_job.max_parallelism,
1055                max_parallelism
1056            );
1057        }
1058
1059        let parallelism = original_job.resolved_parallelism(specified_parallelism);
1060        let ctx = original_job.stream_context(ctx);
1061        let resource_type = original_job.resource_type();
1062
1063        // 4. create streaming object for new replace table.
1064        let tmp_model = Self::create_streaming_job_obj(
1065            &txn,
1066            streaming_job.object_type(),
1067            streaming_job.owner() as _,
1068            Some(streaming_job.database_id() as _),
1069            Some(streaming_job.schema_id() as _),
1070            streaming_job.create_type(),
1071            ctx,
1072            parallelism,
1073            original_job.max_parallelism as _,
1074            resource_type,
1075            // `backfill_parallelism` is intentionally NOT inherited from the original job.
1076            // Replace has no "backfill finish -> restore parallelism" phase, so inheriting it
1077            // would render actors at the backfill parallelism and never recover to steady-state.
1078            None,
1079        )
1080        .await?;
1081
1082        // 5. record dependency for new replace table.
1083        ObjectDependency::insert(object_dependency::ActiveModel {
1084            oid: Set(id.as_object_id()),
1085            used_by: Set(tmp_model.job_id.as_object_id()),
1086            ..Default::default()
1087        })
1088        .exec(&txn)
1089        .await?;
1090
1091        txn.commit().await?;
1092
1093        Ok(tmp_model)
1094    }
1095
1096    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1097    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1098        let mut inner = self.inner.write().await;
1099        let txn = inner.db.begin().await?;
1100
1101        // Check if the job belongs to iceberg table.
1102        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1103            tracing::info!(
1104                "streaming job {} is for iceberg table, wait for manual finish operation",
1105                job_id
1106            );
1107            return Ok(());
1108        }
1109
1110        let (notification_op, objects, updated_user_info, dependencies) =
1111            self.finish_streaming_job_inner(&txn, job_id).await?;
1112
1113        txn.commit().await?;
1114
1115        let mut version = self
1116            .notify_frontend(
1117                notification_op,
1118                NotificationInfo::ObjectGroup(PbObjectGroup {
1119                    objects,
1120                    dependencies,
1121                }),
1122            )
1123            .await;
1124
1125        // notify users about the default privileges
1126        if !updated_user_info.is_empty() {
1127            version = self.notify_users_update(updated_user_info).await;
1128        }
1129
1130        inner
1131            .creating_table_finish_notifier
1132            .values_mut()
1133            .for_each(|creating_tables| {
1134                if let Some(txs) = creating_tables.remove(&job_id) {
1135                    for tx in txs {
1136                        let _ = tx.send(Ok(version));
1137                    }
1138                }
1139            });
1140
1141        Ok(())
1142    }
1143
1144    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1145    pub async fn finish_streaming_job_inner(
1146        &self,
1147        txn: &DatabaseTransaction,
1148        job_id: JobId,
1149    ) -> MetaResult<(
1150        Operation,
1151        Vec<risingwave_pb::meta::Object>,
1152        Vec<PbUserInfo>,
1153        Vec<PbObjectDependency>,
1154    )> {
1155        let job_type = Object::find_by_id(job_id)
1156            .select_only()
1157            .column(object::Column::ObjType)
1158            .into_tuple()
1159            .one(txn)
1160            .await?
1161            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1162
1163        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1164            .select_only()
1165            .column(streaming_job::Column::CreateType)
1166            .into_tuple()
1167            .one(txn)
1168            .await?
1169            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1170
1171        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1172        let res = Object::update_many()
1173            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1174            .col_expr(
1175                object::Column::CreatedAtClusterVersion,
1176                current_cluster_version().into(),
1177            )
1178            .filter(object::Column::Oid.eq(job_id))
1179            .exec(txn)
1180            .await?;
1181        if res.rows_affected == 0 {
1182            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1183        }
1184
1185        // mark the target stream job as `Created`.
1186        let job = streaming_job::ActiveModel {
1187            job_id: Set(job_id),
1188            job_status: Set(JobStatus::Created),
1189            ..Default::default()
1190        };
1191        let streaming_job = Some(job.update(txn).await?);
1192
1193        // notify frontend: job, internal tables.
1194        let internal_table_objs = Table::find()
1195            .find_also_related(Object)
1196            .filter(table::Column::BelongsToJobId.eq(job_id))
1197            .all(txn)
1198            .await?;
1199        let mut objects = internal_table_objs
1200            .iter()
1201            .map(|(table, obj)| PbObject {
1202                object_info: Some(PbObjectInfo::Table(
1203                    ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1204                )),
1205            })
1206            .collect_vec();
1207        let mut notification_op = if create_type == CreateType::Background {
1208            NotificationOperation::Update
1209        } else {
1210            NotificationOperation::Add
1211        };
1212        let mut updated_user_info = vec![];
1213        let mut need_grant_default_privileges = true;
1214
1215        match job_type {
1216            ObjectType::Table => {
1217                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1218                    .find_also_related(Object)
1219                    .one(txn)
1220                    .await?
1221                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1222                if table.table_type == TableType::MaterializedView {
1223                    notification_op = NotificationOperation::Update;
1224                }
1225
1226                if let Some(source_id) = table.optional_associated_source_id {
1227                    let (src, obj) = Source::find_by_id(source_id)
1228                        .find_also_related(Object)
1229                        .one(txn)
1230                        .await?
1231                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1232                    objects.push(PbObject {
1233                        object_info: Some(PbObjectInfo::Source(
1234                            ObjectModel(src, obj.unwrap(), None).into(),
1235                        )),
1236                    });
1237                }
1238                objects.push(PbObject {
1239                    object_info: Some(PbObjectInfo::Table(
1240                        ObjectModel(table, obj.unwrap(), streaming_job).into(),
1241                    )),
1242                });
1243            }
1244            ObjectType::Sink => {
1245                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1246                    .find_also_related(Object)
1247                    .one(txn)
1248                    .await?
1249                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1250                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1251                    need_grant_default_privileges = false;
1252                }
1253                // If sinks were pre-notified during CREATING, we should use Update at finish
1254                // to avoid duplicate Add notifications (align with MV behavior).
1255                notification_op = NotificationOperation::Update;
1256                objects.push(PbObject {
1257                    object_info: Some(PbObjectInfo::Sink(
1258                        ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1259                    )),
1260                });
1261            }
1262            ObjectType::Index => {
1263                need_grant_default_privileges = false;
1264                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1265                    .find_also_related(Object)
1266                    .one(txn)
1267                    .await?
1268                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1269                {
1270                    let (table, obj) = Table::find_by_id(index.index_table_id)
1271                        .find_also_related(Object)
1272                        .one(txn)
1273                        .await?
1274                        .ok_or_else(|| {
1275                            MetaError::catalog_id_not_found("table", index.index_table_id)
1276                        })?;
1277                    objects.push(PbObject {
1278                        object_info: Some(PbObjectInfo::Table(
1279                            ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1280                        )),
1281                    });
1282                }
1283
1284                // If the index is created on a table with privileges, we should also
1285                // grant the privileges for the index and its state tables.
1286                let primary_table_privileges = UserPrivilege::find()
1287                    .filter(
1288                        user_privilege::Column::Oid
1289                            .eq(index.primary_table_id)
1290                            .and(user_privilege::Column::Action.eq(Action::Select)),
1291                    )
1292                    .all(txn)
1293                    .await?;
1294                if !primary_table_privileges.is_empty() {
1295                    let index_state_table_ids: Vec<TableId> = Table::find()
1296                        .select_only()
1297                        .column(table::Column::TableId)
1298                        .filter(
1299                            table::Column::BelongsToJobId
1300                                .eq(job_id)
1301                                .or(table::Column::TableId.eq(index.index_table_id)),
1302                        )
1303                        .into_tuple()
1304                        .all(txn)
1305                        .await?;
1306                    let mut new_privileges = vec![];
1307                    for privilege in &primary_table_privileges {
1308                        for state_table_id in &index_state_table_ids {
1309                            new_privileges.push(user_privilege::ActiveModel {
1310                                id: Default::default(),
1311                                oid: Set(state_table_id.as_object_id()),
1312                                user_id: Set(privilege.user_id),
1313                                action: Set(Action::Select),
1314                                dependent_id: Set(privilege.dependent_id),
1315                                granted_by: Set(privilege.granted_by),
1316                                with_grant_option: Set(privilege.with_grant_option),
1317                            });
1318                        }
1319                    }
1320                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1321
1322                    updated_user_info = list_user_info_by_ids(
1323                        primary_table_privileges.into_iter().map(|p| p.user_id),
1324                        txn,
1325                    )
1326                    .await?;
1327                }
1328
1329                objects.push(PbObject {
1330                    object_info: Some(PbObjectInfo::Index(
1331                        ObjectModel(index, obj.unwrap(), streaming_job).into(),
1332                    )),
1333                });
1334            }
1335            ObjectType::Source => {
1336                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1337                    .find_also_related(Object)
1338                    .one(txn)
1339                    .await?
1340                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1341                objects.push(PbObject {
1342                    object_info: Some(PbObjectInfo::Source(
1343                        ObjectModel(source, obj.unwrap(), None).into(),
1344                    )),
1345                });
1346            }
1347            _ => unreachable!("invalid job type: {:?}", job_type),
1348        }
1349
1350        if need_grant_default_privileges {
1351            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1352        }
1353
1354        let dependencies =
1355            list_object_dependencies_by_object_id(txn, job_id.as_object_id()).await?;
1356
1357        Ok((notification_op, objects, updated_user_info, dependencies))
1358    }
1359
1360    pub async fn finish_replace_streaming_job(
1361        &self,
1362        tmp_id: JobId,
1363        streaming_job: StreamingJob,
1364        replace_upstream: FragmentReplaceUpstream,
1365        sink_into_table_context: SinkIntoTableContext,
1366        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1367        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1368    ) -> MetaResult<NotificationVersion> {
1369        let inner = self.inner.write().await;
1370        let txn = inner.db.begin().await?;
1371
1372        let (objects, delete_notification_objs, old_fragment_ids, new_fragment_ids) =
1373            Self::finish_replace_streaming_job_inner(
1374                tmp_id,
1375                replace_upstream,
1376                sink_into_table_context,
1377                &txn,
1378                streaming_job,
1379                drop_table_connector_ctx,
1380                auto_refresh_schema_sinks,
1381            )
1382            .await?;
1383
1384        txn.commit().await?;
1385
1386        // Notify serving module: delete old fragment mappings, upsert new ones.
1387        let notification_manager = self.env.notification_manager();
1388        notification_manager.notify_serving_fragment_mapping_delete(
1389            old_fragment_ids.iter().map(|id| *id as _).collect(),
1390        );
1391        notification_manager.notify_serving_fragment_mapping_update(
1392            new_fragment_ids.iter().map(|id| *id as _).collect(),
1393        );
1394
1395        let mut version = self
1396            .notify_frontend(
1397                NotificationOperation::Update,
1398                NotificationInfo::ObjectGroup(PbObjectGroup {
1399                    objects,
1400                    dependencies: vec![],
1401                }),
1402            )
1403            .await;
1404
1405        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1406            self.notify_users_update(user_infos).await;
1407            version = self
1408                .notify_frontend(
1409                    NotificationOperation::Delete,
1410                    build_object_group_for_delete(to_drop_objects),
1411                )
1412                .await;
1413        }
1414
1415        Ok(version)
1416    }
1417
1418    fn update_iceberg_source_columns(
1419        original_source_columns: &[ColumnCatalog],
1420        original_row_id_index: Option<usize>,
1421        new_table_columns: &[ColumnCatalog],
1422    ) -> (Vec<ColumnCatalog>, Option<usize>) {
1423        let row_id_column_name = original_row_id_index
1424            .and_then(|idx| original_source_columns.get(idx))
1425            .map(|col| col.name().to_owned());
1426        let mut next_column_id = max_column_id(original_source_columns).next();
1427        let existing_columns: HashMap<String, ColumnCatalog> = original_source_columns
1428            .iter()
1429            .cloned()
1430            .map(|col| (col.name().to_owned(), col))
1431            .collect();
1432
1433        let mut new_columns = Vec::new();
1434        for table_col in new_table_columns
1435            .iter()
1436            .filter(|col| !col.is_rw_sys_column())
1437        {
1438            let mut source_col_name = table_col.name().to_owned();
1439            if source_col_name == ROW_ID_COLUMN_NAME {
1440                source_col_name = RISINGWAVE_ICEBERG_ROW_ID.to_owned();
1441            }
1442
1443            if let Some(existing) = existing_columns.get(&source_col_name) {
1444                new_columns.push(existing.clone());
1445            } else {
1446                let mut new_col = table_col.clone();
1447                new_col.column_desc.name = source_col_name;
1448                new_col.column_desc.column_id = next_column_id;
1449                next_column_id = next_column_id.next();
1450                new_columns.push(new_col);
1451            }
1452        }
1453
1454        let mut seen_names: HashSet<String> = new_columns
1455            .iter()
1456            .map(|col| col.name().to_owned())
1457            .collect();
1458        for col in original_source_columns
1459            .iter()
1460            .filter(|col| col.is_iceberg_hidden_column())
1461        {
1462            if seen_names.insert(col.name().to_owned()) {
1463                new_columns.push(col.clone());
1464            }
1465        }
1466
1467        let new_row_id_index = row_id_column_name
1468            .as_ref()
1469            .and_then(|name| new_columns.iter().position(|col| col.name() == name));
1470
1471        (new_columns, new_row_id_index)
1472    }
1473
1474    pub async fn finish_replace_streaming_job_inner(
1475        tmp_id: JobId,
1476        replace_upstream: FragmentReplaceUpstream,
1477        SinkIntoTableContext {
1478            updated_sink_catalogs,
1479        }: SinkIntoTableContext,
1480        txn: &DatabaseTransaction,
1481        streaming_job: StreamingJob,
1482        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1483        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1484    ) -> MetaResult<(
1485        Vec<PbObject>,
1486        Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1487        Vec<FragmentId>,
1488        Vec<FragmentId>,
1489    )> {
1490        let original_job_id = streaming_job.id();
1491        let job_type = streaming_job.job_type();
1492
1493        // Query old fragment IDs (will be deleted) and new fragment IDs (will be reassigned).
1494        let old_fragment_ids: Vec<FragmentId> = Fragment::find()
1495            .select_only()
1496            .column(fragment::Column::FragmentId)
1497            .filter(fragment::Column::JobId.eq(original_job_id))
1498            .into_tuple()
1499            .all(txn)
1500            .await?;
1501        let new_fragment_ids: Vec<FragmentId> = Fragment::find()
1502            .select_only()
1503            .column(fragment::Column::FragmentId)
1504            .filter(fragment::Column::JobId.eq(tmp_id))
1505            .into_tuple()
1506            .all(txn)
1507            .await?;
1508
1509        let mut index_item_rewriter = None;
1510        let mut updated_iceberg_source_id: Option<SourceId> = None;
1511
1512        // Update catalog
1513        match streaming_job {
1514            StreamingJob::Table(_, table, _table_job_type) => {
1515                let original_column_catalogs =
1516                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1517                let schema_changed = original_column_catalogs.to_protobuf() != table.columns;
1518                let is_iceberg = table
1519                    .engine
1520                    .and_then(|engine| PbEngine::try_from(engine).ok())
1521                    == Some(PbEngine::Iceberg);
1522                if is_iceberg && schema_changed {
1523                    let iceberg_source_name = format!("{}{}", ICEBERG_SOURCE_PREFIX, table.name);
1524                    let source = Source::find()
1525                        .inner_join(Object)
1526                        .filter(
1527                            object::Column::DatabaseId
1528                                .eq(table.database_id)
1529                                .and(object::Column::SchemaId.eq(table.schema_id))
1530                                .and(source::Column::Name.eq(&iceberg_source_name)),
1531                        )
1532                        .one(txn)
1533                        .await?
1534                        .ok_or_else(|| {
1535                            MetaError::catalog_id_not_found("source", iceberg_source_name)
1536                        })?;
1537
1538                    let source_id = source.source_id;
1539                    let source_version = source.version;
1540                    let source_columns: Vec<ColumnCatalog> = source
1541                        .columns
1542                        .to_protobuf()
1543                        .into_iter()
1544                        .map(ColumnCatalog::from)
1545                        .collect();
1546                    let source_row_id_index = source
1547                        .row_id_index
1548                        .and_then(|idx| usize::try_from(idx).ok());
1549                    let table_columns: Vec<ColumnCatalog> = table
1550                        .columns
1551                        .iter()
1552                        .cloned()
1553                        .map(ColumnCatalog::from)
1554                        .collect();
1555                    let (updated_columns, updated_row_id_index) =
1556                        Self::update_iceberg_source_columns(
1557                            &source_columns,
1558                            source_row_id_index,
1559                            &table_columns,
1560                        );
1561                    let updated_columns: Vec<PbColumnCatalog> = updated_columns
1562                        .into_iter()
1563                        .map(|col| col.to_protobuf())
1564                        .collect();
1565
1566                    let mut source_active = source.into_active_model();
1567                    source_active.columns = Set(ColumnCatalogArray::from(updated_columns));
1568                    source_active.row_id_index = Set(updated_row_id_index.map(|idx| idx as i32));
1569                    source_active.version = Set(source_version + 1);
1570                    source_active.update(txn).await?;
1571                    updated_iceberg_source_id = Some(source_id);
1572                }
1573
1574                index_item_rewriter = Some({
1575                    let original_columns = original_column_catalogs
1576                        .to_protobuf()
1577                        .into_iter()
1578                        .map(|c| c.column_desc.unwrap())
1579                        .collect_vec();
1580                    let new_columns = table
1581                        .columns
1582                        .iter()
1583                        .map(|c| c.column_desc.clone().unwrap())
1584                        .collect_vec();
1585
1586                    IndexItemRewriter {
1587                        original_columns,
1588                        new_columns,
1589                    }
1590                });
1591
1592                // For sinks created in earlier versions, we need to set the original_target_columns.
1593                for sink_id in updated_sink_catalogs {
1594                    Sink::update(sink::ActiveModel {
1595                        sink_id: Set(sink_id as _),
1596                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1597                        ..Default::default()
1598                    })
1599                    .exec(txn)
1600                    .await?;
1601                }
1602                // Update the table catalog with the new one. (column catalog is also updated here)
1603                let mut table = table::ActiveModel::from(table);
1604                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1605                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1606                {
1607                    // drop table connector, the rest logic is in `drop_table_associated_source`
1608                    table.optional_associated_source_id = Set(None);
1609                }
1610
1611                Table::update(table).exec(txn).await?;
1612            }
1613            StreamingJob::Source(source) => {
1614                // Update the source catalog with the new one.
1615                let source = source::ActiveModel::from(source);
1616                Source::update(source).exec(txn).await?;
1617            }
1618            StreamingJob::MaterializedView(table) => {
1619                // Update the table catalog with the new one.
1620                let table = table::ActiveModel::from(table);
1621                Table::update(table).exec(txn).await?;
1622            }
1623            _ => unreachable!(
1624                "invalid streaming job type: {:?}",
1625                streaming_job.job_type_str()
1626            ),
1627        }
1628
1629        async fn finish_fragments(
1630            txn: &DatabaseTransaction,
1631            tmp_id: JobId,
1632            original_job_id: JobId,
1633            replace_upstream: FragmentReplaceUpstream,
1634        ) -> MetaResult<()> {
1635            // 0. update internal tables
1636            // Fields including `fragment_id` were placeholder values before.
1637            // After table fragments are created, update them for all internal tables.
1638            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1639                .select_only()
1640                .columns([
1641                    fragment::Column::FragmentId,
1642                    fragment::Column::StateTableIds,
1643                ])
1644                .filter(fragment::Column::JobId.eq(tmp_id))
1645                .into_tuple()
1646                .all(txn)
1647                .await?;
1648            for (fragment_id, state_table_ids) in fragment_info {
1649                for state_table_id in state_table_ids.into_inner() {
1650                    let state_table_id = TableId::new(state_table_id as _);
1651                    Table::update(table::ActiveModel {
1652                        table_id: Set(state_table_id),
1653                        fragment_id: Set(Some(fragment_id)),
1654                        // No need to update `vnode_count` because it must remain the same.
1655                        ..Default::default()
1656                    })
1657                    .exec(txn)
1658                    .await?;
1659                }
1660            }
1661
1662            // 1. replace old fragments/actors with new ones.
1663            Fragment::delete_many()
1664                .filter(fragment::Column::JobId.eq(original_job_id))
1665                .exec(txn)
1666                .await?;
1667            Fragment::update_many()
1668                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1669                .filter(fragment::Column::JobId.eq(tmp_id))
1670                .exec(txn)
1671                .await?;
1672
1673            // 2. update merges.
1674            // update downstream fragment's Merge node, and upstream_fragment_id
1675            for (fragment_id, fragment_replace_map) in replace_upstream {
1676                let (fragment_id, mut stream_node) =
1677                    Fragment::find_by_id(fragment_id as FragmentId)
1678                        .select_only()
1679                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1680                        .into_tuple::<(FragmentId, StreamNode)>()
1681                        .one(txn)
1682                        .await?
1683                        .map(|(id, node)| (id, node.to_protobuf()))
1684                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1685
1686                visit_stream_node_mut(&mut stream_node, |body| {
1687                    if let PbNodeBody::Merge(m) = body
1688                        && let Some(new_fragment_id) =
1689                            fragment_replace_map.get(&m.upstream_fragment_id)
1690                    {
1691                        m.upstream_fragment_id = *new_fragment_id;
1692                    }
1693                });
1694                Fragment::update(fragment::ActiveModel {
1695                    fragment_id: Set(fragment_id),
1696                    stream_node: Set(StreamNode::from(&stream_node)),
1697                    ..Default::default()
1698                })
1699                .exec(txn)
1700                .await?;
1701            }
1702
1703            // 3. remove dummy object.
1704            Object::delete_by_id(tmp_id).exec(txn).await?;
1705
1706            Ok(())
1707        }
1708
1709        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1710
1711        // 4. update catalogs and notify.
1712        let mut objects = vec![];
1713        match job_type {
1714            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1715                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1716                    .find_also_related(Object)
1717                    .one(txn)
1718                    .await?
1719                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1720                let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1721                    .one(txn)
1722                    .await?;
1723                objects.push(PbObject {
1724                    object_info: Some(PbObjectInfo::Table(
1725                        ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1726                    )),
1727                })
1728            }
1729            StreamingJobType::Source => {
1730                let (source, source_obj) =
1731                    Source::find_by_id(original_job_id.as_shared_source_id())
1732                        .find_also_related(Object)
1733                        .one(txn)
1734                        .await?
1735                        .ok_or_else(|| {
1736                            MetaError::catalog_id_not_found("object", original_job_id)
1737                        })?;
1738                objects.push(PbObject {
1739                    object_info: Some(PbObjectInfo::Source(
1740                        ObjectModel(source, source_obj.unwrap(), None).into(),
1741                    )),
1742                })
1743            }
1744            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1745        }
1746
1747        if let Some(source_id) = updated_iceberg_source_id {
1748            let (source, source_obj) = Source::find_by_id(source_id)
1749                .find_also_related(Object)
1750                .one(txn)
1751                .await?
1752                .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1753            objects.push(PbObject {
1754                object_info: Some(PbObjectInfo::Source(
1755                    ObjectModel(source, source_obj.unwrap(), None).into(),
1756                )),
1757            });
1758        }
1759
1760        if let Some(expr_rewriter) = index_item_rewriter {
1761            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1762                .select_only()
1763                .columns([index::Column::IndexId, index::Column::IndexItems])
1764                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1765                .into_tuple()
1766                .all(txn)
1767                .await?;
1768            for (index_id, nodes) in index_items {
1769                let mut pb_nodes = nodes.to_protobuf();
1770                pb_nodes
1771                    .iter_mut()
1772                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1773                let index = index::ActiveModel {
1774                    index_id: Set(index_id),
1775                    index_items: Set(pb_nodes.into()),
1776                    ..Default::default()
1777                }
1778                .update(txn)
1779                .await?;
1780                let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1781                    .find_also_related(streaming_job::Entity)
1782                    .one(txn)
1783                    .await?
1784                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1785                objects.push(PbObject {
1786                    object_info: Some(PbObjectInfo::Index(
1787                        ObjectModel(index, index_obj, streaming_job).into(),
1788                    )),
1789                });
1790            }
1791        }
1792
1793        if let Some(sinks) = auto_refresh_schema_sinks {
1794            for finish_sink_context in sinks {
1795                finish_fragments(
1796                    txn,
1797                    finish_sink_context.tmp_sink_id.as_job_id(),
1798                    finish_sink_context.original_sink_id.as_job_id(),
1799                    Default::default(),
1800                )
1801                .await?;
1802                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1803                    .find_also_related(Object)
1804                    .one(txn)
1805                    .await?
1806                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1807                let sink_streaming_job =
1808                    streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1809                        .one(txn)
1810                        .await?;
1811                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1812                Sink::update(sink::ActiveModel {
1813                    sink_id: Set(finish_sink_context.original_sink_id),
1814                    columns: Set(columns.clone()),
1815                    ..Default::default()
1816                })
1817                .exec(txn)
1818                .await?;
1819                sink.columns = columns;
1820                objects.push(PbObject {
1821                    object_info: Some(PbObjectInfo::Sink(
1822                        ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1823                    )),
1824                });
1825                if let Some((log_store_table_id, new_log_store_table_columns)) =
1826                    finish_sink_context.new_log_store_table
1827                {
1828                    let new_log_store_table_columns: ColumnCatalogArray =
1829                        new_log_store_table_columns.into();
1830                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1831                        .find_also_related(Object)
1832                        .one(txn)
1833                        .await?
1834                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1835                    Table::update(table::ActiveModel {
1836                        table_id: Set(log_store_table_id),
1837                        columns: Set(new_log_store_table_columns.clone()),
1838                        ..Default::default()
1839                    })
1840                    .exec(txn)
1841                    .await?;
1842                    table.columns = new_log_store_table_columns;
1843                    objects.push(PbObject {
1844                        object_info: Some(PbObjectInfo::Table(
1845                            ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1846                                .into(),
1847                        )),
1848                    });
1849                }
1850            }
1851        }
1852
1853        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1854        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1855            notification_objs =
1856                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1857        }
1858
1859        Ok((
1860            objects,
1861            notification_objs,
1862            old_fragment_ids,
1863            new_fragment_ids,
1864        ))
1865    }
1866
1867    /// Abort the replacing streaming job by deleting the temporary job object.
1868    pub async fn try_abort_replacing_streaming_job(
1869        &self,
1870        tmp_job_id: JobId,
1871        tmp_sink_ids: Option<Vec<ObjectId>>,
1872    ) -> MetaResult<()> {
1873        let inner = self.inner.write().await;
1874        let txn = inner.db.begin().await?;
1875
1876        // Query fragment IDs of the temp job and temp sinks before cascade-deleting them.
1877        let mut all_job_ids: Vec<ObjectId> = vec![tmp_job_id.into()];
1878        if let Some(ref sink_ids) = tmp_sink_ids {
1879            all_job_ids.extend(sink_ids.iter().copied());
1880        }
1881        let abort_fragment_ids: Vec<FragmentId> = Fragment::find()
1882            .select_only()
1883            .column(fragment::Column::FragmentId)
1884            .filter(fragment::Column::JobId.is_in(all_job_ids))
1885            .into_tuple()
1886            .all(&txn)
1887            .await?;
1888
1889        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1890        if let Some(tmp_sink_ids) = tmp_sink_ids {
1891            for tmp_sink_id in tmp_sink_ids {
1892                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1893            }
1894        }
1895        txn.commit().await?;
1896
1897        // Notify serving module about deleted fragments from the aborted replace job.
1898        self.env
1899            .notification_manager()
1900            .notify_serving_fragment_mapping_delete(
1901                abort_fragment_ids.iter().map(|id| *id as _).collect(),
1902            );
1903
1904        Ok(())
1905    }
1906
1907    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1908    // return the actor_ids to be applied
1909    pub async fn update_source_rate_limit_by_source_id(
1910        &self,
1911        source_id: SourceId,
1912        rate_limit: Option<u32>,
1913    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1914        let inner = self.inner.read().await;
1915        let txn = inner.db.begin().await?;
1916
1917        {
1918            let active_source = source::ActiveModel {
1919                source_id: Set(source_id),
1920                rate_limit: Set(rate_limit.map(|v| v as i32)),
1921                ..Default::default()
1922            };
1923            Source::update(active_source).exec(&txn).await?;
1924        }
1925
1926        let (source, obj) = Source::find_by_id(source_id)
1927            .find_also_related(Object)
1928            .one(&txn)
1929            .await?
1930            .ok_or_else(|| {
1931                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1932            })?;
1933
1934        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1935        let streaming_job_ids: Vec<JobId> =
1936            if let Some(table_id) = source.optional_associated_table_id {
1937                vec![table_id.as_job_id()]
1938            } else if let Some(source_info) = &source.source_info
1939                && source_info.to_protobuf().is_shared()
1940            {
1941                vec![source_id.as_share_source_job_id()]
1942            } else {
1943                ObjectDependency::find()
1944                    .select_only()
1945                    .column(object_dependency::Column::UsedBy)
1946                    .filter(object_dependency::Column::Oid.eq(source_id))
1947                    .into_tuple()
1948                    .all(&txn)
1949                    .await?
1950            };
1951
1952        if streaming_job_ids.is_empty() {
1953            return Err(MetaError::invalid_parameter(format!(
1954                "source id {source_id} not used by any streaming job"
1955            )));
1956        }
1957
1958        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1959            .select_only()
1960            .columns([
1961                fragment::Column::FragmentId,
1962                fragment::Column::JobId,
1963                fragment::Column::FragmentTypeMask,
1964                fragment::Column::StreamNode,
1965            ])
1966            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1967            .into_tuple()
1968            .all(&txn)
1969            .await?;
1970        let mut fragments = fragments
1971            .into_iter()
1972            .map(|(id, job_id, mask, stream_node)| {
1973                (
1974                    id,
1975                    job_id,
1976                    FragmentTypeMask::from(mask as u32),
1977                    stream_node.to_protobuf(),
1978                )
1979            })
1980            .collect_vec();
1981
1982        fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1983            let mut found = false;
1984            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1985                visit_stream_node_mut(stream_node, |node| {
1986                    if let PbNodeBody::Source(node) = node
1987                        && let Some(node_inner) = &mut node.source_inner
1988                        && node_inner.source_id == source_id
1989                    {
1990                        node_inner.rate_limit = rate_limit;
1991                        found = true;
1992                    }
1993                });
1994            }
1995            if is_fs_source {
1996                // in older versions, there's no fragment type flag for `FsFetch` node,
1997                // so we just scan all fragments for StreamFsFetch node if using fs connector
1998                visit_stream_node_mut(stream_node, |node| {
1999                    if let PbNodeBody::StreamFsFetch(node) = node {
2000                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
2001                        if let Some(node_inner) = &mut node.node_inner
2002                            && node_inner.source_id == source_id
2003                        {
2004                            node_inner.rate_limit = rate_limit;
2005                            found = true;
2006                        }
2007                    }
2008                });
2009            }
2010            found
2011        });
2012
2013        assert!(
2014            !fragments.is_empty(),
2015            "source id should be used by at least one fragment"
2016        );
2017
2018        let (fragment_ids, job_ids) = fragments
2019            .iter()
2020            .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
2021            .unzip();
2022
2023        for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
2024            Fragment::update(fragment::ActiveModel {
2025                fragment_id: Set(fragment_id),
2026                fragment_type_mask: Set(fragment_type_mask.into()),
2027                stream_node: Set(StreamNode::from(&stream_node)),
2028                ..Default::default()
2029            })
2030            .exec(&txn)
2031            .await?;
2032        }
2033
2034        txn.commit().await?;
2035
2036        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
2037        let _version = self
2038            .notify_frontend(
2039                NotificationOperation::Update,
2040                NotificationInfo::ObjectGroup(PbObjectGroup {
2041                    objects: vec![PbObject {
2042                        object_info: Some(relation_info),
2043                    }],
2044                    dependencies: vec![],
2045                }),
2046            )
2047            .await;
2048
2049        Ok((job_ids, fragment_ids))
2050    }
2051
2052    // edit the content of fragments in given `table_id`
2053    // return the actor_ids to be applied
2054    pub async fn mutate_fragments_by_job_id(
2055        &self,
2056        job_id: JobId,
2057        // returns true if the mutation is applied
2058        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
2059        // error message when no relevant fragments is found
2060        err_msg: &'static str,
2061    ) -> MetaResult<HashSet<FragmentId>> {
2062        let inner = self.inner.read().await;
2063        let txn = inner.db.begin().await?;
2064
2065        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2066            .select_only()
2067            .columns([
2068                fragment::Column::FragmentId,
2069                fragment::Column::FragmentTypeMask,
2070                fragment::Column::StreamNode,
2071            ])
2072            .filter(fragment::Column::JobId.eq(job_id))
2073            .into_tuple()
2074            .all(&txn)
2075            .await?;
2076        let mut fragments = fragments
2077            .into_iter()
2078            .map(|(id, mask, stream_node)| {
2079                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
2080            })
2081            .collect_vec();
2082
2083        let fragments = fragments
2084            .iter_mut()
2085            .map(|(_, fragment_type_mask, stream_node)| {
2086                fragments_mutation_fn(*fragment_type_mask, stream_node)
2087            })
2088            .collect::<MetaResult<Vec<bool>>>()?
2089            .into_iter()
2090            .zip_eq_debug(std::mem::take(&mut fragments))
2091            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
2092            .collect::<Vec<_>>();
2093
2094        if fragments.is_empty() {
2095            return Err(MetaError::invalid_parameter(format!(
2096                "job id {job_id}: {}",
2097                err_msg
2098            )));
2099        }
2100
2101        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
2102        for (id, _, stream_node) in fragments {
2103            Fragment::update(fragment::ActiveModel {
2104                fragment_id: Set(id),
2105                stream_node: Set(StreamNode::from(&stream_node)),
2106                ..Default::default()
2107            })
2108            .exec(&txn)
2109            .await?;
2110        }
2111
2112        txn.commit().await?;
2113
2114        Ok(fragment_ids)
2115    }
2116
2117    async fn mutate_fragment_by_fragment_id(
2118        &self,
2119        fragment_id: FragmentId,
2120        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
2121        err_msg: &'static str,
2122    ) -> MetaResult<()> {
2123        let inner = self.inner.read().await;
2124        let txn = inner.db.begin().await?;
2125
2126        let (fragment_type_mask, stream_node): (i32, StreamNode) =
2127            Fragment::find_by_id(fragment_id)
2128                .select_only()
2129                .columns([
2130                    fragment::Column::FragmentTypeMask,
2131                    fragment::Column::StreamNode,
2132                ])
2133                .into_tuple()
2134                .one(&txn)
2135                .await?
2136                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2137        let mut pb_stream_node = stream_node.to_protobuf();
2138        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
2139
2140        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
2141            return Err(MetaError::invalid_parameter(format!(
2142                "fragment id {fragment_id}: {}",
2143                err_msg
2144            )));
2145        }
2146
2147        Fragment::update(fragment::ActiveModel {
2148            fragment_id: Set(fragment_id),
2149            stream_node: Set(stream_node),
2150            ..Default::default()
2151        })
2152        .exec(&txn)
2153        .await?;
2154
2155        txn.commit().await?;
2156
2157        Ok(())
2158    }
2159
2160    pub async fn update_backfill_orders_by_job_id(
2161        &self,
2162        job_id: JobId,
2163        backfill_orders: Option<BackfillOrders>,
2164    ) -> MetaResult<()> {
2165        let inner = self.inner.write().await;
2166        let txn = inner.db.begin().await?;
2167
2168        ensure_job_not_canceled(job_id, &txn).await?;
2169
2170        streaming_job::ActiveModel {
2171            job_id: Set(job_id),
2172            backfill_orders: Set(backfill_orders),
2173            ..Default::default()
2174        }
2175        .update(&txn)
2176        .await?;
2177
2178        txn.commit().await?;
2179
2180        Ok(())
2181    }
2182
2183    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
2184    // return the actor_ids to be applied
2185    pub async fn update_backfill_rate_limit_by_job_id(
2186        &self,
2187        job_id: JobId,
2188        rate_limit: Option<u32>,
2189    ) -> MetaResult<HashSet<FragmentId>> {
2190        let update_backfill_rate_limit =
2191            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2192                let mut found = false;
2193                if fragment_type_mask
2194                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
2195                {
2196                    visit_stream_node_mut(stream_node, |node| match node {
2197                        PbNodeBody::StreamCdcScan(node) => {
2198                            node.rate_limit = rate_limit;
2199                            found = true;
2200                        }
2201                        PbNodeBody::StreamScan(node) => {
2202                            node.rate_limit = rate_limit;
2203                            found = true;
2204                        }
2205                        PbNodeBody::SourceBackfill(node) => {
2206                            node.rate_limit = rate_limit;
2207                            found = true;
2208                        }
2209                        _ => {}
2210                    });
2211                }
2212                Ok(found)
2213            };
2214
2215        self.mutate_fragments_by_job_id(
2216            job_id,
2217            update_backfill_rate_limit,
2218            "stream scan node or source node not found",
2219        )
2220        .await
2221    }
2222
2223    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
2224    // return the actor_ids to be applied
2225    pub async fn update_sink_rate_limit_by_job_id(
2226        &self,
2227        sink_id: SinkId,
2228        rate_limit: Option<u32>,
2229    ) -> MetaResult<HashSet<FragmentId>> {
2230        let update_sink_rate_limit =
2231            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2232                let mut found = Ok(false);
2233                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
2234                    visit_stream_node_mut(stream_node, |node| {
2235                        if let PbNodeBody::Sink(node) = node {
2236                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
2237                                found = Err(MetaError::invalid_parameter(
2238                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
2239                                ));
2240                                return;
2241                            }
2242                            node.rate_limit = rate_limit;
2243                            found = Ok(true);
2244                        }
2245                    });
2246                }
2247                found
2248            };
2249
2250        self.mutate_fragments_by_job_id(
2251            sink_id.as_job_id(),
2252            update_sink_rate_limit,
2253            "sink node not found",
2254        )
2255        .await
2256    }
2257
2258    pub async fn update_dml_rate_limit_by_job_id(
2259        &self,
2260        job_id: JobId,
2261        rate_limit: Option<u32>,
2262    ) -> MetaResult<HashSet<FragmentId>> {
2263        let update_dml_rate_limit =
2264            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2265                let mut found = false;
2266                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2267                    visit_stream_node_mut(stream_node, |node| {
2268                        if let PbNodeBody::Dml(node) = node {
2269                            node.rate_limit = rate_limit;
2270                            found = true;
2271                        }
2272                    });
2273                }
2274                Ok(found)
2275            };
2276
2277        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2278            .await
2279    }
2280
2281    pub async fn update_source_props_by_source_id(
2282        &self,
2283        source_id: SourceId,
2284        alter_props: BTreeMap<String, String>,
2285        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2286        skip_alter_on_fly_check: bool,
2287    ) -> MetaResult<WithOptionsSecResolved> {
2288        let inner = self.inner.read().await;
2289        let txn = inner.db.begin().await?;
2290
2291        let (source, _obj) = Source::find_by_id(source_id)
2292            .find_also_related(Object)
2293            .one(&txn)
2294            .await?
2295            .ok_or_else(|| {
2296                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2297            })?;
2298        let connector = source.with_properties.0.get_connector().unwrap();
2299        let is_shared_source = source.is_shared();
2300
2301        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2302        if !is_shared_source {
2303            // mv using non-shared source holds a copy of source in their fragments
2304            dep_source_job_ids = ObjectDependency::find()
2305                .select_only()
2306                .column(object_dependency::Column::UsedBy)
2307                .filter(object_dependency::Column::Oid.eq(source_id))
2308                .into_tuple()
2309                .all(&txn)
2310                .await?;
2311        }
2312
2313        // Validate that connector type is not being changed
2314        if let Some(new_connector) = alter_props.get(UPSTREAM_SOURCE_KEY)
2315            && new_connector != &connector
2316        {
2317            return Err(MetaError::invalid_parameter(format!(
2318                "Cannot change connector type from '{}' to '{}'. Drop and recreate the source instead.",
2319                connector, new_connector
2320            )));
2321        }
2322
2323        // Only check alter-on-fly restrictions for SQL ALTER SOURCE, not for admin risectl operations
2324        if !skip_alter_on_fly_check {
2325            let prop_keys: Vec<String> = alter_props
2326                .keys()
2327                .chain(alter_secret_refs.keys())
2328                .cloned()
2329                .collect();
2330            risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2331                &connector, &prop_keys,
2332            )?;
2333        }
2334
2335        let mut options_with_secret = WithOptionsSecResolved::new(
2336            source.with_properties.0.clone(),
2337            source
2338                .secret_ref
2339                .map(|secret_ref| secret_ref.to_protobuf())
2340                .unwrap_or_default(),
2341        );
2342        let (to_add_secret_dep, to_remove_secret_dep) =
2343            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2344
2345        tracing::info!(
2346            "applying new properties to source: source_id={}, options_with_secret={:?}",
2347            source_id,
2348            options_with_secret
2349        );
2350        // check if the alter-ed props are valid for each Connector
2351        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2352        // todo: validate via source manager
2353
2354        let mut associate_table_id = None;
2355
2356        // can be source_id or table_id
2357        // if updating an associated source, the preferred_id is the table_id
2358        // otherwise, it is the source_id
2359        let mut preferred_id = source_id.as_object_id();
2360        let rewrite_sql = {
2361            let definition = source.definition.clone();
2362
2363            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2364                .map_err(|e| {
2365                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2366                        anyhow!(e).context("Failed to parse source definition SQL"),
2367                    )))
2368                })?
2369                .try_into()
2370                .unwrap();
2371
2372            /// Formats SQL options with secret values properly resolved
2373            ///
2374            /// This function processes configuration options that may contain sensitive data:
2375            /// - Plaintext options are directly converted to `SqlOption`
2376            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2377            ///   without exposing the actual secret value
2378            ///
2379            /// # Arguments
2380            /// * `txn` - Database transaction for retrieving secrets
2381            /// * `options_with_secret` - Container of options with both plaintext and secret values
2382            ///
2383            /// # Returns
2384            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2385            async fn format_with_option_secret_resolved(
2386                txn: &DatabaseTransaction,
2387                options_with_secret: &WithOptionsSecResolved,
2388            ) -> MetaResult<Vec<SqlOption>> {
2389                let mut options = Vec::new();
2390                for (k, v) in options_with_secret.as_plaintext() {
2391                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2392                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2393                    options.push(sql_option);
2394                }
2395                for (k, v) in options_with_secret.as_secret() {
2396                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2397                        let sql_option =
2398                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2399                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2400                        options.push(sql_option);
2401                    } else {
2402                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2403                    }
2404                }
2405                Ok(options)
2406            }
2407
2408            match &mut stmt {
2409                Statement::CreateSource { stmt } => {
2410                    stmt.with_properties.0 =
2411                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2412                }
2413                Statement::CreateTable { with_options, .. } => {
2414                    *with_options =
2415                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2416                    associate_table_id = source.optional_associated_table_id;
2417                    preferred_id = associate_table_id.unwrap().as_object_id();
2418                }
2419                _ => unreachable!(),
2420            }
2421
2422            stmt.to_string()
2423        };
2424
2425        {
2426            // Update secret dependencies atomically within the transaction.
2427            // Add new dependencies for secrets that are newly referenced.
2428            if !to_add_secret_dep.is_empty() {
2429                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2430                    object_dependency::ActiveModel {
2431                        oid: Set(secret_id.into()),
2432                        used_by: Set(preferred_id),
2433                        ..Default::default()
2434                    }
2435                }))
2436                .exec(&txn)
2437                .await?;
2438            }
2439            // Remove dependencies for secrets that are no longer referenced.
2440            // This allows the secrets to be deleted after this source no longer uses them.
2441            if !to_remove_secret_dep.is_empty() {
2442                let _ = ObjectDependency::delete_many()
2443                    .filter(
2444                        object_dependency::Column::Oid
2445                            .is_in(to_remove_secret_dep)
2446                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2447                    )
2448                    .exec(&txn)
2449                    .await?;
2450            }
2451        }
2452
2453        let active_source_model = source::ActiveModel {
2454            source_id: Set(source_id),
2455            definition: Set(rewrite_sql.clone()),
2456            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2457            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2458                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2459            ..Default::default()
2460        };
2461        Source::update(active_source_model).exec(&txn).await?;
2462
2463        if let Some(associate_table_id) = associate_table_id {
2464            // update the associated table statement accordly
2465            let active_table_model = table::ActiveModel {
2466                table_id: Set(associate_table_id),
2467                definition: Set(rewrite_sql),
2468                ..Default::default()
2469            };
2470            Table::update(active_table_model).exec(&txn).await?;
2471        }
2472
2473        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2474            // if updating table with connector, the fragment_id is table id
2475            associate_table_id.as_job_id()
2476        } else {
2477            source_id.as_share_source_job_id()
2478        }]
2479        .into_iter()
2480        .chain(dep_source_job_ids.into_iter())
2481        .collect_vec();
2482
2483        // update fragments
2484        update_connector_props_fragments(
2485            &txn,
2486            to_check_job_ids,
2487            FragmentTypeFlag::Source,
2488            |node, found| {
2489                if let PbNodeBody::Source(node) = node
2490                    && let Some(source_inner) = &mut node.source_inner
2491                {
2492                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2493                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2494                    *found = true;
2495                }
2496            },
2497            is_shared_source,
2498        )
2499        .await?;
2500
2501        let mut to_update_objs = Vec::with_capacity(2);
2502        let (source, obj) = Source::find_by_id(source_id)
2503            .find_also_related(Object)
2504            .one(&txn)
2505            .await?
2506            .ok_or_else(|| {
2507                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2508            })?;
2509        to_update_objs.push(PbObject {
2510            object_info: Some(PbObjectInfo::Source(
2511                ObjectModel(source, obj.unwrap(), None).into(),
2512            )),
2513        });
2514
2515        if let Some(associate_table_id) = associate_table_id {
2516            let (table, obj) = Table::find_by_id(associate_table_id)
2517                .find_also_related(Object)
2518                .one(&txn)
2519                .await?
2520                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2521            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2522                .one(&txn)
2523                .await?;
2524            to_update_objs.push(PbObject {
2525                object_info: Some(PbObjectInfo::Table(
2526                    ObjectModel(table, obj.unwrap(), streaming_job).into(),
2527                )),
2528            });
2529        }
2530
2531        txn.commit().await?;
2532
2533        self.notify_frontend(
2534            NotificationOperation::Update,
2535            NotificationInfo::ObjectGroup(PbObjectGroup {
2536                objects: to_update_objs,
2537                dependencies: vec![],
2538            }),
2539        )
2540        .await;
2541
2542        Ok(options_with_secret)
2543    }
2544
2545    pub async fn update_sink_props_by_sink_id(
2546        &self,
2547        sink_id: SinkId,
2548        props: BTreeMap<String, String>,
2549    ) -> MetaResult<HashMap<String, String>> {
2550        let inner = self.inner.read().await;
2551        let txn = inner.db.begin().await?;
2552
2553        let (sink, _obj) = Sink::find_by_id(sink_id)
2554            .find_also_related(Object)
2555            .one(&txn)
2556            .await?
2557            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2558        validate_sink_props(&sink, &props)?;
2559        let definition = sink.definition.clone();
2560        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2561            .map_err(|e| SinkError::Config(anyhow!(e)))?
2562            .try_into()
2563            .unwrap();
2564        if let Statement::CreateSink { stmt } = &mut stmt {
2565            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2566        } else {
2567            panic!("definition is not a create sink statement")
2568        }
2569        let mut new_config = sink.properties.clone().into_inner();
2570        new_config.extend(props.clone());
2571
2572        let definition = stmt.to_string();
2573        let active_sink = sink::ActiveModel {
2574            sink_id: Set(sink_id),
2575            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2576            definition: Set(definition),
2577            ..Default::default()
2578        };
2579        Sink::update(active_sink).exec(&txn).await?;
2580
2581        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2582        let (sink, obj) = Sink::find_by_id(sink_id)
2583            .find_also_related(Object)
2584            .one(&txn)
2585            .await?
2586            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2587        let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2588            .one(&txn)
2589            .await?;
2590        txn.commit().await?;
2591        let relation_infos = vec![PbObject {
2592            object_info: Some(PbObjectInfo::Sink(
2593                ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2594            )),
2595        }];
2596
2597        let _version = self
2598            .notify_frontend(
2599                NotificationOperation::Update,
2600                NotificationInfo::ObjectGroup(PbObjectGroup {
2601                    objects: relation_infos,
2602                    dependencies: vec![],
2603                }),
2604            )
2605            .await;
2606
2607        Ok(props.into_iter().collect())
2608    }
2609
2610    pub async fn update_iceberg_table_props_by_table_id(
2611        &self,
2612        table_id: TableId,
2613        props: BTreeMap<String, String>,
2614        alter_iceberg_table_props: Option<
2615            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2616        >,
2617    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2618        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2619            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2620        let inner = self.inner.read().await;
2621        let txn = inner.db.begin().await?;
2622
2623        let (sink, _obj) = Sink::find_by_id(sink_id)
2624            .find_also_related(Object)
2625            .one(&txn)
2626            .await?
2627            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2628        validate_sink_props(&sink, &props)?;
2629
2630        let definition = sink.definition.clone();
2631        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2632            .map_err(|e| SinkError::Config(anyhow!(e)))?
2633            .try_into()
2634            .unwrap();
2635        if let Statement::CreateTable {
2636            with_options,
2637            engine,
2638            ..
2639        } = &mut stmt
2640        {
2641            if !matches!(engine, Engine::Iceberg) {
2642                return Err(SinkError::Config(anyhow!(
2643                    "only iceberg table can be altered as sink"
2644                ))
2645                .into());
2646            }
2647            update_stmt_with_props(with_options, &props)?;
2648        } else {
2649            panic!("definition is not a create iceberg table statement")
2650        }
2651        let mut new_config = sink.properties.clone().into_inner();
2652        new_config.extend(props.clone());
2653
2654        let definition = stmt.to_string();
2655        let active_sink = sink::ActiveModel {
2656            sink_id: Set(sink_id),
2657            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2658            definition: Set(definition.clone()),
2659            ..Default::default()
2660        };
2661        let active_source = source::ActiveModel {
2662            source_id: Set(source_id),
2663            definition: Set(definition.clone()),
2664            ..Default::default()
2665        };
2666        let active_table = table::ActiveModel {
2667            table_id: Set(table_id),
2668            definition: Set(definition),
2669            ..Default::default()
2670        };
2671        Sink::update(active_sink).exec(&txn).await?;
2672        Source::update(active_source).exec(&txn).await?;
2673        Table::update(active_table).exec(&txn).await?;
2674
2675        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2676
2677        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2678            .find_also_related(Object)
2679            .one(&txn)
2680            .await?
2681            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2682        let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2683            .one(&txn)
2684            .await?;
2685        let (source, source_obj) = Source::find_by_id(source_id)
2686            .find_also_related(Object)
2687            .one(&txn)
2688            .await?
2689            .ok_or_else(|| {
2690                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2691            })?;
2692        let (table, table_obj) = Table::find_by_id(table_id)
2693            .find_also_related(Object)
2694            .one(&txn)
2695            .await?
2696            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2697        let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2698            .one(&txn)
2699            .await?;
2700        txn.commit().await?;
2701        let relation_infos = vec![
2702            PbObject {
2703                object_info: Some(PbObjectInfo::Sink(
2704                    ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2705                )),
2706            },
2707            PbObject {
2708                object_info: Some(PbObjectInfo::Source(
2709                    ObjectModel(source, source_obj.unwrap(), None).into(),
2710                )),
2711            },
2712            PbObject {
2713                object_info: Some(PbObjectInfo::Table(
2714                    ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2715                )),
2716            },
2717        ];
2718        let _version = self
2719            .notify_frontend(
2720                NotificationOperation::Update,
2721                NotificationInfo::ObjectGroup(PbObjectGroup {
2722                    objects: relation_infos,
2723                    dependencies: vec![],
2724                }),
2725            )
2726            .await;
2727
2728        Ok((props.into_iter().collect(), sink_id))
2729    }
2730
2731    /// Update connection properties and all dependent sources/sinks in a single transaction
2732    pub async fn update_connection_and_dependent_objects_props(
2733        &self,
2734        connection_id: ConnectionId,
2735        alter_props: BTreeMap<String, String>,
2736        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2737    ) -> MetaResult<(
2738        WithOptionsSecResolved,                   // Connection's new properties
2739        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2740        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2741    )> {
2742        let inner = self.inner.read().await;
2743        let txn = inner.db.begin().await?;
2744
2745        // Find all dependent sources and sinks first
2746        let dependent_sources: Vec<SourceId> = Source::find()
2747            .select_only()
2748            .column(source::Column::SourceId)
2749            .filter(source::Column::ConnectionId.eq(connection_id))
2750            .into_tuple()
2751            .all(&txn)
2752            .await?;
2753
2754        let dependent_sinks: Vec<SinkId> = Sink::find()
2755            .select_only()
2756            .column(sink::Column::SinkId)
2757            .filter(sink::Column::ConnectionId.eq(connection_id))
2758            .into_tuple()
2759            .all(&txn)
2760            .await?;
2761
2762        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2763            .find_also_related(Object)
2764            .one(&txn)
2765            .await?
2766            .ok_or_else(|| {
2767                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2768            })?;
2769
2770        // Validate that props can be altered
2771        let prop_keys: Vec<String> = alter_props
2772            .keys()
2773            .chain(alter_secret_refs.keys())
2774            .cloned()
2775            .collect();
2776
2777        // Map the connection type enum to the string name expected by the validation function
2778        let connection_type_str = pb_connection_type_to_connection_type(
2779            &connection_catalog.params.to_protobuf().connection_type(),
2780        )
2781        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2782
2783        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2784            connection_type_str, &prop_keys,
2785        )?;
2786
2787        let connection_pb = connection_catalog.params.to_protobuf();
2788        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2789            connection_pb.properties.into_iter().collect(),
2790            connection_pb.secret_refs.into_iter().collect(),
2791        );
2792
2793        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2794            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2795
2796        tracing::debug!(
2797            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2798            connection_id,
2799            dependent_sources,
2800            dependent_sinks
2801        );
2802
2803        // Validate connection
2804        {
2805            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2806                connection_type: connection_pb.connection_type,
2807                properties: connection_options_with_secret
2808                    .as_plaintext()
2809                    .clone()
2810                    .into_iter()
2811                    .collect(),
2812                secret_refs: connection_options_with_secret
2813                    .as_secret()
2814                    .clone()
2815                    .into_iter()
2816                    .collect(),
2817            };
2818            let connection = PbConnection {
2819                id: connection_id as _,
2820                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2821                    conn_params_pb,
2822                )),
2823                ..Default::default()
2824            };
2825            validate_connection(&connection).await?;
2826        }
2827
2828        // Update connection secret dependencies
2829        if !to_add_secret_dep.is_empty() {
2830            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2831                object_dependency::ActiveModel {
2832                    oid: Set(secret_id.into()),
2833                    used_by: Set(connection_id.as_object_id()),
2834                    ..Default::default()
2835                }
2836            }))
2837            .exec(&txn)
2838            .await?;
2839        }
2840        if !to_remove_secret_dep.is_empty() {
2841            let _ = ObjectDependency::delete_many()
2842                .filter(
2843                    object_dependency::Column::Oid
2844                        .is_in(to_remove_secret_dep)
2845                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2846                )
2847                .exec(&txn)
2848                .await?;
2849        }
2850
2851        // Update the connection with new properties
2852        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2853            connection_type: connection_pb.connection_type,
2854            properties: connection_options_with_secret
2855                .as_plaintext()
2856                .clone()
2857                .into_iter()
2858                .collect(),
2859            secret_refs: connection_options_with_secret
2860                .as_secret()
2861                .clone()
2862                .into_iter()
2863                .collect(),
2864        };
2865        let active_connection_model = connection::ActiveModel {
2866            connection_id: Set(connection_id),
2867            params: Set(ConnectionParams::from(&updated_connection_params)),
2868            ..Default::default()
2869        };
2870        Connection::update(active_connection_model)
2871            .exec(&txn)
2872            .await?;
2873
2874        // Batch update dependent sources and collect their complete properties
2875        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2876
2877        if !dependent_sources.is_empty() {
2878            // Batch fetch all dependent sources
2879            let sources_with_objs = Source::find()
2880                .find_also_related(Object)
2881                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2882                .all(&txn)
2883                .await?;
2884
2885            // Prepare batch updates
2886            let mut source_updates = Vec::new();
2887            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2888
2889            for (source, _obj) in sources_with_objs {
2890                let source_id = source.source_id;
2891
2892                let mut source_options_with_secret = WithOptionsSecResolved::new(
2893                    source.with_properties.0.clone(),
2894                    source
2895                        .secret_ref
2896                        .clone()
2897                        .map(|secret_ref| secret_ref.to_protobuf())
2898                        .unwrap_or_default(),
2899                );
2900                let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2901                    source_options_with_secret
2902                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2903
2904                // Validate the updated source properties
2905                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2906
2907                // Prepare source update
2908                let active_source = source::ActiveModel {
2909                    source_id: Set(source_id),
2910                    with_properties: Set(Property(
2911                        source_options_with_secret.as_plaintext().clone(),
2912                    )),
2913                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2914                        || {
2915                            risingwave_meta_model::SecretRef::from(
2916                                source_options_with_secret.as_secret().clone(),
2917                            )
2918                        },
2919                    )),
2920                    ..Default::default()
2921                };
2922                source_updates.push(active_source);
2923
2924                // Prepare fragment update:
2925                // - If the source is a table-associated source, update fragments for the table job.
2926                // - Otherwise update the shared source job.
2927                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2928                let is_shared_source = source.is_shared();
2929                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2930                if !is_shared_source {
2931                    dep_source_job_ids = ObjectDependency::find()
2932                        .select_only()
2933                        .column(object_dependency::Column::UsedBy)
2934                        .filter(object_dependency::Column::Oid.eq(source_id))
2935                        .into_tuple()
2936                        .all(&txn)
2937                        .await?;
2938                }
2939
2940                let base_job_id =
2941                    if let Some(associate_table_id) = source.optional_associated_table_id {
2942                        associate_table_id.as_job_id()
2943                    } else {
2944                        source_id.as_share_source_job_id()
2945                    };
2946                let job_ids = vec![base_job_id]
2947                    .into_iter()
2948                    .chain(dep_source_job_ids.into_iter())
2949                    .collect_vec();
2950
2951                fragment_updates.push(DependentSourceFragmentUpdate {
2952                    job_ids,
2953                    with_properties: source_options_with_secret.as_plaintext().clone(),
2954                    secret_refs: source_options_with_secret.as_secret().clone(),
2955                    is_shared_source,
2956                });
2957
2958                // Collect the complete properties for runtime broadcast
2959                let complete_source_props = LocalSecretManager::global()
2960                    .fill_secrets(
2961                        source_options_with_secret.as_plaintext().clone(),
2962                        source_options_with_secret.as_secret().clone(),
2963                    )
2964                    .map_err(MetaError::from)?
2965                    .into_iter()
2966                    .collect::<HashMap<String, String>>();
2967                updated_sources_with_props.push((source_id, complete_source_props));
2968            }
2969
2970            for source_update in source_updates {
2971                Source::update(source_update).exec(&txn).await?;
2972            }
2973
2974            // Batch execute fragment updates
2975            for DependentSourceFragmentUpdate {
2976                job_ids,
2977                with_properties,
2978                secret_refs,
2979                is_shared_source,
2980            } in fragment_updates
2981            {
2982                update_connector_props_fragments(
2983                    &txn,
2984                    job_ids,
2985                    FragmentTypeFlag::Source,
2986                    |node, found| {
2987                        if let PbNodeBody::Source(node) = node
2988                            && let Some(source_inner) = &mut node.source_inner
2989                        {
2990                            source_inner.with_properties = with_properties.clone();
2991                            source_inner.secret_refs = secret_refs.clone();
2992                            *found = true;
2993                        }
2994                    },
2995                    is_shared_source,
2996                )
2997                .await?;
2998            }
2999        }
3000
3001        // Batch update dependent sinks and collect their complete properties
3002        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
3003
3004        if !dependent_sinks.is_empty() {
3005            // Batch fetch all dependent sinks
3006            let sinks_with_objs = Sink::find()
3007                .find_also_related(Object)
3008                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
3009                .all(&txn)
3010                .await?;
3011
3012            // Prepare batch updates
3013            let mut sink_updates = Vec::new();
3014            let mut sink_fragment_updates = Vec::new();
3015
3016            for (sink, _obj) in sinks_with_objs {
3017                let sink_id = sink.sink_id;
3018
3019                // Validate that sink props can be altered
3020                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3021                    Some(connector) => {
3022                        let connector_type = connector.to_lowercase();
3023                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
3024                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
3025
3026                        match_sink_name_str!(
3027                            connector_type.as_str(),
3028                            SinkType,
3029                            {
3030                                let mut new_sink_props = sink.properties.0.clone();
3031                                new_sink_props.extend(alter_props.clone());
3032                                SinkType::validate_alter_config(&new_sink_props)
3033                            },
3034                            |sink: &str| Err(SinkError::Config(anyhow!(
3035                                "unsupported sink type {}",
3036                                sink
3037                            )))
3038                        )?
3039                    }
3040                    None => {
3041                        return Err(SinkError::Config(anyhow!(
3042                            "connector not specified when alter sink"
3043                        ))
3044                        .into());
3045                    }
3046                };
3047
3048                let mut new_sink_props = sink.properties.0.clone();
3049                new_sink_props.extend(alter_props.clone());
3050
3051                // Prepare sink update
3052                let active_sink = sink::ActiveModel {
3053                    sink_id: Set(sink_id),
3054                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
3055                    ..Default::default()
3056                };
3057                sink_updates.push(active_sink);
3058
3059                // Prepare fragment updates for this sink
3060                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
3061
3062                // Collect the complete properties for runtime broadcast
3063                let complete_sink_props: HashMap<String, String> =
3064                    new_sink_props.into_iter().collect();
3065                updated_sinks_with_props.push((sink_id, complete_sink_props));
3066            }
3067
3068            // Batch execute sink updates
3069            for sink_update in sink_updates {
3070                Sink::update(sink_update).exec(&txn).await?;
3071            }
3072
3073            // Batch execute sink fragment updates using the reusable function
3074            for (sink_id, new_sink_props) in sink_fragment_updates {
3075                update_connector_props_fragments(
3076                    &txn,
3077                    vec![sink_id.as_job_id()],
3078                    FragmentTypeFlag::Sink,
3079                    |node, found| {
3080                        if let PbNodeBody::Sink(node) = node
3081                            && let Some(sink_desc) = &mut node.sink_desc
3082                            && sink_desc.id == sink_id.as_raw_id()
3083                        {
3084                            sink_desc.properties = new_sink_props.clone();
3085                            *found = true;
3086                        }
3087                    },
3088                    true,
3089                )
3090                .await?;
3091            }
3092        }
3093
3094        // Collect all updated objects for frontend notification
3095        let mut updated_objects = Vec::new();
3096
3097        // Add connection
3098        let (connection, obj) = Connection::find_by_id(connection_id)
3099            .find_also_related(Object)
3100            .one(&txn)
3101            .await?
3102            .ok_or_else(|| {
3103                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
3104            })?;
3105        updated_objects.push(PbObject {
3106            object_info: Some(PbObjectInfo::Connection(
3107                ObjectModel(connection, obj.unwrap(), None).into(),
3108            )),
3109        });
3110
3111        // Add sources
3112        for source_id in &dependent_sources {
3113            let (source, obj) = Source::find_by_id(*source_id)
3114                .find_also_related(Object)
3115                .one(&txn)
3116                .await?
3117                .ok_or_else(|| {
3118                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
3119                })?;
3120            updated_objects.push(PbObject {
3121                object_info: Some(PbObjectInfo::Source(
3122                    ObjectModel(source, obj.unwrap(), None).into(),
3123                )),
3124            });
3125        }
3126
3127        // Add sinks
3128        for sink_id in &dependent_sinks {
3129            let (sink, obj) = Sink::find_by_id(*sink_id)
3130                .find_also_related(Object)
3131                .one(&txn)
3132                .await?
3133                .ok_or_else(|| {
3134                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
3135                })?;
3136            let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
3137                .one(&txn)
3138                .await?;
3139            updated_objects.push(PbObject {
3140                object_info: Some(PbObjectInfo::Sink(
3141                    ObjectModel(sink, obj.unwrap(), streaming_job).into(),
3142                )),
3143            });
3144        }
3145
3146        // Commit the transaction
3147        txn.commit().await?;
3148
3149        // Notify frontend about all updated objects
3150        if !updated_objects.is_empty() {
3151            self.notify_frontend(
3152                NotificationOperation::Update,
3153                NotificationInfo::ObjectGroup(PbObjectGroup {
3154                    objects: updated_objects,
3155                    dependencies: vec![],
3156                }),
3157            )
3158            .await;
3159        }
3160
3161        Ok((
3162            connection_options_with_secret,
3163            updated_sources_with_props,
3164            updated_sinks_with_props,
3165        ))
3166    }
3167
3168    pub async fn update_fragment_rate_limit_by_fragment_id(
3169        &self,
3170        fragment_id: FragmentId,
3171        rate_limit: Option<u32>,
3172    ) -> MetaResult<()> {
3173        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
3174                                 stream_node: &mut PbStreamNode| {
3175            let mut found = false;
3176            if fragment_type_mask.contains_any(
3177                FragmentTypeFlag::dml_rate_limit_fragments()
3178                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
3179                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
3180                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
3181            ) {
3182                visit_stream_node_mut(stream_node, |node| {
3183                    if let PbNodeBody::Dml(node) = node {
3184                        node.rate_limit = rate_limit;
3185                        found = true;
3186                    }
3187                    if let PbNodeBody::Sink(node) = node {
3188                        node.rate_limit = rate_limit;
3189                        found = true;
3190                    }
3191                    if let PbNodeBody::StreamCdcScan(node) = node {
3192                        node.rate_limit = rate_limit;
3193                        found = true;
3194                    }
3195                    if let PbNodeBody::StreamScan(node) = node {
3196                        node.rate_limit = rate_limit;
3197                        found = true;
3198                    }
3199                    if let PbNodeBody::SourceBackfill(node) = node {
3200                        node.rate_limit = rate_limit;
3201                        found = true;
3202                    }
3203                });
3204            }
3205            found
3206        };
3207        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
3208            .await
3209    }
3210
3211    /// Note: `FsFetch` created in old versions are not included.
3212    /// Since this is only used for debugging, it should be fine.
3213    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
3214        let inner = self.inner.read().await;
3215        let txn = inner.db.begin().await?;
3216
3217        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
3218            .select_only()
3219            .columns([
3220                fragment::Column::FragmentId,
3221                fragment::Column::JobId,
3222                fragment::Column::FragmentTypeMask,
3223                fragment::Column::StreamNode,
3224            ])
3225            .filter(FragmentTypeMask::intersects_any(
3226                FragmentTypeFlag::rate_limit_fragments(),
3227            ))
3228            .into_tuple()
3229            .all(&txn)
3230            .await?;
3231
3232        let mut rate_limits = Vec::new();
3233        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
3234            let stream_node = stream_node.to_protobuf();
3235            visit_stream_node_body(&stream_node, |node| {
3236                let mut rate_limit = None;
3237                let mut node_name = None;
3238
3239                match node {
3240                    // source rate limit
3241                    PbNodeBody::Source(node) => {
3242                        if let Some(node_inner) = &node.source_inner {
3243                            rate_limit = node_inner.rate_limit;
3244                            node_name = Some("SOURCE");
3245                        }
3246                    }
3247                    PbNodeBody::StreamFsFetch(node) => {
3248                        if let Some(node_inner) = &node.node_inner {
3249                            rate_limit = node_inner.rate_limit;
3250                            node_name = Some("FS_FETCH");
3251                        }
3252                    }
3253                    // backfill rate limit
3254                    PbNodeBody::SourceBackfill(node) => {
3255                        rate_limit = node.rate_limit;
3256                        node_name = Some("SOURCE_BACKFILL");
3257                    }
3258                    PbNodeBody::StreamScan(node) => {
3259                        rate_limit = node.rate_limit;
3260                        node_name = Some("STREAM_SCAN");
3261                    }
3262                    PbNodeBody::StreamCdcScan(node) => {
3263                        rate_limit = node.rate_limit;
3264                        node_name = Some("STREAM_CDC_SCAN");
3265                    }
3266                    PbNodeBody::Sink(node) => {
3267                        rate_limit = node.rate_limit;
3268                        node_name = Some("SINK");
3269                    }
3270                    _ => {}
3271                }
3272
3273                if let Some(rate_limit) = rate_limit {
3274                    rate_limits.push(RateLimitInfo {
3275                        fragment_id,
3276                        job_id,
3277                        fragment_type_mask: fragment_type_mask as u32,
3278                        rate_limit,
3279                        node_name: node_name.unwrap().to_owned(),
3280                    });
3281                }
3282            });
3283        }
3284
3285        Ok(rate_limits)
3286    }
3287}
3288
3289fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3290    // Validate that props can be altered
3291    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3292        Some(connector) => {
3293            let connector_type = connector.to_lowercase();
3294            let field_names: Vec<String> = props.keys().cloned().collect();
3295            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3296                .map_err(|e| SinkError::Config(anyhow!(e)))?;
3297
3298            match_sink_name_str!(
3299                connector_type.as_str(),
3300                SinkType,
3301                {
3302                    let mut new_props = sink.properties.0.clone();
3303                    new_props.extend(props.clone());
3304                    SinkType::validate_alter_config(&new_props)
3305                },
3306                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3307            )?
3308        }
3309        None => {
3310            return Err(
3311                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3312            );
3313        }
3314    };
3315    Ok(())
3316}
3317
3318fn update_stmt_with_props(
3319    with_properties: &mut Vec<SqlOption>,
3320    props: &BTreeMap<String, String>,
3321) -> MetaResult<()> {
3322    let mut new_sql_options = with_properties
3323        .iter()
3324        .map(|sql_option| (&sql_option.name, sql_option))
3325        .collect::<IndexMap<_, _>>();
3326    let add_sql_options = props
3327        .iter()
3328        .map(|(k, v)| SqlOption::try_from((k, v)))
3329        .collect::<Result<Vec<SqlOption>, ParserError>>()
3330        .map_err(|e| SinkError::Config(anyhow!(e)))?;
3331    new_sql_options.extend(
3332        add_sql_options
3333            .iter()
3334            .map(|sql_option| (&sql_option.name, sql_option)),
3335    );
3336    *with_properties = new_sql_options.into_values().cloned().collect();
3337    Ok(())
3338}
3339
3340async fn update_sink_fragment_props(
3341    txn: &DatabaseTransaction,
3342    sink_id: SinkId,
3343    props: BTreeMap<String, String>,
3344) -> MetaResult<()> {
3345    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3346        .select_only()
3347        .columns([
3348            fragment::Column::FragmentId,
3349            fragment::Column::FragmentTypeMask,
3350            fragment::Column::StreamNode,
3351        ])
3352        .filter(fragment::Column::JobId.eq(sink_id))
3353        .into_tuple()
3354        .all(txn)
3355        .await?;
3356    let fragments = fragments
3357        .into_iter()
3358        .filter(|(_, fragment_type_mask, _)| {
3359            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3360        })
3361        .filter_map(|(id, _, stream_node)| {
3362            let mut stream_node = stream_node.to_protobuf();
3363            let mut found = false;
3364            visit_stream_node_mut(&mut stream_node, |node| {
3365                if let PbNodeBody::Sink(node) = node
3366                    && let Some(sink_desc) = &mut node.sink_desc
3367                    && sink_desc.id == sink_id
3368                {
3369                    sink_desc.properties.extend(props.clone());
3370                    found = true;
3371                }
3372            });
3373            if found { Some((id, stream_node)) } else { None }
3374        })
3375        .collect_vec();
3376    assert!(
3377        !fragments.is_empty(),
3378        "sink id should be used by at least one fragment"
3379    );
3380    for (id, stream_node) in fragments {
3381        Fragment::update(fragment::ActiveModel {
3382            fragment_id: Set(id),
3383            stream_node: Set(StreamNode::from(&stream_node)),
3384            ..Default::default()
3385        })
3386        .exec(txn)
3387        .await?;
3388    }
3389    Ok(())
3390}
3391
3392pub struct SinkIntoTableContext {
3393    /// For alter table (e.g., add column), this is the list of existing sink ids
3394    /// otherwise empty.
3395    pub updated_sink_catalogs: Vec<SinkId>,
3396}
3397
3398pub struct FinishAutoRefreshSchemaSinkContext {
3399    pub tmp_sink_id: SinkId,
3400    pub original_sink_id: SinkId,
3401    pub columns: Vec<PbColumnCatalog>,
3402    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3403}
3404
3405async fn update_connector_props_fragments<F>(
3406    txn: &DatabaseTransaction,
3407    job_ids: Vec<JobId>,
3408    expect_flag: FragmentTypeFlag,
3409    mut alter_stream_node_fn: F,
3410    is_shared_source: bool,
3411) -> MetaResult<()>
3412where
3413    F: FnMut(&mut PbNodeBody, &mut bool),
3414{
3415    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3416        .select_only()
3417        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3418        .filter(
3419            fragment::Column::JobId
3420                .is_in(job_ids.clone())
3421                .and(FragmentTypeMask::intersects(expect_flag)),
3422        )
3423        .into_tuple()
3424        .all(txn)
3425        .await?;
3426    let fragments = fragments
3427        .into_iter()
3428        .filter_map(|(id, stream_node)| {
3429            let mut stream_node = stream_node.to_protobuf();
3430            let mut found = false;
3431            visit_stream_node_mut(&mut stream_node, |node| {
3432                alter_stream_node_fn(node, &mut found);
3433            });
3434            if found { Some((id, stream_node)) } else { None }
3435        })
3436        .collect_vec();
3437    if is_shared_source || job_ids.len() > 1 {
3438        // the first element is the source_id or associated table_id
3439        // if the source is non-shared, there is no updated fragments
3440        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3441        assert!(
3442            !fragments.is_empty(),
3443            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3444            job_ids,
3445            expect_flag
3446        );
3447    }
3448
3449    for (id, stream_node) in fragments {
3450        Fragment::update(fragment::ActiveModel {
3451            fragment_id: Set(id),
3452            stream_node: Set(StreamNode::from(&stream_node)),
3453            ..Default::default()
3454        })
3455        .exec(txn)
3456        .await?;
3457    }
3458
3459    Ok(())
3460}