risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::{PbCreateType, PbTable};
42use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
43use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
47};
48use risingwave_pb::meta::{PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::PbColumnCatalog;
50use risingwave_pb::secret::PbSecretRef;
51use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
52use risingwave_pb::stream_plan::stream_node::PbNodeBody;
53use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
54use risingwave_pb::user::PbUserInfo;
55use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
56use risingwave_sqlparser::parser::{Parser, ParserError};
57use sea_orm::ActiveValue::Set;
58use sea_orm::sea_query::{Expr, Query, SimpleExpr};
59use sea_orm::{
60    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
61    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
62};
63use thiserror_ext::AsReport;
64
65use super::rename::IndexItemRewriter;
66use crate::barrier::{Command, SharedFragmentInfo};
67use crate::controller::ObjectModel;
68use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::utils::{
71    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
72    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
73    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
74    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
75    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
76};
77use crate::error::MetaErrorInner;
78use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
79use crate::model::{
80    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
81    StreamJobFragmentsToCreate,
82};
83use crate::stream::SplitAssignment;
84use crate::{MetaError, MetaResult};
85
86impl CatalogController {
87    pub async fn create_streaming_job_obj(
88        txn: &DatabaseTransaction,
89        obj_type: ObjectType,
90        owner_id: UserId,
91        database_id: Option<DatabaseId>,
92        schema_id: Option<SchemaId>,
93        create_type: PbCreateType,
94        timezone: Option<String>,
95        streaming_parallelism: StreamingParallelism,
96        max_parallelism: usize,
97        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
98    ) -> MetaResult<JobId> {
99        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
100        let job_id = obj.oid.as_job_id();
101        let job = streaming_job::ActiveModel {
102            job_id: Set(job_id),
103            job_status: Set(JobStatus::Initial),
104            create_type: Set(create_type.into()),
105            timezone: Set(timezone),
106            parallelism: Set(streaming_parallelism),
107            max_parallelism: Set(max_parallelism as _),
108            specific_resource_group: Set(specific_resource_group),
109        };
110        job.insert(txn).await?;
111
112        Ok(job_id)
113    }
114
115    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
116    /// materialized view.
117    ///
118    /// Some of the fields in the given streaming job are placeholders, which will
119    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
120    #[await_tree::instrument]
121    pub async fn create_job_catalog(
122        &self,
123        streaming_job: &mut StreamingJob,
124        ctx: &StreamContext,
125        parallelism: &Option<Parallelism>,
126        max_parallelism: usize,
127        mut dependencies: HashSet<ObjectId>,
128        specific_resource_group: Option<String>,
129    ) -> MetaResult<()> {
130        let inner = self.inner.write().await;
131        let txn = inner.db.begin().await?;
132        let create_type = streaming_job.create_type();
133
134        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
135            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
136            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
137            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
138        };
139
140        ensure_user_id(streaming_job.owner() as _, &txn).await?;
141        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
142        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
143        check_relation_name_duplicate(
144            &streaming_job.name(),
145            streaming_job.database_id(),
146            streaming_job.schema_id(),
147            &txn,
148        )
149        .await?;
150
151        // check if any dependency is in altering status.
152        if !dependencies.is_empty() {
153            let altering_cnt = ObjectDependency::find()
154                .join(
155                    JoinType::InnerJoin,
156                    object_dependency::Relation::Object1.def(),
157                )
158                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
159                .filter(
160                    object_dependency::Column::Oid
161                        .is_in(dependencies.clone())
162                        .and(object::Column::ObjType.eq(ObjectType::Table))
163                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
164                        .and(
165                            // It means the referring table is just dummy for altering.
166                            object::Column::Oid.not_in_subquery(
167                                Query::select()
168                                    .column(table::Column::TableId)
169                                    .from(Table)
170                                    .to_owned(),
171                            ),
172                        ),
173                )
174                .count(&txn)
175                .await?;
176            if altering_cnt != 0 {
177                return Err(MetaError::permission_denied(
178                    "some dependent relations are being altered",
179                ));
180            }
181        }
182
183        match streaming_job {
184            StreamingJob::MaterializedView(table) => {
185                let job_id = Self::create_streaming_job_obj(
186                    &txn,
187                    ObjectType::Table,
188                    table.owner as _,
189                    Some(table.database_id),
190                    Some(table.schema_id),
191                    create_type,
192                    ctx.timezone.clone(),
193                    streaming_parallelism,
194                    max_parallelism,
195                    specific_resource_group,
196                )
197                .await?;
198                table.id = job_id.as_mv_table_id();
199                let table_model: table::ActiveModel = table.clone().into();
200                Table::insert(table_model).exec(&txn).await?;
201            }
202            StreamingJob::Sink(sink) => {
203                if let Some(target_table_id) = sink.target_table
204                    && check_sink_into_table_cycle(
205                        target_table_id.into(),
206                        dependencies.iter().cloned().collect(),
207                        &txn,
208                    )
209                    .await?
210                {
211                    bail!("Creating such a sink will result in circular dependency.");
212                }
213
214                let job_id = Self::create_streaming_job_obj(
215                    &txn,
216                    ObjectType::Sink,
217                    sink.owner as _,
218                    Some(sink.database_id),
219                    Some(sink.schema_id),
220                    create_type,
221                    ctx.timezone.clone(),
222                    streaming_parallelism,
223                    max_parallelism,
224                    specific_resource_group,
225                )
226                .await?;
227                sink.id = job_id.as_sink_id();
228                let sink_model: sink::ActiveModel = sink.clone().into();
229                Sink::insert(sink_model).exec(&txn).await?;
230            }
231            StreamingJob::Table(src, table, _) => {
232                let job_id = Self::create_streaming_job_obj(
233                    &txn,
234                    ObjectType::Table,
235                    table.owner as _,
236                    Some(table.database_id),
237                    Some(table.schema_id),
238                    create_type,
239                    ctx.timezone.clone(),
240                    streaming_parallelism,
241                    max_parallelism,
242                    specific_resource_group,
243                )
244                .await?;
245                table.id = job_id.as_mv_table_id();
246                if let Some(src) = src {
247                    let src_obj = Self::create_object(
248                        &txn,
249                        ObjectType::Source,
250                        src.owner as _,
251                        Some(src.database_id),
252                        Some(src.schema_id),
253                    )
254                    .await?;
255                    src.id = src_obj.oid.as_source_id();
256                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
257                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
258                    let source: source::ActiveModel = src.clone().into();
259                    Source::insert(source).exec(&txn).await?;
260                }
261                let table_model: table::ActiveModel = table.clone().into();
262                Table::insert(table_model).exec(&txn).await?;
263            }
264            StreamingJob::Index(index, table) => {
265                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
266                let job_id = Self::create_streaming_job_obj(
267                    &txn,
268                    ObjectType::Index,
269                    index.owner as _,
270                    Some(index.database_id),
271                    Some(index.schema_id),
272                    create_type,
273                    ctx.timezone.clone(),
274                    streaming_parallelism,
275                    max_parallelism,
276                    specific_resource_group,
277                )
278                .await?;
279                // to be compatible with old implementation.
280                index.id = job_id.as_index_id();
281                index.index_table_id = job_id.as_mv_table_id();
282                table.id = job_id.as_mv_table_id();
283
284                ObjectDependency::insert(object_dependency::ActiveModel {
285                    oid: Set(index.primary_table_id.into()),
286                    used_by: Set(table.id.into()),
287                    ..Default::default()
288                })
289                .exec(&txn)
290                .await?;
291
292                let table_model: table::ActiveModel = table.clone().into();
293                Table::insert(table_model).exec(&txn).await?;
294                let index_model: index::ActiveModel = index.clone().into();
295                Index::insert(index_model).exec(&txn).await?;
296            }
297            StreamingJob::Source(src) => {
298                let job_id = Self::create_streaming_job_obj(
299                    &txn,
300                    ObjectType::Source,
301                    src.owner as _,
302                    Some(src.database_id),
303                    Some(src.schema_id),
304                    create_type,
305                    ctx.timezone.clone(),
306                    streaming_parallelism,
307                    max_parallelism,
308                    specific_resource_group,
309                )
310                .await?;
311                src.id = job_id.as_shared_source_id();
312                let source_model: source::ActiveModel = src.clone().into();
313                Source::insert(source_model).exec(&txn).await?;
314            }
315        }
316
317        // collect dependent secrets.
318        dependencies.extend(
319            streaming_job
320                .dependent_secret_ids()?
321                .into_iter()
322                .map(|id| id.as_object_id()),
323        );
324        // collect dependent connection
325        dependencies.extend(
326            streaming_job
327                .dependent_connection_ids()?
328                .into_iter()
329                .map(|id| id.as_object_id()),
330        );
331
332        // record object dependency.
333        if !dependencies.is_empty() {
334            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
335                object_dependency::ActiveModel {
336                    oid: Set(oid),
337                    used_by: Set(streaming_job.id().as_object_id()),
338                    ..Default::default()
339                }
340            }))
341            .exec(&txn)
342            .await?;
343        }
344
345        txn.commit().await?;
346
347        Ok(())
348    }
349
350    /// Create catalogs for internal tables, then notify frontend about them if the job is a
351    /// materialized view.
352    ///
353    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
354    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
355    ///
356    /// Returns a mapping from the temporary table id to the actual global table id.
357    pub async fn create_internal_table_catalog(
358        &self,
359        job: &StreamingJob,
360        mut incomplete_internal_tables: Vec<PbTable>,
361    ) -> MetaResult<HashMap<TableId, TableId>> {
362        let job_id = job.id();
363        let inner = self.inner.write().await;
364        let txn = inner.db.begin().await?;
365
366        // Ensure the job exists.
367        ensure_job_not_canceled(job_id, &txn).await?;
368
369        let mut table_id_map = HashMap::new();
370        for table in &mut incomplete_internal_tables {
371            let table_id = Self::create_object(
372                &txn,
373                ObjectType::Table,
374                table.owner as _,
375                Some(table.database_id),
376                Some(table.schema_id),
377            )
378            .await?
379            .oid
380            .as_table_id();
381            table_id_map.insert(table.id, table_id);
382            table.id = table_id;
383            table.job_id = Some(job_id);
384
385            let table_model = table::ActiveModel {
386                table_id: Set(table_id),
387                belongs_to_job_id: Set(Some(job_id)),
388                fragment_id: NotSet,
389                ..table.clone().into()
390            };
391            Table::insert(table_model).exec(&txn).await?;
392        }
393        txn.commit().await?;
394
395        Ok(table_id_map)
396    }
397
398    pub async fn prepare_stream_job_fragments(
399        &self,
400        stream_job_fragments: &StreamJobFragmentsToCreate,
401        streaming_job: &StreamingJob,
402        for_replace: bool,
403    ) -> MetaResult<()> {
404        self.prepare_streaming_job(
405            stream_job_fragments.stream_job_id(),
406            || stream_job_fragments.fragments.values(),
407            &stream_job_fragments.downstreams,
408            for_replace,
409            Some(streaming_job),
410        )
411        .await
412    }
413
414    // TODO: In this function, we also update the `Table` model in the meta store.
415    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
416    // making them the source of truth and performing a full replacement for those in the meta store?
417    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
418    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
419    )]
420    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
421        &self,
422        job_id: JobId,
423        get_fragments: impl Fn() -> I + 'a,
424        downstreams: &FragmentDownstreamRelation,
425        for_replace: bool,
426        creating_streaming_job: Option<&'a StreamingJob>,
427    ) -> MetaResult<()> {
428        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
429
430        let inner = self.inner.write().await;
431
432        let need_notify = creating_streaming_job
433            .map(|job| job.should_notify_creating())
434            .unwrap_or(false);
435        let definition = creating_streaming_job.map(|job| job.definition());
436
437        let mut objects_to_notify = vec![];
438        let txn = inner.db.begin().await?;
439
440        // Ensure the job exists.
441        ensure_job_not_canceled(job_id, &txn).await?;
442
443        for fragment in fragments {
444            let fragment_id = fragment.fragment_id;
445            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
446
447            let fragment = fragment.into_active_model();
448            Fragment::insert(fragment).exec(&txn).await?;
449
450            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
451            // After table fragments are created, update them for all tables.
452            if !for_replace {
453                let all_tables = StreamJobFragments::collect_tables(get_fragments());
454                for state_table_id in state_table_ids {
455                    // Table's vnode count is not always the fragment's vnode count, so we have to
456                    // look up the table from `TableFragments`.
457                    // See `ActorGraphBuilder::new`.
458                    let table = all_tables
459                        .get(&state_table_id)
460                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
461                    assert_eq!(table.id, state_table_id);
462                    assert_eq!(table.fragment_id, fragment_id);
463                    let vnode_count = table.vnode_count();
464
465                    table::ActiveModel {
466                        table_id: Set(state_table_id as _),
467                        fragment_id: Set(Some(fragment_id)),
468                        vnode_count: Set(vnode_count as _),
469                        ..Default::default()
470                    }
471                    .update(&txn)
472                    .await?;
473
474                    if need_notify {
475                        let mut table = table.clone();
476                        // In production, definition was replaced but still needed for notification.
477                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
478                            table.definition = definition.clone().unwrap();
479                        }
480                        objects_to_notify.push(PbObject {
481                            object_info: Some(PbObjectInfo::Table(table)),
482                        });
483                    }
484                }
485            }
486        }
487
488        // Add streaming job objects to notification
489        if need_notify {
490            match creating_streaming_job.unwrap() {
491                StreamingJob::Table(Some(source), ..) => {
492                    objects_to_notify.push(PbObject {
493                        object_info: Some(PbObjectInfo::Source(source.clone())),
494                    });
495                }
496                StreamingJob::Sink(sink) => {
497                    objects_to_notify.push(PbObject {
498                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
499                    });
500                }
501                StreamingJob::Index(index, _) => {
502                    objects_to_notify.push(PbObject {
503                        object_info: Some(PbObjectInfo::Index(index.clone())),
504                    });
505                }
506                _ => {}
507            }
508        }
509
510        insert_fragment_relations(&txn, downstreams).await?;
511
512        if !for_replace {
513            // Update dml fragment id.
514            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
515                Table::update(table::ActiveModel {
516                    table_id: Set(table.id),
517                    dml_fragment_id: Set(table.dml_fragment_id),
518                    ..Default::default()
519                })
520                .exec(&txn)
521                .await?;
522            }
523        }
524
525        txn.commit().await?;
526
527        // FIXME: there's a gap between the catalog creation and notification, which may lead to
528        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
529        // need to either forward the notification or filter catalogs without any fragments in the metastore.
530        if !objects_to_notify.is_empty() {
531            self.notify_frontend(
532                Operation::Add,
533                Info::ObjectGroup(PbObjectGroup {
534                    objects: objects_to_notify,
535                }),
536            )
537            .await;
538        }
539
540        Ok(())
541    }
542
543    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
544    /// information is required to build barrier mutation.
545    pub async fn build_cancel_command(
546        &self,
547        table_fragments: &StreamJobFragments,
548    ) -> MetaResult<Command> {
549        let inner = self.inner.read().await;
550        let txn = inner.db.begin().await?;
551
552        let dropped_sink_fragment_with_target =
553            if let Some(sink_fragment) = table_fragments.sink_fragment() {
554                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
555                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
556                sink_target_fragment
557                    .get(&sink_fragment_id)
558                    .map(|target_fragments| {
559                        let target_fragment_id = *target_fragments
560                            .first()
561                            .expect("sink should have at least one downstream fragment");
562                        (sink_fragment_id, target_fragment_id)
563                    })
564            } else {
565                None
566            };
567
568        Ok(Command::DropStreamingJobs {
569            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
570            actors: table_fragments.actor_ids().collect(),
571            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
572            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
573            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
574                .into_iter()
575                .map(|(sink, target)| (target as _, vec![sink as _]))
576                .collect(),
577        })
578    }
579
580    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
581    /// It returns (true, _) if the job is not found or aborted.
582    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
583    #[await_tree::instrument]
584    pub async fn try_abort_creating_streaming_job(
585        &self,
586        mut job_id: JobId,
587        is_cancelled: bool,
588    ) -> MetaResult<(bool, Option<DatabaseId>)> {
589        let mut inner = self.inner.write().await;
590        let txn = inner.db.begin().await?;
591
592        let obj = Object::find_by_id(job_id).one(&txn).await?;
593        let Some(obj) = obj else {
594            tracing::warn!(
595                id = %job_id,
596                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
597            );
598            return Ok((true, None));
599        };
600        let database_id = obj
601            .database_id
602            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
603        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
604
605        if !is_cancelled && let Some(streaming_job) = &streaming_job {
606            assert_ne!(streaming_job.job_status, JobStatus::Created);
607            if streaming_job.create_type == CreateType::Background
608                && streaming_job.job_status == JobStatus::Creating
609            {
610                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
611                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
612                {
613                    // If the job belongs to an iceberg table, we still need to clean it.
614                } else {
615                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
616                    tracing::warn!(
617                        id = %job_id,
618                        "streaming job is created in background and still in creating status"
619                    );
620                    return Ok((false, Some(database_id)));
621                }
622            }
623        }
624
625        let iceberg_table_id =
626            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
627        if let Some(iceberg_table_id) = iceberg_table_id {
628            // If the job is iceberg sink, we need to clean the iceberg table as well.
629            // Here we will drop the sink objects directly.
630            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
631            Object::delete_many()
632                .filter(
633                    object::Column::Oid
634                        .eq(job_id)
635                        .or(object::Column::Oid.is_in(internal_tables)),
636                )
637                .exec(&txn)
638                .await?;
639            job_id = iceberg_table_id.as_job_id();
640        };
641
642        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
643
644        // Get the notification info if the job is a materialized view or created in the background.
645        let mut objs = vec![];
646        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
647
648        let mut need_notify =
649            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
650        if !need_notify {
651            // If the job is not created in the background, we only need to notify the frontend if the job is a materialized view.
652            if let Some(table) = &table_obj {
653                need_notify = table.table_type == TableType::MaterializedView;
654            }
655        }
656
657        if is_cancelled {
658            let dropped_tables = Table::find()
659                .find_also_related(Object)
660                .filter(
661                    table::Column::TableId.is_in(
662                        internal_table_ids
663                            .iter()
664                            .cloned()
665                            .chain(table_obj.iter().map(|t| t.table_id as _)),
666                    ),
667                )
668                .all(&txn)
669                .await?
670                .into_iter()
671                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
672            inner
673                .dropped_tables
674                .extend(dropped_tables.map(|t| (t.id, t)));
675        }
676
677        if need_notify {
678            let obj: Option<PartialObject> = Object::find_by_id(job_id)
679                .select_only()
680                .columns([
681                    object::Column::Oid,
682                    object::Column::ObjType,
683                    object::Column::SchemaId,
684                    object::Column::DatabaseId,
685                ])
686                .into_partial_model()
687                .one(&txn)
688                .await?;
689            let obj =
690                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
691            objs.push(obj);
692            let internal_table_objs: Vec<PartialObject> = Object::find()
693                .select_only()
694                .columns([
695                    object::Column::Oid,
696                    object::Column::ObjType,
697                    object::Column::SchemaId,
698                    object::Column::DatabaseId,
699                ])
700                .join(JoinType::InnerJoin, object::Relation::Table.def())
701                .filter(table::Column::BelongsToJobId.eq(job_id))
702                .into_partial_model()
703                .all(&txn)
704                .await?;
705            objs.extend(internal_table_objs);
706        }
707
708        // Check if the job is creating sink into table.
709        if table_obj.is_none()
710            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
711                .select_only()
712                .column(sink::Column::TargetTable)
713                .into_tuple::<Option<TableId>>()
714                .one(&txn)
715                .await?
716        {
717            let tmp_id: Option<ObjectId> = ObjectDependency::find()
718                .select_only()
719                .column(object_dependency::Column::UsedBy)
720                .join(
721                    JoinType::InnerJoin,
722                    object_dependency::Relation::Object1.def(),
723                )
724                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
725                .filter(
726                    object_dependency::Column::Oid
727                        .eq(target_table_id)
728                        .and(object::Column::ObjType.eq(ObjectType::Table))
729                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
730                )
731                .into_tuple()
732                .one(&txn)
733                .await?;
734            if let Some(tmp_id) = tmp_id {
735                tracing::warn!(
736                    id = %tmp_id,
737                    "aborting temp streaming job for sink into table"
738                );
739
740                Object::delete_by_id(tmp_id).exec(&txn).await?;
741            }
742        }
743
744        Object::delete_by_id(job_id).exec(&txn).await?;
745        if !internal_table_ids.is_empty() {
746            Object::delete_many()
747                .filter(object::Column::Oid.is_in(internal_table_ids))
748                .exec(&txn)
749                .await?;
750        }
751        if let Some(t) = &table_obj
752            && let Some(source_id) = t.optional_associated_source_id
753        {
754            Object::delete_by_id(source_id).exec(&txn).await?;
755        }
756
757        let err = if is_cancelled {
758            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
759        } else {
760            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
761        };
762        let abort_reason = format!("streaming job aborted {}", err.as_report());
763        for tx in inner
764            .creating_table_finish_notifier
765            .get_mut(&database_id)
766            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
767            .into_iter()
768            .flatten()
769            .flatten()
770        {
771            let _ = tx.send(Err(abort_reason.clone()));
772        }
773        txn.commit().await?;
774
775        if !objs.is_empty() {
776            // We also have notified the frontend about these objects,
777            // so we need to notify the frontend to delete them here.
778            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
779                .await;
780        }
781        Ok((true, Some(database_id)))
782    }
783
784    #[await_tree::instrument]
785    pub async fn post_collect_job_fragments(
786        &self,
787        job_id: JobId,
788        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
789        new_sink_downstream: Option<FragmentDownstreamRelation>,
790        split_assignment: Option<&SplitAssignment>,
791    ) -> MetaResult<()> {
792        let inner = self.inner.write().await;
793        let txn = inner.db.begin().await?;
794
795        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
796
797        if let Some(new_downstream) = new_sink_downstream {
798            insert_fragment_relations(&txn, &new_downstream).await?;
799        }
800
801        // Mark job as CREATING.
802        streaming_job::ActiveModel {
803            job_id: Set(job_id),
804            job_status: Set(JobStatus::Creating),
805            ..Default::default()
806        }
807        .update(&txn)
808        .await?;
809
810        if let Some(split_assignment) = split_assignment {
811            let fragment_splits = split_assignment
812                .iter()
813                .map(|(fragment_id, splits)| {
814                    (
815                        *fragment_id as _,
816                        splits.values().flatten().cloned().collect_vec(),
817                    )
818                })
819                .collect();
820
821            self.update_fragment_splits(&txn, &fragment_splits).await?;
822        }
823
824        txn.commit().await?;
825
826        Ok(())
827    }
828
829    pub async fn create_job_catalog_for_replace(
830        &self,
831        streaming_job: &StreamingJob,
832        ctx: Option<&StreamContext>,
833        specified_parallelism: Option<&NonZeroUsize>,
834        expected_original_max_parallelism: Option<usize>,
835    ) -> MetaResult<JobId> {
836        let id = streaming_job.id();
837        let inner = self.inner.write().await;
838        let txn = inner.db.begin().await?;
839
840        // 1. check version.
841        streaming_job.verify_version_for_replace(&txn).await?;
842        // 2. check concurrent replace.
843        let referring_cnt = ObjectDependency::find()
844            .join(
845                JoinType::InnerJoin,
846                object_dependency::Relation::Object1.def(),
847            )
848            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
849            .filter(
850                object_dependency::Column::Oid
851                    .eq(id)
852                    .and(object::Column::ObjType.eq(ObjectType::Table))
853                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
854            )
855            .count(&txn)
856            .await?;
857        if referring_cnt != 0 {
858            return Err(MetaError::permission_denied(
859                "job is being altered or referenced by some creating jobs",
860            ));
861        }
862
863        // 3. check parallelism.
864        let (original_max_parallelism, original_timezone): (i32, Option<String>) =
865            StreamingJobModel::find_by_id(id)
866                .select_only()
867                .column(streaming_job::Column::MaxParallelism)
868                .column(streaming_job::Column::Timezone)
869                .into_tuple()
870                .one(&txn)
871                .await?
872                .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
873
874        if let Some(max_parallelism) = expected_original_max_parallelism
875            && original_max_parallelism != max_parallelism as i32
876        {
877            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
878            // This should not happen in normal cases.
879            bail!(
880                "cannot use a different max parallelism \
881                 when replacing streaming job, \
882                 original: {}, new: {}",
883                original_max_parallelism,
884                max_parallelism
885            );
886        }
887
888        let parallelism = match specified_parallelism {
889            None => StreamingParallelism::Adaptive,
890            Some(n) => StreamingParallelism::Fixed(n.get() as _),
891        };
892        let timezone = ctx
893            .map(|ctx| ctx.timezone.clone())
894            .unwrap_or(original_timezone);
895
896        // 4. create streaming object for new replace table.
897        let new_obj_id = Self::create_streaming_job_obj(
898            &txn,
899            streaming_job.object_type(),
900            streaming_job.owner() as _,
901            Some(streaming_job.database_id() as _),
902            Some(streaming_job.schema_id() as _),
903            streaming_job.create_type(),
904            timezone,
905            parallelism,
906            original_max_parallelism as _,
907            None,
908        )
909        .await?;
910
911        // 5. record dependency for new replace table.
912        ObjectDependency::insert(object_dependency::ActiveModel {
913            oid: Set(id.as_object_id()),
914            used_by: Set(new_obj_id.as_object_id()),
915            ..Default::default()
916        })
917        .exec(&txn)
918        .await?;
919
920        txn.commit().await?;
921
922        Ok(new_obj_id)
923    }
924
925    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
926    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
927        let mut inner = self.inner.write().await;
928        let txn = inner.db.begin().await?;
929
930        // Check if the job belongs to iceberg table.
931        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
932            tracing::info!(
933                "streaming job {} is for iceberg table, wait for manual finish operation",
934                job_id
935            );
936            return Ok(());
937        }
938
939        let (notification_op, objects, updated_user_info) =
940            Self::finish_streaming_job_inner(&txn, job_id).await?;
941
942        txn.commit().await?;
943
944        let mut version = self
945            .notify_frontend(
946                notification_op,
947                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
948            )
949            .await;
950
951        // notify users about the default privileges
952        if !updated_user_info.is_empty() {
953            version = self.notify_users_update(updated_user_info).await;
954        }
955
956        inner
957            .creating_table_finish_notifier
958            .values_mut()
959            .for_each(|creating_tables| {
960                if let Some(txs) = creating_tables.remove(&job_id) {
961                    for tx in txs {
962                        let _ = tx.send(Ok(version));
963                    }
964                }
965            });
966
967        Ok(())
968    }
969
970    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
971    pub async fn finish_streaming_job_inner(
972        txn: &DatabaseTransaction,
973        job_id: JobId,
974    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
975        let job_type = Object::find_by_id(job_id)
976            .select_only()
977            .column(object::Column::ObjType)
978            .into_tuple()
979            .one(txn)
980            .await?
981            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
982
983        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
984            .select_only()
985            .column(streaming_job::Column::CreateType)
986            .into_tuple()
987            .one(txn)
988            .await?
989            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
990
991        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
992        let res = Object::update_many()
993            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
994            .col_expr(
995                object::Column::CreatedAtClusterVersion,
996                current_cluster_version().into(),
997            )
998            .filter(object::Column::Oid.eq(job_id))
999            .exec(txn)
1000            .await?;
1001        if res.rows_affected == 0 {
1002            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1003        }
1004
1005        // mark the target stream job as `Created`.
1006        let job = streaming_job::ActiveModel {
1007            job_id: Set(job_id),
1008            job_status: Set(JobStatus::Created),
1009            ..Default::default()
1010        };
1011        job.update(txn).await?;
1012
1013        // notify frontend: job, internal tables.
1014        let internal_table_objs = Table::find()
1015            .find_also_related(Object)
1016            .filter(table::Column::BelongsToJobId.eq(job_id))
1017            .all(txn)
1018            .await?;
1019        let mut objects = internal_table_objs
1020            .iter()
1021            .map(|(table, obj)| PbObject {
1022                object_info: Some(PbObjectInfo::Table(
1023                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1024                )),
1025            })
1026            .collect_vec();
1027        let mut notification_op = if create_type == CreateType::Background {
1028            NotificationOperation::Update
1029        } else {
1030            NotificationOperation::Add
1031        };
1032        let mut updated_user_info = vec![];
1033
1034        match job_type {
1035            ObjectType::Table => {
1036                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1037                    .find_also_related(Object)
1038                    .one(txn)
1039                    .await?
1040                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1041                if table.table_type == TableType::MaterializedView {
1042                    notification_op = NotificationOperation::Update;
1043                }
1044
1045                if let Some(source_id) = table.optional_associated_source_id {
1046                    let (src, obj) = Source::find_by_id(source_id)
1047                        .find_also_related(Object)
1048                        .one(txn)
1049                        .await?
1050                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1051                    objects.push(PbObject {
1052                        object_info: Some(PbObjectInfo::Source(
1053                            ObjectModel(src, obj.unwrap()).into(),
1054                        )),
1055                    });
1056                }
1057                objects.push(PbObject {
1058                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1059                });
1060            }
1061            ObjectType::Sink => {
1062                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1063                    .find_also_related(Object)
1064                    .one(txn)
1065                    .await?
1066                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1067                objects.push(PbObject {
1068                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1069                });
1070            }
1071            ObjectType::Index => {
1072                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1073                    .find_also_related(Object)
1074                    .one(txn)
1075                    .await?
1076                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1077                {
1078                    let (table, obj) = Table::find_by_id(index.index_table_id)
1079                        .find_also_related(Object)
1080                        .one(txn)
1081                        .await?
1082                        .ok_or_else(|| {
1083                            MetaError::catalog_id_not_found("table", index.index_table_id)
1084                        })?;
1085                    objects.push(PbObject {
1086                        object_info: Some(PbObjectInfo::Table(
1087                            ObjectModel(table, obj.unwrap()).into(),
1088                        )),
1089                    });
1090                }
1091
1092                // If the index is created on a table with privileges, we should also
1093                // grant the privileges for the index and its state tables.
1094                let primary_table_privileges = UserPrivilege::find()
1095                    .filter(
1096                        user_privilege::Column::Oid
1097                            .eq(index.primary_table_id)
1098                            .and(user_privilege::Column::Action.eq(Action::Select)),
1099                    )
1100                    .all(txn)
1101                    .await?;
1102                if !primary_table_privileges.is_empty() {
1103                    let index_state_table_ids: Vec<TableId> = Table::find()
1104                        .select_only()
1105                        .column(table::Column::TableId)
1106                        .filter(
1107                            table::Column::BelongsToJobId
1108                                .eq(job_id)
1109                                .or(table::Column::TableId.eq(index.index_table_id)),
1110                        )
1111                        .into_tuple()
1112                        .all(txn)
1113                        .await?;
1114                    let mut new_privileges = vec![];
1115                    for privilege in &primary_table_privileges {
1116                        for state_table_id in &index_state_table_ids {
1117                            new_privileges.push(user_privilege::ActiveModel {
1118                                id: Default::default(),
1119                                oid: Set(state_table_id.as_object_id()),
1120                                user_id: Set(privilege.user_id),
1121                                action: Set(Action::Select),
1122                                dependent_id: Set(privilege.dependent_id),
1123                                granted_by: Set(privilege.granted_by),
1124                                with_grant_option: Set(privilege.with_grant_option),
1125                            });
1126                        }
1127                    }
1128                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1129
1130                    updated_user_info = list_user_info_by_ids(
1131                        primary_table_privileges.into_iter().map(|p| p.user_id),
1132                        txn,
1133                    )
1134                    .await?;
1135                }
1136
1137                objects.push(PbObject {
1138                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1139                });
1140            }
1141            ObjectType::Source => {
1142                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1143                    .find_also_related(Object)
1144                    .one(txn)
1145                    .await?
1146                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1147                objects.push(PbObject {
1148                    object_info: Some(PbObjectInfo::Source(
1149                        ObjectModel(source, obj.unwrap()).into(),
1150                    )),
1151                });
1152            }
1153            _ => unreachable!("invalid job type: {:?}", job_type),
1154        }
1155
1156        if job_type != ObjectType::Index {
1157            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1158        }
1159
1160        Ok((notification_op, objects, updated_user_info))
1161    }
1162
1163    pub async fn finish_replace_streaming_job(
1164        &self,
1165        tmp_id: JobId,
1166        streaming_job: StreamingJob,
1167        replace_upstream: FragmentReplaceUpstream,
1168        sink_into_table_context: SinkIntoTableContext,
1169        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1170        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1171    ) -> MetaResult<NotificationVersion> {
1172        let inner = self.inner.write().await;
1173        let txn = inner.db.begin().await?;
1174
1175        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1176            tmp_id,
1177            replace_upstream,
1178            sink_into_table_context,
1179            &txn,
1180            streaming_job,
1181            drop_table_connector_ctx,
1182            auto_refresh_schema_sinks,
1183        )
1184        .await?;
1185
1186        txn.commit().await?;
1187
1188        let mut version = self
1189            .notify_frontend(
1190                NotificationOperation::Update,
1191                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1192            )
1193            .await;
1194
1195        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1196            self.notify_users_update(user_infos).await;
1197            version = self
1198                .notify_frontend(
1199                    NotificationOperation::Delete,
1200                    build_object_group_for_delete(to_drop_objects),
1201                )
1202                .await;
1203        }
1204
1205        Ok(version)
1206    }
1207
1208    pub async fn finish_replace_streaming_job_inner(
1209        tmp_id: JobId,
1210        replace_upstream: FragmentReplaceUpstream,
1211        SinkIntoTableContext {
1212            updated_sink_catalogs,
1213        }: SinkIntoTableContext,
1214        txn: &DatabaseTransaction,
1215        streaming_job: StreamingJob,
1216        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1217        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1218    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1219        let original_job_id = streaming_job.id();
1220        let job_type = streaming_job.job_type();
1221
1222        let mut index_item_rewriter = None;
1223
1224        // Update catalog
1225        match streaming_job {
1226            StreamingJob::Table(_source, table, _table_job_type) => {
1227                // The source catalog should remain unchanged
1228
1229                let original_column_catalogs =
1230                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1231
1232                index_item_rewriter = Some({
1233                    let original_columns = original_column_catalogs
1234                        .to_protobuf()
1235                        .into_iter()
1236                        .map(|c| c.column_desc.unwrap())
1237                        .collect_vec();
1238                    let new_columns = table
1239                        .columns
1240                        .iter()
1241                        .map(|c| c.column_desc.clone().unwrap())
1242                        .collect_vec();
1243
1244                    IndexItemRewriter {
1245                        original_columns,
1246                        new_columns,
1247                    }
1248                });
1249
1250                // For sinks created in earlier versions, we need to set the original_target_columns.
1251                for sink_id in updated_sink_catalogs {
1252                    sink::ActiveModel {
1253                        sink_id: Set(sink_id as _),
1254                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1255                        ..Default::default()
1256                    }
1257                    .update(txn)
1258                    .await?;
1259                }
1260                // Update the table catalog with the new one. (column catalog is also updated here)
1261                let mut table = table::ActiveModel::from(table);
1262                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1263                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1264                {
1265                    // drop table connector, the rest logic is in `drop_table_associated_source`
1266                    table.optional_associated_source_id = Set(None);
1267                }
1268
1269                table.update(txn).await?;
1270            }
1271            StreamingJob::Source(source) => {
1272                // Update the source catalog with the new one.
1273                let source = source::ActiveModel::from(source);
1274                source.update(txn).await?;
1275            }
1276            StreamingJob::MaterializedView(table) => {
1277                // Update the table catalog with the new one.
1278                let table = table::ActiveModel::from(table);
1279                table.update(txn).await?;
1280            }
1281            _ => unreachable!(
1282                "invalid streaming job type: {:?}",
1283                streaming_job.job_type_str()
1284            ),
1285        }
1286
1287        async fn finish_fragments(
1288            txn: &DatabaseTransaction,
1289            tmp_id: JobId,
1290            original_job_id: JobId,
1291            replace_upstream: FragmentReplaceUpstream,
1292        ) -> MetaResult<()> {
1293            // 0. update internal tables
1294            // Fields including `fragment_id` were placeholder values before.
1295            // After table fragments are created, update them for all internal tables.
1296            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1297                .select_only()
1298                .columns([
1299                    fragment::Column::FragmentId,
1300                    fragment::Column::StateTableIds,
1301                ])
1302                .filter(fragment::Column::JobId.eq(tmp_id))
1303                .into_tuple()
1304                .all(txn)
1305                .await?;
1306            for (fragment_id, state_table_ids) in fragment_info {
1307                for state_table_id in state_table_ids.into_inner() {
1308                    let state_table_id = TableId::new(state_table_id as _);
1309                    table::ActiveModel {
1310                        table_id: Set(state_table_id),
1311                        fragment_id: Set(Some(fragment_id)),
1312                        // No need to update `vnode_count` because it must remain the same.
1313                        ..Default::default()
1314                    }
1315                    .update(txn)
1316                    .await?;
1317                }
1318            }
1319
1320            // 1. replace old fragments/actors with new ones.
1321            Fragment::delete_many()
1322                .filter(fragment::Column::JobId.eq(original_job_id))
1323                .exec(txn)
1324                .await?;
1325            Fragment::update_many()
1326                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1327                .filter(fragment::Column::JobId.eq(tmp_id))
1328                .exec(txn)
1329                .await?;
1330
1331            // 2. update merges.
1332            // update downstream fragment's Merge node, and upstream_fragment_id
1333            for (fragment_id, fragment_replace_map) in replace_upstream {
1334                let (fragment_id, mut stream_node) =
1335                    Fragment::find_by_id(fragment_id as FragmentId)
1336                        .select_only()
1337                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1338                        .into_tuple::<(FragmentId, StreamNode)>()
1339                        .one(txn)
1340                        .await?
1341                        .map(|(id, node)| (id, node.to_protobuf()))
1342                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1343
1344                visit_stream_node_mut(&mut stream_node, |body| {
1345                    if let PbNodeBody::Merge(m) = body
1346                        && let Some(new_fragment_id) =
1347                            fragment_replace_map.get(&m.upstream_fragment_id)
1348                    {
1349                        m.upstream_fragment_id = *new_fragment_id;
1350                    }
1351                });
1352                fragment::ActiveModel {
1353                    fragment_id: Set(fragment_id),
1354                    stream_node: Set(StreamNode::from(&stream_node)),
1355                    ..Default::default()
1356                }
1357                .update(txn)
1358                .await?;
1359            }
1360
1361            // 3. remove dummy object.
1362            Object::delete_by_id(tmp_id).exec(txn).await?;
1363
1364            Ok(())
1365        }
1366
1367        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1368
1369        // 4. update catalogs and notify.
1370        let mut objects = vec![];
1371        match job_type {
1372            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1373                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1374                    .find_also_related(Object)
1375                    .one(txn)
1376                    .await?
1377                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1378                objects.push(PbObject {
1379                    object_info: Some(PbObjectInfo::Table(
1380                        ObjectModel(table, table_obj.unwrap()).into(),
1381                    )),
1382                })
1383            }
1384            StreamingJobType::Source => {
1385                let (source, source_obj) =
1386                    Source::find_by_id(original_job_id.as_shared_source_id())
1387                        .find_also_related(Object)
1388                        .one(txn)
1389                        .await?
1390                        .ok_or_else(|| {
1391                            MetaError::catalog_id_not_found("object", original_job_id)
1392                        })?;
1393                objects.push(PbObject {
1394                    object_info: Some(PbObjectInfo::Source(
1395                        ObjectModel(source, source_obj.unwrap()).into(),
1396                    )),
1397                })
1398            }
1399            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1400        }
1401
1402        if let Some(expr_rewriter) = index_item_rewriter {
1403            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1404                .select_only()
1405                .columns([index::Column::IndexId, index::Column::IndexItems])
1406                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1407                .into_tuple()
1408                .all(txn)
1409                .await?;
1410            for (index_id, nodes) in index_items {
1411                let mut pb_nodes = nodes.to_protobuf();
1412                pb_nodes
1413                    .iter_mut()
1414                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1415                let index = index::ActiveModel {
1416                    index_id: Set(index_id),
1417                    index_items: Set(pb_nodes.into()),
1418                    ..Default::default()
1419                }
1420                .update(txn)
1421                .await?;
1422                let index_obj = index
1423                    .find_related(Object)
1424                    .one(txn)
1425                    .await?
1426                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1427                objects.push(PbObject {
1428                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1429                });
1430            }
1431        }
1432
1433        if let Some(sinks) = auto_refresh_schema_sinks {
1434            for finish_sink_context in sinks {
1435                finish_fragments(
1436                    txn,
1437                    finish_sink_context.tmp_sink_id.as_job_id(),
1438                    finish_sink_context.original_sink_id.as_job_id(),
1439                    Default::default(),
1440                )
1441                .await?;
1442                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1443                    .find_also_related(Object)
1444                    .one(txn)
1445                    .await?
1446                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1447                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1448                Sink::update(sink::ActiveModel {
1449                    sink_id: Set(finish_sink_context.original_sink_id),
1450                    columns: Set(columns.clone()),
1451                    ..Default::default()
1452                })
1453                .exec(txn)
1454                .await?;
1455                sink.columns = columns;
1456                objects.push(PbObject {
1457                    object_info: Some(PbObjectInfo::Sink(
1458                        ObjectModel(sink, sink_obj.unwrap()).into(),
1459                    )),
1460                });
1461                if let Some((log_store_table_id, new_log_store_table_columns)) =
1462                    finish_sink_context.new_log_store_table
1463                {
1464                    let new_log_store_table_columns: ColumnCatalogArray =
1465                        new_log_store_table_columns.into();
1466                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1467                        .find_also_related(Object)
1468                        .one(txn)
1469                        .await?
1470                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1471                    Table::update(table::ActiveModel {
1472                        table_id: Set(log_store_table_id),
1473                        columns: Set(new_log_store_table_columns.clone()),
1474                        ..Default::default()
1475                    })
1476                    .exec(txn)
1477                    .await?;
1478                    table.columns = new_log_store_table_columns;
1479                    objects.push(PbObject {
1480                        object_info: Some(PbObjectInfo::Table(
1481                            ObjectModel(table, table_obj.unwrap()).into(),
1482                        )),
1483                    });
1484                }
1485            }
1486        }
1487
1488        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1489        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1490            notification_objs =
1491                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1492        }
1493
1494        Ok((objects, notification_objs))
1495    }
1496
1497    /// Abort the replacing streaming job by deleting the temporary job object.
1498    pub async fn try_abort_replacing_streaming_job(
1499        &self,
1500        tmp_job_id: JobId,
1501        tmp_sink_ids: Option<Vec<ObjectId>>,
1502    ) -> MetaResult<()> {
1503        let inner = self.inner.write().await;
1504        let txn = inner.db.begin().await?;
1505        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1506        if let Some(tmp_sink_ids) = tmp_sink_ids {
1507            for tmp_sink_id in tmp_sink_ids {
1508                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1509            }
1510        }
1511        txn.commit().await?;
1512        Ok(())
1513    }
1514
1515    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1516    // return the actor_ids to be applied
1517    pub async fn update_source_rate_limit_by_source_id(
1518        &self,
1519        source_id: SourceId,
1520        rate_limit: Option<u32>,
1521    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1522        let inner = self.inner.read().await;
1523        let txn = inner.db.begin().await?;
1524
1525        {
1526            let active_source = source::ActiveModel {
1527                source_id: Set(source_id),
1528                rate_limit: Set(rate_limit.map(|v| v as i32)),
1529                ..Default::default()
1530            };
1531            active_source.update(&txn).await?;
1532        }
1533
1534        let (source, obj) = Source::find_by_id(source_id)
1535            .find_also_related(Object)
1536            .one(&txn)
1537            .await?
1538            .ok_or_else(|| {
1539                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1540            })?;
1541
1542        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1543        let streaming_job_ids: Vec<JobId> =
1544            if let Some(table_id) = source.optional_associated_table_id {
1545                vec![table_id.as_job_id()]
1546            } else if let Some(source_info) = &source.source_info
1547                && source_info.to_protobuf().is_shared()
1548            {
1549                vec![source_id.as_share_source_job_id()]
1550            } else {
1551                ObjectDependency::find()
1552                    .select_only()
1553                    .column(object_dependency::Column::UsedBy)
1554                    .filter(object_dependency::Column::Oid.eq(source_id))
1555                    .into_tuple()
1556                    .all(&txn)
1557                    .await?
1558            };
1559
1560        if streaming_job_ids.is_empty() {
1561            return Err(MetaError::invalid_parameter(format!(
1562                "source id {source_id} not used by any streaming job"
1563            )));
1564        }
1565
1566        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1567            .select_only()
1568            .columns([
1569                fragment::Column::FragmentId,
1570                fragment::Column::FragmentTypeMask,
1571                fragment::Column::StreamNode,
1572            ])
1573            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1574            .into_tuple()
1575            .all(&txn)
1576            .await?;
1577        let mut fragments = fragments
1578            .into_iter()
1579            .map(|(id, mask, stream_node)| {
1580                (
1581                    id,
1582                    FragmentTypeMask::from(mask as u32),
1583                    stream_node.to_protobuf(),
1584                )
1585            })
1586            .collect_vec();
1587
1588        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1589            let mut found = false;
1590            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1591                visit_stream_node_mut(stream_node, |node| {
1592                    if let PbNodeBody::Source(node) = node
1593                        && let Some(node_inner) = &mut node.source_inner
1594                        && node_inner.source_id == source_id
1595                    {
1596                        node_inner.rate_limit = rate_limit;
1597                        found = true;
1598                    }
1599                });
1600            }
1601            if is_fs_source {
1602                // in older versions, there's no fragment type flag for `FsFetch` node,
1603                // so we just scan all fragments for StreamFsFetch node if using fs connector
1604                visit_stream_node_mut(stream_node, |node| {
1605                    if let PbNodeBody::StreamFsFetch(node) = node {
1606                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1607                        if let Some(node_inner) = &mut node.node_inner
1608                            && node_inner.source_id == source_id
1609                        {
1610                            node_inner.rate_limit = rate_limit;
1611                            found = true;
1612                        }
1613                    }
1614                });
1615            }
1616            found
1617        });
1618
1619        assert!(
1620            !fragments.is_empty(),
1621            "source id should be used by at least one fragment"
1622        );
1623        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1624
1625        for (id, fragment_type_mask, stream_node) in fragments {
1626            fragment::ActiveModel {
1627                fragment_id: Set(id),
1628                fragment_type_mask: Set(fragment_type_mask.into()),
1629                stream_node: Set(StreamNode::from(&stream_node)),
1630                ..Default::default()
1631            }
1632            .update(&txn)
1633            .await?;
1634        }
1635
1636        txn.commit().await?;
1637
1638        let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1639
1640        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1641        let _version = self
1642            .notify_frontend(
1643                NotificationOperation::Update,
1644                NotificationInfo::ObjectGroup(PbObjectGroup {
1645                    objects: vec![PbObject {
1646                        object_info: Some(relation_info),
1647                    }],
1648                }),
1649            )
1650            .await;
1651
1652        Ok(fragment_actors)
1653    }
1654
1655    fn get_fragment_actors_from_running_info(
1656        &self,
1657        fragment_ids: impl Iterator<Item = FragmentId>,
1658    ) -> HashMap<FragmentId, Vec<ActorId>> {
1659        let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1660
1661        let info = self.env.shared_actor_infos().read_guard();
1662
1663        for fragment_id in fragment_ids {
1664            let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1665            fragment_actors
1666                .entry(fragment_id as _)
1667                .or_default()
1668                .extend(actors.keys().copied());
1669        }
1670
1671        fragment_actors
1672    }
1673
1674    // edit the content of fragments in given `table_id`
1675    // return the actor_ids to be applied
1676    pub async fn mutate_fragments_by_job_id(
1677        &self,
1678        job_id: JobId,
1679        // returns true if the mutation is applied
1680        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1681        // error message when no relevant fragments is found
1682        err_msg: &'static str,
1683    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1684        let inner = self.inner.read().await;
1685        let txn = inner.db.begin().await?;
1686
1687        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1688            .select_only()
1689            .columns([
1690                fragment::Column::FragmentId,
1691                fragment::Column::FragmentTypeMask,
1692                fragment::Column::StreamNode,
1693            ])
1694            .filter(fragment::Column::JobId.eq(job_id))
1695            .into_tuple()
1696            .all(&txn)
1697            .await?;
1698        let mut fragments = fragments
1699            .into_iter()
1700            .map(|(id, mask, stream_node)| {
1701                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1702            })
1703            .collect_vec();
1704
1705        let fragments = fragments
1706            .iter_mut()
1707            .map(|(_, fragment_type_mask, stream_node)| {
1708                fragments_mutation_fn(*fragment_type_mask, stream_node)
1709            })
1710            .collect::<MetaResult<Vec<bool>>>()?
1711            .into_iter()
1712            .zip_eq_debug(std::mem::take(&mut fragments))
1713            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1714            .collect::<Vec<_>>();
1715
1716        if fragments.is_empty() {
1717            return Err(MetaError::invalid_parameter(format!(
1718                "job id {job_id}: {}",
1719                err_msg
1720            )));
1721        }
1722
1723        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1724        for (id, _, stream_node) in fragments {
1725            fragment::ActiveModel {
1726                fragment_id: Set(id),
1727                stream_node: Set(StreamNode::from(&stream_node)),
1728                ..Default::default()
1729            }
1730            .update(&txn)
1731            .await?;
1732        }
1733
1734        txn.commit().await?;
1735
1736        let fragment_actors =
1737            self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1738
1739        Ok(fragment_actors)
1740    }
1741
1742    async fn mutate_fragment_by_fragment_id(
1743        &self,
1744        fragment_id: FragmentId,
1745        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1746        err_msg: &'static str,
1747    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1748        let inner = self.inner.read().await;
1749        let txn = inner.db.begin().await?;
1750
1751        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1752            Fragment::find_by_id(fragment_id)
1753                .select_only()
1754                .columns([
1755                    fragment::Column::FragmentTypeMask,
1756                    fragment::Column::StreamNode,
1757                ])
1758                .into_tuple()
1759                .one(&txn)
1760                .await?
1761                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1762        let mut pb_stream_node = stream_node.to_protobuf();
1763        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1764
1765        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1766            return Err(MetaError::invalid_parameter(format!(
1767                "fragment id {fragment_id}: {}",
1768                err_msg
1769            )));
1770        }
1771
1772        fragment::ActiveModel {
1773            fragment_id: Set(fragment_id),
1774            stream_node: Set(stream_node),
1775            ..Default::default()
1776        }
1777        .update(&txn)
1778        .await?;
1779
1780        let fragment_actors =
1781            self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1782
1783        txn.commit().await?;
1784
1785        Ok(fragment_actors)
1786    }
1787
1788    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1789    // return the actor_ids to be applied
1790    pub async fn update_backfill_rate_limit_by_job_id(
1791        &self,
1792        job_id: JobId,
1793        rate_limit: Option<u32>,
1794    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1795        let update_backfill_rate_limit =
1796            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1797                let mut found = false;
1798                if fragment_type_mask
1799                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1800                {
1801                    visit_stream_node_mut(stream_node, |node| match node {
1802                        PbNodeBody::StreamCdcScan(node) => {
1803                            node.rate_limit = rate_limit;
1804                            found = true;
1805                        }
1806                        PbNodeBody::StreamScan(node) => {
1807                            node.rate_limit = rate_limit;
1808                            found = true;
1809                        }
1810                        PbNodeBody::SourceBackfill(node) => {
1811                            node.rate_limit = rate_limit;
1812                            found = true;
1813                        }
1814                        PbNodeBody::Sink(node) => {
1815                            node.rate_limit = rate_limit;
1816                            found = true;
1817                        }
1818                        _ => {}
1819                    });
1820                }
1821                Ok(found)
1822            };
1823
1824        self.mutate_fragments_by_job_id(
1825            job_id,
1826            update_backfill_rate_limit,
1827            "stream scan node or source node not found",
1828        )
1829        .await
1830    }
1831
1832    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1833    // return the actor_ids to be applied
1834    pub async fn update_sink_rate_limit_by_job_id(
1835        &self,
1836        sink_id: SinkId,
1837        rate_limit: Option<u32>,
1838    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1839        let update_sink_rate_limit =
1840            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1841                let mut found = Ok(false);
1842                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1843                    visit_stream_node_mut(stream_node, |node| {
1844                        if let PbNodeBody::Sink(node) = node {
1845                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1846                                found = Err(MetaError::invalid_parameter(
1847                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1848                                ));
1849                                return;
1850                            }
1851                            node.rate_limit = rate_limit;
1852                            found = Ok(true);
1853                        }
1854                    });
1855                }
1856                found
1857            };
1858
1859        self.mutate_fragments_by_job_id(
1860            sink_id.as_job_id(),
1861            update_sink_rate_limit,
1862            "sink node not found",
1863        )
1864        .await
1865    }
1866
1867    pub async fn update_dml_rate_limit_by_job_id(
1868        &self,
1869        job_id: JobId,
1870        rate_limit: Option<u32>,
1871    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1872        let update_dml_rate_limit =
1873            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1874                let mut found = false;
1875                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1876                    visit_stream_node_mut(stream_node, |node| {
1877                        if let PbNodeBody::Dml(node) = node {
1878                            node.rate_limit = rate_limit;
1879                            found = true;
1880                        }
1881                    });
1882                }
1883                Ok(found)
1884            };
1885
1886        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1887            .await
1888    }
1889
1890    pub async fn update_source_props_by_source_id(
1891        &self,
1892        source_id: SourceId,
1893        alter_props: BTreeMap<String, String>,
1894        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1895    ) -> MetaResult<WithOptionsSecResolved> {
1896        let inner = self.inner.read().await;
1897        let txn = inner.db.begin().await?;
1898
1899        let (source, _obj) = Source::find_by_id(source_id)
1900            .find_also_related(Object)
1901            .one(&txn)
1902            .await?
1903            .ok_or_else(|| {
1904                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1905            })?;
1906        let connector = source.with_properties.0.get_connector().unwrap();
1907        let is_shared_source = source.is_shared();
1908
1909        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1910        if !is_shared_source {
1911            // mv using non-shared source holds a copy of source in their fragments
1912            dep_source_job_ids = ObjectDependency::find()
1913                .select_only()
1914                .column(object_dependency::Column::UsedBy)
1915                .filter(object_dependency::Column::Oid.eq(source_id))
1916                .into_tuple()
1917                .all(&txn)
1918                .await?;
1919        }
1920
1921        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1922        let prop_keys: Vec<String> = alter_props
1923            .keys()
1924            .chain(alter_secret_refs.keys())
1925            .cloned()
1926            .collect();
1927        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1928            &connector, &prop_keys,
1929        )?;
1930
1931        let mut options_with_secret = WithOptionsSecResolved::new(
1932            source.with_properties.0.clone(),
1933            source
1934                .secret_ref
1935                .map(|secret_ref| secret_ref.to_protobuf())
1936                .unwrap_or_default(),
1937        );
1938        let (to_add_secret_dep, to_remove_secret_dep) =
1939            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1940
1941        tracing::info!(
1942            "applying new properties to source: source_id={}, options_with_secret={:?}",
1943            source_id,
1944            options_with_secret
1945        );
1946        // check if the alter-ed props are valid for each Connector
1947        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1948        // todo: validate via source manager
1949
1950        let mut associate_table_id = None;
1951
1952        // can be source_id or table_id
1953        // if updating an associated source, the preferred_id is the table_id
1954        // otherwise, it is the source_id
1955        let mut preferred_id = source_id.as_object_id();
1956        let rewrite_sql = {
1957            let definition = source.definition.clone();
1958
1959            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1960                .map_err(|e| {
1961                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1962                        anyhow!(e).context("Failed to parse source definition SQL"),
1963                    )))
1964                })?
1965                .try_into()
1966                .unwrap();
1967
1968            /// Formats SQL options with secret values properly resolved
1969            ///
1970            /// This function processes configuration options that may contain sensitive data:
1971            /// - Plaintext options are directly converted to `SqlOption`
1972            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
1973            ///   without exposing the actual secret value
1974            ///
1975            /// # Arguments
1976            /// * `txn` - Database transaction for retrieving secrets
1977            /// * `options_with_secret` - Container of options with both plaintext and secret values
1978            ///
1979            /// # Returns
1980            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
1981            async fn format_with_option_secret_resolved(
1982                txn: &DatabaseTransaction,
1983                options_with_secret: &WithOptionsSecResolved,
1984            ) -> MetaResult<Vec<SqlOption>> {
1985                let mut options = Vec::new();
1986                for (k, v) in options_with_secret.as_plaintext() {
1987                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1988                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1989                    options.push(sql_option);
1990                }
1991                for (k, v) in options_with_secret.as_secret() {
1992                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
1993                        let sql_option =
1994                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1995                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1996                        options.push(sql_option);
1997                    } else {
1998                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1999                    }
2000                }
2001                Ok(options)
2002            }
2003
2004            match &mut stmt {
2005                Statement::CreateSource { stmt } => {
2006                    stmt.with_properties.0 =
2007                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2008                }
2009                Statement::CreateTable { with_options, .. } => {
2010                    *with_options =
2011                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2012                    associate_table_id = source.optional_associated_table_id;
2013                    preferred_id = associate_table_id.unwrap().as_object_id();
2014                }
2015                _ => unreachable!(),
2016            }
2017
2018            stmt.to_string()
2019        };
2020
2021        {
2022            // update secret dependencies
2023            if !to_add_secret_dep.is_empty() {
2024                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2025                    object_dependency::ActiveModel {
2026                        oid: Set(secret_id.into()),
2027                        used_by: Set(preferred_id),
2028                        ..Default::default()
2029                    }
2030                }))
2031                .exec(&txn)
2032                .await?;
2033            }
2034            if !to_remove_secret_dep.is_empty() {
2035                // todo: fix the filter logic
2036                let _ = ObjectDependency::delete_many()
2037                    .filter(
2038                        object_dependency::Column::Oid
2039                            .is_in(to_remove_secret_dep)
2040                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2041                    )
2042                    .exec(&txn)
2043                    .await?;
2044            }
2045        }
2046
2047        let active_source_model = source::ActiveModel {
2048            source_id: Set(source_id),
2049            definition: Set(rewrite_sql.clone()),
2050            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2051            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2052                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2053            ..Default::default()
2054        };
2055        active_source_model.update(&txn).await?;
2056
2057        if let Some(associate_table_id) = associate_table_id {
2058            // update the associated table statement accordly
2059            let active_table_model = table::ActiveModel {
2060                table_id: Set(associate_table_id),
2061                definition: Set(rewrite_sql),
2062                ..Default::default()
2063            };
2064            active_table_model.update(&txn).await?;
2065        }
2066
2067        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2068            // if updating table with connector, the fragment_id is table id
2069            associate_table_id.as_job_id()
2070        } else {
2071            source_id.as_share_source_job_id()
2072        }]
2073        .into_iter()
2074        .chain(dep_source_job_ids.into_iter())
2075        .collect_vec();
2076
2077        // update fragments
2078        update_connector_props_fragments(
2079            &txn,
2080            to_check_job_ids,
2081            FragmentTypeFlag::Source,
2082            |node, found| {
2083                if let PbNodeBody::Source(node) = node
2084                    && let Some(source_inner) = &mut node.source_inner
2085                {
2086                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2087                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2088                    *found = true;
2089                }
2090            },
2091            is_shared_source,
2092        )
2093        .await?;
2094
2095        let mut to_update_objs = Vec::with_capacity(2);
2096        let (source, obj) = Source::find_by_id(source_id)
2097            .find_also_related(Object)
2098            .one(&txn)
2099            .await?
2100            .ok_or_else(|| {
2101                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2102            })?;
2103        to_update_objs.push(PbObject {
2104            object_info: Some(PbObjectInfo::Source(
2105                ObjectModel(source, obj.unwrap()).into(),
2106            )),
2107        });
2108
2109        if let Some(associate_table_id) = associate_table_id {
2110            let (table, obj) = Table::find_by_id(associate_table_id)
2111                .find_also_related(Object)
2112                .one(&txn)
2113                .await?
2114                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2115            to_update_objs.push(PbObject {
2116                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2117            });
2118        }
2119
2120        txn.commit().await?;
2121
2122        self.notify_frontend(
2123            NotificationOperation::Update,
2124            NotificationInfo::ObjectGroup(PbObjectGroup {
2125                objects: to_update_objs,
2126            }),
2127        )
2128        .await;
2129
2130        Ok(options_with_secret)
2131    }
2132
2133    pub async fn update_sink_props_by_sink_id(
2134        &self,
2135        sink_id: SinkId,
2136        props: BTreeMap<String, String>,
2137    ) -> MetaResult<HashMap<String, String>> {
2138        let inner = self.inner.read().await;
2139        let txn = inner.db.begin().await?;
2140
2141        let (sink, _obj) = Sink::find_by_id(sink_id)
2142            .find_also_related(Object)
2143            .one(&txn)
2144            .await?
2145            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2146        validate_sink_props(&sink, &props)?;
2147        let definition = sink.definition.clone();
2148        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2149            .map_err(|e| SinkError::Config(anyhow!(e)))?
2150            .try_into()
2151            .unwrap();
2152        if let Statement::CreateSink { stmt } = &mut stmt {
2153            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2154        } else {
2155            panic!("definition is not a create sink statement")
2156        }
2157        let mut new_config = sink.properties.clone().into_inner();
2158        new_config.extend(props.clone());
2159
2160        let definition = stmt.to_string();
2161        let active_sink = sink::ActiveModel {
2162            sink_id: Set(sink_id),
2163            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2164            definition: Set(definition),
2165            ..Default::default()
2166        };
2167        active_sink.update(&txn).await?;
2168
2169        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2170        let (sink, obj) = Sink::find_by_id(sink_id)
2171            .find_also_related(Object)
2172            .one(&txn)
2173            .await?
2174            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2175        txn.commit().await?;
2176        let relation_infos = vec![PbObject {
2177            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2178        }];
2179
2180        let _version = self
2181            .notify_frontend(
2182                NotificationOperation::Update,
2183                NotificationInfo::ObjectGroup(PbObjectGroup {
2184                    objects: relation_infos,
2185                }),
2186            )
2187            .await;
2188
2189        Ok(props.into_iter().collect())
2190    }
2191
2192    pub async fn update_iceberg_table_props_by_table_id(
2193        &self,
2194        table_id: TableId,
2195        props: BTreeMap<String, String>,
2196        alter_iceberg_table_props: Option<
2197            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2198        >,
2199    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2200        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2201            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2202        let inner = self.inner.read().await;
2203        let txn = inner.db.begin().await?;
2204
2205        let (sink, _obj) = Sink::find_by_id(sink_id)
2206            .find_also_related(Object)
2207            .one(&txn)
2208            .await?
2209            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2210        validate_sink_props(&sink, &props)?;
2211
2212        let definition = sink.definition.clone();
2213        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2214            .map_err(|e| SinkError::Config(anyhow!(e)))?
2215            .try_into()
2216            .unwrap();
2217        if let Statement::CreateTable {
2218            with_options,
2219            engine,
2220            ..
2221        } = &mut stmt
2222        {
2223            if !matches!(engine, Engine::Iceberg) {
2224                return Err(SinkError::Config(anyhow!(
2225                    "only iceberg table can be altered as sink"
2226                ))
2227                .into());
2228            }
2229            update_stmt_with_props(with_options, &props)?;
2230        } else {
2231            panic!("definition is not a create iceberg table statement")
2232        }
2233        let mut new_config = sink.properties.clone().into_inner();
2234        new_config.extend(props.clone());
2235
2236        let definition = stmt.to_string();
2237        let active_sink = sink::ActiveModel {
2238            sink_id: Set(sink_id),
2239            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2240            definition: Set(definition.clone()),
2241            ..Default::default()
2242        };
2243        let active_source = source::ActiveModel {
2244            source_id: Set(source_id),
2245            definition: Set(definition.clone()),
2246            ..Default::default()
2247        };
2248        let active_table = table::ActiveModel {
2249            table_id: Set(table_id),
2250            definition: Set(definition),
2251            ..Default::default()
2252        };
2253        active_sink.update(&txn).await?;
2254        active_source.update(&txn).await?;
2255        active_table.update(&txn).await?;
2256
2257        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2258
2259        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2260            .find_also_related(Object)
2261            .one(&txn)
2262            .await?
2263            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2264        let (source, source_obj) = Source::find_by_id(source_id)
2265            .find_also_related(Object)
2266            .one(&txn)
2267            .await?
2268            .ok_or_else(|| {
2269                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2270            })?;
2271        let (table, table_obj) = Table::find_by_id(table_id)
2272            .find_also_related(Object)
2273            .one(&txn)
2274            .await?
2275            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2276        txn.commit().await?;
2277        let relation_infos = vec![
2278            PbObject {
2279                object_info: Some(PbObjectInfo::Sink(
2280                    ObjectModel(sink, sink_obj.unwrap()).into(),
2281                )),
2282            },
2283            PbObject {
2284                object_info: Some(PbObjectInfo::Source(
2285                    ObjectModel(source, source_obj.unwrap()).into(),
2286                )),
2287            },
2288            PbObject {
2289                object_info: Some(PbObjectInfo::Table(
2290                    ObjectModel(table, table_obj.unwrap()).into(),
2291                )),
2292            },
2293        ];
2294        let _version = self
2295            .notify_frontend(
2296                NotificationOperation::Update,
2297                NotificationInfo::ObjectGroup(PbObjectGroup {
2298                    objects: relation_infos,
2299                }),
2300            )
2301            .await;
2302
2303        Ok((props.into_iter().collect(), sink_id))
2304    }
2305
2306    pub async fn update_fragment_rate_limit_by_fragment_id(
2307        &self,
2308        fragment_id: FragmentId,
2309        rate_limit: Option<u32>,
2310    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2311        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2312                                 stream_node: &mut PbStreamNode| {
2313            let mut found = false;
2314            if fragment_type_mask.contains_any(
2315                FragmentTypeFlag::dml_rate_limit_fragments()
2316                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2317                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2318                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2319            ) {
2320                visit_stream_node_mut(stream_node, |node| {
2321                    if let PbNodeBody::Dml(node) = node {
2322                        node.rate_limit = rate_limit;
2323                        found = true;
2324                    }
2325                    if let PbNodeBody::Sink(node) = node {
2326                        node.rate_limit = rate_limit;
2327                        found = true;
2328                    }
2329                    if let PbNodeBody::StreamCdcScan(node) = node {
2330                        node.rate_limit = rate_limit;
2331                        found = true;
2332                    }
2333                    if let PbNodeBody::StreamScan(node) = node {
2334                        node.rate_limit = rate_limit;
2335                        found = true;
2336                    }
2337                    if let PbNodeBody::SourceBackfill(node) = node {
2338                        node.rate_limit = rate_limit;
2339                        found = true;
2340                    }
2341                });
2342            }
2343            found
2344        };
2345        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2346            .await
2347    }
2348
2349    /// Note: `FsFetch` created in old versions are not included.
2350    /// Since this is only used for debugging, it should be fine.
2351    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2352        let inner = self.inner.read().await;
2353        let txn = inner.db.begin().await?;
2354
2355        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2356            .select_only()
2357            .columns([
2358                fragment::Column::FragmentId,
2359                fragment::Column::JobId,
2360                fragment::Column::FragmentTypeMask,
2361                fragment::Column::StreamNode,
2362            ])
2363            .filter(FragmentTypeMask::intersects_any(
2364                FragmentTypeFlag::rate_limit_fragments(),
2365            ))
2366            .into_tuple()
2367            .all(&txn)
2368            .await?;
2369
2370        let mut rate_limits = Vec::new();
2371        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2372            let stream_node = stream_node.to_protobuf();
2373            visit_stream_node_body(&stream_node, |node| {
2374                let mut rate_limit = None;
2375                let mut node_name = None;
2376
2377                match node {
2378                    // source rate limit
2379                    PbNodeBody::Source(node) => {
2380                        if let Some(node_inner) = &node.source_inner {
2381                            rate_limit = node_inner.rate_limit;
2382                            node_name = Some("SOURCE");
2383                        }
2384                    }
2385                    PbNodeBody::StreamFsFetch(node) => {
2386                        if let Some(node_inner) = &node.node_inner {
2387                            rate_limit = node_inner.rate_limit;
2388                            node_name = Some("FS_FETCH");
2389                        }
2390                    }
2391                    // backfill rate limit
2392                    PbNodeBody::SourceBackfill(node) => {
2393                        rate_limit = node.rate_limit;
2394                        node_name = Some("SOURCE_BACKFILL");
2395                    }
2396                    PbNodeBody::StreamScan(node) => {
2397                        rate_limit = node.rate_limit;
2398                        node_name = Some("STREAM_SCAN");
2399                    }
2400                    PbNodeBody::StreamCdcScan(node) => {
2401                        rate_limit = node.rate_limit;
2402                        node_name = Some("STREAM_CDC_SCAN");
2403                    }
2404                    PbNodeBody::Sink(node) => {
2405                        rate_limit = node.rate_limit;
2406                        node_name = Some("SINK");
2407                    }
2408                    _ => {}
2409                }
2410
2411                if let Some(rate_limit) = rate_limit {
2412                    rate_limits.push(RateLimitInfo {
2413                        fragment_id,
2414                        job_id,
2415                        fragment_type_mask: fragment_type_mask as u32,
2416                        rate_limit,
2417                        node_name: node_name.unwrap().to_owned(),
2418                    });
2419                }
2420            });
2421        }
2422
2423        Ok(rate_limits)
2424    }
2425}
2426
2427fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2428    // Validate that props can be altered
2429    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2430        Some(connector) => {
2431            let connector_type = connector.to_lowercase();
2432            let field_names: Vec<String> = props.keys().cloned().collect();
2433            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2434                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2435
2436            match_sink_name_str!(
2437                connector_type.as_str(),
2438                SinkType,
2439                {
2440                    let mut new_props = sink.properties.0.clone();
2441                    new_props.extend(props.clone());
2442                    SinkType::validate_alter_config(&new_props)
2443                },
2444                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2445            )?
2446        }
2447        None => {
2448            return Err(
2449                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2450            );
2451        }
2452    };
2453    Ok(())
2454}
2455
2456fn update_stmt_with_props(
2457    with_properties: &mut Vec<SqlOption>,
2458    props: &BTreeMap<String, String>,
2459) -> MetaResult<()> {
2460    let mut new_sql_options = with_properties
2461        .iter()
2462        .map(|sql_option| (&sql_option.name, sql_option))
2463        .collect::<IndexMap<_, _>>();
2464    let add_sql_options = props
2465        .iter()
2466        .map(|(k, v)| SqlOption::try_from((k, v)))
2467        .collect::<Result<Vec<SqlOption>, ParserError>>()
2468        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2469    new_sql_options.extend(
2470        add_sql_options
2471            .iter()
2472            .map(|sql_option| (&sql_option.name, sql_option)),
2473    );
2474    *with_properties = new_sql_options.into_values().cloned().collect();
2475    Ok(())
2476}
2477
2478async fn update_sink_fragment_props(
2479    txn: &DatabaseTransaction,
2480    sink_id: SinkId,
2481    props: BTreeMap<String, String>,
2482) -> MetaResult<()> {
2483    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2484        .select_only()
2485        .columns([
2486            fragment::Column::FragmentId,
2487            fragment::Column::FragmentTypeMask,
2488            fragment::Column::StreamNode,
2489        ])
2490        .filter(fragment::Column::JobId.eq(sink_id))
2491        .into_tuple()
2492        .all(txn)
2493        .await?;
2494    let fragments = fragments
2495        .into_iter()
2496        .filter(|(_, fragment_type_mask, _)| {
2497            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2498        })
2499        .filter_map(|(id, _, stream_node)| {
2500            let mut stream_node = stream_node.to_protobuf();
2501            let mut found = false;
2502            visit_stream_node_mut(&mut stream_node, |node| {
2503                if let PbNodeBody::Sink(node) = node
2504                    && let Some(sink_desc) = &mut node.sink_desc
2505                    && sink_desc.id == sink_id
2506                {
2507                    sink_desc.properties.extend(props.clone());
2508                    found = true;
2509                }
2510            });
2511            if found { Some((id, stream_node)) } else { None }
2512        })
2513        .collect_vec();
2514    assert!(
2515        !fragments.is_empty(),
2516        "sink id should be used by at least one fragment"
2517    );
2518    for (id, stream_node) in fragments {
2519        fragment::ActiveModel {
2520            fragment_id: Set(id),
2521            stream_node: Set(StreamNode::from(&stream_node)),
2522            ..Default::default()
2523        }
2524        .update(txn)
2525        .await?;
2526    }
2527    Ok(())
2528}
2529
2530pub struct SinkIntoTableContext {
2531    /// For alter table (e.g., add column), this is the list of existing sink ids
2532    /// otherwise empty.
2533    pub updated_sink_catalogs: Vec<SinkId>,
2534}
2535
2536pub struct FinishAutoRefreshSchemaSinkContext {
2537    pub tmp_sink_id: SinkId,
2538    pub original_sink_id: SinkId,
2539    pub columns: Vec<PbColumnCatalog>,
2540    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2541}
2542
2543async fn update_connector_props_fragments<F>(
2544    txn: &DatabaseTransaction,
2545    job_ids: Vec<JobId>,
2546    expect_flag: FragmentTypeFlag,
2547    mut alter_stream_node_fn: F,
2548    is_shared_source: bool,
2549) -> MetaResult<()>
2550where
2551    F: FnMut(&mut PbNodeBody, &mut bool),
2552{
2553    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2554        .select_only()
2555        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2556        .filter(
2557            fragment::Column::JobId
2558                .is_in(job_ids.clone())
2559                .and(FragmentTypeMask::intersects(expect_flag)),
2560        )
2561        .into_tuple()
2562        .all(txn)
2563        .await?;
2564    let fragments = fragments
2565        .into_iter()
2566        .filter_map(|(id, stream_node)| {
2567            let mut stream_node = stream_node.to_protobuf();
2568            let mut found = false;
2569            visit_stream_node_mut(&mut stream_node, |node| {
2570                alter_stream_node_fn(node, &mut found);
2571            });
2572            if found { Some((id, stream_node)) } else { None }
2573        })
2574        .collect_vec();
2575    if is_shared_source || job_ids.len() > 1 {
2576        // the first element is the source_id or associated table_id
2577        // if the source is non-shared, there is no updated fragments
2578        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
2579        assert!(
2580            !fragments.is_empty(),
2581            "job ids {:?} (type: {:?}) should be used by at least one fragment",
2582            job_ids,
2583            expect_flag
2584        );
2585    }
2586
2587    for (id, stream_node) in fragments {
2588        fragment::ActiveModel {
2589            fragment_id: Set(id),
2590            stream_node: Set(StreamNode::from(&stream_node)),
2591            ..Default::default()
2592        }
2593        .update(txn)
2594        .await?;
2595    }
2596
2597    Ok(())
2598}