risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbTable};
44use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
45use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
46use risingwave_pb::meta::object::PbObjectInfo;
47use risingwave_pb::meta::subscribe_response::{
48    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
49};
50use risingwave_pb::meta::{PbObject, PbObjectGroup};
51use risingwave_pb::plan_common::PbColumnCatalog;
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
54use risingwave_pb::stream_plan::stream_node::PbNodeBody;
55use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
56use risingwave_pb::user::PbUserInfo;
57use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
58use risingwave_sqlparser::parser::{Parser, ParserError};
59use sea_orm::ActiveValue::Set;
60use sea_orm::sea_query::{Expr, Query, SimpleExpr};
61use sea_orm::{
62    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
63    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
64};
65use thiserror_ext::AsReport;
66
67use super::rename::IndexItemRewriter;
68use crate::barrier::{Command, SharedFragmentInfo};
69use crate::controller::ObjectModel;
70use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
71use crate::controller::fragment::FragmentTypeMaskExt;
72use crate::controller::utils::{
73    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
74    check_sink_into_table_cycle, ensure_job_not_canceled, ensure_object_id, ensure_user_id,
75    fetch_target_fragments, get_internal_tables_by_id, get_table_columns,
76    grant_default_privileges_automatically, insert_fragment_relations, list_user_info_by_ids,
77};
78use crate::error::MetaErrorInner;
79use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
80use crate::model::{
81    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
82    StreamJobFragmentsToCreate,
83};
84use crate::stream::SplitAssignment;
85use crate::{MetaError, MetaResult};
86
87impl CatalogController {
88    pub async fn create_streaming_job_obj(
89        txn: &DatabaseTransaction,
90        obj_type: ObjectType,
91        owner_id: UserId,
92        database_id: Option<DatabaseId>,
93        schema_id: Option<SchemaId>,
94        create_type: PbCreateType,
95        timezone: Option<String>,
96        streaming_parallelism: StreamingParallelism,
97        max_parallelism: usize,
98        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
99    ) -> MetaResult<JobId> {
100        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
101        let job_id = (obj.oid as u32).into();
102        let job = streaming_job::ActiveModel {
103            job_id: Set(job_id),
104            job_status: Set(JobStatus::Initial),
105            create_type: Set(create_type.into()),
106            timezone: Set(timezone),
107            parallelism: Set(streaming_parallelism),
108            max_parallelism: Set(max_parallelism as _),
109            specific_resource_group: Set(specific_resource_group),
110        };
111        job.insert(txn).await?;
112
113        Ok(job_id)
114    }
115
116    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
117    /// materialized view.
118    ///
119    /// Some of the fields in the given streaming job are placeholders, which will
120    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
121    #[await_tree::instrument]
122    pub async fn create_job_catalog(
123        &self,
124        streaming_job: &mut StreamingJob,
125        ctx: &StreamContext,
126        parallelism: &Option<Parallelism>,
127        max_parallelism: usize,
128        mut dependencies: HashSet<ObjectId>,
129        specific_resource_group: Option<String>,
130    ) -> MetaResult<()> {
131        let inner = self.inner.write().await;
132        let txn = inner.db.begin().await?;
133        let create_type = streaming_job.create_type();
134
135        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
136            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
137            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
138            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
139        };
140
141        ensure_user_id(streaming_job.owner() as _, &txn).await?;
142        ensure_object_id(
143            ObjectType::Database,
144            streaming_job.database_id().as_raw_id() as _,
145            &txn,
146        )
147        .await?;
148        ensure_object_id(
149            ObjectType::Schema,
150            streaming_job.schema_id().as_raw_id() as _,
151            &txn,
152        )
153        .await?;
154        check_relation_name_duplicate(
155            &streaming_job.name(),
156            streaming_job.database_id(),
157            streaming_job.schema_id(),
158            &txn,
159        )
160        .await?;
161
162        // check if any dependency is in altering status.
163        if !dependencies.is_empty() {
164            let altering_cnt = ObjectDependency::find()
165                .join(
166                    JoinType::InnerJoin,
167                    object_dependency::Relation::Object1.def(),
168                )
169                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
170                .filter(
171                    object_dependency::Column::Oid
172                        .is_in(dependencies.clone())
173                        .and(object::Column::ObjType.eq(ObjectType::Table))
174                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
175                        .and(
176                            // It means the referring table is just dummy for altering.
177                            object::Column::Oid.not_in_subquery(
178                                Query::select()
179                                    .column(table::Column::TableId)
180                                    .from(Table)
181                                    .to_owned(),
182                            ),
183                        ),
184                )
185                .count(&txn)
186                .await?;
187            if altering_cnt != 0 {
188                return Err(MetaError::permission_denied(
189                    "some dependent relations are being altered",
190                ));
191            }
192        }
193
194        match streaming_job {
195            StreamingJob::MaterializedView(table) => {
196                let job_id = Self::create_streaming_job_obj(
197                    &txn,
198                    ObjectType::Table,
199                    table.owner as _,
200                    Some(table.database_id),
201                    Some(table.schema_id),
202                    create_type,
203                    ctx.timezone.clone(),
204                    streaming_parallelism,
205                    max_parallelism,
206                    specific_resource_group,
207                )
208                .await?;
209                table.id = job_id.as_mv_table_id();
210                let table_model: table::ActiveModel = table.clone().into();
211                Table::insert(table_model).exec(&txn).await?;
212            }
213            StreamingJob::Sink(sink) => {
214                if let Some(target_table_id) = sink.target_table
215                    && check_sink_into_table_cycle(
216                        target_table_id.as_raw_id() as ObjectId,
217                        dependencies.iter().cloned().collect(),
218                        &txn,
219                    )
220                    .await?
221                {
222                    bail!("Creating such a sink will result in circular dependency.");
223                }
224
225                let job_id = Self::create_streaming_job_obj(
226                    &txn,
227                    ObjectType::Sink,
228                    sink.owner as _,
229                    Some(sink.database_id),
230                    Some(sink.schema_id),
231                    create_type,
232                    ctx.timezone.clone(),
233                    streaming_parallelism,
234                    max_parallelism,
235                    specific_resource_group,
236                )
237                .await?;
238                sink.id = job_id.as_raw_id() as _;
239                let sink_model: sink::ActiveModel = sink.clone().into();
240                Sink::insert(sink_model).exec(&txn).await?;
241            }
242            StreamingJob::Table(src, table, _) => {
243                let job_id = Self::create_streaming_job_obj(
244                    &txn,
245                    ObjectType::Table,
246                    table.owner as _,
247                    Some(table.database_id),
248                    Some(table.schema_id),
249                    create_type,
250                    ctx.timezone.clone(),
251                    streaming_parallelism,
252                    max_parallelism,
253                    specific_resource_group,
254                )
255                .await?;
256                table.id = job_id.as_mv_table_id();
257                if let Some(src) = src {
258                    let src_obj = Self::create_object(
259                        &txn,
260                        ObjectType::Source,
261                        src.owner as _,
262                        Some(src.database_id),
263                        Some(src.schema_id),
264                    )
265                    .await?;
266                    src.id = src_obj.oid as _;
267                    src.optional_associated_table_id = Some(
268                        PbOptionalAssociatedTableId::AssociatedTableId(job_id.as_raw_id() as _),
269                    );
270                    table.optional_associated_source_id = Some(
271                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
272                    );
273                    let source: source::ActiveModel = src.clone().into();
274                    Source::insert(source).exec(&txn).await?;
275                }
276                let table_model: table::ActiveModel = table.clone().into();
277                Table::insert(table_model).exec(&txn).await?;
278            }
279            StreamingJob::Index(index, table) => {
280                ensure_object_id(
281                    ObjectType::Table,
282                    index.primary_table_id.as_raw_id() as ObjectId,
283                    &txn,
284                )
285                .await?;
286                let job_id = Self::create_streaming_job_obj(
287                    &txn,
288                    ObjectType::Index,
289                    index.owner as _,
290                    Some(index.database_id),
291                    Some(index.schema_id),
292                    create_type,
293                    ctx.timezone.clone(),
294                    streaming_parallelism,
295                    max_parallelism,
296                    specific_resource_group,
297                )
298                .await?;
299                // to be compatible with old implementation.
300                index.id = job_id.as_raw_id();
301                index.index_table_id = job_id.as_mv_table_id();
302                table.id = job_id.as_mv_table_id();
303
304                ObjectDependency::insert(object_dependency::ActiveModel {
305                    oid: Set(index.primary_table_id.as_raw_id() as ObjectId),
306                    used_by: Set(table.id.as_raw_id() as ObjectId),
307                    ..Default::default()
308                })
309                .exec(&txn)
310                .await?;
311
312                let table_model: table::ActiveModel = table.clone().into();
313                Table::insert(table_model).exec(&txn).await?;
314                let index_model: index::ActiveModel = index.clone().into();
315                Index::insert(index_model).exec(&txn).await?;
316            }
317            StreamingJob::Source(src) => {
318                let job_id = Self::create_streaming_job_obj(
319                    &txn,
320                    ObjectType::Source,
321                    src.owner as _,
322                    Some(src.database_id),
323                    Some(src.schema_id),
324                    create_type,
325                    ctx.timezone.clone(),
326                    streaming_parallelism,
327                    max_parallelism,
328                    specific_resource_group,
329                )
330                .await?;
331                src.id = job_id.as_raw_id();
332                let source_model: source::ActiveModel = src.clone().into();
333                Source::insert(source_model).exec(&txn).await?;
334            }
335        }
336
337        // collect dependent secrets.
338        dependencies.extend(
339            streaming_job
340                .dependent_secret_ids()?
341                .into_iter()
342                .map(|secret_id| secret_id as ObjectId),
343        );
344        // collect dependent connection
345        dependencies.extend(
346            streaming_job
347                .dependent_connection_ids()?
348                .into_iter()
349                .map(|conn_id| conn_id as ObjectId),
350        );
351
352        // record object dependency.
353        if !dependencies.is_empty() {
354            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
355                object_dependency::ActiveModel {
356                    oid: Set(oid),
357                    used_by: Set(streaming_job.id().as_raw_id() as _),
358                    ..Default::default()
359                }
360            }))
361            .exec(&txn)
362            .await?;
363        }
364
365        txn.commit().await?;
366
367        Ok(())
368    }
369
370    /// Create catalogs for internal tables, then notify frontend about them if the job is a
371    /// materialized view.
372    ///
373    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
374    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
375    ///
376    /// Returns a mapping from the temporary table id to the actual global table id.
377    pub async fn create_internal_table_catalog(
378        &self,
379        job: &StreamingJob,
380        mut incomplete_internal_tables: Vec<PbTable>,
381    ) -> MetaResult<HashMap<TableId, u32>> {
382        let job_id = job.id();
383        let inner = self.inner.write().await;
384        let txn = inner.db.begin().await?;
385
386        // Ensure the job exists.
387        ensure_job_not_canceled(job_id, &txn).await?;
388
389        let mut table_id_map = HashMap::new();
390        for table in &mut incomplete_internal_tables {
391            let table_id = Self::create_object(
392                &txn,
393                ObjectType::Table,
394                table.owner as _,
395                Some(table.database_id),
396                Some(table.schema_id),
397            )
398            .await?
399            .oid;
400            table_id_map.insert(table.id, table_id as u32);
401            table.id = (table_id as u32).into();
402            table.job_id = Some(job_id);
403
404            let table_model = table::ActiveModel {
405                table_id: Set((table_id as u32).into()),
406                belongs_to_job_id: Set(Some(job_id)),
407                fragment_id: NotSet,
408                ..table.clone().into()
409            };
410            Table::insert(table_model).exec(&txn).await?;
411        }
412        txn.commit().await?;
413
414        Ok(table_id_map)
415    }
416
417    pub async fn prepare_stream_job_fragments(
418        &self,
419        stream_job_fragments: &StreamJobFragmentsToCreate,
420        streaming_job: &StreamingJob,
421        for_replace: bool,
422    ) -> MetaResult<()> {
423        self.prepare_streaming_job(
424            stream_job_fragments.stream_job_id(),
425            || stream_job_fragments.fragments.values(),
426            &stream_job_fragments.downstreams,
427            for_replace,
428            Some(streaming_job),
429        )
430        .await
431    }
432
433    // TODO: In this function, we also update the `Table` model in the meta store.
434    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
435    // making them the source of truth and performing a full replacement for those in the meta store?
436    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
437    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
438    )]
439    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
440        &self,
441        job_id: JobId,
442        get_fragments: impl Fn() -> I + 'a,
443        downstreams: &FragmentDownstreamRelation,
444        for_replace: bool,
445        creating_streaming_job: Option<&'a StreamingJob>,
446    ) -> MetaResult<()> {
447        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
448
449        let inner = self.inner.write().await;
450
451        let need_notify = creating_streaming_job
452            .map(|job| job.should_notify_creating())
453            .unwrap_or(false);
454        let definition = creating_streaming_job.map(|job| job.definition());
455
456        let mut objects_to_notify = vec![];
457        let txn = inner.db.begin().await?;
458
459        // Ensure the job exists.
460        ensure_job_not_canceled(job_id, &txn).await?;
461
462        for fragment in fragments {
463            let fragment_id = fragment.fragment_id;
464            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
465
466            let fragment = fragment.into_active_model();
467            Fragment::insert(fragment).exec(&txn).await?;
468
469            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
470            // After table fragments are created, update them for all tables.
471            if !for_replace {
472                let all_tables = StreamJobFragments::collect_tables(get_fragments());
473                for state_table_id in state_table_ids {
474                    // Table's vnode count is not always the fragment's vnode count, so we have to
475                    // look up the table from `TableFragments`.
476                    // See `ActorGraphBuilder::new`.
477                    let table = all_tables
478                        .get(&state_table_id)
479                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
480                    assert_eq!(table.id, state_table_id.as_raw_id());
481                    assert_eq!(table.fragment_id, fragment_id as u32);
482                    let vnode_count = table.vnode_count();
483
484                    table::ActiveModel {
485                        table_id: Set(state_table_id as _),
486                        fragment_id: Set(Some(fragment_id)),
487                        vnode_count: Set(vnode_count as _),
488                        ..Default::default()
489                    }
490                    .update(&txn)
491                    .await?;
492
493                    if need_notify {
494                        let mut table = table.clone();
495                        // In production, definition was replaced but still needed for notification.
496                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
497                            table.definition = definition.clone().unwrap();
498                        }
499                        objects_to_notify.push(PbObject {
500                            object_info: Some(PbObjectInfo::Table(table)),
501                        });
502                    }
503                }
504            }
505        }
506
507        // Add streaming job objects to notification
508        if need_notify {
509            match creating_streaming_job.unwrap() {
510                StreamingJob::Sink(sink) => {
511                    objects_to_notify.push(PbObject {
512                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
513                    });
514                }
515                StreamingJob::Index(index, _) => {
516                    objects_to_notify.push(PbObject {
517                        object_info: Some(PbObjectInfo::Index(index.clone())),
518                    });
519                }
520                _ => {}
521            }
522        }
523
524        insert_fragment_relations(&txn, downstreams).await?;
525
526        if !for_replace {
527            // Update dml fragment id.
528            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
529                Table::update(table::ActiveModel {
530                    table_id: Set(table.id),
531                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
532                    ..Default::default()
533                })
534                .exec(&txn)
535                .await?;
536            }
537        }
538
539        txn.commit().await?;
540
541        // FIXME: there's a gap between the catalog creation and notification, which may lead to
542        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
543        // need to either forward the notification or filter catalogs without any fragments in the metastore.
544        if !objects_to_notify.is_empty() {
545            self.notify_frontend(
546                Operation::Add,
547                Info::ObjectGroup(PbObjectGroup {
548                    objects: objects_to_notify,
549                }),
550            )
551            .await;
552        }
553
554        Ok(())
555    }
556
557    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
558    /// information is required to build barrier mutation.
559    pub async fn build_cancel_command(
560        &self,
561        table_fragments: &StreamJobFragments,
562    ) -> MetaResult<Command> {
563        let inner = self.inner.read().await;
564        let txn = inner.db.begin().await?;
565
566        let dropped_sink_fragment_with_target =
567            if let Some(sink_fragment) = table_fragments.sink_fragment() {
568                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
569                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
570                sink_target_fragment
571                    .get(&sink_fragment_id)
572                    .map(|target_fragments| {
573                        let target_fragment_id = *target_fragments
574                            .first()
575                            .expect("sink should have at least one downstream fragment");
576                        (sink_fragment_id, target_fragment_id)
577                    })
578            } else {
579                None
580            };
581
582        Ok(Command::DropStreamingJobs {
583            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
584            actors: table_fragments.actor_ids(),
585            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
586            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
587            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
588                .into_iter()
589                .map(|(sink, target)| (target as _, vec![sink as _]))
590                .collect(),
591        })
592    }
593
594    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
595    /// It returns (true, _) if the job is not found or aborted.
596    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
597    #[await_tree::instrument]
598    pub async fn try_abort_creating_streaming_job(
599        &self,
600        job_id: JobId,
601        is_cancelled: bool,
602    ) -> MetaResult<(bool, Option<DatabaseId>)> {
603        let mut inner = self.inner.write().await;
604        let txn = inner.db.begin().await?;
605
606        let obj = Object::find_by_id(job_id.as_raw_id() as ObjectId)
607            .one(&txn)
608            .await?;
609        let Some(obj) = obj else {
610            tracing::warn!(
611                id = %job_id,
612                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
613            );
614            return Ok((true, None));
615        };
616        let database_id = obj
617            .database_id
618            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
619        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
620
621        if !is_cancelled && let Some(streaming_job) = &streaming_job {
622            assert_ne!(streaming_job.job_status, JobStatus::Created);
623            if streaming_job.create_type == CreateType::Background
624                && streaming_job.job_status == JobStatus::Creating
625            {
626                // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
627                tracing::warn!(
628                    id = %job_id,
629                    "streaming job is created in background and still in creating status"
630                );
631                return Ok((false, Some(database_id)));
632            }
633        }
634
635        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
636
637        // Get the notification info if the job is a materialized view or created in the background.
638        let mut objs = vec![];
639        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
640
641        let mut need_notify =
642            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
643        if !need_notify {
644            // If the job is not created in the background, we only need to notify the frontend if the job is a materialized view.
645            if let Some(table) = &table_obj {
646                need_notify = table.table_type == TableType::MaterializedView;
647            }
648        }
649
650        if is_cancelled {
651            let dropped_tables = Table::find()
652                .find_also_related(Object)
653                .filter(
654                    table::Column::TableId.is_in(
655                        internal_table_ids
656                            .iter()
657                            .cloned()
658                            .chain(table_obj.iter().map(|t| t.table_id as _)),
659                    ),
660                )
661                .all(&txn)
662                .await?
663                .into_iter()
664                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
665            inner
666                .dropped_tables
667                .extend(dropped_tables.map(|t| (t.id, t)));
668        }
669
670        if need_notify {
671            let obj: Option<PartialObject> = Object::find_by_id(job_id.as_raw_id() as ObjectId)
672                .select_only()
673                .columns([
674                    object::Column::Oid,
675                    object::Column::ObjType,
676                    object::Column::SchemaId,
677                    object::Column::DatabaseId,
678                ])
679                .into_partial_model()
680                .one(&txn)
681                .await?;
682            let obj =
683                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
684            objs.push(obj);
685            let internal_table_objs: Vec<PartialObject> = Object::find()
686                .select_only()
687                .columns([
688                    object::Column::Oid,
689                    object::Column::ObjType,
690                    object::Column::SchemaId,
691                    object::Column::DatabaseId,
692                ])
693                .join(JoinType::InnerJoin, object::Relation::Table.def())
694                .filter(table::Column::BelongsToJobId.eq(job_id))
695                .into_partial_model()
696                .all(&txn)
697                .await?;
698            objs.extend(internal_table_objs);
699        }
700
701        // Check if the job is creating sink into table.
702        if table_obj.is_none()
703            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_raw_id() as ObjectId)
704                .select_only()
705                .column(sink::Column::TargetTable)
706                .into_tuple::<Option<TableId>>()
707                .one(&txn)
708                .await?
709        {
710            let tmp_id: Option<ObjectId> = ObjectDependency::find()
711                .select_only()
712                .column(object_dependency::Column::UsedBy)
713                .join(
714                    JoinType::InnerJoin,
715                    object_dependency::Relation::Object1.def(),
716                )
717                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
718                .filter(
719                    object_dependency::Column::Oid
720                        .eq(target_table_id)
721                        .and(object::Column::ObjType.eq(ObjectType::Table))
722                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
723                )
724                .into_tuple()
725                .one(&txn)
726                .await?;
727            if let Some(tmp_id) = tmp_id {
728                tracing::warn!(
729                    id = tmp_id,
730                    "aborting temp streaming job for sink into table"
731                );
732
733                Object::delete_by_id(tmp_id).exec(&txn).await?;
734            }
735        }
736
737        Object::delete_by_id(job_id.as_raw_id() as ObjectId)
738            .exec(&txn)
739            .await?;
740        if !internal_table_ids.is_empty() {
741            Object::delete_many()
742                .filter(object::Column::Oid.is_in(internal_table_ids))
743                .exec(&txn)
744                .await?;
745        }
746        if let Some(t) = &table_obj
747            && let Some(source_id) = t.optional_associated_source_id
748        {
749            Object::delete_by_id(source_id).exec(&txn).await?;
750        }
751
752        let err = if is_cancelled {
753            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
754        } else {
755            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
756        };
757        let abort_reason = format!("streaming job aborted {}", err.as_report());
758        for tx in inner
759            .creating_table_finish_notifier
760            .get_mut(&database_id)
761            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
762            .into_iter()
763            .flatten()
764            .flatten()
765        {
766            let _ = tx.send(Err(abort_reason.clone()));
767        }
768        txn.commit().await?;
769
770        if !objs.is_empty() {
771            // We also have notified the frontend about these objects,
772            // so we need to notify the frontend to delete them here.
773            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
774                .await;
775        }
776        Ok((true, Some(database_id)))
777    }
778
779    #[await_tree::instrument]
780    pub async fn post_collect_job_fragments(
781        &self,
782        job_id: JobId,
783        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
784        new_sink_downstream: Option<FragmentDownstreamRelation>,
785        split_assignment: Option<&SplitAssignment>,
786    ) -> MetaResult<()> {
787        let inner = self.inner.write().await;
788        let txn = inner.db.begin().await?;
789
790        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
791
792        if let Some(new_downstream) = new_sink_downstream {
793            insert_fragment_relations(&txn, &new_downstream).await?;
794        }
795
796        // Mark job as CREATING.
797        streaming_job::ActiveModel {
798            job_id: Set(job_id),
799            job_status: Set(JobStatus::Creating),
800            ..Default::default()
801        }
802        .update(&txn)
803        .await?;
804
805        if let Some(split_assignment) = split_assignment {
806            let fragment_splits = split_assignment
807                .iter()
808                .map(|(fragment_id, splits)| {
809                    (
810                        *fragment_id as _,
811                        splits.values().flatten().cloned().collect_vec(),
812                    )
813                })
814                .collect();
815
816            self.update_fragment_splits(&txn, &fragment_splits).await?;
817        }
818
819        txn.commit().await?;
820
821        Ok(())
822    }
823
824    pub async fn create_job_catalog_for_replace(
825        &self,
826        streaming_job: &StreamingJob,
827        ctx: Option<&StreamContext>,
828        specified_parallelism: Option<&NonZeroUsize>,
829        expected_original_max_parallelism: Option<usize>,
830    ) -> MetaResult<JobId> {
831        let id = streaming_job.id();
832        let inner = self.inner.write().await;
833        let txn = inner.db.begin().await?;
834
835        // 1. check version.
836        streaming_job.verify_version_for_replace(&txn).await?;
837        // 2. check concurrent replace.
838        let referring_cnt = ObjectDependency::find()
839            .join(
840                JoinType::InnerJoin,
841                object_dependency::Relation::Object1.def(),
842            )
843            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
844            .filter(
845                object_dependency::Column::Oid
846                    .eq(id.as_raw_id() as ObjectId)
847                    .and(object::Column::ObjType.eq(ObjectType::Table))
848                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
849            )
850            .count(&txn)
851            .await?;
852        if referring_cnt != 0 {
853            return Err(MetaError::permission_denied(
854                "job is being altered or referenced by some creating jobs",
855            ));
856        }
857
858        // 3. check parallelism.
859        let (original_max_parallelism, original_timezone): (i32, Option<String>) =
860            StreamingJobModel::find_by_id(id)
861                .select_only()
862                .column(streaming_job::Column::MaxParallelism)
863                .column(streaming_job::Column::Timezone)
864                .into_tuple()
865                .one(&txn)
866                .await?
867                .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
868
869        if let Some(max_parallelism) = expected_original_max_parallelism
870            && original_max_parallelism != max_parallelism as i32
871        {
872            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
873            // This should not happen in normal cases.
874            bail!(
875                "cannot use a different max parallelism \
876                 when replacing streaming job, \
877                 original: {}, new: {}",
878                original_max_parallelism,
879                max_parallelism
880            );
881        }
882
883        let parallelism = match specified_parallelism {
884            None => StreamingParallelism::Adaptive,
885            Some(n) => StreamingParallelism::Fixed(n.get() as _),
886        };
887        let timezone = ctx
888            .map(|ctx| ctx.timezone.clone())
889            .unwrap_or(original_timezone);
890
891        // 4. create streaming object for new replace table.
892        let new_obj_id = Self::create_streaming_job_obj(
893            &txn,
894            streaming_job.object_type(),
895            streaming_job.owner() as _,
896            Some(streaming_job.database_id() as _),
897            Some(streaming_job.schema_id() as _),
898            streaming_job.create_type(),
899            timezone,
900            parallelism,
901            original_max_parallelism as _,
902            None,
903        )
904        .await?;
905
906        // 5. record dependency for new replace table.
907        ObjectDependency::insert(object_dependency::ActiveModel {
908            oid: Set(id.as_raw_id() as _),
909            used_by: Set(new_obj_id.as_raw_id() as _),
910            ..Default::default()
911        })
912        .exec(&txn)
913        .await?;
914
915        txn.commit().await?;
916
917        Ok(new_obj_id)
918    }
919
920    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
921    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
922        let mut inner = self.inner.write().await;
923        let txn = inner.db.begin().await?;
924
925        let job_type = Object::find_by_id(job_id.as_raw_id() as ObjectId)
926            .select_only()
927            .column(object::Column::ObjType)
928            .into_tuple()
929            .one(&txn)
930            .await?
931            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
932
933        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
934            .select_only()
935            .column(streaming_job::Column::CreateType)
936            .into_tuple()
937            .one(&txn)
938            .await?
939            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
940
941        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
942        let res = Object::update_many()
943            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
944            .col_expr(
945                object::Column::CreatedAtClusterVersion,
946                current_cluster_version().into(),
947            )
948            .filter(object::Column::Oid.eq(job_id))
949            .exec(&txn)
950            .await?;
951        if res.rows_affected == 0 {
952            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
953        }
954
955        // mark the target stream job as `Created`.
956        let job = streaming_job::ActiveModel {
957            job_id: Set(job_id),
958            job_status: Set(JobStatus::Created),
959            ..Default::default()
960        };
961        job.update(&txn).await?;
962
963        // notify frontend: job, internal tables.
964        let internal_table_objs = Table::find()
965            .find_also_related(Object)
966            .filter(table::Column::BelongsToJobId.eq(job_id))
967            .all(&txn)
968            .await?;
969        let mut objects = internal_table_objs
970            .iter()
971            .map(|(table, obj)| PbObject {
972                object_info: Some(PbObjectInfo::Table(
973                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
974                )),
975            })
976            .collect_vec();
977        let mut notification_op = if create_type == CreateType::Background {
978            NotificationOperation::Update
979        } else {
980            NotificationOperation::Add
981        };
982        let mut updated_user_info = vec![];
983
984        match job_type {
985            ObjectType::Table => {
986                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
987                    .find_also_related(Object)
988                    .one(&txn)
989                    .await?
990                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
991                if table.table_type == TableType::MaterializedView {
992                    notification_op = NotificationOperation::Update;
993                }
994
995                if let Some(source_id) = table.optional_associated_source_id {
996                    let (src, obj) = Source::find_by_id(source_id)
997                        .find_also_related(Object)
998                        .one(&txn)
999                        .await?
1000                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1001                    objects.push(PbObject {
1002                        object_info: Some(PbObjectInfo::Source(
1003                            ObjectModel(src, obj.unwrap()).into(),
1004                        )),
1005                    });
1006                }
1007                objects.push(PbObject {
1008                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1009                });
1010            }
1011            ObjectType::Sink => {
1012                let (sink, obj) = Sink::find_by_id(job_id.as_raw_id() as ObjectId)
1013                    .find_also_related(Object)
1014                    .one(&txn)
1015                    .await?
1016                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1017                objects.push(PbObject {
1018                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1019                });
1020            }
1021            ObjectType::Index => {
1022                let (index, obj) = Index::find_by_id(job_id.as_raw_id() as ObjectId)
1023                    .find_also_related(Object)
1024                    .one(&txn)
1025                    .await?
1026                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1027                {
1028                    let (table, obj) = Table::find_by_id(index.index_table_id)
1029                        .find_also_related(Object)
1030                        .one(&txn)
1031                        .await?
1032                        .ok_or_else(|| {
1033                            MetaError::catalog_id_not_found("table", index.index_table_id)
1034                        })?;
1035                    objects.push(PbObject {
1036                        object_info: Some(PbObjectInfo::Table(
1037                            ObjectModel(table, obj.unwrap()).into(),
1038                        )),
1039                    });
1040                }
1041
1042                // If the index is created on a table with privileges, we should also
1043                // grant the privileges for the index and its state tables.
1044                let primary_table_privileges = UserPrivilege::find()
1045                    .filter(
1046                        user_privilege::Column::Oid
1047                            .eq(index.primary_table_id)
1048                            .and(user_privilege::Column::Action.eq(Action::Select)),
1049                    )
1050                    .all(&txn)
1051                    .await?;
1052                if !primary_table_privileges.is_empty() {
1053                    let index_state_table_ids: Vec<TableId> = Table::find()
1054                        .select_only()
1055                        .column(table::Column::TableId)
1056                        .filter(
1057                            table::Column::BelongsToJobId
1058                                .eq(job_id)
1059                                .or(table::Column::TableId.eq(index.index_table_id)),
1060                        )
1061                        .into_tuple()
1062                        .all(&txn)
1063                        .await?;
1064                    let mut new_privileges = vec![];
1065                    for privilege in &primary_table_privileges {
1066                        for state_table_id in &index_state_table_ids {
1067                            new_privileges.push(user_privilege::ActiveModel {
1068                                id: Default::default(),
1069                                oid: Set(state_table_id.as_raw_id() as ObjectId),
1070                                user_id: Set(privilege.user_id),
1071                                action: Set(Action::Select),
1072                                dependent_id: Set(privilege.dependent_id),
1073                                granted_by: Set(privilege.granted_by),
1074                                with_grant_option: Set(privilege.with_grant_option),
1075                            });
1076                        }
1077                    }
1078                    UserPrivilege::insert_many(new_privileges)
1079                        .exec(&txn)
1080                        .await?;
1081
1082                    updated_user_info = list_user_info_by_ids(
1083                        primary_table_privileges.into_iter().map(|p| p.user_id),
1084                        &txn,
1085                    )
1086                    .await?;
1087                }
1088
1089                objects.push(PbObject {
1090                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1091                });
1092            }
1093            ObjectType::Source => {
1094                let (source, obj) = Source::find_by_id(job_id.as_raw_id() as ObjectId)
1095                    .find_also_related(Object)
1096                    .one(&txn)
1097                    .await?
1098                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1099                objects.push(PbObject {
1100                    object_info: Some(PbObjectInfo::Source(
1101                        ObjectModel(source, obj.unwrap()).into(),
1102                    )),
1103                });
1104            }
1105            _ => unreachable!("invalid job type: {:?}", job_type),
1106        }
1107
1108        if job_type != ObjectType::Index {
1109            updated_user_info =
1110                grant_default_privileges_automatically(&txn, job_id.as_raw_id() as ObjectId)
1111                    .await?;
1112        }
1113        txn.commit().await?;
1114
1115        let mut version = self
1116            .notify_frontend(
1117                notification_op,
1118                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1119            )
1120            .await;
1121
1122        // notify users about the default privileges
1123        if !updated_user_info.is_empty() {
1124            version = self.notify_users_update(updated_user_info).await;
1125        }
1126
1127        inner
1128            .creating_table_finish_notifier
1129            .values_mut()
1130            .for_each(|creating_tables| {
1131                if let Some(txs) = creating_tables.remove(&job_id) {
1132                    for tx in txs {
1133                        let _ = tx.send(Ok(version));
1134                    }
1135                }
1136            });
1137
1138        Ok(())
1139    }
1140
1141    pub async fn finish_replace_streaming_job(
1142        &self,
1143        tmp_id: JobId,
1144        streaming_job: StreamingJob,
1145        replace_upstream: FragmentReplaceUpstream,
1146        sink_into_table_context: SinkIntoTableContext,
1147        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1148        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1149    ) -> MetaResult<NotificationVersion> {
1150        let inner = self.inner.write().await;
1151        let txn = inner.db.begin().await?;
1152
1153        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1154            tmp_id,
1155            replace_upstream,
1156            sink_into_table_context,
1157            &txn,
1158            streaming_job,
1159            drop_table_connector_ctx,
1160            auto_refresh_schema_sinks,
1161        )
1162        .await?;
1163
1164        txn.commit().await?;
1165
1166        let mut version = self
1167            .notify_frontend(
1168                NotificationOperation::Update,
1169                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1170            )
1171            .await;
1172
1173        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1174            self.notify_users_update(user_infos).await;
1175            version = self
1176                .notify_frontend(
1177                    NotificationOperation::Delete,
1178                    build_object_group_for_delete(to_drop_objects),
1179                )
1180                .await;
1181        }
1182
1183        Ok(version)
1184    }
1185
1186    pub async fn finish_replace_streaming_job_inner(
1187        tmp_id: JobId,
1188        replace_upstream: FragmentReplaceUpstream,
1189        SinkIntoTableContext {
1190            updated_sink_catalogs,
1191        }: SinkIntoTableContext,
1192        txn: &DatabaseTransaction,
1193        streaming_job: StreamingJob,
1194        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1195        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1196    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1197        let original_job_id = streaming_job.id();
1198        let job_type = streaming_job.job_type();
1199
1200        let mut index_item_rewriter = None;
1201
1202        // Update catalog
1203        match streaming_job {
1204            StreamingJob::Table(_source, table, _table_job_type) => {
1205                // The source catalog should remain unchanged
1206
1207                let original_column_catalogs =
1208                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1209
1210                index_item_rewriter = Some({
1211                    let original_columns = original_column_catalogs
1212                        .to_protobuf()
1213                        .into_iter()
1214                        .map(|c| c.column_desc.unwrap())
1215                        .collect_vec();
1216                    let new_columns = table
1217                        .columns
1218                        .iter()
1219                        .map(|c| c.column_desc.clone().unwrap())
1220                        .collect_vec();
1221
1222                    IndexItemRewriter {
1223                        original_columns,
1224                        new_columns,
1225                    }
1226                });
1227
1228                // For sinks created in earlier versions, we need to set the original_target_columns.
1229                for sink_id in updated_sink_catalogs {
1230                    sink::ActiveModel {
1231                        sink_id: Set(sink_id as _),
1232                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1233                        ..Default::default()
1234                    }
1235                    .update(txn)
1236                    .await?;
1237                }
1238                // Update the table catalog with the new one. (column catalog is also updated here)
1239                let mut table = table::ActiveModel::from(table);
1240                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1241                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1242                {
1243                    // drop table connector, the rest logic is in `drop_table_associated_source`
1244                    table.optional_associated_source_id = Set(None);
1245                }
1246
1247                table.update(txn).await?;
1248            }
1249            StreamingJob::Source(source) => {
1250                // Update the source catalog with the new one.
1251                let source = source::ActiveModel::from(source);
1252                source.update(txn).await?;
1253            }
1254            StreamingJob::MaterializedView(table) => {
1255                // Update the table catalog with the new one.
1256                let table = table::ActiveModel::from(table);
1257                table.update(txn).await?;
1258            }
1259            _ => unreachable!(
1260                "invalid streaming job type: {:?}",
1261                streaming_job.job_type_str()
1262            ),
1263        }
1264
1265        async fn finish_fragments(
1266            txn: &DatabaseTransaction,
1267            tmp_id: JobId,
1268            original_job_id: JobId,
1269            replace_upstream: FragmentReplaceUpstream,
1270        ) -> MetaResult<()> {
1271            // 0. update internal tables
1272            // Fields including `fragment_id` were placeholder values before.
1273            // After table fragments are created, update them for all internal tables.
1274            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1275                .select_only()
1276                .columns([
1277                    fragment::Column::FragmentId,
1278                    fragment::Column::StateTableIds,
1279                ])
1280                .filter(fragment::Column::JobId.eq(tmp_id))
1281                .into_tuple()
1282                .all(txn)
1283                .await?;
1284            for (fragment_id, state_table_ids) in fragment_info {
1285                for state_table_id in state_table_ids.into_inner() {
1286                    let state_table_id = TableId::new(state_table_id as _);
1287                    table::ActiveModel {
1288                        table_id: Set(state_table_id),
1289                        fragment_id: Set(Some(fragment_id)),
1290                        // No need to update `vnode_count` because it must remain the same.
1291                        ..Default::default()
1292                    }
1293                    .update(txn)
1294                    .await?;
1295                }
1296            }
1297
1298            // 1. replace old fragments/actors with new ones.
1299            Fragment::delete_many()
1300                .filter(fragment::Column::JobId.eq(original_job_id))
1301                .exec(txn)
1302                .await?;
1303            Fragment::update_many()
1304                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1305                .filter(fragment::Column::JobId.eq(tmp_id))
1306                .exec(txn)
1307                .await?;
1308
1309            // 2. update merges.
1310            // update downstream fragment's Merge node, and upstream_fragment_id
1311            for (fragment_id, fragment_replace_map) in replace_upstream {
1312                let (fragment_id, mut stream_node) =
1313                    Fragment::find_by_id(fragment_id as FragmentId)
1314                        .select_only()
1315                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1316                        .into_tuple::<(FragmentId, StreamNode)>()
1317                        .one(txn)
1318                        .await?
1319                        .map(|(id, node)| (id, node.to_protobuf()))
1320                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1321
1322                visit_stream_node_mut(&mut stream_node, |body| {
1323                    if let PbNodeBody::Merge(m) = body
1324                        && let Some(new_fragment_id) =
1325                            fragment_replace_map.get(&m.upstream_fragment_id)
1326                    {
1327                        m.upstream_fragment_id = *new_fragment_id;
1328                    }
1329                });
1330                fragment::ActiveModel {
1331                    fragment_id: Set(fragment_id),
1332                    stream_node: Set(StreamNode::from(&stream_node)),
1333                    ..Default::default()
1334                }
1335                .update(txn)
1336                .await?;
1337            }
1338
1339            // 3. remove dummy object.
1340            Object::delete_by_id(tmp_id.as_raw_id() as ObjectId)
1341                .exec(txn)
1342                .await?;
1343
1344            Ok(())
1345        }
1346
1347        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1348
1349        // 4. update catalogs and notify.
1350        let mut objects = vec![];
1351        match job_type {
1352            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1353                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1354                    .find_also_related(Object)
1355                    .one(txn)
1356                    .await?
1357                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1358                objects.push(PbObject {
1359                    object_info: Some(PbObjectInfo::Table(
1360                        ObjectModel(table, table_obj.unwrap()).into(),
1361                    )),
1362                })
1363            }
1364            StreamingJobType::Source => {
1365                let (source, source_obj) =
1366                    Source::find_by_id(original_job_id.as_raw_id() as ObjectId)
1367                        .find_also_related(Object)
1368                        .one(txn)
1369                        .await?
1370                        .ok_or_else(|| {
1371                            MetaError::catalog_id_not_found("object", original_job_id)
1372                        })?;
1373                objects.push(PbObject {
1374                    object_info: Some(PbObjectInfo::Source(
1375                        ObjectModel(source, source_obj.unwrap()).into(),
1376                    )),
1377                })
1378            }
1379            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1380        }
1381
1382        if let Some(expr_rewriter) = index_item_rewriter {
1383            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1384                .select_only()
1385                .columns([index::Column::IndexId, index::Column::IndexItems])
1386                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1387                .into_tuple()
1388                .all(txn)
1389                .await?;
1390            for (index_id, nodes) in index_items {
1391                let mut pb_nodes = nodes.to_protobuf();
1392                pb_nodes
1393                    .iter_mut()
1394                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1395                let index = index::ActiveModel {
1396                    index_id: Set(index_id),
1397                    index_items: Set(pb_nodes.into()),
1398                    ..Default::default()
1399                }
1400                .update(txn)
1401                .await?;
1402                let index_obj = index
1403                    .find_related(Object)
1404                    .one(txn)
1405                    .await?
1406                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1407                objects.push(PbObject {
1408                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1409                });
1410            }
1411        }
1412
1413        if let Some(sinks) = auto_refresh_schema_sinks {
1414            for finish_sink_context in sinks {
1415                finish_fragments(
1416                    txn,
1417                    finish_sink_context.tmp_sink_id,
1418                    JobId::new(finish_sink_context.original_sink_id as _),
1419                    Default::default(),
1420                )
1421                .await?;
1422                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1423                    .find_also_related(Object)
1424                    .one(txn)
1425                    .await?
1426                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1427                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1428                Sink::update(sink::ActiveModel {
1429                    sink_id: Set(finish_sink_context.original_sink_id),
1430                    columns: Set(columns.clone()),
1431                    ..Default::default()
1432                })
1433                .exec(txn)
1434                .await?;
1435                sink.columns = columns;
1436                objects.push(PbObject {
1437                    object_info: Some(PbObjectInfo::Sink(
1438                        ObjectModel(sink, sink_obj.unwrap()).into(),
1439                    )),
1440                });
1441                if let Some((log_store_table_id, new_log_store_table_columns)) =
1442                    finish_sink_context.new_log_store_table
1443                {
1444                    let new_log_store_table_columns: ColumnCatalogArray =
1445                        new_log_store_table_columns.into();
1446                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1447                        .find_also_related(Object)
1448                        .one(txn)
1449                        .await?
1450                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1451                    Table::update(table::ActiveModel {
1452                        table_id: Set(log_store_table_id),
1453                        columns: Set(new_log_store_table_columns.clone()),
1454                        ..Default::default()
1455                    })
1456                    .exec(txn)
1457                    .await?;
1458                    table.columns = new_log_store_table_columns;
1459                    objects.push(PbObject {
1460                        object_info: Some(PbObjectInfo::Table(
1461                            ObjectModel(table, table_obj.unwrap()).into(),
1462                        )),
1463                    });
1464                }
1465            }
1466        }
1467
1468        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1469        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1470            notification_objs =
1471                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1472        }
1473
1474        Ok((objects, notification_objs))
1475    }
1476
1477    /// Abort the replacing streaming job by deleting the temporary job object.
1478    pub async fn try_abort_replacing_streaming_job(
1479        &self,
1480        tmp_job_id: JobId,
1481        tmp_sink_ids: Option<Vec<ObjectId>>,
1482    ) -> MetaResult<()> {
1483        let inner = self.inner.write().await;
1484        let txn = inner.db.begin().await?;
1485        Object::delete_by_id(tmp_job_id.as_raw_id() as ObjectId)
1486            .exec(&txn)
1487            .await?;
1488        if let Some(tmp_sink_ids) = tmp_sink_ids {
1489            for tmp_sink_id in tmp_sink_ids {
1490                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1491            }
1492        }
1493        txn.commit().await?;
1494        Ok(())
1495    }
1496
1497    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1498    // return the actor_ids to be applied
1499    pub async fn update_source_rate_limit_by_source_id(
1500        &self,
1501        source_id: SourceId,
1502        rate_limit: Option<u32>,
1503    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1504        let inner = self.inner.read().await;
1505        let txn = inner.db.begin().await?;
1506
1507        {
1508            let active_source = source::ActiveModel {
1509                source_id: Set(source_id),
1510                rate_limit: Set(rate_limit.map(|v| v as i32)),
1511                ..Default::default()
1512            };
1513            active_source.update(&txn).await?;
1514        }
1515
1516        let (source, obj) = Source::find_by_id(source_id)
1517            .find_also_related(Object)
1518            .one(&txn)
1519            .await?
1520            .ok_or_else(|| {
1521                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1522            })?;
1523
1524        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1525        let streaming_job_ids: Vec<JobId> =
1526            if let Some(table_id) = source.optional_associated_table_id {
1527                vec![table_id.as_job_id()]
1528            } else if let Some(source_info) = &source.source_info
1529                && source_info.to_protobuf().is_shared()
1530            {
1531                vec![JobId::new(source_id as _)]
1532            } else {
1533                ObjectDependency::find()
1534                    .select_only()
1535                    .column(object_dependency::Column::UsedBy)
1536                    .filter(object_dependency::Column::Oid.eq(source_id))
1537                    .into_tuple()
1538                    .all(&txn)
1539                    .await?
1540            };
1541
1542        if streaming_job_ids.is_empty() {
1543            return Err(MetaError::invalid_parameter(format!(
1544                "source id {source_id} not used by any streaming job"
1545            )));
1546        }
1547
1548        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1549            .select_only()
1550            .columns([
1551                fragment::Column::FragmentId,
1552                fragment::Column::FragmentTypeMask,
1553                fragment::Column::StreamNode,
1554            ])
1555            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1556            .into_tuple()
1557            .all(&txn)
1558            .await?;
1559        let mut fragments = fragments
1560            .into_iter()
1561            .map(|(id, mask, stream_node)| {
1562                (
1563                    id,
1564                    FragmentTypeMask::from(mask as u32),
1565                    stream_node.to_protobuf(),
1566                )
1567            })
1568            .collect_vec();
1569
1570        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1571            let mut found = false;
1572            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1573                visit_stream_node_mut(stream_node, |node| {
1574                    if let PbNodeBody::Source(node) = node
1575                        && let Some(node_inner) = &mut node.source_inner
1576                        && node_inner.source_id == source_id as u32
1577                    {
1578                        node_inner.rate_limit = rate_limit;
1579                        found = true;
1580                    }
1581                });
1582            }
1583            if is_fs_source {
1584                // in older versions, there's no fragment type flag for `FsFetch` node,
1585                // so we just scan all fragments for StreamFsFetch node if using fs connector
1586                visit_stream_node_mut(stream_node, |node| {
1587                    if let PbNodeBody::StreamFsFetch(node) = node {
1588                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1589                        if let Some(node_inner) = &mut node.node_inner
1590                            && node_inner.source_id == source_id as u32
1591                        {
1592                            node_inner.rate_limit = rate_limit;
1593                            found = true;
1594                        }
1595                    }
1596                });
1597            }
1598            found
1599        });
1600
1601        assert!(
1602            !fragments.is_empty(),
1603            "source id should be used by at least one fragment"
1604        );
1605        let fragment_ids = fragments.iter().map(|(id, _, _)| *id as u32).collect_vec();
1606
1607        for (id, fragment_type_mask, stream_node) in fragments {
1608            fragment::ActiveModel {
1609                fragment_id: Set(id),
1610                fragment_type_mask: Set(fragment_type_mask.into()),
1611                stream_node: Set(StreamNode::from(&stream_node)),
1612                ..Default::default()
1613            }
1614            .update(&txn)
1615            .await?;
1616        }
1617
1618        txn.commit().await?;
1619
1620        let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1621
1622        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1623        let _version = self
1624            .notify_frontend(
1625                NotificationOperation::Update,
1626                NotificationInfo::ObjectGroup(PbObjectGroup {
1627                    objects: vec![PbObject {
1628                        object_info: Some(relation_info),
1629                    }],
1630                }),
1631            )
1632            .await;
1633
1634        Ok(fragment_actors)
1635    }
1636
1637    fn get_fragment_actors_from_running_info(
1638        &self,
1639        fragment_ids: impl Iterator<Item = u32>,
1640    ) -> HashMap<FragmentId, Vec<ActorId>> {
1641        let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1642
1643        let info = self.env.shared_actor_infos().read_guard();
1644
1645        for fragment_id in fragment_ids {
1646            let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id as _).unwrap();
1647            fragment_actors
1648                .entry(fragment_id as _)
1649                .or_default()
1650                .extend(actors.keys().map(|actor| *actor as i32));
1651        }
1652
1653        fragment_actors
1654    }
1655
1656    // edit the content of fragments in given `table_id`
1657    // return the actor_ids to be applied
1658    pub async fn mutate_fragments_by_job_id(
1659        &self,
1660        job_id: JobId,
1661        // returns true if the mutation is applied
1662        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1663        // error message when no relevant fragments is found
1664        err_msg: &'static str,
1665    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1666        let inner = self.inner.read().await;
1667        let txn = inner.db.begin().await?;
1668
1669        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1670            .select_only()
1671            .columns([
1672                fragment::Column::FragmentId,
1673                fragment::Column::FragmentTypeMask,
1674                fragment::Column::StreamNode,
1675            ])
1676            .filter(fragment::Column::JobId.eq(job_id))
1677            .into_tuple()
1678            .all(&txn)
1679            .await?;
1680        let mut fragments = fragments
1681            .into_iter()
1682            .map(|(id, mask, stream_node)| {
1683                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1684            })
1685            .collect_vec();
1686
1687        let fragments = fragments
1688            .iter_mut()
1689            .map(|(_, fragment_type_mask, stream_node)| {
1690                fragments_mutation_fn(*fragment_type_mask, stream_node)
1691            })
1692            .collect::<MetaResult<Vec<bool>>>()?
1693            .into_iter()
1694            .zip_eq_debug(std::mem::take(&mut fragments))
1695            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1696            .collect::<Vec<_>>();
1697
1698        if fragments.is_empty() {
1699            return Err(MetaError::invalid_parameter(format!(
1700                "job id {job_id}: {}",
1701                err_msg
1702            )));
1703        }
1704
1705        let fragment_ids: HashSet<u32> = fragments.iter().map(|(id, _, _)| *id as u32).collect();
1706        for (id, _, stream_node) in fragments {
1707            fragment::ActiveModel {
1708                fragment_id: Set(id),
1709                stream_node: Set(StreamNode::from(&stream_node)),
1710                ..Default::default()
1711            }
1712            .update(&txn)
1713            .await?;
1714        }
1715
1716        txn.commit().await?;
1717
1718        let fragment_actors =
1719            self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1720
1721        Ok(fragment_actors)
1722    }
1723
1724    async fn mutate_fragment_by_fragment_id(
1725        &self,
1726        fragment_id: FragmentId,
1727        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1728        err_msg: &'static str,
1729    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1730        let inner = self.inner.read().await;
1731        let txn = inner.db.begin().await?;
1732
1733        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1734            Fragment::find_by_id(fragment_id)
1735                .select_only()
1736                .columns([
1737                    fragment::Column::FragmentTypeMask,
1738                    fragment::Column::StreamNode,
1739                ])
1740                .into_tuple()
1741                .one(&txn)
1742                .await?
1743                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1744        let mut pb_stream_node = stream_node.to_protobuf();
1745        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1746
1747        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1748            return Err(MetaError::invalid_parameter(format!(
1749                "fragment id {fragment_id}: {}",
1750                err_msg
1751            )));
1752        }
1753
1754        fragment::ActiveModel {
1755            fragment_id: Set(fragment_id),
1756            stream_node: Set(stream_node),
1757            ..Default::default()
1758        }
1759        .update(&txn)
1760        .await?;
1761
1762        let fragment_actors =
1763            self.get_fragment_actors_from_running_info(std::iter::once(fragment_id as u32));
1764
1765        txn.commit().await?;
1766
1767        Ok(fragment_actors)
1768    }
1769
1770    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1771    // return the actor_ids to be applied
1772    pub async fn update_backfill_rate_limit_by_job_id(
1773        &self,
1774        job_id: JobId,
1775        rate_limit: Option<u32>,
1776    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1777        let update_backfill_rate_limit =
1778            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1779                let mut found = false;
1780                if fragment_type_mask
1781                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1782                {
1783                    visit_stream_node_mut(stream_node, |node| match node {
1784                        PbNodeBody::StreamCdcScan(node) => {
1785                            node.rate_limit = rate_limit;
1786                            found = true;
1787                        }
1788                        PbNodeBody::StreamScan(node) => {
1789                            node.rate_limit = rate_limit;
1790                            found = true;
1791                        }
1792                        PbNodeBody::SourceBackfill(node) => {
1793                            node.rate_limit = rate_limit;
1794                            found = true;
1795                        }
1796                        PbNodeBody::Sink(node) => {
1797                            node.rate_limit = rate_limit;
1798                            found = true;
1799                        }
1800                        _ => {}
1801                    });
1802                }
1803                Ok(found)
1804            };
1805
1806        self.mutate_fragments_by_job_id(
1807            job_id,
1808            update_backfill_rate_limit,
1809            "stream scan node or source node not found",
1810        )
1811        .await
1812    }
1813
1814    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1815    // return the actor_ids to be applied
1816    pub async fn update_sink_rate_limit_by_job_id(
1817        &self,
1818        sink_id: SinkId,
1819        rate_limit: Option<u32>,
1820    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1821        let update_sink_rate_limit =
1822            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1823                let mut found = Ok(false);
1824                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1825                    visit_stream_node_mut(stream_node, |node| {
1826                        if let PbNodeBody::Sink(node) = node {
1827                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1828                                found = Err(MetaError::invalid_parameter(
1829                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1830                                ));
1831                                return;
1832                            }
1833                            node.rate_limit = rate_limit;
1834                            found = Ok(true);
1835                        }
1836                    });
1837                }
1838                found
1839            };
1840
1841        self.mutate_fragments_by_job_id(
1842            JobId::new(sink_id as _),
1843            update_sink_rate_limit,
1844            "sink node not found",
1845        )
1846        .await
1847    }
1848
1849    pub async fn update_dml_rate_limit_by_job_id(
1850        &self,
1851        job_id: JobId,
1852        rate_limit: Option<u32>,
1853    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1854        let update_dml_rate_limit =
1855            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1856                let mut found = false;
1857                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1858                    visit_stream_node_mut(stream_node, |node| {
1859                        if let PbNodeBody::Dml(node) = node {
1860                            node.rate_limit = rate_limit;
1861                            found = true;
1862                        }
1863                    });
1864                }
1865                Ok(found)
1866            };
1867
1868        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1869            .await
1870    }
1871
1872    pub async fn update_source_props_by_source_id(
1873        &self,
1874        source_id: SourceId,
1875        alter_props: BTreeMap<String, String>,
1876        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1877    ) -> MetaResult<WithOptionsSecResolved> {
1878        let inner = self.inner.read().await;
1879        let txn = inner.db.begin().await?;
1880
1881        let (source, _obj) = Source::find_by_id(source_id)
1882            .find_also_related(Object)
1883            .one(&txn)
1884            .await?
1885            .ok_or_else(|| {
1886                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1887            })?;
1888        let connector = source.with_properties.0.get_connector().unwrap();
1889        let is_shared_source = source.is_shared();
1890
1891        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1892        if !is_shared_source {
1893            // mv using non-shared source holds a copy of source in their fragments
1894            dep_source_job_ids = ObjectDependency::find()
1895                .select_only()
1896                .column(object_dependency::Column::UsedBy)
1897                .filter(object_dependency::Column::Oid.eq(source_id))
1898                .into_tuple()
1899                .all(&txn)
1900                .await?;
1901        }
1902
1903        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1904        let prop_keys: Vec<String> = alter_props
1905            .keys()
1906            .chain(alter_secret_refs.keys())
1907            .cloned()
1908            .collect();
1909        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1910            &connector, &prop_keys,
1911        )?;
1912
1913        let mut options_with_secret = WithOptionsSecResolved::new(
1914            source.with_properties.0.clone(),
1915            source
1916                .secret_ref
1917                .map(|secret_ref| secret_ref.to_protobuf())
1918                .unwrap_or_default(),
1919        );
1920        let (to_add_secret_dep, to_remove_secret_dep) =
1921            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1922
1923        tracing::info!(
1924            "applying new properties to source: source_id={}, options_with_secret={:?}",
1925            source_id,
1926            options_with_secret
1927        );
1928        // check if the alter-ed props are valid for each Connector
1929        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1930        // todo: validate via source manager
1931
1932        let mut associate_table_id = None;
1933
1934        // can be source_id or table_id
1935        // if updating an associated source, the preferred_id is the table_id
1936        // otherwise, it is the source_id
1937        let mut preferred_id: i32 = source_id;
1938        let rewrite_sql = {
1939            let definition = source.definition.clone();
1940
1941            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1942                .map_err(|e| {
1943                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1944                        anyhow!(e).context("Failed to parse source definition SQL"),
1945                    )))
1946                })?
1947                .try_into()
1948                .unwrap();
1949
1950            /// Formats SQL options with secret values properly resolved
1951            ///
1952            /// This function processes configuration options that may contain sensitive data:
1953            /// - Plaintext options are directly converted to `SqlOption`
1954            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
1955            ///   without exposing the actual secret value
1956            ///
1957            /// # Arguments
1958            /// * `txn` - Database transaction for retrieving secrets
1959            /// * `options_with_secret` - Container of options with both plaintext and secret values
1960            ///
1961            /// # Returns
1962            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
1963            async fn format_with_option_secret_resolved(
1964                txn: &DatabaseTransaction,
1965                options_with_secret: &WithOptionsSecResolved,
1966            ) -> MetaResult<Vec<SqlOption>> {
1967                let mut options = Vec::new();
1968                for (k, v) in options_with_secret.as_plaintext() {
1969                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1970                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1971                    options.push(sql_option);
1972                }
1973                for (k, v) in options_with_secret.as_secret() {
1974                    if let Some(secret_model) =
1975                        Secret::find_by_id(v.secret_id as i32).one(txn).await?
1976                    {
1977                        let sql_option =
1978                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1979                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1980                        options.push(sql_option);
1981                    } else {
1982                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1983                    }
1984                }
1985                Ok(options)
1986            }
1987
1988            match &mut stmt {
1989                Statement::CreateSource { stmt } => {
1990                    stmt.with_properties.0 =
1991                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1992                }
1993                Statement::CreateTable { with_options, .. } => {
1994                    *with_options =
1995                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1996                    associate_table_id = source.optional_associated_table_id;
1997                    preferred_id = associate_table_id.unwrap().as_raw_id() as _;
1998                }
1999                _ => unreachable!(),
2000            }
2001
2002            stmt.to_string()
2003        };
2004
2005        {
2006            // update secret dependencies
2007            if !to_add_secret_dep.is_empty() {
2008                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2009                    object_dependency::ActiveModel {
2010                        oid: Set(secret_id as _),
2011                        used_by: Set(preferred_id as _),
2012                        ..Default::default()
2013                    }
2014                }))
2015                .exec(&txn)
2016                .await?;
2017            }
2018            if !to_remove_secret_dep.is_empty() {
2019                // todo: fix the filter logic
2020                let _ = ObjectDependency::delete_many()
2021                    .filter(
2022                        object_dependency::Column::Oid
2023                            .is_in(to_remove_secret_dep)
2024                            .and(
2025                                object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
2026                            ),
2027                    )
2028                    .exec(&txn)
2029                    .await?;
2030            }
2031        }
2032
2033        let active_source_model = source::ActiveModel {
2034            source_id: Set(source_id),
2035            definition: Set(rewrite_sql.clone()),
2036            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2037            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2038                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2039            ..Default::default()
2040        };
2041        active_source_model.update(&txn).await?;
2042
2043        if let Some(associate_table_id) = associate_table_id {
2044            // update the associated table statement accordly
2045            let active_table_model = table::ActiveModel {
2046                table_id: Set(associate_table_id),
2047                definition: Set(rewrite_sql),
2048                ..Default::default()
2049            };
2050            active_table_model.update(&txn).await?;
2051        }
2052
2053        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2054            // if updating table with connector, the fragment_id is table id
2055            associate_table_id.as_job_id()
2056        } else {
2057            JobId::new(source_id as _)
2058        }]
2059        .into_iter()
2060        .chain(dep_source_job_ids.into_iter())
2061        .collect_vec();
2062
2063        // update fragments
2064        update_connector_props_fragments(
2065            &txn,
2066            to_check_job_ids,
2067            FragmentTypeFlag::Source,
2068            |node, found| {
2069                if let PbNodeBody::Source(node) = node
2070                    && let Some(source_inner) = &mut node.source_inner
2071                {
2072                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2073                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2074                    *found = true;
2075                }
2076            },
2077            is_shared_source,
2078        )
2079        .await?;
2080
2081        let mut to_update_objs = Vec::with_capacity(2);
2082        let (source, obj) = Source::find_by_id(source_id)
2083            .find_also_related(Object)
2084            .one(&txn)
2085            .await?
2086            .ok_or_else(|| {
2087                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2088            })?;
2089        to_update_objs.push(PbObject {
2090            object_info: Some(PbObjectInfo::Source(
2091                ObjectModel(source, obj.unwrap()).into(),
2092            )),
2093        });
2094
2095        if let Some(associate_table_id) = associate_table_id {
2096            let (table, obj) = Table::find_by_id(associate_table_id)
2097                .find_also_related(Object)
2098                .one(&txn)
2099                .await?
2100                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2101            to_update_objs.push(PbObject {
2102                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2103            });
2104        }
2105
2106        txn.commit().await?;
2107
2108        self.notify_frontend(
2109            NotificationOperation::Update,
2110            NotificationInfo::ObjectGroup(PbObjectGroup {
2111                objects: to_update_objs,
2112            }),
2113        )
2114        .await;
2115
2116        Ok(options_with_secret)
2117    }
2118
2119    pub async fn update_sink_props_by_sink_id(
2120        &self,
2121        sink_id: SinkId,
2122        props: BTreeMap<String, String>,
2123    ) -> MetaResult<HashMap<String, String>> {
2124        let inner = self.inner.read().await;
2125        let txn = inner.db.begin().await?;
2126
2127        let (sink, _obj) = Sink::find_by_id(sink_id)
2128            .find_also_related(Object)
2129            .one(&txn)
2130            .await?
2131            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2132        validate_sink_props(&sink, &props)?;
2133        let definition = sink.definition.clone();
2134        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2135            .map_err(|e| SinkError::Config(anyhow!(e)))?
2136            .try_into()
2137            .unwrap();
2138        if let Statement::CreateSink { stmt } = &mut stmt {
2139            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2140        } else {
2141            panic!("definition is not a create sink statement")
2142        }
2143        let mut new_config = sink.properties.clone().into_inner();
2144        new_config.extend(props.clone());
2145
2146        let definition = stmt.to_string();
2147        let active_sink = sink::ActiveModel {
2148            sink_id: Set(sink_id),
2149            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2150            definition: Set(definition),
2151            ..Default::default()
2152        };
2153        active_sink.update(&txn).await?;
2154
2155        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2156        let (sink, obj) = Sink::find_by_id(sink_id)
2157            .find_also_related(Object)
2158            .one(&txn)
2159            .await?
2160            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2161        txn.commit().await?;
2162        let relation_infos = vec![PbObject {
2163            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2164        }];
2165
2166        let _version = self
2167            .notify_frontend(
2168                NotificationOperation::Update,
2169                NotificationInfo::ObjectGroup(PbObjectGroup {
2170                    objects: relation_infos,
2171                }),
2172            )
2173            .await;
2174
2175        Ok(props.into_iter().collect())
2176    }
2177
2178    pub async fn update_iceberg_table_props_by_table_id(
2179        &self,
2180        table_id: TableId,
2181        props: BTreeMap<String, String>,
2182        alter_iceberg_table_props: Option<
2183            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2184        >,
2185    ) -> MetaResult<(HashMap<String, String>, u32)> {
2186        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2187            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2188        let inner = self.inner.read().await;
2189        let txn = inner.db.begin().await?;
2190
2191        let (sink, _obj) = Sink::find_by_id(sink_id)
2192            .find_also_related(Object)
2193            .one(&txn)
2194            .await?
2195            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2196        validate_sink_props(&sink, &props)?;
2197
2198        let definition = sink.definition.clone();
2199        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2200            .map_err(|e| SinkError::Config(anyhow!(e)))?
2201            .try_into()
2202            .unwrap();
2203        if let Statement::CreateTable {
2204            with_options,
2205            engine,
2206            ..
2207        } = &mut stmt
2208        {
2209            if !matches!(engine, Engine::Iceberg) {
2210                return Err(SinkError::Config(anyhow!(
2211                    "only iceberg table can be altered as sink"
2212                ))
2213                .into());
2214            }
2215            update_stmt_with_props(with_options, &props)?;
2216        } else {
2217            panic!("definition is not a create iceberg table statement")
2218        }
2219        let mut new_config = sink.properties.clone().into_inner();
2220        new_config.extend(props.clone());
2221
2222        let definition = stmt.to_string();
2223        let active_sink = sink::ActiveModel {
2224            sink_id: Set(sink_id),
2225            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2226            definition: Set(definition.clone()),
2227            ..Default::default()
2228        };
2229        let active_source = source::ActiveModel {
2230            source_id: Set(source_id),
2231            definition: Set(definition.clone()),
2232            ..Default::default()
2233        };
2234        let active_table = table::ActiveModel {
2235            table_id: Set(table_id),
2236            definition: Set(definition),
2237            ..Default::default()
2238        };
2239        active_sink.update(&txn).await?;
2240        active_source.update(&txn).await?;
2241        active_table.update(&txn).await?;
2242
2243        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2244
2245        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2246            .find_also_related(Object)
2247            .one(&txn)
2248            .await?
2249            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2250        let (source, source_obj) = Source::find_by_id(source_id)
2251            .find_also_related(Object)
2252            .one(&txn)
2253            .await?
2254            .ok_or_else(|| {
2255                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2256            })?;
2257        let (table, table_obj) = Table::find_by_id(table_id)
2258            .find_also_related(Object)
2259            .one(&txn)
2260            .await?
2261            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2262        txn.commit().await?;
2263        let relation_infos = vec![
2264            PbObject {
2265                object_info: Some(PbObjectInfo::Sink(
2266                    ObjectModel(sink, sink_obj.unwrap()).into(),
2267                )),
2268            },
2269            PbObject {
2270                object_info: Some(PbObjectInfo::Source(
2271                    ObjectModel(source, source_obj.unwrap()).into(),
2272                )),
2273            },
2274            PbObject {
2275                object_info: Some(PbObjectInfo::Table(
2276                    ObjectModel(table, table_obj.unwrap()).into(),
2277                )),
2278            },
2279        ];
2280        let _version = self
2281            .notify_frontend(
2282                NotificationOperation::Update,
2283                NotificationInfo::ObjectGroup(PbObjectGroup {
2284                    objects: relation_infos,
2285                }),
2286            )
2287            .await;
2288
2289        Ok((props.into_iter().collect(), sink_id as u32))
2290    }
2291
2292    pub async fn update_fragment_rate_limit_by_fragment_id(
2293        &self,
2294        fragment_id: FragmentId,
2295        rate_limit: Option<u32>,
2296    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2297        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2298                                 stream_node: &mut PbStreamNode| {
2299            let mut found = false;
2300            if fragment_type_mask.contains_any(
2301                FragmentTypeFlag::dml_rate_limit_fragments()
2302                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2303                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2304                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2305            ) {
2306                visit_stream_node_mut(stream_node, |node| {
2307                    if let PbNodeBody::Dml(node) = node {
2308                        node.rate_limit = rate_limit;
2309                        found = true;
2310                    }
2311                    if let PbNodeBody::Sink(node) = node {
2312                        node.rate_limit = rate_limit;
2313                        found = true;
2314                    }
2315                    if let PbNodeBody::StreamCdcScan(node) = node {
2316                        node.rate_limit = rate_limit;
2317                        found = true;
2318                    }
2319                    if let PbNodeBody::StreamScan(node) = node {
2320                        node.rate_limit = rate_limit;
2321                        found = true;
2322                    }
2323                    if let PbNodeBody::SourceBackfill(node) = node {
2324                        node.rate_limit = rate_limit;
2325                        found = true;
2326                    }
2327                });
2328            }
2329            found
2330        };
2331        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2332            .await
2333    }
2334
2335    /// Note: `FsFetch` created in old versions are not included.
2336    /// Since this is only used for debugging, it should be fine.
2337    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2338        let inner = self.inner.read().await;
2339        let txn = inner.db.begin().await?;
2340
2341        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2342            .select_only()
2343            .columns([
2344                fragment::Column::FragmentId,
2345                fragment::Column::JobId,
2346                fragment::Column::FragmentTypeMask,
2347                fragment::Column::StreamNode,
2348            ])
2349            .filter(FragmentTypeMask::intersects_any(
2350                FragmentTypeFlag::rate_limit_fragments(),
2351            ))
2352            .into_tuple()
2353            .all(&txn)
2354            .await?;
2355
2356        let mut rate_limits = Vec::new();
2357        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2358            let stream_node = stream_node.to_protobuf();
2359            visit_stream_node_body(&stream_node, |node| {
2360                let mut rate_limit = None;
2361                let mut node_name = None;
2362
2363                match node {
2364                    // source rate limit
2365                    PbNodeBody::Source(node) => {
2366                        if let Some(node_inner) = &node.source_inner {
2367                            rate_limit = node_inner.rate_limit;
2368                            node_name = Some("SOURCE");
2369                        }
2370                    }
2371                    PbNodeBody::StreamFsFetch(node) => {
2372                        if let Some(node_inner) = &node.node_inner {
2373                            rate_limit = node_inner.rate_limit;
2374                            node_name = Some("FS_FETCH");
2375                        }
2376                    }
2377                    // backfill rate limit
2378                    PbNodeBody::SourceBackfill(node) => {
2379                        rate_limit = node.rate_limit;
2380                        node_name = Some("SOURCE_BACKFILL");
2381                    }
2382                    PbNodeBody::StreamScan(node) => {
2383                        rate_limit = node.rate_limit;
2384                        node_name = Some("STREAM_SCAN");
2385                    }
2386                    PbNodeBody::StreamCdcScan(node) => {
2387                        rate_limit = node.rate_limit;
2388                        node_name = Some("STREAM_CDC_SCAN");
2389                    }
2390                    PbNodeBody::Sink(node) => {
2391                        rate_limit = node.rate_limit;
2392                        node_name = Some("SINK");
2393                    }
2394                    _ => {}
2395                }
2396
2397                if let Some(rate_limit) = rate_limit {
2398                    rate_limits.push(RateLimitInfo {
2399                        fragment_id: fragment_id as u32,
2400                        job_id,
2401                        fragment_type_mask: fragment_type_mask as u32,
2402                        rate_limit,
2403                        node_name: node_name.unwrap().to_owned(),
2404                    });
2405                }
2406            });
2407        }
2408
2409        Ok(rate_limits)
2410    }
2411}
2412
2413fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2414    // Validate that props can be altered
2415    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2416        Some(connector) => {
2417            let connector_type = connector.to_lowercase();
2418            let field_names: Vec<String> = props.keys().cloned().collect();
2419            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2420                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2421
2422            match_sink_name_str!(
2423                connector_type.as_str(),
2424                SinkType,
2425                {
2426                    let mut new_props = sink.properties.0.clone();
2427                    new_props.extend(props.clone());
2428                    SinkType::validate_alter_config(&new_props)
2429                },
2430                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2431            )?
2432        }
2433        None => {
2434            return Err(
2435                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2436            );
2437        }
2438    };
2439    Ok(())
2440}
2441
2442fn update_stmt_with_props(
2443    with_properties: &mut Vec<SqlOption>,
2444    props: &BTreeMap<String, String>,
2445) -> MetaResult<()> {
2446    let mut new_sql_options = with_properties
2447        .iter()
2448        .map(|sql_option| (&sql_option.name, sql_option))
2449        .collect::<IndexMap<_, _>>();
2450    let add_sql_options = props
2451        .iter()
2452        .map(|(k, v)| SqlOption::try_from((k, v)))
2453        .collect::<Result<Vec<SqlOption>, ParserError>>()
2454        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2455    new_sql_options.extend(
2456        add_sql_options
2457            .iter()
2458            .map(|sql_option| (&sql_option.name, sql_option)),
2459    );
2460    *with_properties = new_sql_options.into_values().cloned().collect();
2461    Ok(())
2462}
2463
2464async fn update_sink_fragment_props(
2465    txn: &DatabaseTransaction,
2466    sink_id: SinkId,
2467    props: BTreeMap<String, String>,
2468) -> MetaResult<()> {
2469    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2470        .select_only()
2471        .columns([
2472            fragment::Column::FragmentId,
2473            fragment::Column::FragmentTypeMask,
2474            fragment::Column::StreamNode,
2475        ])
2476        .filter(fragment::Column::JobId.eq(sink_id))
2477        .into_tuple()
2478        .all(txn)
2479        .await?;
2480    let fragments = fragments
2481        .into_iter()
2482        .filter(|(_, fragment_type_mask, _)| {
2483            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2484        })
2485        .filter_map(|(id, _, stream_node)| {
2486            let mut stream_node = stream_node.to_protobuf();
2487            let mut found = false;
2488            visit_stream_node_mut(&mut stream_node, |node| {
2489                if let PbNodeBody::Sink(node) = node
2490                    && let Some(sink_desc) = &mut node.sink_desc
2491                    && sink_desc.id == sink_id as u32
2492                {
2493                    sink_desc.properties.extend(props.clone());
2494                    found = true;
2495                }
2496            });
2497            if found { Some((id, stream_node)) } else { None }
2498        })
2499        .collect_vec();
2500    assert!(
2501        !fragments.is_empty(),
2502        "sink id should be used by at least one fragment"
2503    );
2504    for (id, stream_node) in fragments {
2505        fragment::ActiveModel {
2506            fragment_id: Set(id),
2507            stream_node: Set(StreamNode::from(&stream_node)),
2508            ..Default::default()
2509        }
2510        .update(txn)
2511        .await?;
2512    }
2513    Ok(())
2514}
2515
2516pub struct SinkIntoTableContext {
2517    /// For alter table (e.g., add column), this is the list of existing sink ids
2518    /// otherwise empty.
2519    pub updated_sink_catalogs: Vec<SinkId>,
2520}
2521
2522pub struct FinishAutoRefreshSchemaSinkContext {
2523    pub tmp_sink_id: JobId,
2524    pub original_sink_id: ObjectId,
2525    pub columns: Vec<PbColumnCatalog>,
2526    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2527}
2528
2529async fn update_connector_props_fragments<F>(
2530    txn: &DatabaseTransaction,
2531    job_ids: Vec<JobId>,
2532    expect_flag: FragmentTypeFlag,
2533    mut alter_stream_node_fn: F,
2534    is_shared_source: bool,
2535) -> MetaResult<()>
2536where
2537    F: FnMut(&mut PbNodeBody, &mut bool),
2538{
2539    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2540        .select_only()
2541        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2542        .filter(
2543            fragment::Column::JobId
2544                .is_in(job_ids.clone())
2545                .and(FragmentTypeMask::intersects(expect_flag)),
2546        )
2547        .into_tuple()
2548        .all(txn)
2549        .await?;
2550    let fragments = fragments
2551        .into_iter()
2552        .filter_map(|(id, stream_node)| {
2553            let mut stream_node = stream_node.to_protobuf();
2554            let mut found = false;
2555            visit_stream_node_mut(&mut stream_node, |node| {
2556                alter_stream_node_fn(node, &mut found);
2557            });
2558            if found { Some((id, stream_node)) } else { None }
2559        })
2560        .collect_vec();
2561    if is_shared_source || job_ids.len() > 1 {
2562        // the first element is the source_id or associated table_id
2563        // if the source is non-shared, there is no updated fragments
2564        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
2565        assert!(
2566            !fragments.is_empty(),
2567            "job ids {:?} (type: {:?}) should be used by at least one fragment",
2568            job_ids,
2569            expect_flag
2570        );
2571    }
2572
2573    for (id, stream_node) in fragments {
2574        fragment::ActiveModel {
2575            fragment_id: Set(id),
2576            stream_node: Set(StreamNode::from(&stream_node)),
2577            ..Default::default()
2578        }
2579        .update(txn)
2580        .await?;
2581    }
2582
2583    Ok(())
2584}