risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::{
28    visit_stream_node_body, visit_stream_node_mut,
29};
30use risingwave_common::{bail, current_cluster_version};
31use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
32use risingwave_connector::connector_common::validate_connection;
33use risingwave_connector::error::ConnectorError;
34use risingwave_connector::sink::file_sink::fs::FsSink;
35use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
36use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
37use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
40use risingwave_meta_model::refresh_job::RefreshState;
41use risingwave_meta_model::table::TableType;
42use risingwave_meta_model::user_privilege::Action;
43use risingwave_meta_model::*;
44use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
45use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
46use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
47use risingwave_pb::meta::object::PbObjectInfo;
48use risingwave_pb::meta::subscribe_response::{
49    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
50};
51use risingwave_pb::meta::{PbObject, PbObjectGroup};
52use risingwave_pb::plan_common::PbColumnCatalog;
53use risingwave_pb::plan_common::source_refresh_mode::{
54    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
55};
56use risingwave_pb::secret::PbSecretRef;
57use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
58use risingwave_pb::stream_plan::stream_node::PbNodeBody;
59use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
60use risingwave_pb::user::PbUserInfo;
61use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
62use risingwave_sqlparser::parser::{Parser, ParserError};
63use sea_orm::ActiveValue::Set;
64use sea_orm::sea_query::{Expr, Query, SimpleExpr};
65use sea_orm::{
66    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
67    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
68};
69use thiserror_ext::AsReport;
70
71use super::rename::IndexItemRewriter;
72use crate::barrier::{Command, SharedFragmentInfo};
73use crate::controller::ObjectModel;
74use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
75use crate::controller::fragment::FragmentTypeMaskExt;
76use crate::controller::utils::{
77    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
78    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
79    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
80    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
81    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
82};
83use crate::error::MetaErrorInner;
84use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
85use crate::model::{
86    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
87    StreamJobFragmentsToCreate,
88};
89use crate::stream::SplitAssignment;
90use crate::{MetaError, MetaResult};
91
92/// A planned fragment update for dependent sources when a connection's properties change.
93///
94/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
95/// `type_complexity` warnings.
96#[derive(Debug)]
97struct DependentSourceFragmentUpdate {
98    job_ids: Vec<JobId>,
99    with_properties: BTreeMap<String, String>,
100    secret_refs: BTreeMap<String, PbSecretRef>,
101    is_shared_source: bool,
102}
103
104impl CatalogController {
105    pub async fn create_streaming_job_obj(
106        txn: &DatabaseTransaction,
107        obj_type: ObjectType,
108        owner_id: UserId,
109        database_id: Option<DatabaseId>,
110        schema_id: Option<SchemaId>,
111        create_type: PbCreateType,
112        ctx: StreamContext,
113        streaming_parallelism: StreamingParallelism,
114        max_parallelism: usize,
115        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
116    ) -> MetaResult<JobId> {
117        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
118        let job_id = obj.oid.as_job_id();
119        let job = streaming_job::ActiveModel {
120            job_id: Set(job_id),
121            job_status: Set(JobStatus::Initial),
122            create_type: Set(create_type.into()),
123            timezone: Set(ctx.timezone),
124            config_override: Set(Some(ctx.config_override.to_string())),
125            parallelism: Set(streaming_parallelism),
126            max_parallelism: Set(max_parallelism as _),
127            specific_resource_group: Set(specific_resource_group),
128        };
129        job.insert(txn).await?;
130
131        Ok(job_id)
132    }
133
134    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
135    /// materialized view.
136    ///
137    /// Some of the fields in the given streaming job are placeholders, which will
138    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
139    #[await_tree::instrument]
140    pub async fn create_job_catalog(
141        &self,
142        streaming_job: &mut StreamingJob,
143        ctx: &StreamContext,
144        parallelism: &Option<Parallelism>,
145        max_parallelism: usize,
146        mut dependencies: HashSet<ObjectId>,
147        specific_resource_group: Option<String>,
148    ) -> MetaResult<()> {
149        let inner = self.inner.write().await;
150        let txn = inner.db.begin().await?;
151        let create_type = streaming_job.create_type();
152
153        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
154            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
155            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
156            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
157        };
158
159        ensure_user_id(streaming_job.owner() as _, &txn).await?;
160        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
161        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
162        check_relation_name_duplicate(
163            &streaming_job.name(),
164            streaming_job.database_id(),
165            streaming_job.schema_id(),
166            &txn,
167        )
168        .await?;
169
170        // check if any dependency is in altering status.
171        if !dependencies.is_empty() {
172            let altering_cnt = ObjectDependency::find()
173                .join(
174                    JoinType::InnerJoin,
175                    object_dependency::Relation::Object1.def(),
176                )
177                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
178                .filter(
179                    object_dependency::Column::Oid
180                        .is_in(dependencies.clone())
181                        .and(object::Column::ObjType.eq(ObjectType::Table))
182                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
183                        .and(
184                            // It means the referring table is just dummy for altering.
185                            object::Column::Oid.not_in_subquery(
186                                Query::select()
187                                    .column(table::Column::TableId)
188                                    .from(Table)
189                                    .to_owned(),
190                            ),
191                        ),
192                )
193                .count(&txn)
194                .await?;
195            if altering_cnt != 0 {
196                return Err(MetaError::permission_denied(
197                    "some dependent relations are being altered",
198                ));
199            }
200        }
201
202        match streaming_job {
203            StreamingJob::MaterializedView(table) => {
204                let job_id = Self::create_streaming_job_obj(
205                    &txn,
206                    ObjectType::Table,
207                    table.owner as _,
208                    Some(table.database_id),
209                    Some(table.schema_id),
210                    create_type,
211                    ctx.clone(),
212                    streaming_parallelism,
213                    max_parallelism,
214                    specific_resource_group,
215                )
216                .await?;
217                table.id = job_id.as_mv_table_id();
218                let table_model: table::ActiveModel = table.clone().into();
219                Table::insert(table_model).exec(&txn).await?;
220            }
221            StreamingJob::Sink(sink) => {
222                if let Some(target_table_id) = sink.target_table
223                    && check_sink_into_table_cycle(
224                        target_table_id.into(),
225                        dependencies.iter().cloned().collect(),
226                        &txn,
227                    )
228                    .await?
229                {
230                    bail!("Creating such a sink will result in circular dependency.");
231                }
232
233                let job_id = Self::create_streaming_job_obj(
234                    &txn,
235                    ObjectType::Sink,
236                    sink.owner as _,
237                    Some(sink.database_id),
238                    Some(sink.schema_id),
239                    create_type,
240                    ctx.clone(),
241                    streaming_parallelism,
242                    max_parallelism,
243                    specific_resource_group,
244                )
245                .await?;
246                sink.id = job_id.as_sink_id();
247                let sink_model: sink::ActiveModel = sink.clone().into();
248                Sink::insert(sink_model).exec(&txn).await?;
249            }
250            StreamingJob::Table(src, table, _) => {
251                let job_id = Self::create_streaming_job_obj(
252                    &txn,
253                    ObjectType::Table,
254                    table.owner as _,
255                    Some(table.database_id),
256                    Some(table.schema_id),
257                    create_type,
258                    ctx.clone(),
259                    streaming_parallelism,
260                    max_parallelism,
261                    specific_resource_group,
262                )
263                .await?;
264                table.id = job_id.as_mv_table_id();
265                if let Some(src) = src {
266                    let src_obj = Self::create_object(
267                        &txn,
268                        ObjectType::Source,
269                        src.owner as _,
270                        Some(src.database_id),
271                        Some(src.schema_id),
272                    )
273                    .await?;
274                    src.id = src_obj.oid.as_source_id();
275                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
276                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
277                    let source: source::ActiveModel = src.clone().into();
278                    Source::insert(source).exec(&txn).await?;
279                }
280                let table_model: table::ActiveModel = table.clone().into();
281                Table::insert(table_model).exec(&txn).await?;
282
283                if table.refreshable {
284                    let trigger_interval_secs = src
285                        .as_ref()
286                        .and_then(|source_catalog| source_catalog.refresh_mode)
287                        .and_then(
288                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
289                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
290                                    refresh_interval_sec,
291                                })) => refresh_interval_sec,
292                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
293                                None => None,
294                            },
295                        );
296
297                    RefreshJob::insert(refresh_job::ActiveModel {
298                        table_id: Set(table.id),
299                        last_trigger_time: Set(None),
300                        trigger_interval_secs: Set(trigger_interval_secs),
301                        current_status: Set(RefreshState::Idle),
302                        last_success_time: Set(None),
303                    })
304                    .exec(&txn)
305                    .await?;
306                }
307            }
308            StreamingJob::Index(index, table) => {
309                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
310                let job_id = Self::create_streaming_job_obj(
311                    &txn,
312                    ObjectType::Index,
313                    index.owner as _,
314                    Some(index.database_id),
315                    Some(index.schema_id),
316                    create_type,
317                    ctx.clone(),
318                    streaming_parallelism,
319                    max_parallelism,
320                    specific_resource_group,
321                )
322                .await?;
323                // to be compatible with old implementation.
324                index.id = job_id.as_index_id();
325                index.index_table_id = job_id.as_mv_table_id();
326                table.id = job_id.as_mv_table_id();
327
328                ObjectDependency::insert(object_dependency::ActiveModel {
329                    oid: Set(index.primary_table_id.into()),
330                    used_by: Set(table.id.into()),
331                    ..Default::default()
332                })
333                .exec(&txn)
334                .await?;
335
336                let table_model: table::ActiveModel = table.clone().into();
337                Table::insert(table_model).exec(&txn).await?;
338                let index_model: index::ActiveModel = index.clone().into();
339                Index::insert(index_model).exec(&txn).await?;
340            }
341            StreamingJob::Source(src) => {
342                let job_id = Self::create_streaming_job_obj(
343                    &txn,
344                    ObjectType::Source,
345                    src.owner as _,
346                    Some(src.database_id),
347                    Some(src.schema_id),
348                    create_type,
349                    ctx.clone(),
350                    streaming_parallelism,
351                    max_parallelism,
352                    specific_resource_group,
353                )
354                .await?;
355                src.id = job_id.as_shared_source_id();
356                let source_model: source::ActiveModel = src.clone().into();
357                Source::insert(source_model).exec(&txn).await?;
358            }
359        }
360
361        // collect dependent secrets.
362        dependencies.extend(
363            streaming_job
364                .dependent_secret_ids()?
365                .into_iter()
366                .map(|id| id.as_object_id()),
367        );
368        // collect dependent connection
369        dependencies.extend(
370            streaming_job
371                .dependent_connection_ids()?
372                .into_iter()
373                .map(|id| id.as_object_id()),
374        );
375
376        // record object dependency.
377        if !dependencies.is_empty() {
378            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
379                object_dependency::ActiveModel {
380                    oid: Set(oid),
381                    used_by: Set(streaming_job.id().as_object_id()),
382                    ..Default::default()
383                }
384            }))
385            .exec(&txn)
386            .await?;
387        }
388
389        txn.commit().await?;
390
391        Ok(())
392    }
393
394    /// Create catalogs for internal tables, then notify frontend about them if the job is a
395    /// materialized view.
396    ///
397    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
398    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
399    ///
400    /// Returns a mapping from the temporary table id to the actual global table id.
401    pub async fn create_internal_table_catalog(
402        &self,
403        job: &StreamingJob,
404        mut incomplete_internal_tables: Vec<PbTable>,
405    ) -> MetaResult<HashMap<TableId, TableId>> {
406        let job_id = job.id();
407        let inner = self.inner.write().await;
408        let txn = inner.db.begin().await?;
409
410        // Ensure the job exists.
411        ensure_job_not_canceled(job_id, &txn).await?;
412
413        let mut table_id_map = HashMap::new();
414        for table in &mut incomplete_internal_tables {
415            let table_id = Self::create_object(
416                &txn,
417                ObjectType::Table,
418                table.owner as _,
419                Some(table.database_id),
420                Some(table.schema_id),
421            )
422            .await?
423            .oid
424            .as_table_id();
425            table_id_map.insert(table.id, table_id);
426            table.id = table_id;
427            table.job_id = Some(job_id);
428
429            let table_model = table::ActiveModel {
430                table_id: Set(table_id),
431                belongs_to_job_id: Set(Some(job_id)),
432                fragment_id: NotSet,
433                ..table.clone().into()
434            };
435            Table::insert(table_model).exec(&txn).await?;
436        }
437        txn.commit().await?;
438
439        Ok(table_id_map)
440    }
441
442    pub async fn prepare_stream_job_fragments(
443        &self,
444        stream_job_fragments: &StreamJobFragmentsToCreate,
445        streaming_job: &StreamingJob,
446        for_replace: bool,
447    ) -> MetaResult<()> {
448        self.prepare_streaming_job(
449            stream_job_fragments.stream_job_id(),
450            || stream_job_fragments.fragments.values(),
451            &stream_job_fragments.downstreams,
452            for_replace,
453            Some(streaming_job),
454        )
455        .await
456    }
457
458    // TODO: In this function, we also update the `Table` model in the meta store.
459    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
460    // making them the source of truth and performing a full replacement for those in the meta store?
461    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
462    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
463    )]
464    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
465        &self,
466        job_id: JobId,
467        get_fragments: impl Fn() -> I + 'a,
468        downstreams: &FragmentDownstreamRelation,
469        for_replace: bool,
470        creating_streaming_job: Option<&'a StreamingJob>,
471    ) -> MetaResult<()> {
472        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
473
474        let inner = self.inner.write().await;
475
476        let need_notify = creating_streaming_job
477            .map(|job| job.should_notify_creating())
478            .unwrap_or(false);
479        let definition = creating_streaming_job.map(|job| job.definition());
480
481        let mut objects_to_notify = vec![];
482        let txn = inner.db.begin().await?;
483
484        // Ensure the job exists.
485        ensure_job_not_canceled(job_id, &txn).await?;
486
487        for fragment in fragments {
488            let fragment_id = fragment.fragment_id;
489            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
490
491            let fragment = fragment.into_active_model();
492            Fragment::insert(fragment).exec(&txn).await?;
493
494            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
495            // After table fragments are created, update them for all tables.
496            if !for_replace {
497                let all_tables = StreamJobFragments::collect_tables(get_fragments());
498                for state_table_id in state_table_ids {
499                    // Table's vnode count is not always the fragment's vnode count, so we have to
500                    // look up the table from `TableFragments`.
501                    // See `ActorGraphBuilder::new`.
502                    let table = all_tables
503                        .get(&state_table_id)
504                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
505                    assert_eq!(table.id, state_table_id);
506                    assert_eq!(table.fragment_id, fragment_id);
507                    let vnode_count = table.vnode_count();
508
509                    table::ActiveModel {
510                        table_id: Set(state_table_id as _),
511                        fragment_id: Set(Some(fragment_id)),
512                        vnode_count: Set(vnode_count as _),
513                        ..Default::default()
514                    }
515                    .update(&txn)
516                    .await?;
517
518                    if need_notify {
519                        let mut table = table.clone();
520                        // In production, definition was replaced but still needed for notification.
521                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
522                            table.definition = definition.clone().unwrap();
523                        }
524                        objects_to_notify.push(PbObject {
525                            object_info: Some(PbObjectInfo::Table(table)),
526                        });
527                    }
528                }
529            }
530        }
531
532        // Add streaming job objects to notification
533        if need_notify {
534            match creating_streaming_job.unwrap() {
535                StreamingJob::Table(Some(source), ..) => {
536                    objects_to_notify.push(PbObject {
537                        object_info: Some(PbObjectInfo::Source(source.clone())),
538                    });
539                }
540                StreamingJob::Sink(sink) => {
541                    objects_to_notify.push(PbObject {
542                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
543                    });
544                }
545                StreamingJob::Index(index, _) => {
546                    objects_to_notify.push(PbObject {
547                        object_info: Some(PbObjectInfo::Index(index.clone())),
548                    });
549                }
550                _ => {}
551            }
552        }
553
554        insert_fragment_relations(&txn, downstreams).await?;
555
556        if !for_replace {
557            // Update dml fragment id.
558            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
559                Table::update(table::ActiveModel {
560                    table_id: Set(table.id),
561                    dml_fragment_id: Set(table.dml_fragment_id),
562                    ..Default::default()
563                })
564                .exec(&txn)
565                .await?;
566            }
567        }
568
569        txn.commit().await?;
570
571        // FIXME: there's a gap between the catalog creation and notification, which may lead to
572        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
573        // need to either forward the notification or filter catalogs without any fragments in the metastore.
574        if !objects_to_notify.is_empty() {
575            self.notify_frontend(
576                Operation::Add,
577                Info::ObjectGroup(PbObjectGroup {
578                    objects: objects_to_notify,
579                }),
580            )
581            .await;
582        }
583
584        Ok(())
585    }
586
587    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
588    /// information is required to build barrier mutation.
589    pub async fn build_cancel_command(
590        &self,
591        table_fragments: &StreamJobFragments,
592    ) -> MetaResult<Command> {
593        let inner = self.inner.read().await;
594        let txn = inner.db.begin().await?;
595
596        let dropped_sink_fragment_with_target =
597            if let Some(sink_fragment) = table_fragments.sink_fragment() {
598                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
599                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
600                sink_target_fragment
601                    .get(&sink_fragment_id)
602                    .map(|target_fragments| {
603                        let target_fragment_id = *target_fragments
604                            .first()
605                            .expect("sink should have at least one downstream fragment");
606                        (sink_fragment_id, target_fragment_id)
607                    })
608            } else {
609                None
610            };
611
612        Ok(Command::DropStreamingJobs {
613            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
614            actors: table_fragments.actor_ids().collect(),
615            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
616            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
617            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
618                .into_iter()
619                .map(|(sink, target)| (target as _, vec![sink as _]))
620                .collect(),
621        })
622    }
623
624    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
625    /// It returns (true, _) if the job is not found or aborted.
626    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
627    #[await_tree::instrument]
628    pub async fn try_abort_creating_streaming_job(
629        &self,
630        mut job_id: JobId,
631        is_cancelled: bool,
632    ) -> MetaResult<(bool, Option<DatabaseId>)> {
633        let mut inner = self.inner.write().await;
634        let txn = inner.db.begin().await?;
635
636        let obj = Object::find_by_id(job_id).one(&txn).await?;
637        let Some(obj) = obj else {
638            tracing::warn!(
639                id = %job_id,
640                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
641            );
642            return Ok((true, None));
643        };
644        let database_id = obj
645            .database_id
646            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
647        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
648
649        if !is_cancelled && let Some(streaming_job) = &streaming_job {
650            assert_ne!(streaming_job.job_status, JobStatus::Created);
651            if streaming_job.create_type == CreateType::Background
652                && streaming_job.job_status == JobStatus::Creating
653            {
654                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
655                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
656                {
657                    // If the job belongs to an iceberg table, we still need to clean it.
658                } else {
659                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
660                    tracing::warn!(
661                        id = %job_id,
662                        "streaming job is created in background and still in creating status"
663                    );
664                    return Ok((false, Some(database_id)));
665                }
666            }
667        }
668
669        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
670        let original_job_id = job_id;
671        let original_obj_type = obj.obj_type;
672
673        let iceberg_table_id =
674            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
675        if let Some(iceberg_table_id) = iceberg_table_id {
676            // If the job is iceberg sink, we need to clean the iceberg table as well.
677            // Here we will drop the sink objects directly.
678            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
679            Object::delete_many()
680                .filter(
681                    object::Column::Oid
682                        .eq(job_id)
683                        .or(object::Column::Oid.is_in(internal_tables)),
684                )
685                .exec(&txn)
686                .await?;
687            job_id = iceberg_table_id.as_job_id();
688        };
689
690        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
691
692        // Get the notification info if the job is a materialized view or created in the background.
693        let mut objs = vec![];
694        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
695
696        let mut need_notify =
697            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
698        if !need_notify {
699            if let Some(table) = &table_obj {
700                need_notify = table.table_type == TableType::MaterializedView;
701            } else if original_obj_type == ObjectType::Sink {
702                need_notify = true;
703            }
704        }
705
706        if is_cancelled {
707            let dropped_tables = Table::find()
708                .find_also_related(Object)
709                .filter(
710                    table::Column::TableId.is_in(
711                        internal_table_ids
712                            .iter()
713                            .cloned()
714                            .chain(table_obj.iter().map(|t| t.table_id as _)),
715                    ),
716                )
717                .all(&txn)
718                .await?
719                .into_iter()
720                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
721            inner
722                .dropped_tables
723                .extend(dropped_tables.map(|t| (t.id, t)));
724        }
725
726        if need_notify {
727            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
728            // Ensure we still notify the frontend to delete the original sink object.
729            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
730                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
731                    .select_only()
732                    .columns([
733                        object::Column::Oid,
734                        object::Column::ObjType,
735                        object::Column::SchemaId,
736                        object::Column::DatabaseId,
737                    ])
738                    .into_partial_model()
739                    .one(&txn)
740                    .await?;
741                if let Some(orig_obj) = orig_obj {
742                    objs.push(orig_obj);
743                }
744            }
745
746            let obj: Option<PartialObject> = Object::find_by_id(job_id)
747                .select_only()
748                .columns([
749                    object::Column::Oid,
750                    object::Column::ObjType,
751                    object::Column::SchemaId,
752                    object::Column::DatabaseId,
753                ])
754                .into_partial_model()
755                .one(&txn)
756                .await?;
757            let obj =
758                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
759            objs.push(obj);
760            let internal_table_objs: Vec<PartialObject> = Object::find()
761                .select_only()
762                .columns([
763                    object::Column::Oid,
764                    object::Column::ObjType,
765                    object::Column::SchemaId,
766                    object::Column::DatabaseId,
767                ])
768                .join(JoinType::InnerJoin, object::Relation::Table.def())
769                .filter(table::Column::BelongsToJobId.eq(job_id))
770                .into_partial_model()
771                .all(&txn)
772                .await?;
773            objs.extend(internal_table_objs);
774        }
775
776        // Check if the job is creating sink into table.
777        if table_obj.is_none()
778            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
779                .select_only()
780                .column(sink::Column::TargetTable)
781                .into_tuple::<Option<TableId>>()
782                .one(&txn)
783                .await?
784        {
785            let tmp_id: Option<ObjectId> = ObjectDependency::find()
786                .select_only()
787                .column(object_dependency::Column::UsedBy)
788                .join(
789                    JoinType::InnerJoin,
790                    object_dependency::Relation::Object1.def(),
791                )
792                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
793                .filter(
794                    object_dependency::Column::Oid
795                        .eq(target_table_id)
796                        .and(object::Column::ObjType.eq(ObjectType::Table))
797                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
798                )
799                .into_tuple()
800                .one(&txn)
801                .await?;
802            if let Some(tmp_id) = tmp_id {
803                tracing::warn!(
804                    id = %tmp_id,
805                    "aborting temp streaming job for sink into table"
806                );
807
808                Object::delete_by_id(tmp_id).exec(&txn).await?;
809            }
810        }
811
812        Object::delete_by_id(job_id).exec(&txn).await?;
813        if !internal_table_ids.is_empty() {
814            Object::delete_many()
815                .filter(object::Column::Oid.is_in(internal_table_ids))
816                .exec(&txn)
817                .await?;
818        }
819        if let Some(t) = &table_obj
820            && let Some(source_id) = t.optional_associated_source_id
821        {
822            Object::delete_by_id(source_id).exec(&txn).await?;
823        }
824
825        let err = if is_cancelled {
826            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
827        } else {
828            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
829        };
830        let abort_reason = format!("streaming job aborted {}", err.as_report());
831        for tx in inner
832            .creating_table_finish_notifier
833            .get_mut(&database_id)
834            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
835            .into_iter()
836            .flatten()
837            .flatten()
838        {
839            let _ = tx.send(Err(abort_reason.clone()));
840        }
841        txn.commit().await?;
842
843        if !objs.is_empty() {
844            // We also have notified the frontend about these objects,
845            // so we need to notify the frontend to delete them here.
846            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
847                .await;
848        }
849        Ok((true, Some(database_id)))
850    }
851
852    #[await_tree::instrument]
853    pub async fn post_collect_job_fragments(
854        &self,
855        job_id: JobId,
856        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
857        new_sink_downstream: Option<FragmentDownstreamRelation>,
858        split_assignment: Option<&SplitAssignment>,
859    ) -> MetaResult<()> {
860        let inner = self.inner.write().await;
861        let txn = inner.db.begin().await?;
862
863        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
864
865        if let Some(new_downstream) = new_sink_downstream {
866            insert_fragment_relations(&txn, &new_downstream).await?;
867        }
868
869        // Mark job as CREATING.
870        streaming_job::ActiveModel {
871            job_id: Set(job_id),
872            job_status: Set(JobStatus::Creating),
873            ..Default::default()
874        }
875        .update(&txn)
876        .await?;
877
878        if let Some(split_assignment) = split_assignment {
879            let fragment_splits = split_assignment
880                .iter()
881                .map(|(fragment_id, splits)| {
882                    (
883                        *fragment_id as _,
884                        splits.values().flatten().cloned().collect_vec(),
885                    )
886                })
887                .collect();
888
889            self.update_fragment_splits(&txn, &fragment_splits).await?;
890        }
891
892        txn.commit().await?;
893
894        Ok(())
895    }
896
897    pub async fn create_job_catalog_for_replace(
898        &self,
899        streaming_job: &StreamingJob,
900        ctx: Option<&StreamContext>,
901        specified_parallelism: Option<&NonZeroUsize>,
902        expected_original_max_parallelism: Option<usize>,
903    ) -> MetaResult<JobId> {
904        let id = streaming_job.id();
905        let inner = self.inner.write().await;
906        let txn = inner.db.begin().await?;
907
908        // 1. check version.
909        streaming_job.verify_version_for_replace(&txn).await?;
910        // 2. check concurrent replace.
911        let referring_cnt = ObjectDependency::find()
912            .join(
913                JoinType::InnerJoin,
914                object_dependency::Relation::Object1.def(),
915            )
916            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
917            .filter(
918                object_dependency::Column::Oid
919                    .eq(id)
920                    .and(object::Column::ObjType.eq(ObjectType::Table))
921                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
922            )
923            .count(&txn)
924            .await?;
925        if referring_cnt != 0 {
926            return Err(MetaError::permission_denied(
927                "job is being altered or referenced by some creating jobs",
928            ));
929        }
930
931        // 3. check parallelism.
932        let (original_max_parallelism, original_timezone, original_config_override): (
933            i32,
934            Option<String>,
935            Option<String>,
936        ) = StreamingJobModel::find_by_id(id)
937            .select_only()
938            .column(streaming_job::Column::MaxParallelism)
939            .column(streaming_job::Column::Timezone)
940            .column(streaming_job::Column::ConfigOverride)
941            .into_tuple()
942            .one(&txn)
943            .await?
944            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
945
946        if let Some(max_parallelism) = expected_original_max_parallelism
947            && original_max_parallelism != max_parallelism as i32
948        {
949            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
950            // This should not happen in normal cases.
951            bail!(
952                "cannot use a different max parallelism \
953                 when replacing streaming job, \
954                 original: {}, new: {}",
955                original_max_parallelism,
956                max_parallelism
957            );
958        }
959
960        let parallelism = match specified_parallelism {
961            None => StreamingParallelism::Adaptive,
962            Some(n) => StreamingParallelism::Fixed(n.get() as _),
963        };
964
965        let ctx = StreamContext {
966            timezone: ctx
967                .map(|ctx| ctx.timezone.clone())
968                .unwrap_or(original_timezone),
969            // We don't expect replacing a job with a different config override.
970            // Thus always use the original config override.
971            config_override: original_config_override.unwrap_or_default().into(),
972        };
973
974        // 4. create streaming object for new replace table.
975        let new_obj_id = Self::create_streaming_job_obj(
976            &txn,
977            streaming_job.object_type(),
978            streaming_job.owner() as _,
979            Some(streaming_job.database_id() as _),
980            Some(streaming_job.schema_id() as _),
981            streaming_job.create_type(),
982            ctx,
983            parallelism,
984            original_max_parallelism as _,
985            None,
986        )
987        .await?;
988
989        // 5. record dependency for new replace table.
990        ObjectDependency::insert(object_dependency::ActiveModel {
991            oid: Set(id.as_object_id()),
992            used_by: Set(new_obj_id.as_object_id()),
993            ..Default::default()
994        })
995        .exec(&txn)
996        .await?;
997
998        txn.commit().await?;
999
1000        Ok(new_obj_id)
1001    }
1002
1003    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1004    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1005        let mut inner = self.inner.write().await;
1006        let txn = inner.db.begin().await?;
1007
1008        // Check if the job belongs to iceberg table.
1009        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1010            tracing::info!(
1011                "streaming job {} is for iceberg table, wait for manual finish operation",
1012                job_id
1013            );
1014            return Ok(());
1015        }
1016
1017        let (notification_op, objects, updated_user_info) =
1018            Self::finish_streaming_job_inner(&txn, job_id).await?;
1019
1020        txn.commit().await?;
1021
1022        let mut version = self
1023            .notify_frontend(
1024                notification_op,
1025                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1026            )
1027            .await;
1028
1029        // notify users about the default privileges
1030        if !updated_user_info.is_empty() {
1031            version = self.notify_users_update(updated_user_info).await;
1032        }
1033
1034        inner
1035            .creating_table_finish_notifier
1036            .values_mut()
1037            .for_each(|creating_tables| {
1038                if let Some(txs) = creating_tables.remove(&job_id) {
1039                    for tx in txs {
1040                        let _ = tx.send(Ok(version));
1041                    }
1042                }
1043            });
1044
1045        Ok(())
1046    }
1047
1048    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1049    pub async fn finish_streaming_job_inner(
1050        txn: &DatabaseTransaction,
1051        job_id: JobId,
1052    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1053        let job_type = Object::find_by_id(job_id)
1054            .select_only()
1055            .column(object::Column::ObjType)
1056            .into_tuple()
1057            .one(txn)
1058            .await?
1059            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1060
1061        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1062            .select_only()
1063            .column(streaming_job::Column::CreateType)
1064            .into_tuple()
1065            .one(txn)
1066            .await?
1067            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1068
1069        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1070        let res = Object::update_many()
1071            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1072            .col_expr(
1073                object::Column::CreatedAtClusterVersion,
1074                current_cluster_version().into(),
1075            )
1076            .filter(object::Column::Oid.eq(job_id))
1077            .exec(txn)
1078            .await?;
1079        if res.rows_affected == 0 {
1080            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1081        }
1082
1083        // mark the target stream job as `Created`.
1084        let job = streaming_job::ActiveModel {
1085            job_id: Set(job_id),
1086            job_status: Set(JobStatus::Created),
1087            ..Default::default()
1088        };
1089        job.update(txn).await?;
1090
1091        // notify frontend: job, internal tables.
1092        let internal_table_objs = Table::find()
1093            .find_also_related(Object)
1094            .filter(table::Column::BelongsToJobId.eq(job_id))
1095            .all(txn)
1096            .await?;
1097        let mut objects = internal_table_objs
1098            .iter()
1099            .map(|(table, obj)| PbObject {
1100                object_info: Some(PbObjectInfo::Table(
1101                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1102                )),
1103            })
1104            .collect_vec();
1105        let mut notification_op = if create_type == CreateType::Background {
1106            NotificationOperation::Update
1107        } else {
1108            NotificationOperation::Add
1109        };
1110        let mut updated_user_info = vec![];
1111        let mut need_grant_default_privileges = true;
1112
1113        match job_type {
1114            ObjectType::Table => {
1115                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1116                    .find_also_related(Object)
1117                    .one(txn)
1118                    .await?
1119                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1120                if table.table_type == TableType::MaterializedView {
1121                    notification_op = NotificationOperation::Update;
1122                }
1123
1124                if let Some(source_id) = table.optional_associated_source_id {
1125                    let (src, obj) = Source::find_by_id(source_id)
1126                        .find_also_related(Object)
1127                        .one(txn)
1128                        .await?
1129                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1130                    objects.push(PbObject {
1131                        object_info: Some(PbObjectInfo::Source(
1132                            ObjectModel(src, obj.unwrap()).into(),
1133                        )),
1134                    });
1135                }
1136                objects.push(PbObject {
1137                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1138                });
1139            }
1140            ObjectType::Sink => {
1141                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1142                    .find_also_related(Object)
1143                    .one(txn)
1144                    .await?
1145                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1146                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1147                    need_grant_default_privileges = false;
1148                }
1149                // If sinks were pre-notified during CREATING, we should use Update at finish
1150                // to avoid duplicate Add notifications (align with MV behavior).
1151                notification_op = NotificationOperation::Update;
1152                objects.push(PbObject {
1153                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1154                });
1155            }
1156            ObjectType::Index => {
1157                need_grant_default_privileges = false;
1158                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1159                    .find_also_related(Object)
1160                    .one(txn)
1161                    .await?
1162                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1163                {
1164                    let (table, obj) = Table::find_by_id(index.index_table_id)
1165                        .find_also_related(Object)
1166                        .one(txn)
1167                        .await?
1168                        .ok_or_else(|| {
1169                            MetaError::catalog_id_not_found("table", index.index_table_id)
1170                        })?;
1171                    objects.push(PbObject {
1172                        object_info: Some(PbObjectInfo::Table(
1173                            ObjectModel(table, obj.unwrap()).into(),
1174                        )),
1175                    });
1176                }
1177
1178                // If the index is created on a table with privileges, we should also
1179                // grant the privileges for the index and its state tables.
1180                let primary_table_privileges = UserPrivilege::find()
1181                    .filter(
1182                        user_privilege::Column::Oid
1183                            .eq(index.primary_table_id)
1184                            .and(user_privilege::Column::Action.eq(Action::Select)),
1185                    )
1186                    .all(txn)
1187                    .await?;
1188                if !primary_table_privileges.is_empty() {
1189                    let index_state_table_ids: Vec<TableId> = Table::find()
1190                        .select_only()
1191                        .column(table::Column::TableId)
1192                        .filter(
1193                            table::Column::BelongsToJobId
1194                                .eq(job_id)
1195                                .or(table::Column::TableId.eq(index.index_table_id)),
1196                        )
1197                        .into_tuple()
1198                        .all(txn)
1199                        .await?;
1200                    let mut new_privileges = vec![];
1201                    for privilege in &primary_table_privileges {
1202                        for state_table_id in &index_state_table_ids {
1203                            new_privileges.push(user_privilege::ActiveModel {
1204                                id: Default::default(),
1205                                oid: Set(state_table_id.as_object_id()),
1206                                user_id: Set(privilege.user_id),
1207                                action: Set(Action::Select),
1208                                dependent_id: Set(privilege.dependent_id),
1209                                granted_by: Set(privilege.granted_by),
1210                                with_grant_option: Set(privilege.with_grant_option),
1211                            });
1212                        }
1213                    }
1214                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1215
1216                    updated_user_info = list_user_info_by_ids(
1217                        primary_table_privileges.into_iter().map(|p| p.user_id),
1218                        txn,
1219                    )
1220                    .await?;
1221                }
1222
1223                objects.push(PbObject {
1224                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1225                });
1226            }
1227            ObjectType::Source => {
1228                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1229                    .find_also_related(Object)
1230                    .one(txn)
1231                    .await?
1232                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1233                objects.push(PbObject {
1234                    object_info: Some(PbObjectInfo::Source(
1235                        ObjectModel(source, obj.unwrap()).into(),
1236                    )),
1237                });
1238            }
1239            _ => unreachable!("invalid job type: {:?}", job_type),
1240        }
1241
1242        if need_grant_default_privileges {
1243            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1244        }
1245
1246        Ok((notification_op, objects, updated_user_info))
1247    }
1248
1249    pub async fn finish_replace_streaming_job(
1250        &self,
1251        tmp_id: JobId,
1252        streaming_job: StreamingJob,
1253        replace_upstream: FragmentReplaceUpstream,
1254        sink_into_table_context: SinkIntoTableContext,
1255        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1256        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1257    ) -> MetaResult<NotificationVersion> {
1258        let inner = self.inner.write().await;
1259        let txn = inner.db.begin().await?;
1260
1261        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1262            tmp_id,
1263            replace_upstream,
1264            sink_into_table_context,
1265            &txn,
1266            streaming_job,
1267            drop_table_connector_ctx,
1268            auto_refresh_schema_sinks,
1269        )
1270        .await?;
1271
1272        txn.commit().await?;
1273
1274        let mut version = self
1275            .notify_frontend(
1276                NotificationOperation::Update,
1277                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1278            )
1279            .await;
1280
1281        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1282            self.notify_users_update(user_infos).await;
1283            version = self
1284                .notify_frontend(
1285                    NotificationOperation::Delete,
1286                    build_object_group_for_delete(to_drop_objects),
1287                )
1288                .await;
1289        }
1290
1291        Ok(version)
1292    }
1293
1294    pub async fn finish_replace_streaming_job_inner(
1295        tmp_id: JobId,
1296        replace_upstream: FragmentReplaceUpstream,
1297        SinkIntoTableContext {
1298            updated_sink_catalogs,
1299        }: SinkIntoTableContext,
1300        txn: &DatabaseTransaction,
1301        streaming_job: StreamingJob,
1302        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1303        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1304    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1305        let original_job_id = streaming_job.id();
1306        let job_type = streaming_job.job_type();
1307
1308        let mut index_item_rewriter = None;
1309
1310        // Update catalog
1311        match streaming_job {
1312            StreamingJob::Table(_source, table, _table_job_type) => {
1313                // The source catalog should remain unchanged
1314
1315                let original_column_catalogs =
1316                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1317
1318                index_item_rewriter = Some({
1319                    let original_columns = original_column_catalogs
1320                        .to_protobuf()
1321                        .into_iter()
1322                        .map(|c| c.column_desc.unwrap())
1323                        .collect_vec();
1324                    let new_columns = table
1325                        .columns
1326                        .iter()
1327                        .map(|c| c.column_desc.clone().unwrap())
1328                        .collect_vec();
1329
1330                    IndexItemRewriter {
1331                        original_columns,
1332                        new_columns,
1333                    }
1334                });
1335
1336                // For sinks created in earlier versions, we need to set the original_target_columns.
1337                for sink_id in updated_sink_catalogs {
1338                    sink::ActiveModel {
1339                        sink_id: Set(sink_id as _),
1340                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1341                        ..Default::default()
1342                    }
1343                    .update(txn)
1344                    .await?;
1345                }
1346                // Update the table catalog with the new one. (column catalog is also updated here)
1347                let mut table = table::ActiveModel::from(table);
1348                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1349                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1350                {
1351                    // drop table connector, the rest logic is in `drop_table_associated_source`
1352                    table.optional_associated_source_id = Set(None);
1353                }
1354
1355                table.update(txn).await?;
1356            }
1357            StreamingJob::Source(source) => {
1358                // Update the source catalog with the new one.
1359                let source = source::ActiveModel::from(source);
1360                source.update(txn).await?;
1361            }
1362            StreamingJob::MaterializedView(table) => {
1363                // Update the table catalog with the new one.
1364                let table = table::ActiveModel::from(table);
1365                table.update(txn).await?;
1366            }
1367            _ => unreachable!(
1368                "invalid streaming job type: {:?}",
1369                streaming_job.job_type_str()
1370            ),
1371        }
1372
1373        async fn finish_fragments(
1374            txn: &DatabaseTransaction,
1375            tmp_id: JobId,
1376            original_job_id: JobId,
1377            replace_upstream: FragmentReplaceUpstream,
1378        ) -> MetaResult<()> {
1379            // 0. update internal tables
1380            // Fields including `fragment_id` were placeholder values before.
1381            // After table fragments are created, update them for all internal tables.
1382            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1383                .select_only()
1384                .columns([
1385                    fragment::Column::FragmentId,
1386                    fragment::Column::StateTableIds,
1387                ])
1388                .filter(fragment::Column::JobId.eq(tmp_id))
1389                .into_tuple()
1390                .all(txn)
1391                .await?;
1392            for (fragment_id, state_table_ids) in fragment_info {
1393                for state_table_id in state_table_ids.into_inner() {
1394                    let state_table_id = TableId::new(state_table_id as _);
1395                    table::ActiveModel {
1396                        table_id: Set(state_table_id),
1397                        fragment_id: Set(Some(fragment_id)),
1398                        // No need to update `vnode_count` because it must remain the same.
1399                        ..Default::default()
1400                    }
1401                    .update(txn)
1402                    .await?;
1403                }
1404            }
1405
1406            // 1. replace old fragments/actors with new ones.
1407            Fragment::delete_many()
1408                .filter(fragment::Column::JobId.eq(original_job_id))
1409                .exec(txn)
1410                .await?;
1411            Fragment::update_many()
1412                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1413                .filter(fragment::Column::JobId.eq(tmp_id))
1414                .exec(txn)
1415                .await?;
1416
1417            // 2. update merges.
1418            // update downstream fragment's Merge node, and upstream_fragment_id
1419            for (fragment_id, fragment_replace_map) in replace_upstream {
1420                let (fragment_id, mut stream_node) =
1421                    Fragment::find_by_id(fragment_id as FragmentId)
1422                        .select_only()
1423                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1424                        .into_tuple::<(FragmentId, StreamNode)>()
1425                        .one(txn)
1426                        .await?
1427                        .map(|(id, node)| (id, node.to_protobuf()))
1428                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1429
1430                visit_stream_node_mut(&mut stream_node, |body| {
1431                    if let PbNodeBody::Merge(m) = body
1432                        && let Some(new_fragment_id) =
1433                            fragment_replace_map.get(&m.upstream_fragment_id)
1434                    {
1435                        m.upstream_fragment_id = *new_fragment_id;
1436                    }
1437                });
1438                fragment::ActiveModel {
1439                    fragment_id: Set(fragment_id),
1440                    stream_node: Set(StreamNode::from(&stream_node)),
1441                    ..Default::default()
1442                }
1443                .update(txn)
1444                .await?;
1445            }
1446
1447            // 3. remove dummy object.
1448            Object::delete_by_id(tmp_id).exec(txn).await?;
1449
1450            Ok(())
1451        }
1452
1453        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1454
1455        // 4. update catalogs and notify.
1456        let mut objects = vec![];
1457        match job_type {
1458            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1459                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1460                    .find_also_related(Object)
1461                    .one(txn)
1462                    .await?
1463                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1464                objects.push(PbObject {
1465                    object_info: Some(PbObjectInfo::Table(
1466                        ObjectModel(table, table_obj.unwrap()).into(),
1467                    )),
1468                })
1469            }
1470            StreamingJobType::Source => {
1471                let (source, source_obj) =
1472                    Source::find_by_id(original_job_id.as_shared_source_id())
1473                        .find_also_related(Object)
1474                        .one(txn)
1475                        .await?
1476                        .ok_or_else(|| {
1477                            MetaError::catalog_id_not_found("object", original_job_id)
1478                        })?;
1479                objects.push(PbObject {
1480                    object_info: Some(PbObjectInfo::Source(
1481                        ObjectModel(source, source_obj.unwrap()).into(),
1482                    )),
1483                })
1484            }
1485            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1486        }
1487
1488        if let Some(expr_rewriter) = index_item_rewriter {
1489            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1490                .select_only()
1491                .columns([index::Column::IndexId, index::Column::IndexItems])
1492                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1493                .into_tuple()
1494                .all(txn)
1495                .await?;
1496            for (index_id, nodes) in index_items {
1497                let mut pb_nodes = nodes.to_protobuf();
1498                pb_nodes
1499                    .iter_mut()
1500                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1501                let index = index::ActiveModel {
1502                    index_id: Set(index_id),
1503                    index_items: Set(pb_nodes.into()),
1504                    ..Default::default()
1505                }
1506                .update(txn)
1507                .await?;
1508                let index_obj = index
1509                    .find_related(Object)
1510                    .one(txn)
1511                    .await?
1512                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1513                objects.push(PbObject {
1514                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1515                });
1516            }
1517        }
1518
1519        if let Some(sinks) = auto_refresh_schema_sinks {
1520            for finish_sink_context in sinks {
1521                finish_fragments(
1522                    txn,
1523                    finish_sink_context.tmp_sink_id.as_job_id(),
1524                    finish_sink_context.original_sink_id.as_job_id(),
1525                    Default::default(),
1526                )
1527                .await?;
1528                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1529                    .find_also_related(Object)
1530                    .one(txn)
1531                    .await?
1532                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1533                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1534                Sink::update(sink::ActiveModel {
1535                    sink_id: Set(finish_sink_context.original_sink_id),
1536                    columns: Set(columns.clone()),
1537                    ..Default::default()
1538                })
1539                .exec(txn)
1540                .await?;
1541                sink.columns = columns;
1542                objects.push(PbObject {
1543                    object_info: Some(PbObjectInfo::Sink(
1544                        ObjectModel(sink, sink_obj.unwrap()).into(),
1545                    )),
1546                });
1547                if let Some((log_store_table_id, new_log_store_table_columns)) =
1548                    finish_sink_context.new_log_store_table
1549                {
1550                    let new_log_store_table_columns: ColumnCatalogArray =
1551                        new_log_store_table_columns.into();
1552                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1553                        .find_also_related(Object)
1554                        .one(txn)
1555                        .await?
1556                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1557                    Table::update(table::ActiveModel {
1558                        table_id: Set(log_store_table_id),
1559                        columns: Set(new_log_store_table_columns.clone()),
1560                        ..Default::default()
1561                    })
1562                    .exec(txn)
1563                    .await?;
1564                    table.columns = new_log_store_table_columns;
1565                    objects.push(PbObject {
1566                        object_info: Some(PbObjectInfo::Table(
1567                            ObjectModel(table, table_obj.unwrap()).into(),
1568                        )),
1569                    });
1570                }
1571            }
1572        }
1573
1574        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1575        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1576            notification_objs =
1577                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1578        }
1579
1580        Ok((objects, notification_objs))
1581    }
1582
1583    /// Abort the replacing streaming job by deleting the temporary job object.
1584    pub async fn try_abort_replacing_streaming_job(
1585        &self,
1586        tmp_job_id: JobId,
1587        tmp_sink_ids: Option<Vec<ObjectId>>,
1588    ) -> MetaResult<()> {
1589        let inner = self.inner.write().await;
1590        let txn = inner.db.begin().await?;
1591        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1592        if let Some(tmp_sink_ids) = tmp_sink_ids {
1593            for tmp_sink_id in tmp_sink_ids {
1594                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1595            }
1596        }
1597        txn.commit().await?;
1598        Ok(())
1599    }
1600
1601    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1602    // return the actor_ids to be applied
1603    pub async fn update_source_rate_limit_by_source_id(
1604        &self,
1605        source_id: SourceId,
1606        rate_limit: Option<u32>,
1607    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1608        let inner = self.inner.read().await;
1609        let txn = inner.db.begin().await?;
1610
1611        {
1612            let active_source = source::ActiveModel {
1613                source_id: Set(source_id),
1614                rate_limit: Set(rate_limit.map(|v| v as i32)),
1615                ..Default::default()
1616            };
1617            active_source.update(&txn).await?;
1618        }
1619
1620        let (source, obj) = Source::find_by_id(source_id)
1621            .find_also_related(Object)
1622            .one(&txn)
1623            .await?
1624            .ok_or_else(|| {
1625                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1626            })?;
1627
1628        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1629        let streaming_job_ids: Vec<JobId> =
1630            if let Some(table_id) = source.optional_associated_table_id {
1631                vec![table_id.as_job_id()]
1632            } else if let Some(source_info) = &source.source_info
1633                && source_info.to_protobuf().is_shared()
1634            {
1635                vec![source_id.as_share_source_job_id()]
1636            } else {
1637                ObjectDependency::find()
1638                    .select_only()
1639                    .column(object_dependency::Column::UsedBy)
1640                    .filter(object_dependency::Column::Oid.eq(source_id))
1641                    .into_tuple()
1642                    .all(&txn)
1643                    .await?
1644            };
1645
1646        if streaming_job_ids.is_empty() {
1647            return Err(MetaError::invalid_parameter(format!(
1648                "source id {source_id} not used by any streaming job"
1649            )));
1650        }
1651
1652        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1653            .select_only()
1654            .columns([
1655                fragment::Column::FragmentId,
1656                fragment::Column::FragmentTypeMask,
1657                fragment::Column::StreamNode,
1658            ])
1659            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1660            .into_tuple()
1661            .all(&txn)
1662            .await?;
1663        let mut fragments = fragments
1664            .into_iter()
1665            .map(|(id, mask, stream_node)| {
1666                (
1667                    id,
1668                    FragmentTypeMask::from(mask as u32),
1669                    stream_node.to_protobuf(),
1670                )
1671            })
1672            .collect_vec();
1673
1674        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1675            let mut found = false;
1676            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1677                visit_stream_node_mut(stream_node, |node| {
1678                    if let PbNodeBody::Source(node) = node
1679                        && let Some(node_inner) = &mut node.source_inner
1680                        && node_inner.source_id == source_id
1681                    {
1682                        node_inner.rate_limit = rate_limit;
1683                        found = true;
1684                    }
1685                });
1686            }
1687            if is_fs_source {
1688                // in older versions, there's no fragment type flag for `FsFetch` node,
1689                // so we just scan all fragments for StreamFsFetch node if using fs connector
1690                visit_stream_node_mut(stream_node, |node| {
1691                    if let PbNodeBody::StreamFsFetch(node) = node {
1692                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1693                        if let Some(node_inner) = &mut node.node_inner
1694                            && node_inner.source_id == source_id
1695                        {
1696                            node_inner.rate_limit = rate_limit;
1697                            found = true;
1698                        }
1699                    }
1700                });
1701            }
1702            found
1703        });
1704
1705        assert!(
1706            !fragments.is_empty(),
1707            "source id should be used by at least one fragment"
1708        );
1709        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1710
1711        for (id, fragment_type_mask, stream_node) in fragments {
1712            fragment::ActiveModel {
1713                fragment_id: Set(id),
1714                fragment_type_mask: Set(fragment_type_mask.into()),
1715                stream_node: Set(StreamNode::from(&stream_node)),
1716                ..Default::default()
1717            }
1718            .update(&txn)
1719            .await?;
1720        }
1721
1722        txn.commit().await?;
1723
1724        let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1725
1726        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1727        let _version = self
1728            .notify_frontend(
1729                NotificationOperation::Update,
1730                NotificationInfo::ObjectGroup(PbObjectGroup {
1731                    objects: vec![PbObject {
1732                        object_info: Some(relation_info),
1733                    }],
1734                }),
1735            )
1736            .await;
1737
1738        Ok(fragment_actors)
1739    }
1740
1741    fn get_fragment_actors_from_running_info(
1742        &self,
1743        fragment_ids: impl Iterator<Item = FragmentId>,
1744    ) -> HashMap<FragmentId, Vec<ActorId>> {
1745        let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1746
1747        let info = self.env.shared_actor_infos().read_guard();
1748
1749        for fragment_id in fragment_ids {
1750            let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1751            fragment_actors
1752                .entry(fragment_id as _)
1753                .or_default()
1754                .extend(actors.keys().copied());
1755        }
1756
1757        fragment_actors
1758    }
1759
1760    // edit the content of fragments in given `table_id`
1761    // return the actor_ids to be applied
1762    pub async fn mutate_fragments_by_job_id(
1763        &self,
1764        job_id: JobId,
1765        // returns true if the mutation is applied
1766        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1767        // error message when no relevant fragments is found
1768        err_msg: &'static str,
1769    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1770        let inner = self.inner.read().await;
1771        let txn = inner.db.begin().await?;
1772
1773        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1774            .select_only()
1775            .columns([
1776                fragment::Column::FragmentId,
1777                fragment::Column::FragmentTypeMask,
1778                fragment::Column::StreamNode,
1779            ])
1780            .filter(fragment::Column::JobId.eq(job_id))
1781            .into_tuple()
1782            .all(&txn)
1783            .await?;
1784        let mut fragments = fragments
1785            .into_iter()
1786            .map(|(id, mask, stream_node)| {
1787                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1788            })
1789            .collect_vec();
1790
1791        let fragments = fragments
1792            .iter_mut()
1793            .map(|(_, fragment_type_mask, stream_node)| {
1794                fragments_mutation_fn(*fragment_type_mask, stream_node)
1795            })
1796            .collect::<MetaResult<Vec<bool>>>()?
1797            .into_iter()
1798            .zip_eq_debug(std::mem::take(&mut fragments))
1799            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1800            .collect::<Vec<_>>();
1801
1802        if fragments.is_empty() {
1803            return Err(MetaError::invalid_parameter(format!(
1804                "job id {job_id}: {}",
1805                err_msg
1806            )));
1807        }
1808
1809        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1810        for (id, _, stream_node) in fragments {
1811            fragment::ActiveModel {
1812                fragment_id: Set(id),
1813                stream_node: Set(StreamNode::from(&stream_node)),
1814                ..Default::default()
1815            }
1816            .update(&txn)
1817            .await?;
1818        }
1819
1820        txn.commit().await?;
1821
1822        let fragment_actors =
1823            self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1824
1825        Ok(fragment_actors)
1826    }
1827
1828    async fn mutate_fragment_by_fragment_id(
1829        &self,
1830        fragment_id: FragmentId,
1831        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1832        err_msg: &'static str,
1833    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1834        let inner = self.inner.read().await;
1835        let txn = inner.db.begin().await?;
1836
1837        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1838            Fragment::find_by_id(fragment_id)
1839                .select_only()
1840                .columns([
1841                    fragment::Column::FragmentTypeMask,
1842                    fragment::Column::StreamNode,
1843                ])
1844                .into_tuple()
1845                .one(&txn)
1846                .await?
1847                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1848        let mut pb_stream_node = stream_node.to_protobuf();
1849        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1850
1851        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1852            return Err(MetaError::invalid_parameter(format!(
1853                "fragment id {fragment_id}: {}",
1854                err_msg
1855            )));
1856        }
1857
1858        fragment::ActiveModel {
1859            fragment_id: Set(fragment_id),
1860            stream_node: Set(stream_node),
1861            ..Default::default()
1862        }
1863        .update(&txn)
1864        .await?;
1865
1866        let fragment_actors =
1867            self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1868
1869        txn.commit().await?;
1870
1871        Ok(fragment_actors)
1872    }
1873
1874    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1875    // return the actor_ids to be applied
1876    pub async fn update_backfill_rate_limit_by_job_id(
1877        &self,
1878        job_id: JobId,
1879        rate_limit: Option<u32>,
1880    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1881        let update_backfill_rate_limit =
1882            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1883                let mut found = false;
1884                if fragment_type_mask
1885                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1886                {
1887                    visit_stream_node_mut(stream_node, |node| match node {
1888                        PbNodeBody::StreamCdcScan(node) => {
1889                            node.rate_limit = rate_limit;
1890                            found = true;
1891                        }
1892                        PbNodeBody::StreamScan(node) => {
1893                            node.rate_limit = rate_limit;
1894                            found = true;
1895                        }
1896                        PbNodeBody::SourceBackfill(node) => {
1897                            node.rate_limit = rate_limit;
1898                            found = true;
1899                        }
1900                        PbNodeBody::Sink(node) => {
1901                            node.rate_limit = rate_limit;
1902                            found = true;
1903                        }
1904                        _ => {}
1905                    });
1906                }
1907                Ok(found)
1908            };
1909
1910        self.mutate_fragments_by_job_id(
1911            job_id,
1912            update_backfill_rate_limit,
1913            "stream scan node or source node not found",
1914        )
1915        .await
1916    }
1917
1918    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1919    // return the actor_ids to be applied
1920    pub async fn update_sink_rate_limit_by_job_id(
1921        &self,
1922        sink_id: SinkId,
1923        rate_limit: Option<u32>,
1924    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1925        let update_sink_rate_limit =
1926            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1927                let mut found = Ok(false);
1928                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1929                    visit_stream_node_mut(stream_node, |node| {
1930                        if let PbNodeBody::Sink(node) = node {
1931                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1932                                found = Err(MetaError::invalid_parameter(
1933                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1934                                ));
1935                                return;
1936                            }
1937                            node.rate_limit = rate_limit;
1938                            found = Ok(true);
1939                        }
1940                    });
1941                }
1942                found
1943            };
1944
1945        self.mutate_fragments_by_job_id(
1946            sink_id.as_job_id(),
1947            update_sink_rate_limit,
1948            "sink node not found",
1949        )
1950        .await
1951    }
1952
1953    pub async fn update_dml_rate_limit_by_job_id(
1954        &self,
1955        job_id: JobId,
1956        rate_limit: Option<u32>,
1957    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1958        let update_dml_rate_limit =
1959            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1960                let mut found = false;
1961                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1962                    visit_stream_node_mut(stream_node, |node| {
1963                        if let PbNodeBody::Dml(node) = node {
1964                            node.rate_limit = rate_limit;
1965                            found = true;
1966                        }
1967                    });
1968                }
1969                Ok(found)
1970            };
1971
1972        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1973            .await
1974    }
1975
1976    pub async fn update_source_props_by_source_id(
1977        &self,
1978        source_id: SourceId,
1979        alter_props: BTreeMap<String, String>,
1980        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1981    ) -> MetaResult<WithOptionsSecResolved> {
1982        let inner = self.inner.read().await;
1983        let txn = inner.db.begin().await?;
1984
1985        let (source, _obj) = Source::find_by_id(source_id)
1986            .find_also_related(Object)
1987            .one(&txn)
1988            .await?
1989            .ok_or_else(|| {
1990                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1991            })?;
1992        let connector = source.with_properties.0.get_connector().unwrap();
1993        let is_shared_source = source.is_shared();
1994
1995        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1996        if !is_shared_source {
1997            // mv using non-shared source holds a copy of source in their fragments
1998            dep_source_job_ids = ObjectDependency::find()
1999                .select_only()
2000                .column(object_dependency::Column::UsedBy)
2001                .filter(object_dependency::Column::Oid.eq(source_id))
2002                .into_tuple()
2003                .all(&txn)
2004                .await?;
2005        }
2006
2007        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
2008        let prop_keys: Vec<String> = alter_props
2009            .keys()
2010            .chain(alter_secret_refs.keys())
2011            .cloned()
2012            .collect();
2013        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2014            &connector, &prop_keys,
2015        )?;
2016
2017        let mut options_with_secret = WithOptionsSecResolved::new(
2018            source.with_properties.0.clone(),
2019            source
2020                .secret_ref
2021                .map(|secret_ref| secret_ref.to_protobuf())
2022                .unwrap_or_default(),
2023        );
2024        let (to_add_secret_dep, to_remove_secret_dep) =
2025            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2026
2027        tracing::info!(
2028            "applying new properties to source: source_id={}, options_with_secret={:?}",
2029            source_id,
2030            options_with_secret
2031        );
2032        // check if the alter-ed props are valid for each Connector
2033        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2034        // todo: validate via source manager
2035
2036        let mut associate_table_id = None;
2037
2038        // can be source_id or table_id
2039        // if updating an associated source, the preferred_id is the table_id
2040        // otherwise, it is the source_id
2041        let mut preferred_id = source_id.as_object_id();
2042        let rewrite_sql = {
2043            let definition = source.definition.clone();
2044
2045            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2046                .map_err(|e| {
2047                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2048                        anyhow!(e).context("Failed to parse source definition SQL"),
2049                    )))
2050                })?
2051                .try_into()
2052                .unwrap();
2053
2054            /// Formats SQL options with secret values properly resolved
2055            ///
2056            /// This function processes configuration options that may contain sensitive data:
2057            /// - Plaintext options are directly converted to `SqlOption`
2058            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2059            ///   without exposing the actual secret value
2060            ///
2061            /// # Arguments
2062            /// * `txn` - Database transaction for retrieving secrets
2063            /// * `options_with_secret` - Container of options with both plaintext and secret values
2064            ///
2065            /// # Returns
2066            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2067            async fn format_with_option_secret_resolved(
2068                txn: &DatabaseTransaction,
2069                options_with_secret: &WithOptionsSecResolved,
2070            ) -> MetaResult<Vec<SqlOption>> {
2071                let mut options = Vec::new();
2072                for (k, v) in options_with_secret.as_plaintext() {
2073                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2074                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2075                    options.push(sql_option);
2076                }
2077                for (k, v) in options_with_secret.as_secret() {
2078                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2079                        let sql_option =
2080                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2081                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2082                        options.push(sql_option);
2083                    } else {
2084                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2085                    }
2086                }
2087                Ok(options)
2088            }
2089
2090            match &mut stmt {
2091                Statement::CreateSource { stmt } => {
2092                    stmt.with_properties.0 =
2093                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2094                }
2095                Statement::CreateTable { with_options, .. } => {
2096                    *with_options =
2097                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2098                    associate_table_id = source.optional_associated_table_id;
2099                    preferred_id = associate_table_id.unwrap().as_object_id();
2100                }
2101                _ => unreachable!(),
2102            }
2103
2104            stmt.to_string()
2105        };
2106
2107        {
2108            // update secret dependencies
2109            if !to_add_secret_dep.is_empty() {
2110                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2111                    object_dependency::ActiveModel {
2112                        oid: Set(secret_id.into()),
2113                        used_by: Set(preferred_id),
2114                        ..Default::default()
2115                    }
2116                }))
2117                .exec(&txn)
2118                .await?;
2119            }
2120            if !to_remove_secret_dep.is_empty() {
2121                // todo: fix the filter logic
2122                let _ = ObjectDependency::delete_many()
2123                    .filter(
2124                        object_dependency::Column::Oid
2125                            .is_in(to_remove_secret_dep)
2126                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2127                    )
2128                    .exec(&txn)
2129                    .await?;
2130            }
2131        }
2132
2133        let active_source_model = source::ActiveModel {
2134            source_id: Set(source_id),
2135            definition: Set(rewrite_sql.clone()),
2136            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2137            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2138                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2139            ..Default::default()
2140        };
2141        active_source_model.update(&txn).await?;
2142
2143        if let Some(associate_table_id) = associate_table_id {
2144            // update the associated table statement accordly
2145            let active_table_model = table::ActiveModel {
2146                table_id: Set(associate_table_id),
2147                definition: Set(rewrite_sql),
2148                ..Default::default()
2149            };
2150            active_table_model.update(&txn).await?;
2151        }
2152
2153        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2154            // if updating table with connector, the fragment_id is table id
2155            associate_table_id.as_job_id()
2156        } else {
2157            source_id.as_share_source_job_id()
2158        }]
2159        .into_iter()
2160        .chain(dep_source_job_ids.into_iter())
2161        .collect_vec();
2162
2163        // update fragments
2164        update_connector_props_fragments(
2165            &txn,
2166            to_check_job_ids,
2167            FragmentTypeFlag::Source,
2168            |node, found| {
2169                if let PbNodeBody::Source(node) = node
2170                    && let Some(source_inner) = &mut node.source_inner
2171                {
2172                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2173                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2174                    *found = true;
2175                }
2176            },
2177            is_shared_source,
2178        )
2179        .await?;
2180
2181        let mut to_update_objs = Vec::with_capacity(2);
2182        let (source, obj) = Source::find_by_id(source_id)
2183            .find_also_related(Object)
2184            .one(&txn)
2185            .await?
2186            .ok_or_else(|| {
2187                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2188            })?;
2189        to_update_objs.push(PbObject {
2190            object_info: Some(PbObjectInfo::Source(
2191                ObjectModel(source, obj.unwrap()).into(),
2192            )),
2193        });
2194
2195        if let Some(associate_table_id) = associate_table_id {
2196            let (table, obj) = Table::find_by_id(associate_table_id)
2197                .find_also_related(Object)
2198                .one(&txn)
2199                .await?
2200                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2201            to_update_objs.push(PbObject {
2202                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2203            });
2204        }
2205
2206        txn.commit().await?;
2207
2208        self.notify_frontend(
2209            NotificationOperation::Update,
2210            NotificationInfo::ObjectGroup(PbObjectGroup {
2211                objects: to_update_objs,
2212            }),
2213        )
2214        .await;
2215
2216        Ok(options_with_secret)
2217    }
2218
2219    pub async fn update_sink_props_by_sink_id(
2220        &self,
2221        sink_id: SinkId,
2222        props: BTreeMap<String, String>,
2223    ) -> MetaResult<HashMap<String, String>> {
2224        let inner = self.inner.read().await;
2225        let txn = inner.db.begin().await?;
2226
2227        let (sink, _obj) = Sink::find_by_id(sink_id)
2228            .find_also_related(Object)
2229            .one(&txn)
2230            .await?
2231            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2232        validate_sink_props(&sink, &props)?;
2233        let definition = sink.definition.clone();
2234        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2235            .map_err(|e| SinkError::Config(anyhow!(e)))?
2236            .try_into()
2237            .unwrap();
2238        if let Statement::CreateSink { stmt } = &mut stmt {
2239            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2240        } else {
2241            panic!("definition is not a create sink statement")
2242        }
2243        let mut new_config = sink.properties.clone().into_inner();
2244        new_config.extend(props.clone());
2245
2246        let definition = stmt.to_string();
2247        let active_sink = sink::ActiveModel {
2248            sink_id: Set(sink_id),
2249            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2250            definition: Set(definition),
2251            ..Default::default()
2252        };
2253        active_sink.update(&txn).await?;
2254
2255        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2256        let (sink, obj) = Sink::find_by_id(sink_id)
2257            .find_also_related(Object)
2258            .one(&txn)
2259            .await?
2260            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2261        txn.commit().await?;
2262        let relation_infos = vec![PbObject {
2263            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2264        }];
2265
2266        let _version = self
2267            .notify_frontend(
2268                NotificationOperation::Update,
2269                NotificationInfo::ObjectGroup(PbObjectGroup {
2270                    objects: relation_infos,
2271                }),
2272            )
2273            .await;
2274
2275        Ok(props.into_iter().collect())
2276    }
2277
2278    pub async fn update_iceberg_table_props_by_table_id(
2279        &self,
2280        table_id: TableId,
2281        props: BTreeMap<String, String>,
2282        alter_iceberg_table_props: Option<
2283            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2284        >,
2285    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2286        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2287            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2288        let inner = self.inner.read().await;
2289        let txn = inner.db.begin().await?;
2290
2291        let (sink, _obj) = Sink::find_by_id(sink_id)
2292            .find_also_related(Object)
2293            .one(&txn)
2294            .await?
2295            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2296        validate_sink_props(&sink, &props)?;
2297
2298        let definition = sink.definition.clone();
2299        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2300            .map_err(|e| SinkError::Config(anyhow!(e)))?
2301            .try_into()
2302            .unwrap();
2303        if let Statement::CreateTable {
2304            with_options,
2305            engine,
2306            ..
2307        } = &mut stmt
2308        {
2309            if !matches!(engine, Engine::Iceberg) {
2310                return Err(SinkError::Config(anyhow!(
2311                    "only iceberg table can be altered as sink"
2312                ))
2313                .into());
2314            }
2315            update_stmt_with_props(with_options, &props)?;
2316        } else {
2317            panic!("definition is not a create iceberg table statement")
2318        }
2319        let mut new_config = sink.properties.clone().into_inner();
2320        new_config.extend(props.clone());
2321
2322        let definition = stmt.to_string();
2323        let active_sink = sink::ActiveModel {
2324            sink_id: Set(sink_id),
2325            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2326            definition: Set(definition.clone()),
2327            ..Default::default()
2328        };
2329        let active_source = source::ActiveModel {
2330            source_id: Set(source_id),
2331            definition: Set(definition.clone()),
2332            ..Default::default()
2333        };
2334        let active_table = table::ActiveModel {
2335            table_id: Set(table_id),
2336            definition: Set(definition),
2337            ..Default::default()
2338        };
2339        active_sink.update(&txn).await?;
2340        active_source.update(&txn).await?;
2341        active_table.update(&txn).await?;
2342
2343        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2344
2345        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2346            .find_also_related(Object)
2347            .one(&txn)
2348            .await?
2349            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2350        let (source, source_obj) = Source::find_by_id(source_id)
2351            .find_also_related(Object)
2352            .one(&txn)
2353            .await?
2354            .ok_or_else(|| {
2355                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2356            })?;
2357        let (table, table_obj) = Table::find_by_id(table_id)
2358            .find_also_related(Object)
2359            .one(&txn)
2360            .await?
2361            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2362        txn.commit().await?;
2363        let relation_infos = vec![
2364            PbObject {
2365                object_info: Some(PbObjectInfo::Sink(
2366                    ObjectModel(sink, sink_obj.unwrap()).into(),
2367                )),
2368            },
2369            PbObject {
2370                object_info: Some(PbObjectInfo::Source(
2371                    ObjectModel(source, source_obj.unwrap()).into(),
2372                )),
2373            },
2374            PbObject {
2375                object_info: Some(PbObjectInfo::Table(
2376                    ObjectModel(table, table_obj.unwrap()).into(),
2377                )),
2378            },
2379        ];
2380        let _version = self
2381            .notify_frontend(
2382                NotificationOperation::Update,
2383                NotificationInfo::ObjectGroup(PbObjectGroup {
2384                    objects: relation_infos,
2385                }),
2386            )
2387            .await;
2388
2389        Ok((props.into_iter().collect(), sink_id))
2390    }
2391
2392    /// Update connection properties and all dependent sources/sinks in a single transaction
2393    pub async fn update_connection_and_dependent_objects_props(
2394        &self,
2395        connection_id: ConnectionId,
2396        alter_props: BTreeMap<String, String>,
2397        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2398    ) -> MetaResult<(
2399        WithOptionsSecResolved,                   // Connection's new properties
2400        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2401        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2402    )> {
2403        let inner = self.inner.read().await;
2404        let txn = inner.db.begin().await?;
2405
2406        // Find all dependent sources and sinks first
2407        let dependent_sources: Vec<SourceId> = Source::find()
2408            .select_only()
2409            .column(source::Column::SourceId)
2410            .filter(source::Column::ConnectionId.eq(connection_id))
2411            .into_tuple()
2412            .all(&txn)
2413            .await?;
2414
2415        let dependent_sinks: Vec<SinkId> = Sink::find()
2416            .select_only()
2417            .column(sink::Column::SinkId)
2418            .filter(sink::Column::ConnectionId.eq(connection_id))
2419            .into_tuple()
2420            .all(&txn)
2421            .await?;
2422
2423        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2424            .find_also_related(Object)
2425            .one(&txn)
2426            .await?
2427            .ok_or_else(|| {
2428                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2429            })?;
2430
2431        // Validate that props can be altered
2432        let prop_keys: Vec<String> = alter_props
2433            .keys()
2434            .chain(alter_secret_refs.keys())
2435            .cloned()
2436            .collect();
2437
2438        // Map the connection type enum to the string name expected by the validation function
2439        let connection_type_str = pb_connection_type_to_connection_type(
2440            &connection_catalog.params.to_protobuf().connection_type(),
2441        )
2442        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2443
2444        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2445            connection_type_str, &prop_keys,
2446        )?;
2447
2448        let connection_pb = connection_catalog.params.to_protobuf();
2449        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2450            connection_pb.properties.into_iter().collect(),
2451            connection_pb.secret_refs.into_iter().collect(),
2452        );
2453
2454        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2455            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2456
2457        tracing::debug!(
2458            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2459            connection_id,
2460            dependent_sources,
2461            dependent_sinks
2462        );
2463
2464        // Validate connection
2465        {
2466            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2467                connection_type: connection_pb.connection_type,
2468                properties: connection_options_with_secret
2469                    .as_plaintext()
2470                    .clone()
2471                    .into_iter()
2472                    .collect(),
2473                secret_refs: connection_options_with_secret
2474                    .as_secret()
2475                    .clone()
2476                    .into_iter()
2477                    .collect(),
2478            };
2479            let connection = PbConnection {
2480                id: connection_id as _,
2481                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2482                    conn_params_pb,
2483                )),
2484                ..Default::default()
2485            };
2486            validate_connection(&connection).await?;
2487        }
2488
2489        // Update connection secret dependencies
2490        if !to_add_secret_dep.is_empty() {
2491            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2492                object_dependency::ActiveModel {
2493                    oid: Set(secret_id.into()),
2494                    used_by: Set(connection_id.as_object_id()),
2495                    ..Default::default()
2496                }
2497            }))
2498            .exec(&txn)
2499            .await?;
2500        }
2501        if !to_remove_secret_dep.is_empty() {
2502            let _ = ObjectDependency::delete_many()
2503                .filter(
2504                    object_dependency::Column::Oid
2505                        .is_in(to_remove_secret_dep)
2506                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2507                )
2508                .exec(&txn)
2509                .await?;
2510        }
2511
2512        // Update the connection with new properties
2513        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2514            connection_type: connection_pb.connection_type,
2515            properties: connection_options_with_secret
2516                .as_plaintext()
2517                .clone()
2518                .into_iter()
2519                .collect(),
2520            secret_refs: connection_options_with_secret
2521                .as_secret()
2522                .clone()
2523                .into_iter()
2524                .collect(),
2525        };
2526        let active_connection_model = connection::ActiveModel {
2527            connection_id: Set(connection_id),
2528            params: Set(ConnectionParams::from(&updated_connection_params)),
2529            ..Default::default()
2530        };
2531        active_connection_model.update(&txn).await?;
2532
2533        // Batch update dependent sources and collect their complete properties
2534        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2535
2536        if !dependent_sources.is_empty() {
2537            // Batch fetch all dependent sources
2538            let sources_with_objs = Source::find()
2539                .find_also_related(Object)
2540                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2541                .all(&txn)
2542                .await?;
2543
2544            // Prepare batch updates
2545            let mut source_updates = Vec::new();
2546            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2547
2548            for (source, _obj) in sources_with_objs {
2549                let source_id = source.source_id;
2550
2551                let mut source_options_with_secret = WithOptionsSecResolved::new(
2552                    source.with_properties.0.clone(),
2553                    source
2554                        .secret_ref
2555                        .clone()
2556                        .map(|secret_ref| secret_ref.to_protobuf())
2557                        .unwrap_or_default(),
2558                );
2559                let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2560                    source_options_with_secret
2561                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2562
2563                // Validate the updated source properties
2564                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2565
2566                // Prepare source update
2567                let active_source = source::ActiveModel {
2568                    source_id: Set(source_id),
2569                    with_properties: Set(Property(
2570                        source_options_with_secret.as_plaintext().clone(),
2571                    )),
2572                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2573                        || {
2574                            risingwave_meta_model::SecretRef::from(
2575                                source_options_with_secret.as_secret().clone(),
2576                            )
2577                        },
2578                    )),
2579                    ..Default::default()
2580                };
2581                source_updates.push(active_source);
2582
2583                // Prepare fragment update:
2584                // - If the source is a table-associated source, update fragments for the table job.
2585                // - Otherwise update the shared source job.
2586                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2587                let is_shared_source = source.is_shared();
2588                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2589                if !is_shared_source {
2590                    dep_source_job_ids = ObjectDependency::find()
2591                        .select_only()
2592                        .column(object_dependency::Column::UsedBy)
2593                        .filter(object_dependency::Column::Oid.eq(source_id))
2594                        .into_tuple()
2595                        .all(&txn)
2596                        .await?;
2597                }
2598
2599                let base_job_id =
2600                    if let Some(associate_table_id) = source.optional_associated_table_id {
2601                        associate_table_id.as_job_id()
2602                    } else {
2603                        source_id.as_share_source_job_id()
2604                    };
2605                let job_ids = vec![base_job_id]
2606                    .into_iter()
2607                    .chain(dep_source_job_ids.into_iter())
2608                    .collect_vec();
2609
2610                fragment_updates.push(DependentSourceFragmentUpdate {
2611                    job_ids,
2612                    with_properties: source_options_with_secret.as_plaintext().clone(),
2613                    secret_refs: source_options_with_secret.as_secret().clone(),
2614                    is_shared_source,
2615                });
2616
2617                // Collect the complete properties for runtime broadcast
2618                let complete_source_props = LocalSecretManager::global()
2619                    .fill_secrets(
2620                        source_options_with_secret.as_plaintext().clone(),
2621                        source_options_with_secret.as_secret().clone(),
2622                    )
2623                    .map_err(MetaError::from)?
2624                    .into_iter()
2625                    .collect::<HashMap<String, String>>();
2626                updated_sources_with_props.push((source_id, complete_source_props));
2627            }
2628
2629            for source_update in source_updates {
2630                source_update.update(&txn).await?;
2631            }
2632
2633            // Batch execute fragment updates
2634            for DependentSourceFragmentUpdate {
2635                job_ids,
2636                with_properties,
2637                secret_refs,
2638                is_shared_source,
2639            } in fragment_updates
2640            {
2641                update_connector_props_fragments(
2642                    &txn,
2643                    job_ids,
2644                    FragmentTypeFlag::Source,
2645                    |node, found| {
2646                        if let PbNodeBody::Source(node) = node
2647                            && let Some(source_inner) = &mut node.source_inner
2648                        {
2649                            source_inner.with_properties = with_properties.clone();
2650                            source_inner.secret_refs = secret_refs.clone();
2651                            *found = true;
2652                        }
2653                    },
2654                    is_shared_source,
2655                )
2656                .await?;
2657            }
2658        }
2659
2660        // Batch update dependent sinks and collect their complete properties
2661        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2662
2663        if !dependent_sinks.is_empty() {
2664            // Batch fetch all dependent sinks
2665            let sinks_with_objs = Sink::find()
2666                .find_also_related(Object)
2667                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2668                .all(&txn)
2669                .await?;
2670
2671            // Prepare batch updates
2672            let mut sink_updates = Vec::new();
2673            let mut sink_fragment_updates = Vec::new();
2674
2675            for (sink, _obj) in sinks_with_objs {
2676                let sink_id = sink.sink_id;
2677
2678                // Validate that sink props can be altered
2679                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2680                    Some(connector) => {
2681                        let connector_type = connector.to_lowercase();
2682                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2683                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
2684
2685                        match_sink_name_str!(
2686                            connector_type.as_str(),
2687                            SinkType,
2688                            {
2689                                let mut new_sink_props = sink.properties.0.clone();
2690                                new_sink_props.extend(alter_props.clone());
2691                                SinkType::validate_alter_config(&new_sink_props)
2692                            },
2693                            |sink: &str| Err(SinkError::Config(anyhow!(
2694                                "unsupported sink type {}",
2695                                sink
2696                            )))
2697                        )?
2698                    }
2699                    None => {
2700                        return Err(SinkError::Config(anyhow!(
2701                            "connector not specified when alter sink"
2702                        ))
2703                        .into());
2704                    }
2705                };
2706
2707                let mut new_sink_props = sink.properties.0.clone();
2708                new_sink_props.extend(alter_props.clone());
2709
2710                // Prepare sink update
2711                let active_sink = sink::ActiveModel {
2712                    sink_id: Set(sink_id),
2713                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2714                    ..Default::default()
2715                };
2716                sink_updates.push(active_sink);
2717
2718                // Prepare fragment updates for this sink
2719                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2720
2721                // Collect the complete properties for runtime broadcast
2722                let complete_sink_props: HashMap<String, String> =
2723                    new_sink_props.into_iter().collect();
2724                updated_sinks_with_props.push((sink_id, complete_sink_props));
2725            }
2726
2727            // Batch execute sink updates
2728            for sink_update in sink_updates {
2729                sink_update.update(&txn).await?;
2730            }
2731
2732            // Batch execute sink fragment updates using the reusable function
2733            for (sink_id, new_sink_props) in sink_fragment_updates {
2734                update_connector_props_fragments(
2735                    &txn,
2736                    vec![sink_id.as_job_id()],
2737                    FragmentTypeFlag::Sink,
2738                    |node, found| {
2739                        if let PbNodeBody::Sink(node) = node
2740                            && let Some(sink_desc) = &mut node.sink_desc
2741                            && sink_desc.id == sink_id.as_raw_id()
2742                        {
2743                            sink_desc.properties = new_sink_props.clone();
2744                            *found = true;
2745                        }
2746                    },
2747                    true,
2748                )
2749                .await?;
2750            }
2751        }
2752
2753        // Collect all updated objects for frontend notification
2754        let mut updated_objects = Vec::new();
2755
2756        // Add connection
2757        let (connection, obj) = Connection::find_by_id(connection_id)
2758            .find_also_related(Object)
2759            .one(&txn)
2760            .await?
2761            .ok_or_else(|| {
2762                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2763            })?;
2764        updated_objects.push(PbObject {
2765            object_info: Some(PbObjectInfo::Connection(
2766                ObjectModel(connection, obj.unwrap()).into(),
2767            )),
2768        });
2769
2770        // Add sources
2771        for source_id in &dependent_sources {
2772            let (source, obj) = Source::find_by_id(*source_id)
2773                .find_also_related(Object)
2774                .one(&txn)
2775                .await?
2776                .ok_or_else(|| {
2777                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2778                })?;
2779            updated_objects.push(PbObject {
2780                object_info: Some(PbObjectInfo::Source(
2781                    ObjectModel(source, obj.unwrap()).into(),
2782                )),
2783            });
2784        }
2785
2786        // Add sinks
2787        for sink_id in &dependent_sinks {
2788            let (sink, obj) = Sink::find_by_id(*sink_id)
2789                .find_also_related(Object)
2790                .one(&txn)
2791                .await?
2792                .ok_or_else(|| {
2793                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2794                })?;
2795            updated_objects.push(PbObject {
2796                object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2797            });
2798        }
2799
2800        // Commit the transaction
2801        txn.commit().await?;
2802
2803        // Notify frontend about all updated objects
2804        if !updated_objects.is_empty() {
2805            self.notify_frontend(
2806                NotificationOperation::Update,
2807                NotificationInfo::ObjectGroup(PbObjectGroup {
2808                    objects: updated_objects,
2809                }),
2810            )
2811            .await;
2812        }
2813
2814        Ok((
2815            connection_options_with_secret,
2816            updated_sources_with_props,
2817            updated_sinks_with_props,
2818        ))
2819    }
2820
2821    pub async fn update_fragment_rate_limit_by_fragment_id(
2822        &self,
2823        fragment_id: FragmentId,
2824        rate_limit: Option<u32>,
2825    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2826        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2827                                 stream_node: &mut PbStreamNode| {
2828            let mut found = false;
2829            if fragment_type_mask.contains_any(
2830                FragmentTypeFlag::dml_rate_limit_fragments()
2831                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2832                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2833                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2834            ) {
2835                visit_stream_node_mut(stream_node, |node| {
2836                    if let PbNodeBody::Dml(node) = node {
2837                        node.rate_limit = rate_limit;
2838                        found = true;
2839                    }
2840                    if let PbNodeBody::Sink(node) = node {
2841                        node.rate_limit = rate_limit;
2842                        found = true;
2843                    }
2844                    if let PbNodeBody::StreamCdcScan(node) = node {
2845                        node.rate_limit = rate_limit;
2846                        found = true;
2847                    }
2848                    if let PbNodeBody::StreamScan(node) = node {
2849                        node.rate_limit = rate_limit;
2850                        found = true;
2851                    }
2852                    if let PbNodeBody::SourceBackfill(node) = node {
2853                        node.rate_limit = rate_limit;
2854                        found = true;
2855                    }
2856                });
2857            }
2858            found
2859        };
2860        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2861            .await
2862    }
2863
2864    /// Note: `FsFetch` created in old versions are not included.
2865    /// Since this is only used for debugging, it should be fine.
2866    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2867        let inner = self.inner.read().await;
2868        let txn = inner.db.begin().await?;
2869
2870        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2871            .select_only()
2872            .columns([
2873                fragment::Column::FragmentId,
2874                fragment::Column::JobId,
2875                fragment::Column::FragmentTypeMask,
2876                fragment::Column::StreamNode,
2877            ])
2878            .filter(FragmentTypeMask::intersects_any(
2879                FragmentTypeFlag::rate_limit_fragments(),
2880            ))
2881            .into_tuple()
2882            .all(&txn)
2883            .await?;
2884
2885        let mut rate_limits = Vec::new();
2886        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2887            let stream_node = stream_node.to_protobuf();
2888            visit_stream_node_body(&stream_node, |node| {
2889                let mut rate_limit = None;
2890                let mut node_name = None;
2891
2892                match node {
2893                    // source rate limit
2894                    PbNodeBody::Source(node) => {
2895                        if let Some(node_inner) = &node.source_inner {
2896                            rate_limit = node_inner.rate_limit;
2897                            node_name = Some("SOURCE");
2898                        }
2899                    }
2900                    PbNodeBody::StreamFsFetch(node) => {
2901                        if let Some(node_inner) = &node.node_inner {
2902                            rate_limit = node_inner.rate_limit;
2903                            node_name = Some("FS_FETCH");
2904                        }
2905                    }
2906                    // backfill rate limit
2907                    PbNodeBody::SourceBackfill(node) => {
2908                        rate_limit = node.rate_limit;
2909                        node_name = Some("SOURCE_BACKFILL");
2910                    }
2911                    PbNodeBody::StreamScan(node) => {
2912                        rate_limit = node.rate_limit;
2913                        node_name = Some("STREAM_SCAN");
2914                    }
2915                    PbNodeBody::StreamCdcScan(node) => {
2916                        rate_limit = node.rate_limit;
2917                        node_name = Some("STREAM_CDC_SCAN");
2918                    }
2919                    PbNodeBody::Sink(node) => {
2920                        rate_limit = node.rate_limit;
2921                        node_name = Some("SINK");
2922                    }
2923                    _ => {}
2924                }
2925
2926                if let Some(rate_limit) = rate_limit {
2927                    rate_limits.push(RateLimitInfo {
2928                        fragment_id,
2929                        job_id,
2930                        fragment_type_mask: fragment_type_mask as u32,
2931                        rate_limit,
2932                        node_name: node_name.unwrap().to_owned(),
2933                    });
2934                }
2935            });
2936        }
2937
2938        Ok(rate_limits)
2939    }
2940}
2941
2942fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2943    // Validate that props can be altered
2944    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2945        Some(connector) => {
2946            let connector_type = connector.to_lowercase();
2947            let field_names: Vec<String> = props.keys().cloned().collect();
2948            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2949                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2950
2951            match_sink_name_str!(
2952                connector_type.as_str(),
2953                SinkType,
2954                {
2955                    let mut new_props = sink.properties.0.clone();
2956                    new_props.extend(props.clone());
2957                    SinkType::validate_alter_config(&new_props)
2958                },
2959                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2960            )?
2961        }
2962        None => {
2963            return Err(
2964                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2965            );
2966        }
2967    };
2968    Ok(())
2969}
2970
2971fn update_stmt_with_props(
2972    with_properties: &mut Vec<SqlOption>,
2973    props: &BTreeMap<String, String>,
2974) -> MetaResult<()> {
2975    let mut new_sql_options = with_properties
2976        .iter()
2977        .map(|sql_option| (&sql_option.name, sql_option))
2978        .collect::<IndexMap<_, _>>();
2979    let add_sql_options = props
2980        .iter()
2981        .map(|(k, v)| SqlOption::try_from((k, v)))
2982        .collect::<Result<Vec<SqlOption>, ParserError>>()
2983        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2984    new_sql_options.extend(
2985        add_sql_options
2986            .iter()
2987            .map(|sql_option| (&sql_option.name, sql_option)),
2988    );
2989    *with_properties = new_sql_options.into_values().cloned().collect();
2990    Ok(())
2991}
2992
2993async fn update_sink_fragment_props(
2994    txn: &DatabaseTransaction,
2995    sink_id: SinkId,
2996    props: BTreeMap<String, String>,
2997) -> MetaResult<()> {
2998    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2999        .select_only()
3000        .columns([
3001            fragment::Column::FragmentId,
3002            fragment::Column::FragmentTypeMask,
3003            fragment::Column::StreamNode,
3004        ])
3005        .filter(fragment::Column::JobId.eq(sink_id))
3006        .into_tuple()
3007        .all(txn)
3008        .await?;
3009    let fragments = fragments
3010        .into_iter()
3011        .filter(|(_, fragment_type_mask, _)| {
3012            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3013        })
3014        .filter_map(|(id, _, stream_node)| {
3015            let mut stream_node = stream_node.to_protobuf();
3016            let mut found = false;
3017            visit_stream_node_mut(&mut stream_node, |node| {
3018                if let PbNodeBody::Sink(node) = node
3019                    && let Some(sink_desc) = &mut node.sink_desc
3020                    && sink_desc.id == sink_id
3021                {
3022                    sink_desc.properties.extend(props.clone());
3023                    found = true;
3024                }
3025            });
3026            if found { Some((id, stream_node)) } else { None }
3027        })
3028        .collect_vec();
3029    assert!(
3030        !fragments.is_empty(),
3031        "sink id should be used by at least one fragment"
3032    );
3033    for (id, stream_node) in fragments {
3034        fragment::ActiveModel {
3035            fragment_id: Set(id),
3036            stream_node: Set(StreamNode::from(&stream_node)),
3037            ..Default::default()
3038        }
3039        .update(txn)
3040        .await?;
3041    }
3042    Ok(())
3043}
3044
3045pub struct SinkIntoTableContext {
3046    /// For alter table (e.g., add column), this is the list of existing sink ids
3047    /// otherwise empty.
3048    pub updated_sink_catalogs: Vec<SinkId>,
3049}
3050
3051pub struct FinishAutoRefreshSchemaSinkContext {
3052    pub tmp_sink_id: SinkId,
3053    pub original_sink_id: SinkId,
3054    pub columns: Vec<PbColumnCatalog>,
3055    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3056}
3057
3058async fn update_connector_props_fragments<F>(
3059    txn: &DatabaseTransaction,
3060    job_ids: Vec<JobId>,
3061    expect_flag: FragmentTypeFlag,
3062    mut alter_stream_node_fn: F,
3063    is_shared_source: bool,
3064) -> MetaResult<()>
3065where
3066    F: FnMut(&mut PbNodeBody, &mut bool),
3067{
3068    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3069        .select_only()
3070        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3071        .filter(
3072            fragment::Column::JobId
3073                .is_in(job_ids.clone())
3074                .and(FragmentTypeMask::intersects(expect_flag)),
3075        )
3076        .into_tuple()
3077        .all(txn)
3078        .await?;
3079    let fragments = fragments
3080        .into_iter()
3081        .filter_map(|(id, stream_node)| {
3082            let mut stream_node = stream_node.to_protobuf();
3083            let mut found = false;
3084            visit_stream_node_mut(&mut stream_node, |node| {
3085                alter_stream_node_fn(node, &mut found);
3086            });
3087            if found { Some((id, stream_node)) } else { None }
3088        })
3089        .collect_vec();
3090    if is_shared_source || job_ids.len() > 1 {
3091        // the first element is the source_id or associated table_id
3092        // if the source is non-shared, there is no updated fragments
3093        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3094        assert!(
3095            !fragments.is_empty(),
3096            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3097            job_ids,
3098            expect_flag
3099        );
3100    }
3101
3102    for (id, stream_node) in fragments {
3103        fragment::ActiveModel {
3104            fragment_id: Set(id),
3105            stream_node: Set(StreamNode::from(&stream_node)),
3106            ..Default::default()
3107        }
3108        .update(txn)
3109        .await?;
3110    }
3111
3112    Ok(())
3113}