risingwave_meta/controller/
streaming_job.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_common::util::stream_graph_visitor::{
28    visit_stream_node_body, visit_stream_node_mut,
29};
30use risingwave_common::{bail, current_cluster_version};
31use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
32use risingwave_connector::connector_common::validate_connection;
33use risingwave_connector::error::ConnectorError;
34use risingwave_connector::sink::file_sink::fs::FsSink;
35use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
36use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
37use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
40use risingwave_meta_model::refresh_job::RefreshState;
41use risingwave_meta_model::streaming_job::BackfillOrders;
42use risingwave_meta_model::table::TableType;
43use risingwave_meta_model::user_privilege::Action;
44use risingwave_meta_model::*;
45use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
46use risingwave_pb::ddl_service::streaming_job_resource_type;
47use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
48use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
49use risingwave_pb::meta::object::PbObjectInfo;
50use risingwave_pb::meta::subscribe_response::{
51    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
52};
53use risingwave_pb::meta::{PbObject, PbObjectGroup};
54use risingwave_pb::plan_common::PbColumnCatalog;
55use risingwave_pb::plan_common::source_refresh_mode::{
56    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
57};
58use risingwave_pb::secret::PbSecretRef;
59use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
60use risingwave_pb::stream_plan::stream_node::PbNodeBody;
61use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
62use risingwave_pb::user::PbUserInfo;
63use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
64use risingwave_sqlparser::parser::{Parser, ParserError};
65use sea_orm::ActiveValue::Set;
66use sea_orm::sea_query::{Expr, Query, SimpleExpr};
67use sea_orm::{
68    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
69    NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
70};
71use thiserror_ext::AsReport;
72
73use super::rename::IndexItemRewriter;
74use crate::barrier::Command;
75use crate::controller::ObjectModel;
76use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
77use crate::controller::fragment::FragmentTypeMaskExt;
78use crate::controller::utils::{
79    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
80    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
81    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
82    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
83    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
84};
85use crate::error::MetaErrorInner;
86use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
87use crate::model::{
88    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
89    StreamJobFragmentsToCreate,
90};
91use crate::stream::SplitAssignment;
92use crate::{MetaError, MetaResult};
93
94/// A planned fragment update for dependent sources when a connection's properties change.
95///
96/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
97/// `type_complexity` warnings.
98#[derive(Debug)]
99struct DependentSourceFragmentUpdate {
100    job_ids: Vec<JobId>,
101    with_properties: BTreeMap<String, String>,
102    secret_refs: BTreeMap<String, PbSecretRef>,
103    is_shared_source: bool,
104}
105
106impl CatalogController {
107    #[allow(clippy::too_many_arguments)]
108    pub async fn create_streaming_job_obj(
109        txn: &DatabaseTransaction,
110        obj_type: ObjectType,
111        owner_id: UserId,
112        database_id: Option<DatabaseId>,
113        schema_id: Option<SchemaId>,
114        create_type: PbCreateType,
115        ctx: StreamContext,
116        streaming_parallelism: StreamingParallelism,
117        max_parallelism: usize,
118        resource_type: streaming_job_resource_type::ResourceType,
119        backfill_parallelism: Option<StreamingParallelism>,
120    ) -> MetaResult<JobId> {
121        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
122        let job_id = obj.oid.as_job_id();
123        let specific_resource_group = resource_type.resource_group();
124        let is_serverless_backfill = matches!(
125            &resource_type,
126            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
127        );
128        let job = streaming_job::ActiveModel {
129            job_id: Set(job_id),
130            job_status: Set(JobStatus::Initial),
131            create_type: Set(create_type.into()),
132            timezone: Set(ctx.timezone),
133            config_override: Set(Some(ctx.config_override.to_string())),
134            parallelism: Set(streaming_parallelism),
135            backfill_parallelism: Set(backfill_parallelism),
136            backfill_orders: Set(None),
137            max_parallelism: Set(max_parallelism as _),
138            specific_resource_group: Set(specific_resource_group),
139            is_serverless_backfill: Set(is_serverless_backfill),
140        };
141        job.insert(txn).await?;
142
143        Ok(job_id)
144    }
145
146    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
147    /// materialized view.
148    ///
149    /// Some of the fields in the given streaming job are placeholders, which will
150    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
151    #[await_tree::instrument]
152    pub async fn create_job_catalog(
153        &self,
154        streaming_job: &mut StreamingJob,
155        ctx: &StreamContext,
156        parallelism: &Option<Parallelism>,
157        max_parallelism: usize,
158        mut dependencies: HashSet<ObjectId>,
159        resource_type: streaming_job_resource_type::ResourceType,
160        backfill_parallelism: &Option<Parallelism>,
161    ) -> MetaResult<()> {
162        let inner = self.inner.write().await;
163        let txn = inner.db.begin().await?;
164        let create_type = streaming_job.create_type();
165
166        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
167            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
168            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
169            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
170        };
171        let backfill_parallelism = backfill_parallelism
172            .as_ref()
173            .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
174
175        ensure_user_id(streaming_job.owner() as _, &txn).await?;
176        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
177        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
178        check_relation_name_duplicate(
179            &streaming_job.name(),
180            streaming_job.database_id(),
181            streaming_job.schema_id(),
182            &txn,
183        )
184        .await?;
185
186        // check if any dependency is in altering status.
187        if !dependencies.is_empty() {
188            let altering_cnt = ObjectDependency::find()
189                .join(
190                    JoinType::InnerJoin,
191                    object_dependency::Relation::Object1.def(),
192                )
193                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
194                .filter(
195                    object_dependency::Column::Oid
196                        .is_in(dependencies.clone())
197                        .and(object::Column::ObjType.eq(ObjectType::Table))
198                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
199                        .and(
200                            // It means the referring table is just dummy for altering.
201                            object::Column::Oid.not_in_subquery(
202                                Query::select()
203                                    .column(table::Column::TableId)
204                                    .from(Table)
205                                    .to_owned(),
206                            ),
207                        ),
208                )
209                .count(&txn)
210                .await?;
211            if altering_cnt != 0 {
212                return Err(MetaError::permission_denied(
213                    "some dependent relations are being altered",
214                ));
215            }
216        }
217
218        match streaming_job {
219            StreamingJob::MaterializedView(table) => {
220                let job_id = Self::create_streaming_job_obj(
221                    &txn,
222                    ObjectType::Table,
223                    table.owner as _,
224                    Some(table.database_id),
225                    Some(table.schema_id),
226                    create_type,
227                    ctx.clone(),
228                    streaming_parallelism,
229                    max_parallelism,
230                    resource_type,
231                    backfill_parallelism.clone(),
232                )
233                .await?;
234                table.id = job_id.as_mv_table_id();
235                let table_model: table::ActiveModel = table.clone().into();
236                Table::insert(table_model).exec(&txn).await?;
237            }
238            StreamingJob::Sink(sink) => {
239                if let Some(target_table_id) = sink.target_table
240                    && check_sink_into_table_cycle(
241                        target_table_id.into(),
242                        dependencies.iter().cloned().collect(),
243                        &txn,
244                    )
245                    .await?
246                {
247                    bail!("Creating such a sink will result in circular dependency.");
248                }
249
250                let job_id = Self::create_streaming_job_obj(
251                    &txn,
252                    ObjectType::Sink,
253                    sink.owner as _,
254                    Some(sink.database_id),
255                    Some(sink.schema_id),
256                    create_type,
257                    ctx.clone(),
258                    streaming_parallelism,
259                    max_parallelism,
260                    resource_type,
261                    backfill_parallelism.clone(),
262                )
263                .await?;
264                sink.id = job_id.as_sink_id();
265                let sink_model: sink::ActiveModel = sink.clone().into();
266                Sink::insert(sink_model).exec(&txn).await?;
267            }
268            StreamingJob::Table(src, table, _) => {
269                let job_id = Self::create_streaming_job_obj(
270                    &txn,
271                    ObjectType::Table,
272                    table.owner as _,
273                    Some(table.database_id),
274                    Some(table.schema_id),
275                    create_type,
276                    ctx.clone(),
277                    streaming_parallelism,
278                    max_parallelism,
279                    resource_type,
280                    backfill_parallelism.clone(),
281                )
282                .await?;
283                table.id = job_id.as_mv_table_id();
284                if let Some(src) = src {
285                    let src_obj = Self::create_object(
286                        &txn,
287                        ObjectType::Source,
288                        src.owner as _,
289                        Some(src.database_id),
290                        Some(src.schema_id),
291                    )
292                    .await?;
293                    src.id = src_obj.oid.as_source_id();
294                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
295                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
296                    let source: source::ActiveModel = src.clone().into();
297                    Source::insert(source).exec(&txn).await?;
298                }
299                let table_model: table::ActiveModel = table.clone().into();
300                Table::insert(table_model).exec(&txn).await?;
301
302                if table.refreshable {
303                    let trigger_interval_secs = src
304                        .as_ref()
305                        .and_then(|source_catalog| source_catalog.refresh_mode)
306                        .and_then(
307                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
308                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
309                                    refresh_interval_sec,
310                                })) => refresh_interval_sec,
311                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
312                                None => None,
313                            },
314                        );
315
316                    RefreshJob::insert(refresh_job::ActiveModel {
317                        table_id: Set(table.id),
318                        last_trigger_time: Set(None),
319                        trigger_interval_secs: Set(trigger_interval_secs),
320                        current_status: Set(RefreshState::Idle),
321                        last_success_time: Set(None),
322                    })
323                    .exec(&txn)
324                    .await?;
325                }
326            }
327            StreamingJob::Index(index, table) => {
328                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
329                let job_id = Self::create_streaming_job_obj(
330                    &txn,
331                    ObjectType::Index,
332                    index.owner as _,
333                    Some(index.database_id),
334                    Some(index.schema_id),
335                    create_type,
336                    ctx.clone(),
337                    streaming_parallelism,
338                    max_parallelism,
339                    resource_type,
340                    backfill_parallelism.clone(),
341                )
342                .await?;
343                // to be compatible with old implementation.
344                index.id = job_id.as_index_id();
345                index.index_table_id = job_id.as_mv_table_id();
346                table.id = job_id.as_mv_table_id();
347
348                ObjectDependency::insert(object_dependency::ActiveModel {
349                    oid: Set(index.primary_table_id.into()),
350                    used_by: Set(table.id.into()),
351                    ..Default::default()
352                })
353                .exec(&txn)
354                .await?;
355
356                let table_model: table::ActiveModel = table.clone().into();
357                Table::insert(table_model).exec(&txn).await?;
358                let index_model: index::ActiveModel = index.clone().into();
359                Index::insert(index_model).exec(&txn).await?;
360            }
361            StreamingJob::Source(src) => {
362                let job_id = Self::create_streaming_job_obj(
363                    &txn,
364                    ObjectType::Source,
365                    src.owner as _,
366                    Some(src.database_id),
367                    Some(src.schema_id),
368                    create_type,
369                    ctx.clone(),
370                    streaming_parallelism,
371                    max_parallelism,
372                    resource_type,
373                    backfill_parallelism.clone(),
374                )
375                .await?;
376                src.id = job_id.as_shared_source_id();
377                let source_model: source::ActiveModel = src.clone().into();
378                Source::insert(source_model).exec(&txn).await?;
379            }
380        }
381
382        // collect dependent secrets.
383        dependencies.extend(
384            streaming_job
385                .dependent_secret_ids()?
386                .into_iter()
387                .map(|id| id.as_object_id()),
388        );
389        // collect dependent connection
390        dependencies.extend(
391            streaming_job
392                .dependent_connection_ids()?
393                .into_iter()
394                .map(|id| id.as_object_id()),
395        );
396
397        // record object dependency.
398        if !dependencies.is_empty() {
399            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
400                object_dependency::ActiveModel {
401                    oid: Set(oid),
402                    used_by: Set(streaming_job.id().as_object_id()),
403                    ..Default::default()
404                }
405            }))
406            .exec(&txn)
407            .await?;
408        }
409
410        txn.commit().await?;
411
412        Ok(())
413    }
414
415    /// Create catalogs for internal tables, then notify frontend about them if the job is a
416    /// materialized view.
417    ///
418    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
419    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
420    ///
421    /// Returns a mapping from the temporary table id to the actual global table id.
422    pub async fn create_internal_table_catalog(
423        &self,
424        job: &StreamingJob,
425        mut incomplete_internal_tables: Vec<PbTable>,
426    ) -> MetaResult<HashMap<TableId, TableId>> {
427        let job_id = job.id();
428        let inner = self.inner.write().await;
429        let txn = inner.db.begin().await?;
430
431        // Ensure the job exists.
432        ensure_job_not_canceled(job_id, &txn).await?;
433
434        let mut table_id_map = HashMap::new();
435        for table in &mut incomplete_internal_tables {
436            let table_id = Self::create_object(
437                &txn,
438                ObjectType::Table,
439                table.owner as _,
440                Some(table.database_id),
441                Some(table.schema_id),
442            )
443            .await?
444            .oid
445            .as_table_id();
446            table_id_map.insert(table.id, table_id);
447            table.id = table_id;
448            table.job_id = Some(job_id);
449
450            let table_model = table::ActiveModel {
451                table_id: Set(table_id),
452                belongs_to_job_id: Set(Some(job_id)),
453                fragment_id: NotSet,
454                ..table.clone().into()
455            };
456            Table::insert(table_model).exec(&txn).await?;
457        }
458        txn.commit().await?;
459
460        Ok(table_id_map)
461    }
462
463    pub async fn prepare_stream_job_fragments(
464        &self,
465        stream_job_fragments: &StreamJobFragmentsToCreate,
466        streaming_job: &StreamingJob,
467        for_replace: bool,
468        backfill_orders: Option<BackfillOrders>,
469    ) -> MetaResult<()> {
470        self.prepare_streaming_job(
471            stream_job_fragments.stream_job_id(),
472            || stream_job_fragments.fragments.values(),
473            &stream_job_fragments.downstreams,
474            for_replace,
475            Some(streaming_job),
476            backfill_orders,
477        )
478        .await
479    }
480
481    // TODO: In this function, we also update the `Table` model in the meta store.
482    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
483    // making them the source of truth and performing a full replacement for those in the meta store?
484    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
485    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
486    )]
487    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
488        &self,
489        job_id: JobId,
490        get_fragments: impl Fn() -> I + 'a,
491        downstreams: &FragmentDownstreamRelation,
492        for_replace: bool,
493        creating_streaming_job: Option<&'a StreamingJob>,
494        backfill_orders: Option<BackfillOrders>,
495    ) -> MetaResult<()> {
496        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
497
498        let inner = self.inner.write().await;
499
500        let need_notify = creating_streaming_job
501            .map(|job| job.should_notify_creating())
502            .unwrap_or(false);
503        let definition = creating_streaming_job.map(|job| job.definition());
504
505        let mut objects_to_notify = vec![];
506        let txn = inner.db.begin().await?;
507
508        // Ensure the job exists.
509        ensure_job_not_canceled(job_id, &txn).await?;
510
511        if let Some(backfill_orders) = backfill_orders {
512            let job = streaming_job::ActiveModel {
513                job_id: Set(job_id),
514                backfill_orders: Set(Some(backfill_orders)),
515                ..Default::default()
516            };
517            job.update(&txn).await?;
518        }
519
520        let state_table_ids = fragments
521            .iter()
522            .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
523            .collect_vec();
524
525        if !fragments.is_empty() {
526            let fragment_models = fragments
527                .into_iter()
528                .map(|fragment| fragment.into_active_model())
529                .collect_vec();
530            Fragment::insert_many(fragment_models).exec(&txn).await?;
531        }
532
533        // Fields including `fragment_id` and `vnode_count` were placeholder values before.
534        // After table fragments are created, update them for all tables.
535        if !for_replace {
536            let all_tables = StreamJobFragments::collect_tables(get_fragments());
537            for state_table_id in state_table_ids {
538                // Table's vnode count is not always the fragment's vnode count, so we have to
539                // look up the table from `TableFragments`.
540                // See `ActorGraphBuilder::new`.
541                let table = all_tables
542                    .get(&state_table_id)
543                    .unwrap_or_else(|| panic!("table {} not found", state_table_id));
544                assert_eq!(table.id, state_table_id);
545                let vnode_count = table.vnode_count();
546
547                table::ActiveModel {
548                    table_id: Set(state_table_id as _),
549                    fragment_id: Set(Some(table.fragment_id)),
550                    vnode_count: Set(vnode_count as _),
551                    ..Default::default()
552                }
553                .update(&txn)
554                .await?;
555
556                if need_notify {
557                    let mut table = table.clone();
558                    // In production, definition was replaced but still needed for notification.
559                    if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
560                        table.definition = definition.clone().unwrap();
561                    }
562                    objects_to_notify.push(PbObject {
563                        object_info: Some(PbObjectInfo::Table(table)),
564                    });
565                }
566            }
567        }
568
569        // Add streaming job objects to notification
570        if need_notify {
571            match creating_streaming_job.unwrap() {
572                StreamingJob::Table(Some(source), ..) => {
573                    objects_to_notify.push(PbObject {
574                        object_info: Some(PbObjectInfo::Source(source.clone())),
575                    });
576                }
577                StreamingJob::Sink(sink) => {
578                    objects_to_notify.push(PbObject {
579                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
580                    });
581                }
582                StreamingJob::Index(index, _) => {
583                    objects_to_notify.push(PbObject {
584                        object_info: Some(PbObjectInfo::Index(index.clone())),
585                    });
586                }
587                _ => {}
588            }
589        }
590
591        insert_fragment_relations(&txn, downstreams).await?;
592
593        if !for_replace {
594            // Update dml fragment id.
595            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
596                Table::update(table::ActiveModel {
597                    table_id: Set(table.id),
598                    dml_fragment_id: Set(table.dml_fragment_id),
599                    ..Default::default()
600                })
601                .exec(&txn)
602                .await?;
603            }
604        }
605
606        txn.commit().await?;
607
608        // FIXME: there's a gap between the catalog creation and notification, which may lead to
609        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
610        // need to either forward the notification or filter catalogs without any fragments in the metastore.
611        if !objects_to_notify.is_empty() {
612            self.notify_frontend(
613                Operation::Add,
614                Info::ObjectGroup(PbObjectGroup {
615                    objects: objects_to_notify,
616                }),
617            )
618            .await;
619        }
620
621        Ok(())
622    }
623
624    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
625    /// information is required to build barrier mutation.
626    pub async fn build_cancel_command(
627        &self,
628        table_fragments: &StreamJobFragments,
629    ) -> MetaResult<Command> {
630        let inner = self.inner.read().await;
631        let txn = inner.db.begin().await?;
632
633        let dropped_sink_fragment_with_target =
634            if let Some(sink_fragment) = table_fragments.sink_fragment() {
635                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
636                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
637                sink_target_fragment
638                    .get(&sink_fragment_id)
639                    .map(|target_fragments| {
640                        let target_fragment_id = *target_fragments
641                            .first()
642                            .expect("sink should have at least one downstream fragment");
643                        (sink_fragment_id, target_fragment_id)
644                    })
645            } else {
646                None
647            };
648
649        Ok(Command::DropStreamingJobs {
650            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
651            actors: table_fragments.actor_ids().collect(),
652            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
653            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
654            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
655                .into_iter()
656                .map(|(sink, target)| (target as _, vec![sink as _]))
657                .collect(),
658        })
659    }
660
661    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
662    /// It returns (true, _) if the job is not found or aborted.
663    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
664    #[await_tree::instrument]
665    pub async fn try_abort_creating_streaming_job(
666        &self,
667        mut job_id: JobId,
668        is_cancelled: bool,
669    ) -> MetaResult<(bool, Option<DatabaseId>)> {
670        let mut inner = self.inner.write().await;
671        let txn = inner.db.begin().await?;
672
673        let obj = Object::find_by_id(job_id).one(&txn).await?;
674        let Some(obj) = obj else {
675            tracing::warn!(
676                id = %job_id,
677                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
678            );
679            return Ok((true, None));
680        };
681        let database_id = obj
682            .database_id
683            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
684        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
685
686        if !is_cancelled && let Some(streaming_job) = &streaming_job {
687            assert_ne!(streaming_job.job_status, JobStatus::Created);
688            if streaming_job.create_type == CreateType::Background
689                && streaming_job.job_status == JobStatus::Creating
690            {
691                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
692                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
693                {
694                    // If the job belongs to an iceberg table, we still need to clean it.
695                } else {
696                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
697                    tracing::warn!(
698                        id = %job_id,
699                        "streaming job is created in background and still in creating status"
700                    );
701                    return Ok((false, Some(database_id)));
702                }
703            }
704        }
705
706        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
707        let original_job_id = job_id;
708        let original_obj_type = obj.obj_type;
709
710        let iceberg_table_id =
711            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
712        if let Some(iceberg_table_id) = iceberg_table_id {
713            // If the job is iceberg sink, we need to clean the iceberg table as well.
714            // Here we will drop the sink objects directly.
715            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
716            Object::delete_many()
717                .filter(
718                    object::Column::Oid
719                        .eq(job_id)
720                        .or(object::Column::Oid.is_in(internal_tables)),
721                )
722                .exec(&txn)
723                .await?;
724            job_id = iceberg_table_id.as_job_id();
725        };
726
727        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
728
729        // Get the notification info if the job is a materialized view or created in the background.
730        let mut objs = vec![];
731        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
732
733        let mut need_notify =
734            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
735        if !need_notify {
736            if let Some(table) = &table_obj {
737                need_notify = table.table_type == TableType::MaterializedView;
738            } else if original_obj_type == ObjectType::Sink {
739                need_notify = true;
740            }
741        }
742
743        if is_cancelled {
744            let dropped_tables = Table::find()
745                .find_also_related(Object)
746                .filter(
747                    table::Column::TableId.is_in(
748                        internal_table_ids
749                            .iter()
750                            .cloned()
751                            .chain(table_obj.iter().map(|t| t.table_id as _)),
752                    ),
753                )
754                .all(&txn)
755                .await?
756                .into_iter()
757                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
758            inner
759                .dropped_tables
760                .extend(dropped_tables.map(|t| (t.id, t)));
761        }
762
763        if need_notify {
764            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
765            // Ensure we still notify the frontend to delete the original sink object.
766            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
767                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
768                    .select_only()
769                    .columns([
770                        object::Column::Oid,
771                        object::Column::ObjType,
772                        object::Column::SchemaId,
773                        object::Column::DatabaseId,
774                    ])
775                    .into_partial_model()
776                    .one(&txn)
777                    .await?;
778                if let Some(orig_obj) = orig_obj {
779                    objs.push(orig_obj);
780                }
781            }
782
783            let obj: Option<PartialObject> = Object::find_by_id(job_id)
784                .select_only()
785                .columns([
786                    object::Column::Oid,
787                    object::Column::ObjType,
788                    object::Column::SchemaId,
789                    object::Column::DatabaseId,
790                ])
791                .into_partial_model()
792                .one(&txn)
793                .await?;
794            let obj =
795                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
796            objs.push(obj);
797            let internal_table_objs: Vec<PartialObject> = Object::find()
798                .select_only()
799                .columns([
800                    object::Column::Oid,
801                    object::Column::ObjType,
802                    object::Column::SchemaId,
803                    object::Column::DatabaseId,
804                ])
805                .join(JoinType::InnerJoin, object::Relation::Table.def())
806                .filter(table::Column::BelongsToJobId.eq(job_id))
807                .into_partial_model()
808                .all(&txn)
809                .await?;
810            objs.extend(internal_table_objs);
811        }
812
813        // Check if the job is creating sink into table.
814        if table_obj.is_none()
815            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
816                .select_only()
817                .column(sink::Column::TargetTable)
818                .into_tuple::<Option<TableId>>()
819                .one(&txn)
820                .await?
821        {
822            let tmp_id: Option<ObjectId> = ObjectDependency::find()
823                .select_only()
824                .column(object_dependency::Column::UsedBy)
825                .join(
826                    JoinType::InnerJoin,
827                    object_dependency::Relation::Object1.def(),
828                )
829                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
830                .filter(
831                    object_dependency::Column::Oid
832                        .eq(target_table_id)
833                        .and(object::Column::ObjType.eq(ObjectType::Table))
834                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
835                )
836                .into_tuple()
837                .one(&txn)
838                .await?;
839            if let Some(tmp_id) = tmp_id {
840                tracing::warn!(
841                    id = %tmp_id,
842                    "aborting temp streaming job for sink into table"
843                );
844
845                Object::delete_by_id(tmp_id).exec(&txn).await?;
846            }
847        }
848
849        Object::delete_by_id(job_id).exec(&txn).await?;
850        if !internal_table_ids.is_empty() {
851            Object::delete_many()
852                .filter(object::Column::Oid.is_in(internal_table_ids))
853                .exec(&txn)
854                .await?;
855        }
856        if let Some(t) = &table_obj
857            && let Some(source_id) = t.optional_associated_source_id
858        {
859            Object::delete_by_id(source_id).exec(&txn).await?;
860        }
861
862        let err = if is_cancelled {
863            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
864        } else {
865            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
866        };
867        let abort_reason = format!("streaming job aborted {}", err.as_report());
868        for tx in inner
869            .creating_table_finish_notifier
870            .get_mut(&database_id)
871            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
872            .into_iter()
873            .flatten()
874            .flatten()
875        {
876            let _ = tx.send(Err(abort_reason.clone()));
877        }
878        txn.commit().await?;
879
880        if !objs.is_empty() {
881            // We also have notified the frontend about these objects,
882            // so we need to notify the frontend to delete them here.
883            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
884                .await;
885        }
886        Ok((true, Some(database_id)))
887    }
888
889    #[await_tree::instrument]
890    pub async fn post_collect_job_fragments(
891        &self,
892        job_id: JobId,
893        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
894        new_sink_downstream: Option<FragmentDownstreamRelation>,
895        split_assignment: Option<&SplitAssignment>,
896    ) -> MetaResult<()> {
897        let inner = self.inner.write().await;
898        let txn = inner.db.begin().await?;
899
900        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
901
902        if let Some(new_downstream) = new_sink_downstream {
903            insert_fragment_relations(&txn, &new_downstream).await?;
904        }
905
906        // Mark job as CREATING.
907        streaming_job::ActiveModel {
908            job_id: Set(job_id),
909            job_status: Set(JobStatus::Creating),
910            ..Default::default()
911        }
912        .update(&txn)
913        .await?;
914
915        if let Some(split_assignment) = split_assignment {
916            let fragment_splits = split_assignment
917                .iter()
918                .map(|(fragment_id, splits)| {
919                    (
920                        *fragment_id as _,
921                        splits.values().flatten().cloned().collect_vec(),
922                    )
923                })
924                .collect();
925
926            self.update_fragment_splits(&txn, &fragment_splits).await?;
927        }
928
929        txn.commit().await?;
930
931        Ok(())
932    }
933
934    pub async fn create_job_catalog_for_replace(
935        &self,
936        streaming_job: &StreamingJob,
937        ctx: Option<&StreamContext>,
938        specified_parallelism: Option<&NonZeroUsize>,
939        expected_original_max_parallelism: Option<usize>,
940    ) -> MetaResult<JobId> {
941        let id = streaming_job.id();
942        let inner = self.inner.write().await;
943        let txn = inner.db.begin().await?;
944
945        // 1. check version.
946        streaming_job.verify_version_for_replace(&txn).await?;
947        // 2. check concurrent replace.
948        let referring_cnt = ObjectDependency::find()
949            .join(
950                JoinType::InnerJoin,
951                object_dependency::Relation::Object1.def(),
952            )
953            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
954            .filter(
955                object_dependency::Column::Oid
956                    .eq(id)
957                    .and(object::Column::ObjType.eq(ObjectType::Table))
958                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
959            )
960            .count(&txn)
961            .await?;
962        if referring_cnt != 0 {
963            return Err(MetaError::permission_denied(
964                "job is being altered or referenced by some creating jobs",
965            ));
966        }
967
968        // 3. check parallelism.
969        let (original_max_parallelism, original_timezone, original_config_override): (
970            i32,
971            Option<String>,
972            Option<String>,
973        ) = StreamingJobModel::find_by_id(id)
974            .select_only()
975            .column(streaming_job::Column::MaxParallelism)
976            .column(streaming_job::Column::Timezone)
977            .column(streaming_job::Column::ConfigOverride)
978            .into_tuple()
979            .one(&txn)
980            .await?
981            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
982
983        if let Some(max_parallelism) = expected_original_max_parallelism
984            && original_max_parallelism != max_parallelism as i32
985        {
986            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
987            // This should not happen in normal cases.
988            bail!(
989                "cannot use a different max parallelism \
990                 when replacing streaming job, \
991                 original: {}, new: {}",
992                original_max_parallelism,
993                max_parallelism
994            );
995        }
996
997        let parallelism = match specified_parallelism {
998            None => StreamingParallelism::Adaptive,
999            Some(n) => StreamingParallelism::Fixed(n.get() as _),
1000        };
1001
1002        let ctx = StreamContext {
1003            timezone: ctx
1004                .map(|ctx| ctx.timezone.clone())
1005                .unwrap_or(original_timezone),
1006            // We don't expect replacing a job with a different config override.
1007            // Thus always use the original config override.
1008            config_override: original_config_override.unwrap_or_default().into(),
1009        };
1010
1011        // 4. create streaming object for new replace table.
1012        let new_obj_id = Self::create_streaming_job_obj(
1013            &txn,
1014            streaming_job.object_type(),
1015            streaming_job.owner() as _,
1016            Some(streaming_job.database_id() as _),
1017            Some(streaming_job.schema_id() as _),
1018            streaming_job.create_type(),
1019            ctx,
1020            parallelism,
1021            original_max_parallelism as _,
1022            streaming_job_resource_type::ResourceType::Regular(true),
1023            None,
1024        )
1025        .await?;
1026
1027        // 5. record dependency for new replace table.
1028        ObjectDependency::insert(object_dependency::ActiveModel {
1029            oid: Set(id.as_object_id()),
1030            used_by: Set(new_obj_id.as_object_id()),
1031            ..Default::default()
1032        })
1033        .exec(&txn)
1034        .await?;
1035
1036        txn.commit().await?;
1037
1038        Ok(new_obj_id)
1039    }
1040
1041    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1042    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1043        let mut inner = self.inner.write().await;
1044        let txn = inner.db.begin().await?;
1045
1046        // Check if the job belongs to iceberg table.
1047        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1048            tracing::info!(
1049                "streaming job {} is for iceberg table, wait for manual finish operation",
1050                job_id
1051            );
1052            return Ok(());
1053        }
1054
1055        let (notification_op, objects, updated_user_info) =
1056            Self::finish_streaming_job_inner(&txn, job_id).await?;
1057
1058        txn.commit().await?;
1059
1060        let mut version = self
1061            .notify_frontend(
1062                notification_op,
1063                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1064            )
1065            .await;
1066
1067        // notify users about the default privileges
1068        if !updated_user_info.is_empty() {
1069            version = self.notify_users_update(updated_user_info).await;
1070        }
1071
1072        inner
1073            .creating_table_finish_notifier
1074            .values_mut()
1075            .for_each(|creating_tables| {
1076                if let Some(txs) = creating_tables.remove(&job_id) {
1077                    for tx in txs {
1078                        let _ = tx.send(Ok(version));
1079                    }
1080                }
1081            });
1082
1083        Ok(())
1084    }
1085
1086    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1087    pub async fn finish_streaming_job_inner(
1088        txn: &DatabaseTransaction,
1089        job_id: JobId,
1090    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1091        let job_type = Object::find_by_id(job_id)
1092            .select_only()
1093            .column(object::Column::ObjType)
1094            .into_tuple()
1095            .one(txn)
1096            .await?
1097            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1098
1099        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1100            .select_only()
1101            .column(streaming_job::Column::CreateType)
1102            .into_tuple()
1103            .one(txn)
1104            .await?
1105            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1106
1107        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1108        let res = Object::update_many()
1109            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1110            .col_expr(
1111                object::Column::CreatedAtClusterVersion,
1112                current_cluster_version().into(),
1113            )
1114            .filter(object::Column::Oid.eq(job_id))
1115            .exec(txn)
1116            .await?;
1117        if res.rows_affected == 0 {
1118            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1119        }
1120
1121        // mark the target stream job as `Created`.
1122        let job = streaming_job::ActiveModel {
1123            job_id: Set(job_id),
1124            job_status: Set(JobStatus::Created),
1125            ..Default::default()
1126        };
1127        let streaming_job = Some(job.update(txn).await?);
1128
1129        // notify frontend: job, internal tables.
1130        let internal_table_objs = Table::find()
1131            .find_also_related(Object)
1132            .filter(table::Column::BelongsToJobId.eq(job_id))
1133            .all(txn)
1134            .await?;
1135        let mut objects = internal_table_objs
1136            .iter()
1137            .map(|(table, obj)| PbObject {
1138                object_info: Some(PbObjectInfo::Table(
1139                    ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1140                )),
1141            })
1142            .collect_vec();
1143        let mut notification_op = if create_type == CreateType::Background {
1144            NotificationOperation::Update
1145        } else {
1146            NotificationOperation::Add
1147        };
1148        let mut updated_user_info = vec![];
1149        let mut need_grant_default_privileges = true;
1150
1151        match job_type {
1152            ObjectType::Table => {
1153                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1154                    .find_also_related(Object)
1155                    .one(txn)
1156                    .await?
1157                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1158                if table.table_type == TableType::MaterializedView {
1159                    notification_op = NotificationOperation::Update;
1160                }
1161
1162                if let Some(source_id) = table.optional_associated_source_id {
1163                    let (src, obj) = Source::find_by_id(source_id)
1164                        .find_also_related(Object)
1165                        .one(txn)
1166                        .await?
1167                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1168                    objects.push(PbObject {
1169                        object_info: Some(PbObjectInfo::Source(
1170                            ObjectModel(src, obj.unwrap(), None).into(),
1171                        )),
1172                    });
1173                }
1174                objects.push(PbObject {
1175                    object_info: Some(PbObjectInfo::Table(
1176                        ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1177                    )),
1178                });
1179            }
1180            ObjectType::Sink => {
1181                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1182                    .find_also_related(Object)
1183                    .one(txn)
1184                    .await?
1185                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1186                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1187                    need_grant_default_privileges = false;
1188                }
1189                // If sinks were pre-notified during CREATING, we should use Update at finish
1190                // to avoid duplicate Add notifications (align with MV behavior).
1191                notification_op = NotificationOperation::Update;
1192                objects.push(PbObject {
1193                    object_info: Some(PbObjectInfo::Sink(
1194                        ObjectModel(sink, obj.unwrap(), streaming_job.clone()).into(),
1195                    )),
1196                });
1197            }
1198            ObjectType::Index => {
1199                need_grant_default_privileges = false;
1200                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1201                    .find_also_related(Object)
1202                    .one(txn)
1203                    .await?
1204                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1205                {
1206                    let (table, obj) = Table::find_by_id(index.index_table_id)
1207                        .find_also_related(Object)
1208                        .one(txn)
1209                        .await?
1210                        .ok_or_else(|| {
1211                            MetaError::catalog_id_not_found("table", index.index_table_id)
1212                        })?;
1213                    objects.push(PbObject {
1214                        object_info: Some(PbObjectInfo::Table(
1215                            ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1216                        )),
1217                    });
1218                }
1219
1220                // If the index is created on a table with privileges, we should also
1221                // grant the privileges for the index and its state tables.
1222                let primary_table_privileges = UserPrivilege::find()
1223                    .filter(
1224                        user_privilege::Column::Oid
1225                            .eq(index.primary_table_id)
1226                            .and(user_privilege::Column::Action.eq(Action::Select)),
1227                    )
1228                    .all(txn)
1229                    .await?;
1230                if !primary_table_privileges.is_empty() {
1231                    let index_state_table_ids: Vec<TableId> = Table::find()
1232                        .select_only()
1233                        .column(table::Column::TableId)
1234                        .filter(
1235                            table::Column::BelongsToJobId
1236                                .eq(job_id)
1237                                .or(table::Column::TableId.eq(index.index_table_id)),
1238                        )
1239                        .into_tuple()
1240                        .all(txn)
1241                        .await?;
1242                    let mut new_privileges = vec![];
1243                    for privilege in &primary_table_privileges {
1244                        for state_table_id in &index_state_table_ids {
1245                            new_privileges.push(user_privilege::ActiveModel {
1246                                id: Default::default(),
1247                                oid: Set(state_table_id.as_object_id()),
1248                                user_id: Set(privilege.user_id),
1249                                action: Set(Action::Select),
1250                                dependent_id: Set(privilege.dependent_id),
1251                                granted_by: Set(privilege.granted_by),
1252                                with_grant_option: Set(privilege.with_grant_option),
1253                            });
1254                        }
1255                    }
1256                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1257
1258                    updated_user_info = list_user_info_by_ids(
1259                        primary_table_privileges.into_iter().map(|p| p.user_id),
1260                        txn,
1261                    )
1262                    .await?;
1263                }
1264
1265                objects.push(PbObject {
1266                    object_info: Some(PbObjectInfo::Index(
1267                        ObjectModel(index, obj.unwrap(), streaming_job.clone()).into(),
1268                    )),
1269                });
1270            }
1271            ObjectType::Source => {
1272                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1273                    .find_also_related(Object)
1274                    .one(txn)
1275                    .await?
1276                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1277                objects.push(PbObject {
1278                    object_info: Some(PbObjectInfo::Source(
1279                        ObjectModel(source, obj.unwrap(), None).into(),
1280                    )),
1281                });
1282            }
1283            _ => unreachable!("invalid job type: {:?}", job_type),
1284        }
1285
1286        if need_grant_default_privileges {
1287            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1288        }
1289
1290        Ok((notification_op, objects, updated_user_info))
1291    }
1292
1293    pub async fn finish_replace_streaming_job(
1294        &self,
1295        tmp_id: JobId,
1296        streaming_job: StreamingJob,
1297        replace_upstream: FragmentReplaceUpstream,
1298        sink_into_table_context: SinkIntoTableContext,
1299        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1300        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1301    ) -> MetaResult<NotificationVersion> {
1302        let inner = self.inner.write().await;
1303        let txn = inner.db.begin().await?;
1304
1305        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1306            tmp_id,
1307            replace_upstream,
1308            sink_into_table_context,
1309            &txn,
1310            streaming_job,
1311            drop_table_connector_ctx,
1312            auto_refresh_schema_sinks,
1313        )
1314        .await?;
1315
1316        txn.commit().await?;
1317
1318        let mut version = self
1319            .notify_frontend(
1320                NotificationOperation::Update,
1321                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1322            )
1323            .await;
1324
1325        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1326            self.notify_users_update(user_infos).await;
1327            version = self
1328                .notify_frontend(
1329                    NotificationOperation::Delete,
1330                    build_object_group_for_delete(to_drop_objects),
1331                )
1332                .await;
1333        }
1334
1335        Ok(version)
1336    }
1337
1338    pub async fn finish_replace_streaming_job_inner(
1339        tmp_id: JobId,
1340        replace_upstream: FragmentReplaceUpstream,
1341        SinkIntoTableContext {
1342            updated_sink_catalogs,
1343        }: SinkIntoTableContext,
1344        txn: &DatabaseTransaction,
1345        streaming_job: StreamingJob,
1346        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1347        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1348    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1349        let original_job_id = streaming_job.id();
1350        let job_type = streaming_job.job_type();
1351
1352        let mut index_item_rewriter = None;
1353
1354        // Update catalog
1355        match streaming_job {
1356            StreamingJob::Table(_source, table, _table_job_type) => {
1357                // The source catalog should remain unchanged
1358
1359                let original_column_catalogs =
1360                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1361
1362                index_item_rewriter = Some({
1363                    let original_columns = original_column_catalogs
1364                        .to_protobuf()
1365                        .into_iter()
1366                        .map(|c| c.column_desc.unwrap())
1367                        .collect_vec();
1368                    let new_columns = table
1369                        .columns
1370                        .iter()
1371                        .map(|c| c.column_desc.clone().unwrap())
1372                        .collect_vec();
1373
1374                    IndexItemRewriter {
1375                        original_columns,
1376                        new_columns,
1377                    }
1378                });
1379
1380                // For sinks created in earlier versions, we need to set the original_target_columns.
1381                for sink_id in updated_sink_catalogs {
1382                    sink::ActiveModel {
1383                        sink_id: Set(sink_id as _),
1384                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1385                        ..Default::default()
1386                    }
1387                    .update(txn)
1388                    .await?;
1389                }
1390                // Update the table catalog with the new one. (column catalog is also updated here)
1391                let mut table = table::ActiveModel::from(table);
1392                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1393                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1394                {
1395                    // drop table connector, the rest logic is in `drop_table_associated_source`
1396                    table.optional_associated_source_id = Set(None);
1397                }
1398
1399                table.update(txn).await?;
1400            }
1401            StreamingJob::Source(source) => {
1402                // Update the source catalog with the new one.
1403                let source = source::ActiveModel::from(source);
1404                source.update(txn).await?;
1405            }
1406            StreamingJob::MaterializedView(table) => {
1407                // Update the table catalog with the new one.
1408                let table = table::ActiveModel::from(table);
1409                table.update(txn).await?;
1410            }
1411            _ => unreachable!(
1412                "invalid streaming job type: {:?}",
1413                streaming_job.job_type_str()
1414            ),
1415        }
1416
1417        async fn finish_fragments(
1418            txn: &DatabaseTransaction,
1419            tmp_id: JobId,
1420            original_job_id: JobId,
1421            replace_upstream: FragmentReplaceUpstream,
1422        ) -> MetaResult<()> {
1423            // 0. update internal tables
1424            // Fields including `fragment_id` were placeholder values before.
1425            // After table fragments are created, update them for all internal tables.
1426            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1427                .select_only()
1428                .columns([
1429                    fragment::Column::FragmentId,
1430                    fragment::Column::StateTableIds,
1431                ])
1432                .filter(fragment::Column::JobId.eq(tmp_id))
1433                .into_tuple()
1434                .all(txn)
1435                .await?;
1436            for (fragment_id, state_table_ids) in fragment_info {
1437                for state_table_id in state_table_ids.into_inner() {
1438                    let state_table_id = TableId::new(state_table_id as _);
1439                    table::ActiveModel {
1440                        table_id: Set(state_table_id),
1441                        fragment_id: Set(Some(fragment_id)),
1442                        // No need to update `vnode_count` because it must remain the same.
1443                        ..Default::default()
1444                    }
1445                    .update(txn)
1446                    .await?;
1447                }
1448            }
1449
1450            // 1. replace old fragments/actors with new ones.
1451            Fragment::delete_many()
1452                .filter(fragment::Column::JobId.eq(original_job_id))
1453                .exec(txn)
1454                .await?;
1455            Fragment::update_many()
1456                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1457                .filter(fragment::Column::JobId.eq(tmp_id))
1458                .exec(txn)
1459                .await?;
1460
1461            // 2. update merges.
1462            // update downstream fragment's Merge node, and upstream_fragment_id
1463            for (fragment_id, fragment_replace_map) in replace_upstream {
1464                let (fragment_id, mut stream_node) =
1465                    Fragment::find_by_id(fragment_id as FragmentId)
1466                        .select_only()
1467                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1468                        .into_tuple::<(FragmentId, StreamNode)>()
1469                        .one(txn)
1470                        .await?
1471                        .map(|(id, node)| (id, node.to_protobuf()))
1472                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1473
1474                visit_stream_node_mut(&mut stream_node, |body| {
1475                    if let PbNodeBody::Merge(m) = body
1476                        && let Some(new_fragment_id) =
1477                            fragment_replace_map.get(&m.upstream_fragment_id)
1478                    {
1479                        m.upstream_fragment_id = *new_fragment_id;
1480                    }
1481                });
1482                fragment::ActiveModel {
1483                    fragment_id: Set(fragment_id),
1484                    stream_node: Set(StreamNode::from(&stream_node)),
1485                    ..Default::default()
1486                }
1487                .update(txn)
1488                .await?;
1489            }
1490
1491            // 3. remove dummy object.
1492            Object::delete_by_id(tmp_id).exec(txn).await?;
1493
1494            Ok(())
1495        }
1496
1497        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1498
1499        // 4. update catalogs and notify.
1500        let mut objects = vec![];
1501        match job_type {
1502            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1503                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1504                    .find_also_related(Object)
1505                    .one(txn)
1506                    .await?
1507                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1508                let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1509                    .one(txn)
1510                    .await?;
1511                objects.push(PbObject {
1512                    object_info: Some(PbObjectInfo::Table(
1513                        ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1514                    )),
1515                })
1516            }
1517            StreamingJobType::Source => {
1518                let (source, source_obj) =
1519                    Source::find_by_id(original_job_id.as_shared_source_id())
1520                        .find_also_related(Object)
1521                        .one(txn)
1522                        .await?
1523                        .ok_or_else(|| {
1524                            MetaError::catalog_id_not_found("object", original_job_id)
1525                        })?;
1526                objects.push(PbObject {
1527                    object_info: Some(PbObjectInfo::Source(
1528                        ObjectModel(source, source_obj.unwrap(), None).into(),
1529                    )),
1530                })
1531            }
1532            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1533        }
1534
1535        if let Some(expr_rewriter) = index_item_rewriter {
1536            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1537                .select_only()
1538                .columns([index::Column::IndexId, index::Column::IndexItems])
1539                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1540                .into_tuple()
1541                .all(txn)
1542                .await?;
1543            for (index_id, nodes) in index_items {
1544                let mut pb_nodes = nodes.to_protobuf();
1545                pb_nodes
1546                    .iter_mut()
1547                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1548                let index = index::ActiveModel {
1549                    index_id: Set(index_id),
1550                    index_items: Set(pb_nodes.into()),
1551                    ..Default::default()
1552                }
1553                .update(txn)
1554                .await?;
1555                let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1556                    .find_also_related(streaming_job::Entity)
1557                    .one(txn)
1558                    .await?
1559                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1560                objects.push(PbObject {
1561                    object_info: Some(PbObjectInfo::Index(
1562                        ObjectModel(index, index_obj, streaming_job).into(),
1563                    )),
1564                });
1565            }
1566        }
1567
1568        if let Some(sinks) = auto_refresh_schema_sinks {
1569            for finish_sink_context in sinks {
1570                finish_fragments(
1571                    txn,
1572                    finish_sink_context.tmp_sink_id.as_job_id(),
1573                    finish_sink_context.original_sink_id.as_job_id(),
1574                    Default::default(),
1575                )
1576                .await?;
1577                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1578                    .find_also_related(Object)
1579                    .one(txn)
1580                    .await?
1581                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1582                let sink_streaming_job =
1583                    streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1584                        .one(txn)
1585                        .await?;
1586                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1587                Sink::update(sink::ActiveModel {
1588                    sink_id: Set(finish_sink_context.original_sink_id),
1589                    columns: Set(columns.clone()),
1590                    ..Default::default()
1591                })
1592                .exec(txn)
1593                .await?;
1594                sink.columns = columns;
1595                objects.push(PbObject {
1596                    object_info: Some(PbObjectInfo::Sink(
1597                        ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1598                    )),
1599                });
1600                if let Some((log_store_table_id, new_log_store_table_columns)) =
1601                    finish_sink_context.new_log_store_table
1602                {
1603                    let new_log_store_table_columns: ColumnCatalogArray =
1604                        new_log_store_table_columns.into();
1605                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1606                        .find_also_related(Object)
1607                        .one(txn)
1608                        .await?
1609                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1610                    Table::update(table::ActiveModel {
1611                        table_id: Set(log_store_table_id),
1612                        columns: Set(new_log_store_table_columns.clone()),
1613                        ..Default::default()
1614                    })
1615                    .exec(txn)
1616                    .await?;
1617                    table.columns = new_log_store_table_columns;
1618                    objects.push(PbObject {
1619                        object_info: Some(PbObjectInfo::Table(
1620                            ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1621                                .into(),
1622                        )),
1623                    });
1624                }
1625            }
1626        }
1627
1628        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1629        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1630            notification_objs =
1631                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1632        }
1633
1634        Ok((objects, notification_objs))
1635    }
1636
1637    /// Abort the replacing streaming job by deleting the temporary job object.
1638    pub async fn try_abort_replacing_streaming_job(
1639        &self,
1640        tmp_job_id: JobId,
1641        tmp_sink_ids: Option<Vec<ObjectId>>,
1642    ) -> MetaResult<()> {
1643        let inner = self.inner.write().await;
1644        let txn = inner.db.begin().await?;
1645        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1646        if let Some(tmp_sink_ids) = tmp_sink_ids {
1647            for tmp_sink_id in tmp_sink_ids {
1648                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1649            }
1650        }
1651        txn.commit().await?;
1652        Ok(())
1653    }
1654
1655    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1656    // return the actor_ids to be applied
1657    pub async fn update_source_rate_limit_by_source_id(
1658        &self,
1659        source_id: SourceId,
1660        rate_limit: Option<u32>,
1661    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1662        let inner = self.inner.read().await;
1663        let txn = inner.db.begin().await?;
1664
1665        {
1666            let active_source = source::ActiveModel {
1667                source_id: Set(source_id),
1668                rate_limit: Set(rate_limit.map(|v| v as i32)),
1669                ..Default::default()
1670            };
1671            active_source.update(&txn).await?;
1672        }
1673
1674        let (source, obj) = Source::find_by_id(source_id)
1675            .find_also_related(Object)
1676            .one(&txn)
1677            .await?
1678            .ok_or_else(|| {
1679                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1680            })?;
1681
1682        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1683        let streaming_job_ids: Vec<JobId> =
1684            if let Some(table_id) = source.optional_associated_table_id {
1685                vec![table_id.as_job_id()]
1686            } else if let Some(source_info) = &source.source_info
1687                && source_info.to_protobuf().is_shared()
1688            {
1689                vec![source_id.as_share_source_job_id()]
1690            } else {
1691                ObjectDependency::find()
1692                    .select_only()
1693                    .column(object_dependency::Column::UsedBy)
1694                    .filter(object_dependency::Column::Oid.eq(source_id))
1695                    .into_tuple()
1696                    .all(&txn)
1697                    .await?
1698            };
1699
1700        if streaming_job_ids.is_empty() {
1701            return Err(MetaError::invalid_parameter(format!(
1702                "source id {source_id} not used by any streaming job"
1703            )));
1704        }
1705
1706        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1707            .select_only()
1708            .columns([
1709                fragment::Column::FragmentId,
1710                fragment::Column::JobId,
1711                fragment::Column::FragmentTypeMask,
1712                fragment::Column::StreamNode,
1713            ])
1714            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1715            .into_tuple()
1716            .all(&txn)
1717            .await?;
1718        let mut fragments = fragments
1719            .into_iter()
1720            .map(|(id, job_id, mask, stream_node)| {
1721                (
1722                    id,
1723                    job_id,
1724                    FragmentTypeMask::from(mask as u32),
1725                    stream_node.to_protobuf(),
1726                )
1727            })
1728            .collect_vec();
1729
1730        fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1731            let mut found = false;
1732            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1733                visit_stream_node_mut(stream_node, |node| {
1734                    if let PbNodeBody::Source(node) = node
1735                        && let Some(node_inner) = &mut node.source_inner
1736                        && node_inner.source_id == source_id
1737                    {
1738                        node_inner.rate_limit = rate_limit;
1739                        found = true;
1740                    }
1741                });
1742            }
1743            if is_fs_source {
1744                // in older versions, there's no fragment type flag for `FsFetch` node,
1745                // so we just scan all fragments for StreamFsFetch node if using fs connector
1746                visit_stream_node_mut(stream_node, |node| {
1747                    if let PbNodeBody::StreamFsFetch(node) = node {
1748                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1749                        if let Some(node_inner) = &mut node.node_inner
1750                            && node_inner.source_id == source_id
1751                        {
1752                            node_inner.rate_limit = rate_limit;
1753                            found = true;
1754                        }
1755                    }
1756                });
1757            }
1758            found
1759        });
1760
1761        assert!(
1762            !fragments.is_empty(),
1763            "source id should be used by at least one fragment"
1764        );
1765
1766        let (fragment_ids, job_ids) = fragments
1767            .iter()
1768            .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1769            .unzip();
1770
1771        for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1772            fragment::ActiveModel {
1773                fragment_id: Set(fragment_id),
1774                fragment_type_mask: Set(fragment_type_mask.into()),
1775                stream_node: Set(StreamNode::from(&stream_node)),
1776                ..Default::default()
1777            }
1778            .update(&txn)
1779            .await?;
1780        }
1781
1782        txn.commit().await?;
1783
1784        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
1785        let _version = self
1786            .notify_frontend(
1787                NotificationOperation::Update,
1788                NotificationInfo::ObjectGroup(PbObjectGroup {
1789                    objects: vec![PbObject {
1790                        object_info: Some(relation_info),
1791                    }],
1792                }),
1793            )
1794            .await;
1795
1796        Ok((job_ids, fragment_ids))
1797    }
1798
1799    // edit the content of fragments in given `table_id`
1800    // return the actor_ids to be applied
1801    pub async fn mutate_fragments_by_job_id(
1802        &self,
1803        job_id: JobId,
1804        // returns true if the mutation is applied
1805        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1806        // error message when no relevant fragments is found
1807        err_msg: &'static str,
1808    ) -> MetaResult<HashSet<FragmentId>> {
1809        let inner = self.inner.read().await;
1810        let txn = inner.db.begin().await?;
1811
1812        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1813            .select_only()
1814            .columns([
1815                fragment::Column::FragmentId,
1816                fragment::Column::FragmentTypeMask,
1817                fragment::Column::StreamNode,
1818            ])
1819            .filter(fragment::Column::JobId.eq(job_id))
1820            .into_tuple()
1821            .all(&txn)
1822            .await?;
1823        let mut fragments = fragments
1824            .into_iter()
1825            .map(|(id, mask, stream_node)| {
1826                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1827            })
1828            .collect_vec();
1829
1830        let fragments = fragments
1831            .iter_mut()
1832            .map(|(_, fragment_type_mask, stream_node)| {
1833                fragments_mutation_fn(*fragment_type_mask, stream_node)
1834            })
1835            .collect::<MetaResult<Vec<bool>>>()?
1836            .into_iter()
1837            .zip_eq_debug(std::mem::take(&mut fragments))
1838            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1839            .collect::<Vec<_>>();
1840
1841        if fragments.is_empty() {
1842            return Err(MetaError::invalid_parameter(format!(
1843                "job id {job_id}: {}",
1844                err_msg
1845            )));
1846        }
1847
1848        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1849        for (id, _, stream_node) in fragments {
1850            fragment::ActiveModel {
1851                fragment_id: Set(id),
1852                stream_node: Set(StreamNode::from(&stream_node)),
1853                ..Default::default()
1854            }
1855            .update(&txn)
1856            .await?;
1857        }
1858
1859        txn.commit().await?;
1860
1861        Ok(fragment_ids)
1862    }
1863
1864    async fn mutate_fragment_by_fragment_id(
1865        &self,
1866        fragment_id: FragmentId,
1867        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1868        err_msg: &'static str,
1869    ) -> MetaResult<()> {
1870        let inner = self.inner.read().await;
1871        let txn = inner.db.begin().await?;
1872
1873        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1874            Fragment::find_by_id(fragment_id)
1875                .select_only()
1876                .columns([
1877                    fragment::Column::FragmentTypeMask,
1878                    fragment::Column::StreamNode,
1879                ])
1880                .into_tuple()
1881                .one(&txn)
1882                .await?
1883                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1884        let mut pb_stream_node = stream_node.to_protobuf();
1885        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1886
1887        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1888            return Err(MetaError::invalid_parameter(format!(
1889                "fragment id {fragment_id}: {}",
1890                err_msg
1891            )));
1892        }
1893
1894        fragment::ActiveModel {
1895            fragment_id: Set(fragment_id),
1896            stream_node: Set(stream_node),
1897            ..Default::default()
1898        }
1899        .update(&txn)
1900        .await?;
1901
1902        txn.commit().await?;
1903
1904        Ok(())
1905    }
1906
1907    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1908    // return the actor_ids to be applied
1909    pub async fn update_backfill_rate_limit_by_job_id(
1910        &self,
1911        job_id: JobId,
1912        rate_limit: Option<u32>,
1913    ) -> MetaResult<HashSet<FragmentId>> {
1914        let update_backfill_rate_limit =
1915            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1916                let mut found = false;
1917                if fragment_type_mask
1918                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1919                {
1920                    visit_stream_node_mut(stream_node, |node| match node {
1921                        PbNodeBody::StreamCdcScan(node) => {
1922                            node.rate_limit = rate_limit;
1923                            found = true;
1924                        }
1925                        PbNodeBody::StreamScan(node) => {
1926                            node.rate_limit = rate_limit;
1927                            found = true;
1928                        }
1929                        PbNodeBody::SourceBackfill(node) => {
1930                            node.rate_limit = rate_limit;
1931                            found = true;
1932                        }
1933                        _ => {}
1934                    });
1935                }
1936                Ok(found)
1937            };
1938
1939        self.mutate_fragments_by_job_id(
1940            job_id,
1941            update_backfill_rate_limit,
1942            "stream scan node or source node not found",
1943        )
1944        .await
1945    }
1946
1947    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1948    // return the actor_ids to be applied
1949    pub async fn update_sink_rate_limit_by_job_id(
1950        &self,
1951        sink_id: SinkId,
1952        rate_limit: Option<u32>,
1953    ) -> MetaResult<HashSet<FragmentId>> {
1954        let update_sink_rate_limit =
1955            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1956                let mut found = Ok(false);
1957                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1958                    visit_stream_node_mut(stream_node, |node| {
1959                        if let PbNodeBody::Sink(node) = node {
1960                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1961                                found = Err(MetaError::invalid_parameter(
1962                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1963                                ));
1964                                return;
1965                            }
1966                            node.rate_limit = rate_limit;
1967                            found = Ok(true);
1968                        }
1969                    });
1970                }
1971                found
1972            };
1973
1974        self.mutate_fragments_by_job_id(
1975            sink_id.as_job_id(),
1976            update_sink_rate_limit,
1977            "sink node not found",
1978        )
1979        .await
1980    }
1981
1982    pub async fn update_dml_rate_limit_by_job_id(
1983        &self,
1984        job_id: JobId,
1985        rate_limit: Option<u32>,
1986    ) -> MetaResult<HashSet<FragmentId>> {
1987        let update_dml_rate_limit =
1988            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1989                let mut found = false;
1990                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1991                    visit_stream_node_mut(stream_node, |node| {
1992                        if let PbNodeBody::Dml(node) = node {
1993                            node.rate_limit = rate_limit;
1994                            found = true;
1995                        }
1996                    });
1997                }
1998                Ok(found)
1999            };
2000
2001        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2002            .await
2003    }
2004
2005    pub async fn update_source_props_by_source_id(
2006        &self,
2007        source_id: SourceId,
2008        alter_props: BTreeMap<String, String>,
2009        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2010    ) -> MetaResult<WithOptionsSecResolved> {
2011        let inner = self.inner.read().await;
2012        let txn = inner.db.begin().await?;
2013
2014        let (source, _obj) = Source::find_by_id(source_id)
2015            .find_also_related(Object)
2016            .one(&txn)
2017            .await?
2018            .ok_or_else(|| {
2019                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2020            })?;
2021        let connector = source.with_properties.0.get_connector().unwrap();
2022        let is_shared_source = source.is_shared();
2023
2024        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2025        if !is_shared_source {
2026            // mv using non-shared source holds a copy of source in their fragments
2027            dep_source_job_ids = ObjectDependency::find()
2028                .select_only()
2029                .column(object_dependency::Column::UsedBy)
2030                .filter(object_dependency::Column::Oid.eq(source_id))
2031                .into_tuple()
2032                .all(&txn)
2033                .await?;
2034        }
2035
2036        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
2037        let prop_keys: Vec<String> = alter_props
2038            .keys()
2039            .chain(alter_secret_refs.keys())
2040            .cloned()
2041            .collect();
2042        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2043            &connector, &prop_keys,
2044        )?;
2045
2046        let mut options_with_secret = WithOptionsSecResolved::new(
2047            source.with_properties.0.clone(),
2048            source
2049                .secret_ref
2050                .map(|secret_ref| secret_ref.to_protobuf())
2051                .unwrap_or_default(),
2052        );
2053        let (to_add_secret_dep, to_remove_secret_dep) =
2054            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2055
2056        tracing::info!(
2057            "applying new properties to source: source_id={}, options_with_secret={:?}",
2058            source_id,
2059            options_with_secret
2060        );
2061        // check if the alter-ed props are valid for each Connector
2062        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2063        // todo: validate via source manager
2064
2065        let mut associate_table_id = None;
2066
2067        // can be source_id or table_id
2068        // if updating an associated source, the preferred_id is the table_id
2069        // otherwise, it is the source_id
2070        let mut preferred_id = source_id.as_object_id();
2071        let rewrite_sql = {
2072            let definition = source.definition.clone();
2073
2074            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2075                .map_err(|e| {
2076                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2077                        anyhow!(e).context("Failed to parse source definition SQL"),
2078                    )))
2079                })?
2080                .try_into()
2081                .unwrap();
2082
2083            /// Formats SQL options with secret values properly resolved
2084            ///
2085            /// This function processes configuration options that may contain sensitive data:
2086            /// - Plaintext options are directly converted to `SqlOption`
2087            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2088            ///   without exposing the actual secret value
2089            ///
2090            /// # Arguments
2091            /// * `txn` - Database transaction for retrieving secrets
2092            /// * `options_with_secret` - Container of options with both plaintext and secret values
2093            ///
2094            /// # Returns
2095            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2096            async fn format_with_option_secret_resolved(
2097                txn: &DatabaseTransaction,
2098                options_with_secret: &WithOptionsSecResolved,
2099            ) -> MetaResult<Vec<SqlOption>> {
2100                let mut options = Vec::new();
2101                for (k, v) in options_with_secret.as_plaintext() {
2102                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2103                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2104                    options.push(sql_option);
2105                }
2106                for (k, v) in options_with_secret.as_secret() {
2107                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2108                        let sql_option =
2109                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2110                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2111                        options.push(sql_option);
2112                    } else {
2113                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2114                    }
2115                }
2116                Ok(options)
2117            }
2118
2119            match &mut stmt {
2120                Statement::CreateSource { stmt } => {
2121                    stmt.with_properties.0 =
2122                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2123                }
2124                Statement::CreateTable { with_options, .. } => {
2125                    *with_options =
2126                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2127                    associate_table_id = source.optional_associated_table_id;
2128                    preferred_id = associate_table_id.unwrap().as_object_id();
2129                }
2130                _ => unreachable!(),
2131            }
2132
2133            stmt.to_string()
2134        };
2135
2136        {
2137            // update secret dependencies
2138            if !to_add_secret_dep.is_empty() {
2139                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2140                    object_dependency::ActiveModel {
2141                        oid: Set(secret_id.into()),
2142                        used_by: Set(preferred_id),
2143                        ..Default::default()
2144                    }
2145                }))
2146                .exec(&txn)
2147                .await?;
2148            }
2149            if !to_remove_secret_dep.is_empty() {
2150                // todo: fix the filter logic
2151                let _ = ObjectDependency::delete_many()
2152                    .filter(
2153                        object_dependency::Column::Oid
2154                            .is_in(to_remove_secret_dep)
2155                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2156                    )
2157                    .exec(&txn)
2158                    .await?;
2159            }
2160        }
2161
2162        let active_source_model = source::ActiveModel {
2163            source_id: Set(source_id),
2164            definition: Set(rewrite_sql.clone()),
2165            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2166            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2167                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2168            ..Default::default()
2169        };
2170        active_source_model.update(&txn).await?;
2171
2172        if let Some(associate_table_id) = associate_table_id {
2173            // update the associated table statement accordly
2174            let active_table_model = table::ActiveModel {
2175                table_id: Set(associate_table_id),
2176                definition: Set(rewrite_sql),
2177                ..Default::default()
2178            };
2179            active_table_model.update(&txn).await?;
2180        }
2181
2182        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2183            // if updating table with connector, the fragment_id is table id
2184            associate_table_id.as_job_id()
2185        } else {
2186            source_id.as_share_source_job_id()
2187        }]
2188        .into_iter()
2189        .chain(dep_source_job_ids.into_iter())
2190        .collect_vec();
2191
2192        // update fragments
2193        update_connector_props_fragments(
2194            &txn,
2195            to_check_job_ids,
2196            FragmentTypeFlag::Source,
2197            |node, found| {
2198                if let PbNodeBody::Source(node) = node
2199                    && let Some(source_inner) = &mut node.source_inner
2200                {
2201                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2202                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2203                    *found = true;
2204                }
2205            },
2206            is_shared_source,
2207        )
2208        .await?;
2209
2210        let mut to_update_objs = Vec::with_capacity(2);
2211        let (source, obj) = Source::find_by_id(source_id)
2212            .find_also_related(Object)
2213            .one(&txn)
2214            .await?
2215            .ok_or_else(|| {
2216                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2217            })?;
2218        to_update_objs.push(PbObject {
2219            object_info: Some(PbObjectInfo::Source(
2220                ObjectModel(source, obj.unwrap(), None).into(),
2221            )),
2222        });
2223
2224        if let Some(associate_table_id) = associate_table_id {
2225            let (table, obj) = Table::find_by_id(associate_table_id)
2226                .find_also_related(Object)
2227                .one(&txn)
2228                .await?
2229                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2230            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2231                .one(&txn)
2232                .await?;
2233            to_update_objs.push(PbObject {
2234                object_info: Some(PbObjectInfo::Table(
2235                    ObjectModel(table, obj.unwrap(), streaming_job).into(),
2236                )),
2237            });
2238        }
2239
2240        txn.commit().await?;
2241
2242        self.notify_frontend(
2243            NotificationOperation::Update,
2244            NotificationInfo::ObjectGroup(PbObjectGroup {
2245                objects: to_update_objs,
2246            }),
2247        )
2248        .await;
2249
2250        Ok(options_with_secret)
2251    }
2252
2253    pub async fn update_sink_props_by_sink_id(
2254        &self,
2255        sink_id: SinkId,
2256        props: BTreeMap<String, String>,
2257    ) -> MetaResult<HashMap<String, String>> {
2258        let inner = self.inner.read().await;
2259        let txn = inner.db.begin().await?;
2260
2261        let (sink, _obj) = Sink::find_by_id(sink_id)
2262            .find_also_related(Object)
2263            .one(&txn)
2264            .await?
2265            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2266        validate_sink_props(&sink, &props)?;
2267        let definition = sink.definition.clone();
2268        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2269            .map_err(|e| SinkError::Config(anyhow!(e)))?
2270            .try_into()
2271            .unwrap();
2272        if let Statement::CreateSink { stmt } = &mut stmt {
2273            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2274        } else {
2275            panic!("definition is not a create sink statement")
2276        }
2277        let mut new_config = sink.properties.clone().into_inner();
2278        new_config.extend(props.clone());
2279
2280        let definition = stmt.to_string();
2281        let active_sink = sink::ActiveModel {
2282            sink_id: Set(sink_id),
2283            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2284            definition: Set(definition),
2285            ..Default::default()
2286        };
2287        active_sink.update(&txn).await?;
2288
2289        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2290        let (sink, obj) = Sink::find_by_id(sink_id)
2291            .find_also_related(Object)
2292            .one(&txn)
2293            .await?
2294            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2295        let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2296            .one(&txn)
2297            .await?;
2298        txn.commit().await?;
2299        let relation_infos = vec![PbObject {
2300            object_info: Some(PbObjectInfo::Sink(
2301                ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2302            )),
2303        }];
2304
2305        let _version = self
2306            .notify_frontend(
2307                NotificationOperation::Update,
2308                NotificationInfo::ObjectGroup(PbObjectGroup {
2309                    objects: relation_infos,
2310                }),
2311            )
2312            .await;
2313
2314        Ok(props.into_iter().collect())
2315    }
2316
2317    pub async fn update_iceberg_table_props_by_table_id(
2318        &self,
2319        table_id: TableId,
2320        props: BTreeMap<String, String>,
2321        alter_iceberg_table_props: Option<
2322            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2323        >,
2324    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2325        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2326            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2327        let inner = self.inner.read().await;
2328        let txn = inner.db.begin().await?;
2329
2330        let (sink, _obj) = Sink::find_by_id(sink_id)
2331            .find_also_related(Object)
2332            .one(&txn)
2333            .await?
2334            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2335        validate_sink_props(&sink, &props)?;
2336
2337        let definition = sink.definition.clone();
2338        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2339            .map_err(|e| SinkError::Config(anyhow!(e)))?
2340            .try_into()
2341            .unwrap();
2342        if let Statement::CreateTable {
2343            with_options,
2344            engine,
2345            ..
2346        } = &mut stmt
2347        {
2348            if !matches!(engine, Engine::Iceberg) {
2349                return Err(SinkError::Config(anyhow!(
2350                    "only iceberg table can be altered as sink"
2351                ))
2352                .into());
2353            }
2354            update_stmt_with_props(with_options, &props)?;
2355        } else {
2356            panic!("definition is not a create iceberg table statement")
2357        }
2358        let mut new_config = sink.properties.clone().into_inner();
2359        new_config.extend(props.clone());
2360
2361        let definition = stmt.to_string();
2362        let active_sink = sink::ActiveModel {
2363            sink_id: Set(sink_id),
2364            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2365            definition: Set(definition.clone()),
2366            ..Default::default()
2367        };
2368        let active_source = source::ActiveModel {
2369            source_id: Set(source_id),
2370            definition: Set(definition.clone()),
2371            ..Default::default()
2372        };
2373        let active_table = table::ActiveModel {
2374            table_id: Set(table_id),
2375            definition: Set(definition),
2376            ..Default::default()
2377        };
2378        active_sink.update(&txn).await?;
2379        active_source.update(&txn).await?;
2380        active_table.update(&txn).await?;
2381
2382        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2383
2384        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2385            .find_also_related(Object)
2386            .one(&txn)
2387            .await?
2388            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2389        let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2390            .one(&txn)
2391            .await?;
2392        let (source, source_obj) = Source::find_by_id(source_id)
2393            .find_also_related(Object)
2394            .one(&txn)
2395            .await?
2396            .ok_or_else(|| {
2397                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2398            })?;
2399        let (table, table_obj) = Table::find_by_id(table_id)
2400            .find_also_related(Object)
2401            .one(&txn)
2402            .await?
2403            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2404        let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2405            .one(&txn)
2406            .await?;
2407        txn.commit().await?;
2408        let relation_infos = vec![
2409            PbObject {
2410                object_info: Some(PbObjectInfo::Sink(
2411                    ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2412                )),
2413            },
2414            PbObject {
2415                object_info: Some(PbObjectInfo::Source(
2416                    ObjectModel(source, source_obj.unwrap(), None).into(),
2417                )),
2418            },
2419            PbObject {
2420                object_info: Some(PbObjectInfo::Table(
2421                    ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2422                )),
2423            },
2424        ];
2425        let _version = self
2426            .notify_frontend(
2427                NotificationOperation::Update,
2428                NotificationInfo::ObjectGroup(PbObjectGroup {
2429                    objects: relation_infos,
2430                }),
2431            )
2432            .await;
2433
2434        Ok((props.into_iter().collect(), sink_id))
2435    }
2436
2437    /// Update connection properties and all dependent sources/sinks in a single transaction
2438    pub async fn update_connection_and_dependent_objects_props(
2439        &self,
2440        connection_id: ConnectionId,
2441        alter_props: BTreeMap<String, String>,
2442        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2443    ) -> MetaResult<(
2444        WithOptionsSecResolved,                   // Connection's new properties
2445        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2446        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2447    )> {
2448        let inner = self.inner.read().await;
2449        let txn = inner.db.begin().await?;
2450
2451        // Find all dependent sources and sinks first
2452        let dependent_sources: Vec<SourceId> = Source::find()
2453            .select_only()
2454            .column(source::Column::SourceId)
2455            .filter(source::Column::ConnectionId.eq(connection_id))
2456            .into_tuple()
2457            .all(&txn)
2458            .await?;
2459
2460        let dependent_sinks: Vec<SinkId> = Sink::find()
2461            .select_only()
2462            .column(sink::Column::SinkId)
2463            .filter(sink::Column::ConnectionId.eq(connection_id))
2464            .into_tuple()
2465            .all(&txn)
2466            .await?;
2467
2468        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2469            .find_also_related(Object)
2470            .one(&txn)
2471            .await?
2472            .ok_or_else(|| {
2473                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2474            })?;
2475
2476        // Validate that props can be altered
2477        let prop_keys: Vec<String> = alter_props
2478            .keys()
2479            .chain(alter_secret_refs.keys())
2480            .cloned()
2481            .collect();
2482
2483        // Map the connection type enum to the string name expected by the validation function
2484        let connection_type_str = pb_connection_type_to_connection_type(
2485            &connection_catalog.params.to_protobuf().connection_type(),
2486        )
2487        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2488
2489        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2490            connection_type_str, &prop_keys,
2491        )?;
2492
2493        let connection_pb = connection_catalog.params.to_protobuf();
2494        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2495            connection_pb.properties.into_iter().collect(),
2496            connection_pb.secret_refs.into_iter().collect(),
2497        );
2498
2499        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2500            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2501
2502        tracing::debug!(
2503            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2504            connection_id,
2505            dependent_sources,
2506            dependent_sinks
2507        );
2508
2509        // Validate connection
2510        {
2511            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2512                connection_type: connection_pb.connection_type,
2513                properties: connection_options_with_secret
2514                    .as_plaintext()
2515                    .clone()
2516                    .into_iter()
2517                    .collect(),
2518                secret_refs: connection_options_with_secret
2519                    .as_secret()
2520                    .clone()
2521                    .into_iter()
2522                    .collect(),
2523            };
2524            let connection = PbConnection {
2525                id: connection_id as _,
2526                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2527                    conn_params_pb,
2528                )),
2529                ..Default::default()
2530            };
2531            validate_connection(&connection).await?;
2532        }
2533
2534        // Update connection secret dependencies
2535        if !to_add_secret_dep.is_empty() {
2536            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2537                object_dependency::ActiveModel {
2538                    oid: Set(secret_id.into()),
2539                    used_by: Set(connection_id.as_object_id()),
2540                    ..Default::default()
2541                }
2542            }))
2543            .exec(&txn)
2544            .await?;
2545        }
2546        if !to_remove_secret_dep.is_empty() {
2547            let _ = ObjectDependency::delete_many()
2548                .filter(
2549                    object_dependency::Column::Oid
2550                        .is_in(to_remove_secret_dep)
2551                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2552                )
2553                .exec(&txn)
2554                .await?;
2555        }
2556
2557        // Update the connection with new properties
2558        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2559            connection_type: connection_pb.connection_type,
2560            properties: connection_options_with_secret
2561                .as_plaintext()
2562                .clone()
2563                .into_iter()
2564                .collect(),
2565            secret_refs: connection_options_with_secret
2566                .as_secret()
2567                .clone()
2568                .into_iter()
2569                .collect(),
2570        };
2571        let active_connection_model = connection::ActiveModel {
2572            connection_id: Set(connection_id),
2573            params: Set(ConnectionParams::from(&updated_connection_params)),
2574            ..Default::default()
2575        };
2576        active_connection_model.update(&txn).await?;
2577
2578        // Batch update dependent sources and collect their complete properties
2579        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2580
2581        if !dependent_sources.is_empty() {
2582            // Batch fetch all dependent sources
2583            let sources_with_objs = Source::find()
2584                .find_also_related(Object)
2585                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2586                .all(&txn)
2587                .await?;
2588
2589            // Prepare batch updates
2590            let mut source_updates = Vec::new();
2591            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2592
2593            for (source, _obj) in sources_with_objs {
2594                let source_id = source.source_id;
2595
2596                let mut source_options_with_secret = WithOptionsSecResolved::new(
2597                    source.with_properties.0.clone(),
2598                    source
2599                        .secret_ref
2600                        .clone()
2601                        .map(|secret_ref| secret_ref.to_protobuf())
2602                        .unwrap_or_default(),
2603                );
2604                let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2605                    source_options_with_secret
2606                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2607
2608                // Validate the updated source properties
2609                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2610
2611                // Prepare source update
2612                let active_source = source::ActiveModel {
2613                    source_id: Set(source_id),
2614                    with_properties: Set(Property(
2615                        source_options_with_secret.as_plaintext().clone(),
2616                    )),
2617                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2618                        || {
2619                            risingwave_meta_model::SecretRef::from(
2620                                source_options_with_secret.as_secret().clone(),
2621                            )
2622                        },
2623                    )),
2624                    ..Default::default()
2625                };
2626                source_updates.push(active_source);
2627
2628                // Prepare fragment update:
2629                // - If the source is a table-associated source, update fragments for the table job.
2630                // - Otherwise update the shared source job.
2631                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2632                let is_shared_source = source.is_shared();
2633                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2634                if !is_shared_source {
2635                    dep_source_job_ids = ObjectDependency::find()
2636                        .select_only()
2637                        .column(object_dependency::Column::UsedBy)
2638                        .filter(object_dependency::Column::Oid.eq(source_id))
2639                        .into_tuple()
2640                        .all(&txn)
2641                        .await?;
2642                }
2643
2644                let base_job_id =
2645                    if let Some(associate_table_id) = source.optional_associated_table_id {
2646                        associate_table_id.as_job_id()
2647                    } else {
2648                        source_id.as_share_source_job_id()
2649                    };
2650                let job_ids = vec![base_job_id]
2651                    .into_iter()
2652                    .chain(dep_source_job_ids.into_iter())
2653                    .collect_vec();
2654
2655                fragment_updates.push(DependentSourceFragmentUpdate {
2656                    job_ids,
2657                    with_properties: source_options_with_secret.as_plaintext().clone(),
2658                    secret_refs: source_options_with_secret.as_secret().clone(),
2659                    is_shared_source,
2660                });
2661
2662                // Collect the complete properties for runtime broadcast
2663                let complete_source_props = LocalSecretManager::global()
2664                    .fill_secrets(
2665                        source_options_with_secret.as_plaintext().clone(),
2666                        source_options_with_secret.as_secret().clone(),
2667                    )
2668                    .map_err(MetaError::from)?
2669                    .into_iter()
2670                    .collect::<HashMap<String, String>>();
2671                updated_sources_with_props.push((source_id, complete_source_props));
2672            }
2673
2674            for source_update in source_updates {
2675                source_update.update(&txn).await?;
2676            }
2677
2678            // Batch execute fragment updates
2679            for DependentSourceFragmentUpdate {
2680                job_ids,
2681                with_properties,
2682                secret_refs,
2683                is_shared_source,
2684            } in fragment_updates
2685            {
2686                update_connector_props_fragments(
2687                    &txn,
2688                    job_ids,
2689                    FragmentTypeFlag::Source,
2690                    |node, found| {
2691                        if let PbNodeBody::Source(node) = node
2692                            && let Some(source_inner) = &mut node.source_inner
2693                        {
2694                            source_inner.with_properties = with_properties.clone();
2695                            source_inner.secret_refs = secret_refs.clone();
2696                            *found = true;
2697                        }
2698                    },
2699                    is_shared_source,
2700                )
2701                .await?;
2702            }
2703        }
2704
2705        // Batch update dependent sinks and collect their complete properties
2706        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2707
2708        if !dependent_sinks.is_empty() {
2709            // Batch fetch all dependent sinks
2710            let sinks_with_objs = Sink::find()
2711                .find_also_related(Object)
2712                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2713                .all(&txn)
2714                .await?;
2715
2716            // Prepare batch updates
2717            let mut sink_updates = Vec::new();
2718            let mut sink_fragment_updates = Vec::new();
2719
2720            for (sink, _obj) in sinks_with_objs {
2721                let sink_id = sink.sink_id;
2722
2723                // Validate that sink props can be altered
2724                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2725                    Some(connector) => {
2726                        let connector_type = connector.to_lowercase();
2727                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2728                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
2729
2730                        match_sink_name_str!(
2731                            connector_type.as_str(),
2732                            SinkType,
2733                            {
2734                                let mut new_sink_props = sink.properties.0.clone();
2735                                new_sink_props.extend(alter_props.clone());
2736                                SinkType::validate_alter_config(&new_sink_props)
2737                            },
2738                            |sink: &str| Err(SinkError::Config(anyhow!(
2739                                "unsupported sink type {}",
2740                                sink
2741                            )))
2742                        )?
2743                    }
2744                    None => {
2745                        return Err(SinkError::Config(anyhow!(
2746                            "connector not specified when alter sink"
2747                        ))
2748                        .into());
2749                    }
2750                };
2751
2752                let mut new_sink_props = sink.properties.0.clone();
2753                new_sink_props.extend(alter_props.clone());
2754
2755                // Prepare sink update
2756                let active_sink = sink::ActiveModel {
2757                    sink_id: Set(sink_id),
2758                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2759                    ..Default::default()
2760                };
2761                sink_updates.push(active_sink);
2762
2763                // Prepare fragment updates for this sink
2764                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2765
2766                // Collect the complete properties for runtime broadcast
2767                let complete_sink_props: HashMap<String, String> =
2768                    new_sink_props.into_iter().collect();
2769                updated_sinks_with_props.push((sink_id, complete_sink_props));
2770            }
2771
2772            // Batch execute sink updates
2773            for sink_update in sink_updates {
2774                sink_update.update(&txn).await?;
2775            }
2776
2777            // Batch execute sink fragment updates using the reusable function
2778            for (sink_id, new_sink_props) in sink_fragment_updates {
2779                update_connector_props_fragments(
2780                    &txn,
2781                    vec![sink_id.as_job_id()],
2782                    FragmentTypeFlag::Sink,
2783                    |node, found| {
2784                        if let PbNodeBody::Sink(node) = node
2785                            && let Some(sink_desc) = &mut node.sink_desc
2786                            && sink_desc.id == sink_id.as_raw_id()
2787                        {
2788                            sink_desc.properties = new_sink_props.clone();
2789                            *found = true;
2790                        }
2791                    },
2792                    true,
2793                )
2794                .await?;
2795            }
2796        }
2797
2798        // Collect all updated objects for frontend notification
2799        let mut updated_objects = Vec::new();
2800
2801        // Add connection
2802        let (connection, obj) = Connection::find_by_id(connection_id)
2803            .find_also_related(Object)
2804            .one(&txn)
2805            .await?
2806            .ok_or_else(|| {
2807                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2808            })?;
2809        updated_objects.push(PbObject {
2810            object_info: Some(PbObjectInfo::Connection(
2811                ObjectModel(connection, obj.unwrap(), None).into(),
2812            )),
2813        });
2814
2815        // Add sources
2816        for source_id in &dependent_sources {
2817            let (source, obj) = Source::find_by_id(*source_id)
2818                .find_also_related(Object)
2819                .one(&txn)
2820                .await?
2821                .ok_or_else(|| {
2822                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2823                })?;
2824            updated_objects.push(PbObject {
2825                object_info: Some(PbObjectInfo::Source(
2826                    ObjectModel(source, obj.unwrap(), None).into(),
2827                )),
2828            });
2829        }
2830
2831        // Add sinks
2832        for sink_id in &dependent_sinks {
2833            let (sink, obj) = Sink::find_by_id(*sink_id)
2834                .find_also_related(Object)
2835                .one(&txn)
2836                .await?
2837                .ok_or_else(|| {
2838                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2839                })?;
2840            let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2841                .one(&txn)
2842                .await?;
2843            updated_objects.push(PbObject {
2844                object_info: Some(PbObjectInfo::Sink(
2845                    ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2846                )),
2847            });
2848        }
2849
2850        // Commit the transaction
2851        txn.commit().await?;
2852
2853        // Notify frontend about all updated objects
2854        if !updated_objects.is_empty() {
2855            self.notify_frontend(
2856                NotificationOperation::Update,
2857                NotificationInfo::ObjectGroup(PbObjectGroup {
2858                    objects: updated_objects,
2859                }),
2860            )
2861            .await;
2862        }
2863
2864        Ok((
2865            connection_options_with_secret,
2866            updated_sources_with_props,
2867            updated_sinks_with_props,
2868        ))
2869    }
2870
2871    pub async fn update_fragment_rate_limit_by_fragment_id(
2872        &self,
2873        fragment_id: FragmentId,
2874        rate_limit: Option<u32>,
2875    ) -> MetaResult<()> {
2876        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2877                                 stream_node: &mut PbStreamNode| {
2878            let mut found = false;
2879            if fragment_type_mask.contains_any(
2880                FragmentTypeFlag::dml_rate_limit_fragments()
2881                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2882                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2883                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2884            ) {
2885                visit_stream_node_mut(stream_node, |node| {
2886                    if let PbNodeBody::Dml(node) = node {
2887                        node.rate_limit = rate_limit;
2888                        found = true;
2889                    }
2890                    if let PbNodeBody::Sink(node) = node {
2891                        node.rate_limit = rate_limit;
2892                        found = true;
2893                    }
2894                    if let PbNodeBody::StreamCdcScan(node) = node {
2895                        node.rate_limit = rate_limit;
2896                        found = true;
2897                    }
2898                    if let PbNodeBody::StreamScan(node) = node {
2899                        node.rate_limit = rate_limit;
2900                        found = true;
2901                    }
2902                    if let PbNodeBody::SourceBackfill(node) = node {
2903                        node.rate_limit = rate_limit;
2904                        found = true;
2905                    }
2906                });
2907            }
2908            found
2909        };
2910        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2911            .await
2912    }
2913
2914    /// Note: `FsFetch` created in old versions are not included.
2915    /// Since this is only used for debugging, it should be fine.
2916    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2917        let inner = self.inner.read().await;
2918        let txn = inner.db.begin().await?;
2919
2920        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2921            .select_only()
2922            .columns([
2923                fragment::Column::FragmentId,
2924                fragment::Column::JobId,
2925                fragment::Column::FragmentTypeMask,
2926                fragment::Column::StreamNode,
2927            ])
2928            .filter(FragmentTypeMask::intersects_any(
2929                FragmentTypeFlag::rate_limit_fragments(),
2930            ))
2931            .into_tuple()
2932            .all(&txn)
2933            .await?;
2934
2935        let mut rate_limits = Vec::new();
2936        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2937            let stream_node = stream_node.to_protobuf();
2938            visit_stream_node_body(&stream_node, |node| {
2939                let mut rate_limit = None;
2940                let mut node_name = None;
2941
2942                match node {
2943                    // source rate limit
2944                    PbNodeBody::Source(node) => {
2945                        if let Some(node_inner) = &node.source_inner {
2946                            rate_limit = node_inner.rate_limit;
2947                            node_name = Some("SOURCE");
2948                        }
2949                    }
2950                    PbNodeBody::StreamFsFetch(node) => {
2951                        if let Some(node_inner) = &node.node_inner {
2952                            rate_limit = node_inner.rate_limit;
2953                            node_name = Some("FS_FETCH");
2954                        }
2955                    }
2956                    // backfill rate limit
2957                    PbNodeBody::SourceBackfill(node) => {
2958                        rate_limit = node.rate_limit;
2959                        node_name = Some("SOURCE_BACKFILL");
2960                    }
2961                    PbNodeBody::StreamScan(node) => {
2962                        rate_limit = node.rate_limit;
2963                        node_name = Some("STREAM_SCAN");
2964                    }
2965                    PbNodeBody::StreamCdcScan(node) => {
2966                        rate_limit = node.rate_limit;
2967                        node_name = Some("STREAM_CDC_SCAN");
2968                    }
2969                    PbNodeBody::Sink(node) => {
2970                        rate_limit = node.rate_limit;
2971                        node_name = Some("SINK");
2972                    }
2973                    _ => {}
2974                }
2975
2976                if let Some(rate_limit) = rate_limit {
2977                    rate_limits.push(RateLimitInfo {
2978                        fragment_id,
2979                        job_id,
2980                        fragment_type_mask: fragment_type_mask as u32,
2981                        rate_limit,
2982                        node_name: node_name.unwrap().to_owned(),
2983                    });
2984                }
2985            });
2986        }
2987
2988        Ok(rate_limits)
2989    }
2990}
2991
2992fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2993    // Validate that props can be altered
2994    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2995        Some(connector) => {
2996            let connector_type = connector.to_lowercase();
2997            let field_names: Vec<String> = props.keys().cloned().collect();
2998            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2999                .map_err(|e| SinkError::Config(anyhow!(e)))?;
3000
3001            match_sink_name_str!(
3002                connector_type.as_str(),
3003                SinkType,
3004                {
3005                    let mut new_props = sink.properties.0.clone();
3006                    new_props.extend(props.clone());
3007                    SinkType::validate_alter_config(&new_props)
3008                },
3009                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3010            )?
3011        }
3012        None => {
3013            return Err(
3014                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3015            );
3016        }
3017    };
3018    Ok(())
3019}
3020
3021fn update_stmt_with_props(
3022    with_properties: &mut Vec<SqlOption>,
3023    props: &BTreeMap<String, String>,
3024) -> MetaResult<()> {
3025    let mut new_sql_options = with_properties
3026        .iter()
3027        .map(|sql_option| (&sql_option.name, sql_option))
3028        .collect::<IndexMap<_, _>>();
3029    let add_sql_options = props
3030        .iter()
3031        .map(|(k, v)| SqlOption::try_from((k, v)))
3032        .collect::<Result<Vec<SqlOption>, ParserError>>()
3033        .map_err(|e| SinkError::Config(anyhow!(e)))?;
3034    new_sql_options.extend(
3035        add_sql_options
3036            .iter()
3037            .map(|sql_option| (&sql_option.name, sql_option)),
3038    );
3039    *with_properties = new_sql_options.into_values().cloned().collect();
3040    Ok(())
3041}
3042
3043async fn update_sink_fragment_props(
3044    txn: &DatabaseTransaction,
3045    sink_id: SinkId,
3046    props: BTreeMap<String, String>,
3047) -> MetaResult<()> {
3048    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3049        .select_only()
3050        .columns([
3051            fragment::Column::FragmentId,
3052            fragment::Column::FragmentTypeMask,
3053            fragment::Column::StreamNode,
3054        ])
3055        .filter(fragment::Column::JobId.eq(sink_id))
3056        .into_tuple()
3057        .all(txn)
3058        .await?;
3059    let fragments = fragments
3060        .into_iter()
3061        .filter(|(_, fragment_type_mask, _)| {
3062            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3063        })
3064        .filter_map(|(id, _, stream_node)| {
3065            let mut stream_node = stream_node.to_protobuf();
3066            let mut found = false;
3067            visit_stream_node_mut(&mut stream_node, |node| {
3068                if let PbNodeBody::Sink(node) = node
3069                    && let Some(sink_desc) = &mut node.sink_desc
3070                    && sink_desc.id == sink_id
3071                {
3072                    sink_desc.properties.extend(props.clone());
3073                    found = true;
3074                }
3075            });
3076            if found { Some((id, stream_node)) } else { None }
3077        })
3078        .collect_vec();
3079    assert!(
3080        !fragments.is_empty(),
3081        "sink id should be used by at least one fragment"
3082    );
3083    for (id, stream_node) in fragments {
3084        fragment::ActiveModel {
3085            fragment_id: Set(id),
3086            stream_node: Set(StreamNode::from(&stream_node)),
3087            ..Default::default()
3088        }
3089        .update(txn)
3090        .await?;
3091    }
3092    Ok(())
3093}
3094
3095pub struct SinkIntoTableContext {
3096    /// For alter table (e.g., add column), this is the list of existing sink ids
3097    /// otherwise empty.
3098    pub updated_sink_catalogs: Vec<SinkId>,
3099}
3100
3101pub struct FinishAutoRefreshSchemaSinkContext {
3102    pub tmp_sink_id: SinkId,
3103    pub original_sink_id: SinkId,
3104    pub columns: Vec<PbColumnCatalog>,
3105    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3106}
3107
3108async fn update_connector_props_fragments<F>(
3109    txn: &DatabaseTransaction,
3110    job_ids: Vec<JobId>,
3111    expect_flag: FragmentTypeFlag,
3112    mut alter_stream_node_fn: F,
3113    is_shared_source: bool,
3114) -> MetaResult<()>
3115where
3116    F: FnMut(&mut PbNodeBody, &mut bool),
3117{
3118    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3119        .select_only()
3120        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3121        .filter(
3122            fragment::Column::JobId
3123                .is_in(job_ids.clone())
3124                .and(FragmentTypeMask::intersects(expect_flag)),
3125        )
3126        .into_tuple()
3127        .all(txn)
3128        .await?;
3129    let fragments = fragments
3130        .into_iter()
3131        .filter_map(|(id, stream_node)| {
3132            let mut stream_node = stream_node.to_protobuf();
3133            let mut found = false;
3134            visit_stream_node_mut(&mut stream_node, |node| {
3135                alter_stream_node_fn(node, &mut found);
3136            });
3137            if found { Some((id, stream_node)) } else { None }
3138        })
3139        .collect_vec();
3140    if is_shared_source || job_ids.len() > 1 {
3141        // the first element is the source_id or associated table_id
3142        // if the source is non-shared, there is no updated fragments
3143        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3144        assert!(
3145            !fragments.is_empty(),
3146            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3147            job_ids,
3148            expect_flag
3149        );
3150    }
3151
3152    for (id, stream_node) in fragments {
3153        fragment::ActiveModel {
3154            fragment_id: Set(id),
3155            stream_node: Set(StreamNode::from(&stream_node)),
3156            ..Default::default()
3157        }
3158        .update(txn)
3159        .await?;
3160    }
3161
3162    Ok(())
3163}