risingwave_meta/controller/
streaming_job.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask, ICEBERG_SINK_PREFIX};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::secret::LocalSecretManager;
26use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_common::util::stream_graph_visitor::{
29    visit_stream_node_body, visit_stream_node_mut,
30};
31use risingwave_common::{bail, current_cluster_version};
32use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::error::ConnectorError;
35use risingwave_connector::sink::file_sink::fs::FsSink;
36use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
37use risingwave_connector::source::{ConnectorProperties, pb_connection_type_to_connection_type};
38use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
41use risingwave_meta_model::refresh_job::RefreshState;
42use risingwave_meta_model::streaming_job::BackfillOrders;
43use risingwave_meta_model::table::TableType;
44use risingwave_meta_model::user_privilege::Action;
45use risingwave_meta_model::*;
46use risingwave_pb::catalog::{PbConnection, PbCreateType, PbTable};
47use risingwave_pb::ddl_service::streaming_job_resource_type;
48use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
49use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
50use risingwave_pb::meta::object::PbObjectInfo;
51use risingwave_pb::meta::subscribe_response::{
52    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
53};
54use risingwave_pb::meta::{PbObject, PbObjectGroup};
55use risingwave_pb::plan_common::PbColumnCatalog;
56use risingwave_pb::plan_common::source_refresh_mode::{
57    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
58};
59use risingwave_pb::secret::PbSecretRef;
60use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
61use risingwave_pb::stream_plan::stream_node::PbNodeBody;
62use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
63use risingwave_pb::user::PbUserInfo;
64use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
65use risingwave_sqlparser::parser::{Parser, ParserError};
66use sea_orm::ActiveValue::Set;
67use sea_orm::sea_query::{Expr, Query, SimpleExpr};
68use sea_orm::{
69    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
70    NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
71};
72use thiserror_ext::AsReport;
73
74use super::rename::IndexItemRewriter;
75use crate::barrier::Command;
76use crate::controller::ObjectModel;
77use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
78use crate::controller::fragment::FragmentTypeMaskExt;
79use crate::controller::utils::{
80    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
81    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
82    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
83    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
84    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
85};
86use crate::error::MetaErrorInner;
87use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
88use crate::model::{
89    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
90    StreamJobFragmentsToCreate,
91};
92use crate::stream::SplitAssignment;
93use crate::{MetaError, MetaResult};
94
95/// A planned fragment update for dependent sources when a connection's properties change.
96///
97/// We keep it as a named struct (instead of a tuple) for readability and to avoid clippy
98/// `type_complexity` warnings.
99#[derive(Debug)]
100struct DependentSourceFragmentUpdate {
101    job_ids: Vec<JobId>,
102    with_properties: BTreeMap<String, String>,
103    secret_refs: BTreeMap<String, PbSecretRef>,
104    is_shared_source: bool,
105}
106
107impl CatalogController {
108    #[allow(clippy::too_many_arguments)]
109    pub async fn create_streaming_job_obj(
110        txn: &DatabaseTransaction,
111        obj_type: ObjectType,
112        owner_id: UserId,
113        database_id: Option<DatabaseId>,
114        schema_id: Option<SchemaId>,
115        create_type: PbCreateType,
116        ctx: StreamContext,
117        streaming_parallelism: StreamingParallelism,
118        max_parallelism: usize,
119        resource_type: streaming_job_resource_type::ResourceType,
120        backfill_parallelism: Option<StreamingParallelism>,
121    ) -> MetaResult<JobId> {
122        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
123        let job_id = obj.oid.as_job_id();
124        let specific_resource_group = resource_type.resource_group();
125        let is_serverless_backfill = matches!(
126            &resource_type,
127            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
128        );
129        let job = streaming_job::ActiveModel {
130            job_id: Set(job_id),
131            job_status: Set(JobStatus::Initial),
132            create_type: Set(create_type.into()),
133            timezone: Set(ctx.timezone),
134            config_override: Set(Some(ctx.config_override.to_string())),
135            adaptive_parallelism_strategy: Set(ctx
136                .adaptive_parallelism_strategy
137                .as_ref()
138                .map(ToString::to_string)),
139            parallelism: Set(streaming_parallelism),
140            backfill_parallelism: Set(backfill_parallelism),
141            backfill_orders: Set(None),
142            max_parallelism: Set(max_parallelism as _),
143            specific_resource_group: Set(specific_resource_group),
144            is_serverless_backfill: Set(is_serverless_backfill),
145        };
146        StreamingJobModel::insert(job).exec(txn).await?;
147
148        Ok(job_id)
149    }
150
151    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
152    /// materialized view.
153    ///
154    /// Some of the fields in the given streaming job are placeholders, which will
155    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
156    #[await_tree::instrument]
157    pub async fn create_job_catalog(
158        &self,
159        streaming_job: &mut StreamingJob,
160        ctx: &StreamContext,
161        parallelism: &Option<Parallelism>,
162        max_parallelism: usize,
163        mut dependencies: HashSet<ObjectId>,
164        resource_type: streaming_job_resource_type::ResourceType,
165        backfill_parallelism: &Option<Parallelism>,
166    ) -> MetaResult<()> {
167        let inner = self.inner.write().await;
168        let txn = inner.db.begin().await?;
169        let create_type = streaming_job.create_type();
170
171        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
172            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
173            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
174            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
175        };
176        let backfill_parallelism = backfill_parallelism
177            .as_ref()
178            .map(|p| StreamingParallelism::Fixed(p.parallelism as _));
179
180        ensure_user_id(streaming_job.owner() as _, &txn).await?;
181        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
182        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
183        check_relation_name_duplicate(
184            &streaming_job.name(),
185            streaming_job.database_id(),
186            streaming_job.schema_id(),
187            &txn,
188        )
189        .await?;
190
191        // check if any dependency is in altering status.
192        if !dependencies.is_empty() {
193            let altering_cnt = ObjectDependency::find()
194                .join(
195                    JoinType::InnerJoin,
196                    object_dependency::Relation::Object1.def(),
197                )
198                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
199                .filter(
200                    object_dependency::Column::Oid
201                        .is_in(dependencies.clone())
202                        .and(object::Column::ObjType.eq(ObjectType::Table))
203                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
204                        .and(
205                            // It means the referring table is just dummy for altering.
206                            object::Column::Oid.not_in_subquery(
207                                Query::select()
208                                    .column(table::Column::TableId)
209                                    .from(Table)
210                                    .to_owned(),
211                            ),
212                        ),
213                )
214                .count(&txn)
215                .await?;
216            if altering_cnt != 0 {
217                return Err(MetaError::permission_denied(
218                    "some dependent relations are being altered",
219                ));
220            }
221        }
222
223        match streaming_job {
224            StreamingJob::MaterializedView(table) => {
225                let job_id = Self::create_streaming_job_obj(
226                    &txn,
227                    ObjectType::Table,
228                    table.owner as _,
229                    Some(table.database_id),
230                    Some(table.schema_id),
231                    create_type,
232                    ctx.clone(),
233                    streaming_parallelism,
234                    max_parallelism,
235                    resource_type,
236                    backfill_parallelism.clone(),
237                )
238                .await?;
239                table.id = job_id.as_mv_table_id();
240                let table_model: table::ActiveModel = table.clone().into();
241                Table::insert(table_model).exec(&txn).await?;
242            }
243            StreamingJob::Sink(sink) => {
244                if let Some(target_table_id) = sink.target_table
245                    && check_sink_into_table_cycle(
246                        target_table_id.into(),
247                        dependencies.iter().cloned().collect(),
248                        &txn,
249                    )
250                    .await?
251                {
252                    bail!("Creating such a sink will result in circular dependency.");
253                }
254
255                let job_id = Self::create_streaming_job_obj(
256                    &txn,
257                    ObjectType::Sink,
258                    sink.owner as _,
259                    Some(sink.database_id),
260                    Some(sink.schema_id),
261                    create_type,
262                    ctx.clone(),
263                    streaming_parallelism,
264                    max_parallelism,
265                    resource_type,
266                    backfill_parallelism.clone(),
267                )
268                .await?;
269                sink.id = job_id.as_sink_id();
270                let sink_model: sink::ActiveModel = sink.clone().into();
271                Sink::insert(sink_model).exec(&txn).await?;
272            }
273            StreamingJob::Table(src, table, _) => {
274                let job_id = Self::create_streaming_job_obj(
275                    &txn,
276                    ObjectType::Table,
277                    table.owner as _,
278                    Some(table.database_id),
279                    Some(table.schema_id),
280                    create_type,
281                    ctx.clone(),
282                    streaming_parallelism,
283                    max_parallelism,
284                    resource_type,
285                    backfill_parallelism.clone(),
286                )
287                .await?;
288                table.id = job_id.as_mv_table_id();
289                if let Some(src) = src {
290                    let src_obj = Self::create_object(
291                        &txn,
292                        ObjectType::Source,
293                        src.owner as _,
294                        Some(src.database_id),
295                        Some(src.schema_id),
296                    )
297                    .await?;
298                    src.id = src_obj.oid.as_source_id();
299                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
300                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
301                    let source: source::ActiveModel = src.clone().into();
302                    Source::insert(source).exec(&txn).await?;
303                }
304                let table_model: table::ActiveModel = table.clone().into();
305                Table::insert(table_model).exec(&txn).await?;
306
307                if table.refreshable {
308                    let trigger_interval_secs = src
309                        .as_ref()
310                        .and_then(|source_catalog| source_catalog.refresh_mode)
311                        .and_then(
312                            |source_refresh_mode| match source_refresh_mode.refresh_mode {
313                                Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
314                                    refresh_interval_sec,
315                                })) => refresh_interval_sec,
316                                Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})) => None,
317                                None => None,
318                            },
319                        );
320
321                    RefreshJob::insert(refresh_job::ActiveModel {
322                        table_id: Set(table.id),
323                        last_trigger_time: Set(None),
324                        trigger_interval_secs: Set(trigger_interval_secs),
325                        current_status: Set(RefreshState::Idle),
326                        last_success_time: Set(None),
327                    })
328                    .exec(&txn)
329                    .await?;
330                }
331            }
332            StreamingJob::Index(index, table) => {
333                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
334                let job_id = Self::create_streaming_job_obj(
335                    &txn,
336                    ObjectType::Index,
337                    index.owner as _,
338                    Some(index.database_id),
339                    Some(index.schema_id),
340                    create_type,
341                    ctx.clone(),
342                    streaming_parallelism,
343                    max_parallelism,
344                    resource_type,
345                    backfill_parallelism.clone(),
346                )
347                .await?;
348                // to be compatible with old implementation.
349                index.id = job_id.as_index_id();
350                index.index_table_id = job_id.as_mv_table_id();
351                table.id = job_id.as_mv_table_id();
352
353                ObjectDependency::insert(object_dependency::ActiveModel {
354                    oid: Set(index.primary_table_id.into()),
355                    used_by: Set(table.id.into()),
356                    ..Default::default()
357                })
358                .exec(&txn)
359                .await?;
360
361                let table_model: table::ActiveModel = table.clone().into();
362                Table::insert(table_model).exec(&txn).await?;
363                let index_model: index::ActiveModel = index.clone().into();
364                Index::insert(index_model).exec(&txn).await?;
365            }
366            StreamingJob::Source(src) => {
367                let job_id = Self::create_streaming_job_obj(
368                    &txn,
369                    ObjectType::Source,
370                    src.owner as _,
371                    Some(src.database_id),
372                    Some(src.schema_id),
373                    create_type,
374                    ctx.clone(),
375                    streaming_parallelism,
376                    max_parallelism,
377                    resource_type,
378                    backfill_parallelism.clone(),
379                )
380                .await?;
381                src.id = job_id.as_shared_source_id();
382                let source_model: source::ActiveModel = src.clone().into();
383                Source::insert(source_model).exec(&txn).await?;
384            }
385        }
386
387        // collect dependent secrets.
388        dependencies.extend(
389            streaming_job
390                .dependent_secret_ids()?
391                .into_iter()
392                .map(|id| id.as_object_id()),
393        );
394        // collect dependent connection
395        dependencies.extend(
396            streaming_job
397                .dependent_connection_ids()?
398                .into_iter()
399                .map(|id| id.as_object_id()),
400        );
401
402        // record object dependency.
403        if !dependencies.is_empty() {
404            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
405                object_dependency::ActiveModel {
406                    oid: Set(oid),
407                    used_by: Set(streaming_job.id().as_object_id()),
408                    ..Default::default()
409                }
410            }))
411            .exec(&txn)
412            .await?;
413        }
414
415        txn.commit().await?;
416
417        Ok(())
418    }
419
420    /// Create catalogs for internal tables, then notify frontend about them if the job is a
421    /// materialized view.
422    ///
423    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
424    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
425    ///
426    /// Returns a mapping from the temporary table id to the actual global table id.
427    pub async fn create_internal_table_catalog(
428        &self,
429        job: &StreamingJob,
430        mut incomplete_internal_tables: Vec<PbTable>,
431    ) -> MetaResult<HashMap<TableId, TableId>> {
432        let job_id = job.id();
433        let inner = self.inner.write().await;
434        let txn = inner.db.begin().await?;
435
436        // Ensure the job exists.
437        ensure_job_not_canceled(job_id, &txn).await?;
438
439        let mut table_id_map = HashMap::new();
440        for table in &mut incomplete_internal_tables {
441            let table_id = Self::create_object(
442                &txn,
443                ObjectType::Table,
444                table.owner as _,
445                Some(table.database_id),
446                Some(table.schema_id),
447            )
448            .await?
449            .oid
450            .as_table_id();
451            table_id_map.insert(table.id, table_id);
452            table.id = table_id;
453            table.job_id = Some(job_id);
454
455            let table_model = table::ActiveModel {
456                table_id: Set(table_id),
457                belongs_to_job_id: Set(Some(job_id)),
458                fragment_id: NotSet,
459                ..table.clone().into()
460            };
461            Table::insert(table_model).exec(&txn).await?;
462        }
463        txn.commit().await?;
464
465        Ok(table_id_map)
466    }
467
468    pub async fn prepare_stream_job_fragments(
469        &self,
470        stream_job_fragments: &StreamJobFragmentsToCreate,
471        streaming_job: &StreamingJob,
472        for_replace: bool,
473        backfill_orders: Option<BackfillOrders>,
474    ) -> MetaResult<()> {
475        self.prepare_streaming_job(
476            stream_job_fragments.stream_job_id(),
477            || stream_job_fragments.fragments.values(),
478            &stream_job_fragments.downstreams,
479            for_replace,
480            Some(streaming_job),
481            backfill_orders,
482        )
483        .await
484    }
485
486    // TODO: In this function, we also update the `Table` model in the meta store.
487    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
488    // making them the source of truth and performing a full replacement for those in the meta store?
489    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
490    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
491    )]
492    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
493        &self,
494        job_id: JobId,
495        get_fragments: impl Fn() -> I + 'a,
496        downstreams: &FragmentDownstreamRelation,
497        for_replace: bool,
498        creating_streaming_job: Option<&'a StreamingJob>,
499        backfill_orders: Option<BackfillOrders>,
500    ) -> MetaResult<()> {
501        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
502
503        let inner = self.inner.write().await;
504
505        let need_notify = creating_streaming_job
506            .map(|job| job.should_notify_creating())
507            .unwrap_or(false);
508        let definition = creating_streaming_job.map(|job| job.definition());
509
510        let mut objects_to_notify = vec![];
511        let txn = inner.db.begin().await?;
512
513        // Ensure the job exists.
514        ensure_job_not_canceled(job_id, &txn).await?;
515
516        if let Some(backfill_orders) = backfill_orders {
517            let job = streaming_job::ActiveModel {
518                job_id: Set(job_id),
519                backfill_orders: Set(Some(backfill_orders)),
520                ..Default::default()
521            };
522            StreamingJobModel::update(job).exec(&txn).await?;
523        }
524
525        let state_table_ids = fragments
526            .iter()
527            .flat_map(|fragment| fragment.state_table_ids.inner_ref().clone())
528            .collect_vec();
529
530        if !fragments.is_empty() {
531            let fragment_models = fragments
532                .into_iter()
533                .map(|fragment| fragment.into_active_model())
534                .collect_vec();
535            Fragment::insert_many(fragment_models).exec(&txn).await?;
536        }
537
538        // Fields including `fragment_id` and `vnode_count` were placeholder values before.
539        // After table fragments are created, update them for all tables.
540        if !for_replace {
541            let all_tables = StreamJobFragments::collect_tables(get_fragments());
542            for state_table_id in state_table_ids {
543                // Table's vnode count is not always the fragment's vnode count, so we have to
544                // look up the table from `TableFragments`.
545                // See `ActorGraphBuilder::new`.
546                let table = all_tables
547                    .get(&state_table_id)
548                    .unwrap_or_else(|| panic!("table {} not found", state_table_id));
549                assert_eq!(table.id, state_table_id);
550                let vnode_count = table.vnode_count();
551
552                Table::update(table::ActiveModel {
553                    table_id: Set(state_table_id as _),
554                    fragment_id: Set(Some(table.fragment_id)),
555                    vnode_count: Set(vnode_count as _),
556                    ..Default::default()
557                })
558                .exec(&txn)
559                .await?;
560
561                if need_notify {
562                    let mut table = table.clone();
563                    // In production, definition was replaced but still needed for notification.
564                    if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
565                        table.definition = definition.clone().unwrap();
566                    }
567                    objects_to_notify.push(PbObject {
568                        object_info: Some(PbObjectInfo::Table(table)),
569                    });
570                }
571            }
572        }
573
574        // Add streaming job objects to notification
575        if need_notify {
576            match creating_streaming_job.unwrap() {
577                StreamingJob::Table(Some(source), ..) => {
578                    objects_to_notify.push(PbObject {
579                        object_info: Some(PbObjectInfo::Source(source.clone())),
580                    });
581                }
582                StreamingJob::Sink(sink) => {
583                    objects_to_notify.push(PbObject {
584                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
585                    });
586                }
587                StreamingJob::Index(index, _) => {
588                    objects_to_notify.push(PbObject {
589                        object_info: Some(PbObjectInfo::Index(index.clone())),
590                    });
591                }
592                _ => {}
593            }
594        }
595
596        insert_fragment_relations(&txn, downstreams).await?;
597
598        if !for_replace {
599            // Update dml fragment id.
600            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
601                Table::update(table::ActiveModel {
602                    table_id: Set(table.id),
603                    dml_fragment_id: Set(table.dml_fragment_id),
604                    ..Default::default()
605                })
606                .exec(&txn)
607                .await?;
608            }
609        }
610
611        txn.commit().await?;
612
613        // FIXME: there's a gap between the catalog creation and notification, which may lead to
614        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
615        // need to either forward the notification or filter catalogs without any fragments in the metastore.
616        if !objects_to_notify.is_empty() {
617            self.notify_frontend(
618                Operation::Add,
619                Info::ObjectGroup(PbObjectGroup {
620                    objects: objects_to_notify,
621                }),
622            )
623            .await;
624        }
625
626        Ok(())
627    }
628
629    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
630    /// information is required to build barrier mutation.
631    pub async fn build_cancel_command(
632        &self,
633        table_fragments: &StreamJobFragments,
634    ) -> MetaResult<Command> {
635        let inner = self.inner.read().await;
636        let txn = inner.db.begin().await?;
637
638        let dropped_sink_fragment_with_target =
639            if let Some(sink_fragment) = table_fragments.sink_fragment() {
640                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
641                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
642                sink_target_fragment
643                    .get(&sink_fragment_id)
644                    .map(|target_fragments| {
645                        let target_fragment_id = *target_fragments
646                            .first()
647                            .expect("sink should have at least one downstream fragment");
648                        (sink_fragment_id, target_fragment_id)
649                    })
650            } else {
651                None
652            };
653
654        Ok(Command::DropStreamingJobs {
655            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
656            actors: table_fragments.actor_ids().collect(),
657            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
658            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
659            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
660                .into_iter()
661                .map(|(sink, target)| (target as _, vec![sink as _]))
662                .collect(),
663        })
664    }
665
666    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
667    /// It returns (true, _) if the job is not found or aborted.
668    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
669    #[await_tree::instrument]
670    pub async fn try_abort_creating_streaming_job(
671        &self,
672        mut job_id: JobId,
673        is_cancelled: bool,
674    ) -> MetaResult<(bool, Option<DatabaseId>)> {
675        let mut inner = self.inner.write().await;
676        let txn = inner.db.begin().await?;
677
678        let obj = Object::find_by_id(job_id).one(&txn).await?;
679        let Some(obj) = obj else {
680            tracing::warn!(
681                id = %job_id,
682                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
683            );
684            return Ok((true, None));
685        };
686        let database_id = obj
687            .database_id
688            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
689        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
690
691        if !is_cancelled && let Some(streaming_job) = &streaming_job {
692            assert_ne!(streaming_job.job_status, JobStatus::Created);
693            if streaming_job.create_type == CreateType::Background
694                && streaming_job.job_status == JobStatus::Creating
695            {
696                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
697                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
698                {
699                    // If the job belongs to an iceberg table, we still need to clean it.
700                } else {
701                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
702                    tracing::warn!(
703                        id = %job_id,
704                        "streaming job is created in background and still in creating status"
705                    );
706                    return Ok((false, Some(database_id)));
707                }
708            }
709        }
710
711        // Record original job info before any potential job id rewrite (e.g. iceberg sink).
712        let original_job_id = job_id;
713        let original_obj_type = obj.obj_type;
714
715        let iceberg_table_id =
716            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
717        if let Some(iceberg_table_id) = iceberg_table_id {
718            // If the job is iceberg sink, we need to clean the iceberg table as well.
719            // Here we will drop the sink objects directly.
720            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
721            Object::delete_many()
722                .filter(
723                    object::Column::Oid
724                        .eq(job_id)
725                        .or(object::Column::Oid.is_in(internal_tables)),
726                )
727                .exec(&txn)
728                .await?;
729            job_id = iceberg_table_id.as_job_id();
730        };
731
732        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
733
734        // Get the notification info if the job is a materialized view or created in the background.
735        let mut objs = vec![];
736        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
737
738        let mut need_notify =
739            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
740        if !need_notify {
741            if let Some(table) = &table_obj {
742                need_notify = table.table_type == TableType::MaterializedView;
743            } else if original_obj_type == ObjectType::Sink {
744                need_notify = true;
745            }
746        }
747
748        if is_cancelled {
749            let dropped_tables = Table::find()
750                .find_also_related(Object)
751                .filter(
752                    table::Column::TableId.is_in(
753                        internal_table_ids
754                            .iter()
755                            .cloned()
756                            .chain(table_obj.iter().map(|t| t.table_id as _)),
757                    ),
758                )
759                .all(&txn)
760                .await?
761                .into_iter()
762                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
763            inner
764                .dropped_tables
765                .extend(dropped_tables.map(|t| (t.id, t)));
766        }
767
768        if need_notify {
769            // Special handling for iceberg sinks: the `job_id` may have been rewritten to the table id.
770            // Ensure we still notify the frontend to delete the original sink object.
771            if original_obj_type == ObjectType::Sink && original_job_id != job_id {
772                let orig_obj: Option<PartialObject> = Object::find_by_id(original_job_id)
773                    .select_only()
774                    .columns([
775                        object::Column::Oid,
776                        object::Column::ObjType,
777                        object::Column::SchemaId,
778                        object::Column::DatabaseId,
779                    ])
780                    .into_partial_model()
781                    .one(&txn)
782                    .await?;
783                if let Some(orig_obj) = orig_obj {
784                    objs.push(orig_obj);
785                }
786            }
787
788            let obj: Option<PartialObject> = Object::find_by_id(job_id)
789                .select_only()
790                .columns([
791                    object::Column::Oid,
792                    object::Column::ObjType,
793                    object::Column::SchemaId,
794                    object::Column::DatabaseId,
795                ])
796                .into_partial_model()
797                .one(&txn)
798                .await?;
799            let obj =
800                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
801            objs.push(obj);
802            let internal_table_objs: Vec<PartialObject> = Object::find()
803                .select_only()
804                .columns([
805                    object::Column::Oid,
806                    object::Column::ObjType,
807                    object::Column::SchemaId,
808                    object::Column::DatabaseId,
809                ])
810                .join(JoinType::InnerJoin, object::Relation::Table.def())
811                .filter(table::Column::BelongsToJobId.eq(job_id))
812                .into_partial_model()
813                .all(&txn)
814                .await?;
815            objs.extend(internal_table_objs);
816        }
817
818        // Check if the job is creating sink into table.
819        if table_obj.is_none()
820            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
821                .select_only()
822                .column(sink::Column::TargetTable)
823                .into_tuple::<Option<TableId>>()
824                .one(&txn)
825                .await?
826        {
827            let tmp_id: Option<ObjectId> = ObjectDependency::find()
828                .select_only()
829                .column(object_dependency::Column::UsedBy)
830                .join(
831                    JoinType::InnerJoin,
832                    object_dependency::Relation::Object1.def(),
833                )
834                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
835                .filter(
836                    object_dependency::Column::Oid
837                        .eq(target_table_id)
838                        .and(object::Column::ObjType.eq(ObjectType::Table))
839                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
840                )
841                .into_tuple()
842                .one(&txn)
843                .await?;
844            if let Some(tmp_id) = tmp_id {
845                tracing::warn!(
846                    id = %tmp_id,
847                    "aborting temp streaming job for sink into table"
848                );
849
850                Object::delete_by_id(tmp_id).exec(&txn).await?;
851            }
852        }
853
854        Object::delete_by_id(job_id).exec(&txn).await?;
855        if !internal_table_ids.is_empty() {
856            Object::delete_many()
857                .filter(object::Column::Oid.is_in(internal_table_ids))
858                .exec(&txn)
859                .await?;
860        }
861        if let Some(t) = &table_obj
862            && let Some(source_id) = t.optional_associated_source_id
863        {
864            Object::delete_by_id(source_id).exec(&txn).await?;
865        }
866
867        let err = if is_cancelled {
868            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
869        } else {
870            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
871        };
872        let abort_reason = format!("streaming job aborted {}", err.as_report());
873        for tx in inner
874            .creating_table_finish_notifier
875            .get_mut(&database_id)
876            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
877            .into_iter()
878            .flatten()
879            .flatten()
880        {
881            let _ = tx.send(Err(abort_reason.clone()));
882        }
883        txn.commit().await?;
884
885        if !objs.is_empty() {
886            // We also have notified the frontend about these objects,
887            // so we need to notify the frontend to delete them here.
888            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
889                .await;
890        }
891        Ok((true, Some(database_id)))
892    }
893
894    #[await_tree::instrument]
895    pub async fn post_collect_job_fragments(
896        &self,
897        job_id: JobId,
898        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
899        new_sink_downstream: Option<FragmentDownstreamRelation>,
900        split_assignment: Option<&SplitAssignment>,
901    ) -> MetaResult<()> {
902        let inner = self.inner.write().await;
903        let txn = inner.db.begin().await?;
904
905        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
906
907        if let Some(new_downstream) = new_sink_downstream {
908            insert_fragment_relations(&txn, &new_downstream).await?;
909        }
910
911        // Mark job as CREATING.
912        StreamingJobModel::update(streaming_job::ActiveModel {
913            job_id: Set(job_id),
914            job_status: Set(JobStatus::Creating),
915            ..Default::default()
916        })
917        .exec(&txn)
918        .await?;
919
920        if let Some(split_assignment) = split_assignment {
921            let fragment_splits = split_assignment
922                .iter()
923                .map(|(fragment_id, splits)| {
924                    (
925                        *fragment_id as _,
926                        splits.values().flatten().cloned().collect_vec(),
927                    )
928                })
929                .collect();
930
931            self.update_fragment_splits(&txn, &fragment_splits).await?;
932        }
933
934        txn.commit().await?;
935
936        Ok(())
937    }
938
939    pub async fn create_job_catalog_for_replace(
940        &self,
941        streaming_job: &StreamingJob,
942        ctx: Option<&StreamContext>,
943        specified_parallelism: Option<&NonZeroUsize>,
944        expected_original_max_parallelism: Option<usize>,
945    ) -> MetaResult<JobId> {
946        let id = streaming_job.id();
947        let inner = self.inner.write().await;
948        let txn = inner.db.begin().await?;
949
950        // 1. check version.
951        streaming_job.verify_version_for_replace(&txn).await?;
952        // 2. check concurrent replace.
953        let referring_cnt = ObjectDependency::find()
954            .join(
955                JoinType::InnerJoin,
956                object_dependency::Relation::Object1.def(),
957            )
958            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
959            .filter(
960                object_dependency::Column::Oid
961                    .eq(id)
962                    .and(object::Column::ObjType.eq(ObjectType::Table))
963                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
964            )
965            .count(&txn)
966            .await?;
967        if referring_cnt != 0 {
968            return Err(MetaError::permission_denied(
969                "job is being altered or referenced by some creating jobs",
970            ));
971        }
972
973        // 3. check parallelism.
974        let (
975            original_max_parallelism,
976            original_timezone,
977            original_config_override,
978            original_adaptive_strategy,
979        ): (i32, Option<String>, Option<String>, Option<String>) =
980            StreamingJobModel::find_by_id(id)
981                .select_only()
982                .column(streaming_job::Column::MaxParallelism)
983                .column(streaming_job::Column::Timezone)
984                .column(streaming_job::Column::ConfigOverride)
985                .column(streaming_job::Column::AdaptiveParallelismStrategy)
986                .into_tuple()
987                .one(&txn)
988                .await?
989                .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
990
991        if let Some(max_parallelism) = expected_original_max_parallelism
992            && original_max_parallelism != max_parallelism as i32
993        {
994            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
995            // This should not happen in normal cases.
996            bail!(
997                "cannot use a different max parallelism \
998                 when replacing streaming job, \
999                 original: {}, new: {}",
1000                original_max_parallelism,
1001                max_parallelism
1002            );
1003        }
1004
1005        let parallelism = match specified_parallelism {
1006            None => StreamingParallelism::Adaptive,
1007            Some(n) => StreamingParallelism::Fixed(n.get() as _),
1008        };
1009
1010        let ctx = StreamContext {
1011            timezone: ctx
1012                .map(|ctx| ctx.timezone.clone())
1013                .unwrap_or(original_timezone),
1014            // We don't expect replacing a job with a different config override.
1015            // Thus always use the original config override.
1016            config_override: original_config_override.unwrap_or_default().into(),
1017            adaptive_parallelism_strategy: original_adaptive_strategy.as_deref().map(|s| {
1018                parse_strategy(s).expect("strategy should be validated before persisting")
1019            }),
1020        };
1021
1022        // 4. create streaming object for new replace table.
1023        let new_obj_id = Self::create_streaming_job_obj(
1024            &txn,
1025            streaming_job.object_type(),
1026            streaming_job.owner() as _,
1027            Some(streaming_job.database_id() as _),
1028            Some(streaming_job.schema_id() as _),
1029            streaming_job.create_type(),
1030            ctx,
1031            parallelism,
1032            original_max_parallelism as _,
1033            streaming_job_resource_type::ResourceType::Regular(true),
1034            None,
1035        )
1036        .await?;
1037
1038        // 5. record dependency for new replace table.
1039        ObjectDependency::insert(object_dependency::ActiveModel {
1040            oid: Set(id.as_object_id()),
1041            used_by: Set(new_obj_id.as_object_id()),
1042            ..Default::default()
1043        })
1044        .exec(&txn)
1045        .await?;
1046
1047        txn.commit().await?;
1048
1049        Ok(new_obj_id)
1050    }
1051
1052    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1053    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
1054        let mut inner = self.inner.write().await;
1055        let txn = inner.db.begin().await?;
1056
1057        // Check if the job belongs to iceberg table.
1058        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
1059            tracing::info!(
1060                "streaming job {} is for iceberg table, wait for manual finish operation",
1061                job_id
1062            );
1063            return Ok(());
1064        }
1065
1066        let (notification_op, objects, updated_user_info) =
1067            Self::finish_streaming_job_inner(&txn, job_id).await?;
1068
1069        txn.commit().await?;
1070
1071        let mut version = self
1072            .notify_frontend(
1073                notification_op,
1074                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1075            )
1076            .await;
1077
1078        // notify users about the default privileges
1079        if !updated_user_info.is_empty() {
1080            version = self.notify_users_update(updated_user_info).await;
1081        }
1082
1083        inner
1084            .creating_table_finish_notifier
1085            .values_mut()
1086            .for_each(|creating_tables| {
1087                if let Some(txs) = creating_tables.remove(&job_id) {
1088                    for tx in txs {
1089                        let _ = tx.send(Ok(version));
1090                    }
1091                }
1092            });
1093
1094        Ok(())
1095    }
1096
1097    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
1098    pub async fn finish_streaming_job_inner(
1099        txn: &DatabaseTransaction,
1100        job_id: JobId,
1101    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
1102        let job_type = Object::find_by_id(job_id)
1103            .select_only()
1104            .column(object::Column::ObjType)
1105            .into_tuple()
1106            .one(txn)
1107            .await?
1108            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1109
1110        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
1111            .select_only()
1112            .column(streaming_job::Column::CreateType)
1113            .into_tuple()
1114            .one(txn)
1115            .await?
1116            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1117
1118        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1119        let res = Object::update_many()
1120            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1121            .col_expr(
1122                object::Column::CreatedAtClusterVersion,
1123                current_cluster_version().into(),
1124            )
1125            .filter(object::Column::Oid.eq(job_id))
1126            .exec(txn)
1127            .await?;
1128        if res.rows_affected == 0 {
1129            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1130        }
1131
1132        // mark the target stream job as `Created`.
1133        let job = streaming_job::ActiveModel {
1134            job_id: Set(job_id),
1135            job_status: Set(JobStatus::Created),
1136            ..Default::default()
1137        };
1138        let streaming_job = Some(job.update(txn).await?);
1139
1140        // notify frontend: job, internal tables.
1141        let internal_table_objs = Table::find()
1142            .find_also_related(Object)
1143            .filter(table::Column::BelongsToJobId.eq(job_id))
1144            .all(txn)
1145            .await?;
1146        let mut objects = internal_table_objs
1147            .iter()
1148            .map(|(table, obj)| PbObject {
1149                object_info: Some(PbObjectInfo::Table(
1150                    ObjectModel(table.clone(), obj.clone().unwrap(), streaming_job.clone()).into(),
1151                )),
1152            })
1153            .collect_vec();
1154        let mut notification_op = if create_type == CreateType::Background {
1155            NotificationOperation::Update
1156        } else {
1157            NotificationOperation::Add
1158        };
1159        let mut updated_user_info = vec![];
1160        let mut need_grant_default_privileges = true;
1161
1162        match job_type {
1163            ObjectType::Table => {
1164                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1165                    .find_also_related(Object)
1166                    .one(txn)
1167                    .await?
1168                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1169                if table.table_type == TableType::MaterializedView {
1170                    notification_op = NotificationOperation::Update;
1171                }
1172
1173                if let Some(source_id) = table.optional_associated_source_id {
1174                    let (src, obj) = Source::find_by_id(source_id)
1175                        .find_also_related(Object)
1176                        .one(txn)
1177                        .await?
1178                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1179                    objects.push(PbObject {
1180                        object_info: Some(PbObjectInfo::Source(
1181                            ObjectModel(src, obj.unwrap(), None).into(),
1182                        )),
1183                    });
1184                }
1185                objects.push(PbObject {
1186                    object_info: Some(PbObjectInfo::Table(
1187                        ObjectModel(table, obj.unwrap(), streaming_job).into(),
1188                    )),
1189                });
1190            }
1191            ObjectType::Sink => {
1192                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1193                    .find_also_related(Object)
1194                    .one(txn)
1195                    .await?
1196                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1197                if sink.name.starts_with(ICEBERG_SINK_PREFIX) {
1198                    need_grant_default_privileges = false;
1199                }
1200                // If sinks were pre-notified during CREATING, we should use Update at finish
1201                // to avoid duplicate Add notifications (align with MV behavior).
1202                notification_op = NotificationOperation::Update;
1203                objects.push(PbObject {
1204                    object_info: Some(PbObjectInfo::Sink(
1205                        ObjectModel(sink, obj.unwrap(), streaming_job).into(),
1206                    )),
1207                });
1208            }
1209            ObjectType::Index => {
1210                need_grant_default_privileges = false;
1211                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1212                    .find_also_related(Object)
1213                    .one(txn)
1214                    .await?
1215                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1216                {
1217                    let (table, obj) = Table::find_by_id(index.index_table_id)
1218                        .find_also_related(Object)
1219                        .one(txn)
1220                        .await?
1221                        .ok_or_else(|| {
1222                            MetaError::catalog_id_not_found("table", index.index_table_id)
1223                        })?;
1224                    objects.push(PbObject {
1225                        object_info: Some(PbObjectInfo::Table(
1226                            ObjectModel(table, obj.unwrap(), streaming_job.clone()).into(),
1227                        )),
1228                    });
1229                }
1230
1231                // If the index is created on a table with privileges, we should also
1232                // grant the privileges for the index and its state tables.
1233                let primary_table_privileges = UserPrivilege::find()
1234                    .filter(
1235                        user_privilege::Column::Oid
1236                            .eq(index.primary_table_id)
1237                            .and(user_privilege::Column::Action.eq(Action::Select)),
1238                    )
1239                    .all(txn)
1240                    .await?;
1241                if !primary_table_privileges.is_empty() {
1242                    let index_state_table_ids: Vec<TableId> = Table::find()
1243                        .select_only()
1244                        .column(table::Column::TableId)
1245                        .filter(
1246                            table::Column::BelongsToJobId
1247                                .eq(job_id)
1248                                .or(table::Column::TableId.eq(index.index_table_id)),
1249                        )
1250                        .into_tuple()
1251                        .all(txn)
1252                        .await?;
1253                    let mut new_privileges = vec![];
1254                    for privilege in &primary_table_privileges {
1255                        for state_table_id in &index_state_table_ids {
1256                            new_privileges.push(user_privilege::ActiveModel {
1257                                id: Default::default(),
1258                                oid: Set(state_table_id.as_object_id()),
1259                                user_id: Set(privilege.user_id),
1260                                action: Set(Action::Select),
1261                                dependent_id: Set(privilege.dependent_id),
1262                                granted_by: Set(privilege.granted_by),
1263                                with_grant_option: Set(privilege.with_grant_option),
1264                            });
1265                        }
1266                    }
1267                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1268
1269                    updated_user_info = list_user_info_by_ids(
1270                        primary_table_privileges.into_iter().map(|p| p.user_id),
1271                        txn,
1272                    )
1273                    .await?;
1274                }
1275
1276                objects.push(PbObject {
1277                    object_info: Some(PbObjectInfo::Index(
1278                        ObjectModel(index, obj.unwrap(), streaming_job).into(),
1279                    )),
1280                });
1281            }
1282            ObjectType::Source => {
1283                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1284                    .find_also_related(Object)
1285                    .one(txn)
1286                    .await?
1287                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1288                objects.push(PbObject {
1289                    object_info: Some(PbObjectInfo::Source(
1290                        ObjectModel(source, obj.unwrap(), None).into(),
1291                    )),
1292                });
1293            }
1294            _ => unreachable!("invalid job type: {:?}", job_type),
1295        }
1296
1297        if need_grant_default_privileges {
1298            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1299        }
1300
1301        Ok((notification_op, objects, updated_user_info))
1302    }
1303
1304    pub async fn finish_replace_streaming_job(
1305        &self,
1306        tmp_id: JobId,
1307        streaming_job: StreamingJob,
1308        replace_upstream: FragmentReplaceUpstream,
1309        sink_into_table_context: SinkIntoTableContext,
1310        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1311        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1312    ) -> MetaResult<NotificationVersion> {
1313        let inner = self.inner.write().await;
1314        let txn = inner.db.begin().await?;
1315
1316        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1317            tmp_id,
1318            replace_upstream,
1319            sink_into_table_context,
1320            &txn,
1321            streaming_job,
1322            drop_table_connector_ctx,
1323            auto_refresh_schema_sinks,
1324        )
1325        .await?;
1326
1327        txn.commit().await?;
1328
1329        let mut version = self
1330            .notify_frontend(
1331                NotificationOperation::Update,
1332                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1333            )
1334            .await;
1335
1336        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1337            self.notify_users_update(user_infos).await;
1338            version = self
1339                .notify_frontend(
1340                    NotificationOperation::Delete,
1341                    build_object_group_for_delete(to_drop_objects),
1342                )
1343                .await;
1344        }
1345
1346        Ok(version)
1347    }
1348
1349    pub async fn finish_replace_streaming_job_inner(
1350        tmp_id: JobId,
1351        replace_upstream: FragmentReplaceUpstream,
1352        SinkIntoTableContext {
1353            updated_sink_catalogs,
1354        }: SinkIntoTableContext,
1355        txn: &DatabaseTransaction,
1356        streaming_job: StreamingJob,
1357        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1358        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1359    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1360        let original_job_id = streaming_job.id();
1361        let job_type = streaming_job.job_type();
1362
1363        let mut index_item_rewriter = None;
1364
1365        // Update catalog
1366        match streaming_job {
1367            StreamingJob::Table(_source, table, _table_job_type) => {
1368                // The source catalog should remain unchanged
1369
1370                let original_column_catalogs =
1371                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1372
1373                index_item_rewriter = Some({
1374                    let original_columns = original_column_catalogs
1375                        .to_protobuf()
1376                        .into_iter()
1377                        .map(|c| c.column_desc.unwrap())
1378                        .collect_vec();
1379                    let new_columns = table
1380                        .columns
1381                        .iter()
1382                        .map(|c| c.column_desc.clone().unwrap())
1383                        .collect_vec();
1384
1385                    IndexItemRewriter {
1386                        original_columns,
1387                        new_columns,
1388                    }
1389                });
1390
1391                // For sinks created in earlier versions, we need to set the original_target_columns.
1392                for sink_id in updated_sink_catalogs {
1393                    Sink::update(sink::ActiveModel {
1394                        sink_id: Set(sink_id as _),
1395                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1396                        ..Default::default()
1397                    })
1398                    .exec(txn)
1399                    .await?;
1400                }
1401                // Update the table catalog with the new one. (column catalog is also updated here)
1402                let mut table = table::ActiveModel::from(table);
1403                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1404                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1405                {
1406                    // drop table connector, the rest logic is in `drop_table_associated_source`
1407                    table.optional_associated_source_id = Set(None);
1408                }
1409
1410                Table::update(table).exec(txn).await?;
1411            }
1412            StreamingJob::Source(source) => {
1413                // Update the source catalog with the new one.
1414                let source = source::ActiveModel::from(source);
1415                Source::update(source).exec(txn).await?;
1416            }
1417            StreamingJob::MaterializedView(table) => {
1418                // Update the table catalog with the new one.
1419                let table = table::ActiveModel::from(table);
1420                Table::update(table).exec(txn).await?;
1421            }
1422            _ => unreachable!(
1423                "invalid streaming job type: {:?}",
1424                streaming_job.job_type_str()
1425            ),
1426        }
1427
1428        async fn finish_fragments(
1429            txn: &DatabaseTransaction,
1430            tmp_id: JobId,
1431            original_job_id: JobId,
1432            replace_upstream: FragmentReplaceUpstream,
1433        ) -> MetaResult<()> {
1434            // 0. update internal tables
1435            // Fields including `fragment_id` were placeholder values before.
1436            // After table fragments are created, update them for all internal tables.
1437            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1438                .select_only()
1439                .columns([
1440                    fragment::Column::FragmentId,
1441                    fragment::Column::StateTableIds,
1442                ])
1443                .filter(fragment::Column::JobId.eq(tmp_id))
1444                .into_tuple()
1445                .all(txn)
1446                .await?;
1447            for (fragment_id, state_table_ids) in fragment_info {
1448                for state_table_id in state_table_ids.into_inner() {
1449                    let state_table_id = TableId::new(state_table_id as _);
1450                    Table::update(table::ActiveModel {
1451                        table_id: Set(state_table_id),
1452                        fragment_id: Set(Some(fragment_id)),
1453                        // No need to update `vnode_count` because it must remain the same.
1454                        ..Default::default()
1455                    })
1456                    .exec(txn)
1457                    .await?;
1458                }
1459            }
1460
1461            // 1. replace old fragments/actors with new ones.
1462            Fragment::delete_many()
1463                .filter(fragment::Column::JobId.eq(original_job_id))
1464                .exec(txn)
1465                .await?;
1466            Fragment::update_many()
1467                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1468                .filter(fragment::Column::JobId.eq(tmp_id))
1469                .exec(txn)
1470                .await?;
1471
1472            // 2. update merges.
1473            // update downstream fragment's Merge node, and upstream_fragment_id
1474            for (fragment_id, fragment_replace_map) in replace_upstream {
1475                let (fragment_id, mut stream_node) =
1476                    Fragment::find_by_id(fragment_id as FragmentId)
1477                        .select_only()
1478                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1479                        .into_tuple::<(FragmentId, StreamNode)>()
1480                        .one(txn)
1481                        .await?
1482                        .map(|(id, node)| (id, node.to_protobuf()))
1483                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1484
1485                visit_stream_node_mut(&mut stream_node, |body| {
1486                    if let PbNodeBody::Merge(m) = body
1487                        && let Some(new_fragment_id) =
1488                            fragment_replace_map.get(&m.upstream_fragment_id)
1489                    {
1490                        m.upstream_fragment_id = *new_fragment_id;
1491                    }
1492                });
1493                Fragment::update(fragment::ActiveModel {
1494                    fragment_id: Set(fragment_id),
1495                    stream_node: Set(StreamNode::from(&stream_node)),
1496                    ..Default::default()
1497                })
1498                .exec(txn)
1499                .await?;
1500            }
1501
1502            // 3. remove dummy object.
1503            Object::delete_by_id(tmp_id).exec(txn).await?;
1504
1505            Ok(())
1506        }
1507
1508        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1509
1510        // 4. update catalogs and notify.
1511        let mut objects = vec![];
1512        match job_type {
1513            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1514                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1515                    .find_also_related(Object)
1516                    .one(txn)
1517                    .await?
1518                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1519                let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
1520                    .one(txn)
1521                    .await?;
1522                objects.push(PbObject {
1523                    object_info: Some(PbObjectInfo::Table(
1524                        ObjectModel(table, table_obj.unwrap(), streaming_job).into(),
1525                    )),
1526                })
1527            }
1528            StreamingJobType::Source => {
1529                let (source, source_obj) =
1530                    Source::find_by_id(original_job_id.as_shared_source_id())
1531                        .find_also_related(Object)
1532                        .one(txn)
1533                        .await?
1534                        .ok_or_else(|| {
1535                            MetaError::catalog_id_not_found("object", original_job_id)
1536                        })?;
1537                objects.push(PbObject {
1538                    object_info: Some(PbObjectInfo::Source(
1539                        ObjectModel(source, source_obj.unwrap(), None).into(),
1540                    )),
1541                })
1542            }
1543            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1544        }
1545
1546        if let Some(expr_rewriter) = index_item_rewriter {
1547            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1548                .select_only()
1549                .columns([index::Column::IndexId, index::Column::IndexItems])
1550                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1551                .into_tuple()
1552                .all(txn)
1553                .await?;
1554            for (index_id, nodes) in index_items {
1555                let mut pb_nodes = nodes.to_protobuf();
1556                pb_nodes
1557                    .iter_mut()
1558                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1559                let index = index::ActiveModel {
1560                    index_id: Set(index_id),
1561                    index_items: Set(pb_nodes.into()),
1562                    ..Default::default()
1563                }
1564                .update(txn)
1565                .await?;
1566                let (index_obj, streaming_job) = Object::find_by_id(index.index_id)
1567                    .find_also_related(streaming_job::Entity)
1568                    .one(txn)
1569                    .await?
1570                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1571                objects.push(PbObject {
1572                    object_info: Some(PbObjectInfo::Index(
1573                        ObjectModel(index, index_obj, streaming_job).into(),
1574                    )),
1575                });
1576            }
1577        }
1578
1579        if let Some(sinks) = auto_refresh_schema_sinks {
1580            for finish_sink_context in sinks {
1581                finish_fragments(
1582                    txn,
1583                    finish_sink_context.tmp_sink_id.as_job_id(),
1584                    finish_sink_context.original_sink_id.as_job_id(),
1585                    Default::default(),
1586                )
1587                .await?;
1588                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1589                    .find_also_related(Object)
1590                    .one(txn)
1591                    .await?
1592                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1593                let sink_streaming_job =
1594                    streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
1595                        .one(txn)
1596                        .await?;
1597                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1598                Sink::update(sink::ActiveModel {
1599                    sink_id: Set(finish_sink_context.original_sink_id),
1600                    columns: Set(columns.clone()),
1601                    ..Default::default()
1602                })
1603                .exec(txn)
1604                .await?;
1605                sink.columns = columns;
1606                objects.push(PbObject {
1607                    object_info: Some(PbObjectInfo::Sink(
1608                        ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job.clone()).into(),
1609                    )),
1610                });
1611                if let Some((log_store_table_id, new_log_store_table_columns)) =
1612                    finish_sink_context.new_log_store_table
1613                {
1614                    let new_log_store_table_columns: ColumnCatalogArray =
1615                        new_log_store_table_columns.into();
1616                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1617                        .find_also_related(Object)
1618                        .one(txn)
1619                        .await?
1620                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1621                    Table::update(table::ActiveModel {
1622                        table_id: Set(log_store_table_id),
1623                        columns: Set(new_log_store_table_columns.clone()),
1624                        ..Default::default()
1625                    })
1626                    .exec(txn)
1627                    .await?;
1628                    table.columns = new_log_store_table_columns;
1629                    objects.push(PbObject {
1630                        object_info: Some(PbObjectInfo::Table(
1631                            ObjectModel(table, table_obj.unwrap(), sink_streaming_job.clone())
1632                                .into(),
1633                        )),
1634                    });
1635                }
1636            }
1637        }
1638
1639        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1640        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1641            notification_objs =
1642                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1643        }
1644
1645        Ok((objects, notification_objs))
1646    }
1647
1648    /// Abort the replacing streaming job by deleting the temporary job object.
1649    pub async fn try_abort_replacing_streaming_job(
1650        &self,
1651        tmp_job_id: JobId,
1652        tmp_sink_ids: Option<Vec<ObjectId>>,
1653    ) -> MetaResult<()> {
1654        let inner = self.inner.write().await;
1655        let txn = inner.db.begin().await?;
1656        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1657        if let Some(tmp_sink_ids) = tmp_sink_ids {
1658            for tmp_sink_id in tmp_sink_ids {
1659                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1660            }
1661        }
1662        txn.commit().await?;
1663        Ok(())
1664    }
1665
1666    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1667    // return the actor_ids to be applied
1668    pub async fn update_source_rate_limit_by_source_id(
1669        &self,
1670        source_id: SourceId,
1671        rate_limit: Option<u32>,
1672    ) -> MetaResult<(HashSet<JobId>, HashSet<FragmentId>)> {
1673        let inner = self.inner.read().await;
1674        let txn = inner.db.begin().await?;
1675
1676        {
1677            let active_source = source::ActiveModel {
1678                source_id: Set(source_id),
1679                rate_limit: Set(rate_limit.map(|v| v as i32)),
1680                ..Default::default()
1681            };
1682            Source::update(active_source).exec(&txn).await?;
1683        }
1684
1685        let (source, obj) = Source::find_by_id(source_id)
1686            .find_also_related(Object)
1687            .one(&txn)
1688            .await?
1689            .ok_or_else(|| {
1690                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1691            })?;
1692
1693        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1694        let streaming_job_ids: Vec<JobId> =
1695            if let Some(table_id) = source.optional_associated_table_id {
1696                vec![table_id.as_job_id()]
1697            } else if let Some(source_info) = &source.source_info
1698                && source_info.to_protobuf().is_shared()
1699            {
1700                vec![source_id.as_share_source_job_id()]
1701            } else {
1702                ObjectDependency::find()
1703                    .select_only()
1704                    .column(object_dependency::Column::UsedBy)
1705                    .filter(object_dependency::Column::Oid.eq(source_id))
1706                    .into_tuple()
1707                    .all(&txn)
1708                    .await?
1709            };
1710
1711        if streaming_job_ids.is_empty() {
1712            return Err(MetaError::invalid_parameter(format!(
1713                "source id {source_id} not used by any streaming job"
1714            )));
1715        }
1716
1717        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
1718            .select_only()
1719            .columns([
1720                fragment::Column::FragmentId,
1721                fragment::Column::JobId,
1722                fragment::Column::FragmentTypeMask,
1723                fragment::Column::StreamNode,
1724            ])
1725            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1726            .into_tuple()
1727            .all(&txn)
1728            .await?;
1729        let mut fragments = fragments
1730            .into_iter()
1731            .map(|(id, job_id, mask, stream_node)| {
1732                (
1733                    id,
1734                    job_id,
1735                    FragmentTypeMask::from(mask as u32),
1736                    stream_node.to_protobuf(),
1737                )
1738            })
1739            .collect_vec();
1740
1741        fragments.retain_mut(|(_, _, fragment_type_mask, stream_node)| {
1742            let mut found = false;
1743            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1744                visit_stream_node_mut(stream_node, |node| {
1745                    if let PbNodeBody::Source(node) = node
1746                        && let Some(node_inner) = &mut node.source_inner
1747                        && node_inner.source_id == source_id
1748                    {
1749                        node_inner.rate_limit = rate_limit;
1750                        found = true;
1751                    }
1752                });
1753            }
1754            if is_fs_source {
1755                // in older versions, there's no fragment type flag for `FsFetch` node,
1756                // so we just scan all fragments for StreamFsFetch node if using fs connector
1757                visit_stream_node_mut(stream_node, |node| {
1758                    if let PbNodeBody::StreamFsFetch(node) = node {
1759                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1760                        if let Some(node_inner) = &mut node.node_inner
1761                            && node_inner.source_id == source_id
1762                        {
1763                            node_inner.rate_limit = rate_limit;
1764                            found = true;
1765                        }
1766                    }
1767                });
1768            }
1769            found
1770        });
1771
1772        assert!(
1773            !fragments.is_empty(),
1774            "source id should be used by at least one fragment"
1775        );
1776
1777        let (fragment_ids, job_ids) = fragments
1778            .iter()
1779            .map(|(framgnet_id, job_id, _, _)| (framgnet_id, job_id))
1780            .unzip();
1781
1782        for (fragment_id, _, fragment_type_mask, stream_node) in fragments {
1783            Fragment::update(fragment::ActiveModel {
1784                fragment_id: Set(fragment_id),
1785                fragment_type_mask: Set(fragment_type_mask.into()),
1786                stream_node: Set(StreamNode::from(&stream_node)),
1787                ..Default::default()
1788            })
1789            .exec(&txn)
1790            .await?;
1791        }
1792
1793        txn.commit().await?;
1794
1795        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap(), None).into());
1796        let _version = self
1797            .notify_frontend(
1798                NotificationOperation::Update,
1799                NotificationInfo::ObjectGroup(PbObjectGroup {
1800                    objects: vec![PbObject {
1801                        object_info: Some(relation_info),
1802                    }],
1803                }),
1804            )
1805            .await;
1806
1807        Ok((job_ids, fragment_ids))
1808    }
1809
1810    // edit the content of fragments in given `table_id`
1811    // return the actor_ids to be applied
1812    pub async fn mutate_fragments_by_job_id(
1813        &self,
1814        job_id: JobId,
1815        // returns true if the mutation is applied
1816        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1817        // error message when no relevant fragments is found
1818        err_msg: &'static str,
1819    ) -> MetaResult<HashSet<FragmentId>> {
1820        let inner = self.inner.read().await;
1821        let txn = inner.db.begin().await?;
1822
1823        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1824            .select_only()
1825            .columns([
1826                fragment::Column::FragmentId,
1827                fragment::Column::FragmentTypeMask,
1828                fragment::Column::StreamNode,
1829            ])
1830            .filter(fragment::Column::JobId.eq(job_id))
1831            .into_tuple()
1832            .all(&txn)
1833            .await?;
1834        let mut fragments = fragments
1835            .into_iter()
1836            .map(|(id, mask, stream_node)| {
1837                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1838            })
1839            .collect_vec();
1840
1841        let fragments = fragments
1842            .iter_mut()
1843            .map(|(_, fragment_type_mask, stream_node)| {
1844                fragments_mutation_fn(*fragment_type_mask, stream_node)
1845            })
1846            .collect::<MetaResult<Vec<bool>>>()?
1847            .into_iter()
1848            .zip_eq_debug(std::mem::take(&mut fragments))
1849            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1850            .collect::<Vec<_>>();
1851
1852        if fragments.is_empty() {
1853            return Err(MetaError::invalid_parameter(format!(
1854                "job id {job_id}: {}",
1855                err_msg
1856            )));
1857        }
1858
1859        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1860        for (id, _, stream_node) in fragments {
1861            Fragment::update(fragment::ActiveModel {
1862                fragment_id: Set(id),
1863                stream_node: Set(StreamNode::from(&stream_node)),
1864                ..Default::default()
1865            })
1866            .exec(&txn)
1867            .await?;
1868        }
1869
1870        txn.commit().await?;
1871
1872        Ok(fragment_ids)
1873    }
1874
1875    async fn mutate_fragment_by_fragment_id(
1876        &self,
1877        fragment_id: FragmentId,
1878        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1879        err_msg: &'static str,
1880    ) -> MetaResult<()> {
1881        let inner = self.inner.read().await;
1882        let txn = inner.db.begin().await?;
1883
1884        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1885            Fragment::find_by_id(fragment_id)
1886                .select_only()
1887                .columns([
1888                    fragment::Column::FragmentTypeMask,
1889                    fragment::Column::StreamNode,
1890                ])
1891                .into_tuple()
1892                .one(&txn)
1893                .await?
1894                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1895        let mut pb_stream_node = stream_node.to_protobuf();
1896        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1897
1898        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1899            return Err(MetaError::invalid_parameter(format!(
1900                "fragment id {fragment_id}: {}",
1901                err_msg
1902            )));
1903        }
1904
1905        Fragment::update(fragment::ActiveModel {
1906            fragment_id: Set(fragment_id),
1907            stream_node: Set(stream_node),
1908            ..Default::default()
1909        })
1910        .exec(&txn)
1911        .await?;
1912
1913        txn.commit().await?;
1914
1915        Ok(())
1916    }
1917
1918    pub async fn update_backfill_orders_by_job_id(
1919        &self,
1920        job_id: JobId,
1921        backfill_orders: Option<BackfillOrders>,
1922    ) -> MetaResult<()> {
1923        let inner = self.inner.write().await;
1924        let txn = inner.db.begin().await?;
1925
1926        ensure_job_not_canceled(job_id, &txn).await?;
1927
1928        streaming_job::ActiveModel {
1929            job_id: Set(job_id),
1930            backfill_orders: Set(backfill_orders),
1931            ..Default::default()
1932        }
1933        .update(&txn)
1934        .await?;
1935
1936        txn.commit().await?;
1937
1938        Ok(())
1939    }
1940
1941    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1942    // return the actor_ids to be applied
1943    pub async fn update_backfill_rate_limit_by_job_id(
1944        &self,
1945        job_id: JobId,
1946        rate_limit: Option<u32>,
1947    ) -> MetaResult<HashSet<FragmentId>> {
1948        let update_backfill_rate_limit =
1949            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1950                let mut found = false;
1951                if fragment_type_mask
1952                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1953                {
1954                    visit_stream_node_mut(stream_node, |node| match node {
1955                        PbNodeBody::StreamCdcScan(node) => {
1956                            node.rate_limit = rate_limit;
1957                            found = true;
1958                        }
1959                        PbNodeBody::StreamScan(node) => {
1960                            node.rate_limit = rate_limit;
1961                            found = true;
1962                        }
1963                        PbNodeBody::SourceBackfill(node) => {
1964                            node.rate_limit = rate_limit;
1965                            found = true;
1966                        }
1967                        _ => {}
1968                    });
1969                }
1970                Ok(found)
1971            };
1972
1973        self.mutate_fragments_by_job_id(
1974            job_id,
1975            update_backfill_rate_limit,
1976            "stream scan node or source node not found",
1977        )
1978        .await
1979    }
1980
1981    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1982    // return the actor_ids to be applied
1983    pub async fn update_sink_rate_limit_by_job_id(
1984        &self,
1985        sink_id: SinkId,
1986        rate_limit: Option<u32>,
1987    ) -> MetaResult<HashSet<FragmentId>> {
1988        let update_sink_rate_limit =
1989            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1990                let mut found = Ok(false);
1991                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1992                    visit_stream_node_mut(stream_node, |node| {
1993                        if let PbNodeBody::Sink(node) = node {
1994                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1995                                found = Err(MetaError::invalid_parameter(
1996                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1997                                ));
1998                                return;
1999                            }
2000                            node.rate_limit = rate_limit;
2001                            found = Ok(true);
2002                        }
2003                    });
2004                }
2005                found
2006            };
2007
2008        self.mutate_fragments_by_job_id(
2009            sink_id.as_job_id(),
2010            update_sink_rate_limit,
2011            "sink node not found",
2012        )
2013        .await
2014    }
2015
2016    pub async fn update_dml_rate_limit_by_job_id(
2017        &self,
2018        job_id: JobId,
2019        rate_limit: Option<u32>,
2020    ) -> MetaResult<HashSet<FragmentId>> {
2021        let update_dml_rate_limit =
2022            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
2023                let mut found = false;
2024                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
2025                    visit_stream_node_mut(stream_node, |node| {
2026                        if let PbNodeBody::Dml(node) = node {
2027                            node.rate_limit = rate_limit;
2028                            found = true;
2029                        }
2030                    });
2031                }
2032                Ok(found)
2033            };
2034
2035        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
2036            .await
2037    }
2038
2039    pub async fn update_source_props_by_source_id(
2040        &self,
2041        source_id: SourceId,
2042        alter_props: BTreeMap<String, String>,
2043        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2044    ) -> MetaResult<WithOptionsSecResolved> {
2045        let inner = self.inner.read().await;
2046        let txn = inner.db.begin().await?;
2047
2048        let (source, _obj) = Source::find_by_id(source_id)
2049            .find_also_related(Object)
2050            .one(&txn)
2051            .await?
2052            .ok_or_else(|| {
2053                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2054            })?;
2055        let connector = source.with_properties.0.get_connector().unwrap();
2056        let is_shared_source = source.is_shared();
2057
2058        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2059        if !is_shared_source {
2060            // mv using non-shared source holds a copy of source in their fragments
2061            dep_source_job_ids = ObjectDependency::find()
2062                .select_only()
2063                .column(object_dependency::Column::UsedBy)
2064                .filter(object_dependency::Column::Oid.eq(source_id))
2065                .into_tuple()
2066                .all(&txn)
2067                .await?;
2068        }
2069
2070        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
2071        let prop_keys: Vec<String> = alter_props
2072            .keys()
2073            .chain(alter_secret_refs.keys())
2074            .cloned()
2075            .collect();
2076        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
2077            &connector, &prop_keys,
2078        )?;
2079
2080        let mut options_with_secret = WithOptionsSecResolved::new(
2081            source.with_properties.0.clone(),
2082            source
2083                .secret_ref
2084                .map(|secret_ref| secret_ref.to_protobuf())
2085                .unwrap_or_default(),
2086        );
2087        let (to_add_secret_dep, to_remove_secret_dep) =
2088            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
2089
2090        tracing::info!(
2091            "applying new properties to source: source_id={}, options_with_secret={:?}",
2092            source_id,
2093            options_with_secret
2094        );
2095        // check if the alter-ed props are valid for each Connector
2096        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
2097        // todo: validate via source manager
2098
2099        let mut associate_table_id = None;
2100
2101        // can be source_id or table_id
2102        // if updating an associated source, the preferred_id is the table_id
2103        // otherwise, it is the source_id
2104        let mut preferred_id = source_id.as_object_id();
2105        let rewrite_sql = {
2106            let definition = source.definition.clone();
2107
2108            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2109                .map_err(|e| {
2110                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
2111                        anyhow!(e).context("Failed to parse source definition SQL"),
2112                    )))
2113                })?
2114                .try_into()
2115                .unwrap();
2116
2117            /// Formats SQL options with secret values properly resolved
2118            ///
2119            /// This function processes configuration options that may contain sensitive data:
2120            /// - Plaintext options are directly converted to `SqlOption`
2121            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
2122            ///   without exposing the actual secret value
2123            ///
2124            /// # Arguments
2125            /// * `txn` - Database transaction for retrieving secrets
2126            /// * `options_with_secret` - Container of options with both plaintext and secret values
2127            ///
2128            /// # Returns
2129            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
2130            async fn format_with_option_secret_resolved(
2131                txn: &DatabaseTransaction,
2132                options_with_secret: &WithOptionsSecResolved,
2133            ) -> MetaResult<Vec<SqlOption>> {
2134                let mut options = Vec::new();
2135                for (k, v) in options_with_secret.as_plaintext() {
2136                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
2137                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2138                    options.push(sql_option);
2139                }
2140                for (k, v) in options_with_secret.as_secret() {
2141                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2142                        let sql_option =
2143                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2144                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2145                        options.push(sql_option);
2146                    } else {
2147                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2148                    }
2149                }
2150                Ok(options)
2151            }
2152
2153            match &mut stmt {
2154                Statement::CreateSource { stmt } => {
2155                    stmt.with_properties.0 =
2156                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2157                }
2158                Statement::CreateTable { with_options, .. } => {
2159                    *with_options =
2160                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2161                    associate_table_id = source.optional_associated_table_id;
2162                    preferred_id = associate_table_id.unwrap().as_object_id();
2163                }
2164                _ => unreachable!(),
2165            }
2166
2167            stmt.to_string()
2168        };
2169
2170        {
2171            // update secret dependencies
2172            if !to_add_secret_dep.is_empty() {
2173                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2174                    object_dependency::ActiveModel {
2175                        oid: Set(secret_id.into()),
2176                        used_by: Set(preferred_id),
2177                        ..Default::default()
2178                    }
2179                }))
2180                .exec(&txn)
2181                .await?;
2182            }
2183            if !to_remove_secret_dep.is_empty() {
2184                // todo: fix the filter logic
2185                let _ = ObjectDependency::delete_many()
2186                    .filter(
2187                        object_dependency::Column::Oid
2188                            .is_in(to_remove_secret_dep)
2189                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2190                    )
2191                    .exec(&txn)
2192                    .await?;
2193            }
2194        }
2195
2196        let active_source_model = source::ActiveModel {
2197            source_id: Set(source_id),
2198            definition: Set(rewrite_sql.clone()),
2199            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2200            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2201                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2202            ..Default::default()
2203        };
2204        Source::update(active_source_model).exec(&txn).await?;
2205
2206        if let Some(associate_table_id) = associate_table_id {
2207            // update the associated table statement accordly
2208            let active_table_model = table::ActiveModel {
2209                table_id: Set(associate_table_id),
2210                definition: Set(rewrite_sql),
2211                ..Default::default()
2212            };
2213            Table::update(active_table_model).exec(&txn).await?;
2214        }
2215
2216        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2217            // if updating table with connector, the fragment_id is table id
2218            associate_table_id.as_job_id()
2219        } else {
2220            source_id.as_share_source_job_id()
2221        }]
2222        .into_iter()
2223        .chain(dep_source_job_ids.into_iter())
2224        .collect_vec();
2225
2226        // update fragments
2227        update_connector_props_fragments(
2228            &txn,
2229            to_check_job_ids,
2230            FragmentTypeFlag::Source,
2231            |node, found| {
2232                if let PbNodeBody::Source(node) = node
2233                    && let Some(source_inner) = &mut node.source_inner
2234                {
2235                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2236                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2237                    *found = true;
2238                }
2239            },
2240            is_shared_source,
2241        )
2242        .await?;
2243
2244        let mut to_update_objs = Vec::with_capacity(2);
2245        let (source, obj) = Source::find_by_id(source_id)
2246            .find_also_related(Object)
2247            .one(&txn)
2248            .await?
2249            .ok_or_else(|| {
2250                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2251            })?;
2252        to_update_objs.push(PbObject {
2253            object_info: Some(PbObjectInfo::Source(
2254                ObjectModel(source, obj.unwrap(), None).into(),
2255            )),
2256        });
2257
2258        if let Some(associate_table_id) = associate_table_id {
2259            let (table, obj) = Table::find_by_id(associate_table_id)
2260                .find_also_related(Object)
2261                .one(&txn)
2262                .await?
2263                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2264            let streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2265                .one(&txn)
2266                .await?;
2267            to_update_objs.push(PbObject {
2268                object_info: Some(PbObjectInfo::Table(
2269                    ObjectModel(table, obj.unwrap(), streaming_job).into(),
2270                )),
2271            });
2272        }
2273
2274        txn.commit().await?;
2275
2276        self.notify_frontend(
2277            NotificationOperation::Update,
2278            NotificationInfo::ObjectGroup(PbObjectGroup {
2279                objects: to_update_objs,
2280            }),
2281        )
2282        .await;
2283
2284        Ok(options_with_secret)
2285    }
2286
2287    pub async fn update_sink_props_by_sink_id(
2288        &self,
2289        sink_id: SinkId,
2290        props: BTreeMap<String, String>,
2291    ) -> MetaResult<HashMap<String, String>> {
2292        let inner = self.inner.read().await;
2293        let txn = inner.db.begin().await?;
2294
2295        let (sink, _obj) = Sink::find_by_id(sink_id)
2296            .find_also_related(Object)
2297            .one(&txn)
2298            .await?
2299            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2300        validate_sink_props(&sink, &props)?;
2301        let definition = sink.definition.clone();
2302        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2303            .map_err(|e| SinkError::Config(anyhow!(e)))?
2304            .try_into()
2305            .unwrap();
2306        if let Statement::CreateSink { stmt } = &mut stmt {
2307            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2308        } else {
2309            panic!("definition is not a create sink statement")
2310        }
2311        let mut new_config = sink.properties.clone().into_inner();
2312        new_config.extend(props.clone());
2313
2314        let definition = stmt.to_string();
2315        let active_sink = sink::ActiveModel {
2316            sink_id: Set(sink_id),
2317            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2318            definition: Set(definition),
2319            ..Default::default()
2320        };
2321        Sink::update(active_sink).exec(&txn).await?;
2322
2323        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2324        let (sink, obj) = Sink::find_by_id(sink_id)
2325            .find_also_related(Object)
2326            .one(&txn)
2327            .await?
2328            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2329        let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2330            .one(&txn)
2331            .await?;
2332        txn.commit().await?;
2333        let relation_infos = vec![PbObject {
2334            object_info: Some(PbObjectInfo::Sink(
2335                ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2336            )),
2337        }];
2338
2339        let _version = self
2340            .notify_frontend(
2341                NotificationOperation::Update,
2342                NotificationInfo::ObjectGroup(PbObjectGroup {
2343                    objects: relation_infos,
2344                }),
2345            )
2346            .await;
2347
2348        Ok(props.into_iter().collect())
2349    }
2350
2351    pub async fn update_iceberg_table_props_by_table_id(
2352        &self,
2353        table_id: TableId,
2354        props: BTreeMap<String, String>,
2355        alter_iceberg_table_props: Option<
2356            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2357        >,
2358    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2359        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2360            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2361        let inner = self.inner.read().await;
2362        let txn = inner.db.begin().await?;
2363
2364        let (sink, _obj) = Sink::find_by_id(sink_id)
2365            .find_also_related(Object)
2366            .one(&txn)
2367            .await?
2368            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2369        validate_sink_props(&sink, &props)?;
2370
2371        let definition = sink.definition.clone();
2372        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2373            .map_err(|e| SinkError::Config(anyhow!(e)))?
2374            .try_into()
2375            .unwrap();
2376        if let Statement::CreateTable {
2377            with_options,
2378            engine,
2379            ..
2380        } = &mut stmt
2381        {
2382            if !matches!(engine, Engine::Iceberg) {
2383                return Err(SinkError::Config(anyhow!(
2384                    "only iceberg table can be altered as sink"
2385                ))
2386                .into());
2387            }
2388            update_stmt_with_props(with_options, &props)?;
2389        } else {
2390            panic!("definition is not a create iceberg table statement")
2391        }
2392        let mut new_config = sink.properties.clone().into_inner();
2393        new_config.extend(props.clone());
2394
2395        let definition = stmt.to_string();
2396        let active_sink = sink::ActiveModel {
2397            sink_id: Set(sink_id),
2398            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2399            definition: Set(definition.clone()),
2400            ..Default::default()
2401        };
2402        let active_source = source::ActiveModel {
2403            source_id: Set(source_id),
2404            definition: Set(definition.clone()),
2405            ..Default::default()
2406        };
2407        let active_table = table::ActiveModel {
2408            table_id: Set(table_id),
2409            definition: Set(definition),
2410            ..Default::default()
2411        };
2412        Sink::update(active_sink).exec(&txn).await?;
2413        Source::update(active_source).exec(&txn).await?;
2414        Table::update(active_table).exec(&txn).await?;
2415
2416        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2417
2418        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2419            .find_also_related(Object)
2420            .one(&txn)
2421            .await?
2422            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2423        let sink_streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2424            .one(&txn)
2425            .await?;
2426        let (source, source_obj) = Source::find_by_id(source_id)
2427            .find_also_related(Object)
2428            .one(&txn)
2429            .await?
2430            .ok_or_else(|| {
2431                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2432            })?;
2433        let (table, table_obj) = Table::find_by_id(table_id)
2434            .find_also_related(Object)
2435            .one(&txn)
2436            .await?
2437            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2438        let table_streaming_job = streaming_job::Entity::find_by_id(table.job_id())
2439            .one(&txn)
2440            .await?;
2441        txn.commit().await?;
2442        let relation_infos = vec![
2443            PbObject {
2444                object_info: Some(PbObjectInfo::Sink(
2445                    ObjectModel(sink, sink_obj.unwrap(), sink_streaming_job).into(),
2446                )),
2447            },
2448            PbObject {
2449                object_info: Some(PbObjectInfo::Source(
2450                    ObjectModel(source, source_obj.unwrap(), None).into(),
2451                )),
2452            },
2453            PbObject {
2454                object_info: Some(PbObjectInfo::Table(
2455                    ObjectModel(table, table_obj.unwrap(), table_streaming_job).into(),
2456                )),
2457            },
2458        ];
2459        let _version = self
2460            .notify_frontend(
2461                NotificationOperation::Update,
2462                NotificationInfo::ObjectGroup(PbObjectGroup {
2463                    objects: relation_infos,
2464                }),
2465            )
2466            .await;
2467
2468        Ok((props.into_iter().collect(), sink_id))
2469    }
2470
2471    /// Update connection properties and all dependent sources/sinks in a single transaction
2472    pub async fn update_connection_and_dependent_objects_props(
2473        &self,
2474        connection_id: ConnectionId,
2475        alter_props: BTreeMap<String, String>,
2476        alter_secret_refs: BTreeMap<String, PbSecretRef>,
2477    ) -> MetaResult<(
2478        WithOptionsSecResolved,                   // Connection's new properties
2479        Vec<(SourceId, HashMap<String, String>)>, // Source ID and their complete properties
2480        Vec<(SinkId, HashMap<String, String>)>,   // Sink ID and their complete properties
2481    )> {
2482        let inner = self.inner.read().await;
2483        let txn = inner.db.begin().await?;
2484
2485        // Find all dependent sources and sinks first
2486        let dependent_sources: Vec<SourceId> = Source::find()
2487            .select_only()
2488            .column(source::Column::SourceId)
2489            .filter(source::Column::ConnectionId.eq(connection_id))
2490            .into_tuple()
2491            .all(&txn)
2492            .await?;
2493
2494        let dependent_sinks: Vec<SinkId> = Sink::find()
2495            .select_only()
2496            .column(sink::Column::SinkId)
2497            .filter(sink::Column::ConnectionId.eq(connection_id))
2498            .into_tuple()
2499            .all(&txn)
2500            .await?;
2501
2502        let (connection_catalog, _obj) = Connection::find_by_id(connection_id)
2503            .find_also_related(Object)
2504            .one(&txn)
2505            .await?
2506            .ok_or_else(|| {
2507                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2508            })?;
2509
2510        // Validate that props can be altered
2511        let prop_keys: Vec<String> = alter_props
2512            .keys()
2513            .chain(alter_secret_refs.keys())
2514            .cloned()
2515            .collect();
2516
2517        // Map the connection type enum to the string name expected by the validation function
2518        let connection_type_str = pb_connection_type_to_connection_type(
2519            &connection_catalog.params.to_protobuf().connection_type(),
2520        )
2521        .ok_or_else(|| MetaError::invalid_parameter("Unspecified connection type"))?;
2522
2523        risingwave_connector::allow_alter_on_fly_fields::check_connection_allow_alter_on_fly_fields(
2524            connection_type_str, &prop_keys,
2525        )?;
2526
2527        let connection_pb = connection_catalog.params.to_protobuf();
2528        let mut connection_options_with_secret = WithOptionsSecResolved::new(
2529            connection_pb.properties.into_iter().collect(),
2530            connection_pb.secret_refs.into_iter().collect(),
2531        );
2532
2533        let (to_add_secret_dep, to_remove_secret_dep) = connection_options_with_secret
2534            .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2535
2536        tracing::debug!(
2537            "applying new properties to connection and dependents: connection_id={}, sources={:?}, sinks={:?}",
2538            connection_id,
2539            dependent_sources,
2540            dependent_sinks
2541        );
2542
2543        // Validate connection
2544        {
2545            let conn_params_pb = risingwave_pb::catalog::ConnectionParams {
2546                connection_type: connection_pb.connection_type,
2547                properties: connection_options_with_secret
2548                    .as_plaintext()
2549                    .clone()
2550                    .into_iter()
2551                    .collect(),
2552                secret_refs: connection_options_with_secret
2553                    .as_secret()
2554                    .clone()
2555                    .into_iter()
2556                    .collect(),
2557            };
2558            let connection = PbConnection {
2559                id: connection_id as _,
2560                info: Some(risingwave_pb::catalog::connection::Info::ConnectionParams(
2561                    conn_params_pb,
2562                )),
2563                ..Default::default()
2564            };
2565            validate_connection(&connection).await?;
2566        }
2567
2568        // Update connection secret dependencies
2569        if !to_add_secret_dep.is_empty() {
2570            ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2571                object_dependency::ActiveModel {
2572                    oid: Set(secret_id.into()),
2573                    used_by: Set(connection_id.as_object_id()),
2574                    ..Default::default()
2575                }
2576            }))
2577            .exec(&txn)
2578            .await?;
2579        }
2580        if !to_remove_secret_dep.is_empty() {
2581            let _ = ObjectDependency::delete_many()
2582                .filter(
2583                    object_dependency::Column::Oid
2584                        .is_in(to_remove_secret_dep)
2585                        .and(object_dependency::Column::UsedBy.eq(connection_id.as_object_id())),
2586                )
2587                .exec(&txn)
2588                .await?;
2589        }
2590
2591        // Update the connection with new properties
2592        let updated_connection_params = risingwave_pb::catalog::ConnectionParams {
2593            connection_type: connection_pb.connection_type,
2594            properties: connection_options_with_secret
2595                .as_plaintext()
2596                .clone()
2597                .into_iter()
2598                .collect(),
2599            secret_refs: connection_options_with_secret
2600                .as_secret()
2601                .clone()
2602                .into_iter()
2603                .collect(),
2604        };
2605        let active_connection_model = connection::ActiveModel {
2606            connection_id: Set(connection_id),
2607            params: Set(ConnectionParams::from(&updated_connection_params)),
2608            ..Default::default()
2609        };
2610        Connection::update(active_connection_model)
2611            .exec(&txn)
2612            .await?;
2613
2614        // Batch update dependent sources and collect their complete properties
2615        let mut updated_sources_with_props: Vec<(SourceId, HashMap<String, String>)> = Vec::new();
2616
2617        if !dependent_sources.is_empty() {
2618            // Batch fetch all dependent sources
2619            let sources_with_objs = Source::find()
2620                .find_also_related(Object)
2621                .filter(source::Column::SourceId.is_in(dependent_sources.iter().cloned()))
2622                .all(&txn)
2623                .await?;
2624
2625            // Prepare batch updates
2626            let mut source_updates = Vec::new();
2627            let mut fragment_updates: Vec<DependentSourceFragmentUpdate> = Vec::new();
2628
2629            for (source, _obj) in sources_with_objs {
2630                let source_id = source.source_id;
2631
2632                let mut source_options_with_secret = WithOptionsSecResolved::new(
2633                    source.with_properties.0.clone(),
2634                    source
2635                        .secret_ref
2636                        .clone()
2637                        .map(|secret_ref| secret_ref.to_protobuf())
2638                        .unwrap_or_default(),
2639                );
2640                let (_source_to_add_secret_dep, _source_to_remove_secret_dep) =
2641                    source_options_with_secret
2642                        .handle_update(alter_props.clone(), alter_secret_refs.clone())?;
2643
2644                // Validate the updated source properties
2645                let _ = ConnectorProperties::extract(source_options_with_secret.clone(), true)?;
2646
2647                // Prepare source update
2648                let active_source = source::ActiveModel {
2649                    source_id: Set(source_id),
2650                    with_properties: Set(Property(
2651                        source_options_with_secret.as_plaintext().clone(),
2652                    )),
2653                    secret_ref: Set((!source_options_with_secret.as_secret().is_empty()).then(
2654                        || {
2655                            risingwave_meta_model::SecretRef::from(
2656                                source_options_with_secret.as_secret().clone(),
2657                            )
2658                        },
2659                    )),
2660                    ..Default::default()
2661                };
2662                source_updates.push(active_source);
2663
2664                // Prepare fragment update:
2665                // - If the source is a table-associated source, update fragments for the table job.
2666                // - Otherwise update the shared source job.
2667                // - For non-shared sources, also update any dependent streaming jobs that embed a copy.
2668                let is_shared_source = source.is_shared();
2669                let mut dep_source_job_ids: Vec<JobId> = Vec::new();
2670                if !is_shared_source {
2671                    dep_source_job_ids = ObjectDependency::find()
2672                        .select_only()
2673                        .column(object_dependency::Column::UsedBy)
2674                        .filter(object_dependency::Column::Oid.eq(source_id))
2675                        .into_tuple()
2676                        .all(&txn)
2677                        .await?;
2678                }
2679
2680                let base_job_id =
2681                    if let Some(associate_table_id) = source.optional_associated_table_id {
2682                        associate_table_id.as_job_id()
2683                    } else {
2684                        source_id.as_share_source_job_id()
2685                    };
2686                let job_ids = vec![base_job_id]
2687                    .into_iter()
2688                    .chain(dep_source_job_ids.into_iter())
2689                    .collect_vec();
2690
2691                fragment_updates.push(DependentSourceFragmentUpdate {
2692                    job_ids,
2693                    with_properties: source_options_with_secret.as_plaintext().clone(),
2694                    secret_refs: source_options_with_secret.as_secret().clone(),
2695                    is_shared_source,
2696                });
2697
2698                // Collect the complete properties for runtime broadcast
2699                let complete_source_props = LocalSecretManager::global()
2700                    .fill_secrets(
2701                        source_options_with_secret.as_plaintext().clone(),
2702                        source_options_with_secret.as_secret().clone(),
2703                    )
2704                    .map_err(MetaError::from)?
2705                    .into_iter()
2706                    .collect::<HashMap<String, String>>();
2707                updated_sources_with_props.push((source_id, complete_source_props));
2708            }
2709
2710            for source_update in source_updates {
2711                Source::update(source_update).exec(&txn).await?;
2712            }
2713
2714            // Batch execute fragment updates
2715            for DependentSourceFragmentUpdate {
2716                job_ids,
2717                with_properties,
2718                secret_refs,
2719                is_shared_source,
2720            } in fragment_updates
2721            {
2722                update_connector_props_fragments(
2723                    &txn,
2724                    job_ids,
2725                    FragmentTypeFlag::Source,
2726                    |node, found| {
2727                        if let PbNodeBody::Source(node) = node
2728                            && let Some(source_inner) = &mut node.source_inner
2729                        {
2730                            source_inner.with_properties = with_properties.clone();
2731                            source_inner.secret_refs = secret_refs.clone();
2732                            *found = true;
2733                        }
2734                    },
2735                    is_shared_source,
2736                )
2737                .await?;
2738            }
2739        }
2740
2741        // Batch update dependent sinks and collect their complete properties
2742        let mut updated_sinks_with_props: Vec<(SinkId, HashMap<String, String>)> = Vec::new();
2743
2744        if !dependent_sinks.is_empty() {
2745            // Batch fetch all dependent sinks
2746            let sinks_with_objs = Sink::find()
2747                .find_also_related(Object)
2748                .filter(sink::Column::SinkId.is_in(dependent_sinks.iter().cloned()))
2749                .all(&txn)
2750                .await?;
2751
2752            // Prepare batch updates
2753            let mut sink_updates = Vec::new();
2754            let mut sink_fragment_updates = Vec::new();
2755
2756            for (sink, _obj) in sinks_with_objs {
2757                let sink_id = sink.sink_id;
2758
2759                // Validate that sink props can be altered
2760                match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2761                    Some(connector) => {
2762                        let connector_type = connector.to_lowercase();
2763                        check_sink_allow_alter_on_fly_fields(&connector_type, &prop_keys)
2764                            .map_err(|e| SinkError::Config(anyhow!(e)))?;
2765
2766                        match_sink_name_str!(
2767                            connector_type.as_str(),
2768                            SinkType,
2769                            {
2770                                let mut new_sink_props = sink.properties.0.clone();
2771                                new_sink_props.extend(alter_props.clone());
2772                                SinkType::validate_alter_config(&new_sink_props)
2773                            },
2774                            |sink: &str| Err(SinkError::Config(anyhow!(
2775                                "unsupported sink type {}",
2776                                sink
2777                            )))
2778                        )?
2779                    }
2780                    None => {
2781                        return Err(SinkError::Config(anyhow!(
2782                            "connector not specified when alter sink"
2783                        ))
2784                        .into());
2785                    }
2786                };
2787
2788                let mut new_sink_props = sink.properties.0.clone();
2789                new_sink_props.extend(alter_props.clone());
2790
2791                // Prepare sink update
2792                let active_sink = sink::ActiveModel {
2793                    sink_id: Set(sink_id),
2794                    properties: Set(risingwave_meta_model::Property(new_sink_props.clone())),
2795                    ..Default::default()
2796                };
2797                sink_updates.push(active_sink);
2798
2799                // Prepare fragment updates for this sink
2800                sink_fragment_updates.push((sink_id, new_sink_props.clone()));
2801
2802                // Collect the complete properties for runtime broadcast
2803                let complete_sink_props: HashMap<String, String> =
2804                    new_sink_props.into_iter().collect();
2805                updated_sinks_with_props.push((sink_id, complete_sink_props));
2806            }
2807
2808            // Batch execute sink updates
2809            for sink_update in sink_updates {
2810                Sink::update(sink_update).exec(&txn).await?;
2811            }
2812
2813            // Batch execute sink fragment updates using the reusable function
2814            for (sink_id, new_sink_props) in sink_fragment_updates {
2815                update_connector_props_fragments(
2816                    &txn,
2817                    vec![sink_id.as_job_id()],
2818                    FragmentTypeFlag::Sink,
2819                    |node, found| {
2820                        if let PbNodeBody::Sink(node) = node
2821                            && let Some(sink_desc) = &mut node.sink_desc
2822                            && sink_desc.id == sink_id.as_raw_id()
2823                        {
2824                            sink_desc.properties = new_sink_props.clone();
2825                            *found = true;
2826                        }
2827                    },
2828                    true,
2829                )
2830                .await?;
2831            }
2832        }
2833
2834        // Collect all updated objects for frontend notification
2835        let mut updated_objects = Vec::new();
2836
2837        // Add connection
2838        let (connection, obj) = Connection::find_by_id(connection_id)
2839            .find_also_related(Object)
2840            .one(&txn)
2841            .await?
2842            .ok_or_else(|| {
2843                MetaError::catalog_id_not_found(ObjectType::Connection.as_str(), connection_id)
2844            })?;
2845        updated_objects.push(PbObject {
2846            object_info: Some(PbObjectInfo::Connection(
2847                ObjectModel(connection, obj.unwrap(), None).into(),
2848            )),
2849        });
2850
2851        // Add sources
2852        for source_id in &dependent_sources {
2853            let (source, obj) = Source::find_by_id(*source_id)
2854                .find_also_related(Object)
2855                .one(&txn)
2856                .await?
2857                .ok_or_else(|| {
2858                    MetaError::catalog_id_not_found(ObjectType::Source.as_str(), *source_id)
2859                })?;
2860            updated_objects.push(PbObject {
2861                object_info: Some(PbObjectInfo::Source(
2862                    ObjectModel(source, obj.unwrap(), None).into(),
2863                )),
2864            });
2865        }
2866
2867        // Add sinks
2868        for sink_id in &dependent_sinks {
2869            let (sink, obj) = Sink::find_by_id(*sink_id)
2870                .find_also_related(Object)
2871                .one(&txn)
2872                .await?
2873                .ok_or_else(|| {
2874                    MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), *sink_id)
2875                })?;
2876            let streaming_job = streaming_job::Entity::find_by_id(sink.sink_id.as_job_id())
2877                .one(&txn)
2878                .await?;
2879            updated_objects.push(PbObject {
2880                object_info: Some(PbObjectInfo::Sink(
2881                    ObjectModel(sink, obj.unwrap(), streaming_job).into(),
2882                )),
2883            });
2884        }
2885
2886        // Commit the transaction
2887        txn.commit().await?;
2888
2889        // Notify frontend about all updated objects
2890        if !updated_objects.is_empty() {
2891            self.notify_frontend(
2892                NotificationOperation::Update,
2893                NotificationInfo::ObjectGroup(PbObjectGroup {
2894                    objects: updated_objects,
2895                }),
2896            )
2897            .await;
2898        }
2899
2900        Ok((
2901            connection_options_with_secret,
2902            updated_sources_with_props,
2903            updated_sinks_with_props,
2904        ))
2905    }
2906
2907    pub async fn update_fragment_rate_limit_by_fragment_id(
2908        &self,
2909        fragment_id: FragmentId,
2910        rate_limit: Option<u32>,
2911    ) -> MetaResult<()> {
2912        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2913                                 stream_node: &mut PbStreamNode| {
2914            let mut found = false;
2915            if fragment_type_mask.contains_any(
2916                FragmentTypeFlag::dml_rate_limit_fragments()
2917                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2918                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2919                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2920            ) {
2921                visit_stream_node_mut(stream_node, |node| {
2922                    if let PbNodeBody::Dml(node) = node {
2923                        node.rate_limit = rate_limit;
2924                        found = true;
2925                    }
2926                    if let PbNodeBody::Sink(node) = node {
2927                        node.rate_limit = rate_limit;
2928                        found = true;
2929                    }
2930                    if let PbNodeBody::StreamCdcScan(node) = node {
2931                        node.rate_limit = rate_limit;
2932                        found = true;
2933                    }
2934                    if let PbNodeBody::StreamScan(node) = node {
2935                        node.rate_limit = rate_limit;
2936                        found = true;
2937                    }
2938                    if let PbNodeBody::SourceBackfill(node) = node {
2939                        node.rate_limit = rate_limit;
2940                        found = true;
2941                    }
2942                });
2943            }
2944            found
2945        };
2946        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2947            .await
2948    }
2949
2950    /// Note: `FsFetch` created in old versions are not included.
2951    /// Since this is only used for debugging, it should be fine.
2952    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2953        let inner = self.inner.read().await;
2954        let txn = inner.db.begin().await?;
2955
2956        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2957            .select_only()
2958            .columns([
2959                fragment::Column::FragmentId,
2960                fragment::Column::JobId,
2961                fragment::Column::FragmentTypeMask,
2962                fragment::Column::StreamNode,
2963            ])
2964            .filter(FragmentTypeMask::intersects_any(
2965                FragmentTypeFlag::rate_limit_fragments(),
2966            ))
2967            .into_tuple()
2968            .all(&txn)
2969            .await?;
2970
2971        let mut rate_limits = Vec::new();
2972        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2973            let stream_node = stream_node.to_protobuf();
2974            visit_stream_node_body(&stream_node, |node| {
2975                let mut rate_limit = None;
2976                let mut node_name = None;
2977
2978                match node {
2979                    // source rate limit
2980                    PbNodeBody::Source(node) => {
2981                        if let Some(node_inner) = &node.source_inner {
2982                            rate_limit = node_inner.rate_limit;
2983                            node_name = Some("SOURCE");
2984                        }
2985                    }
2986                    PbNodeBody::StreamFsFetch(node) => {
2987                        if let Some(node_inner) = &node.node_inner {
2988                            rate_limit = node_inner.rate_limit;
2989                            node_name = Some("FS_FETCH");
2990                        }
2991                    }
2992                    // backfill rate limit
2993                    PbNodeBody::SourceBackfill(node) => {
2994                        rate_limit = node.rate_limit;
2995                        node_name = Some("SOURCE_BACKFILL");
2996                    }
2997                    PbNodeBody::StreamScan(node) => {
2998                        rate_limit = node.rate_limit;
2999                        node_name = Some("STREAM_SCAN");
3000                    }
3001                    PbNodeBody::StreamCdcScan(node) => {
3002                        rate_limit = node.rate_limit;
3003                        node_name = Some("STREAM_CDC_SCAN");
3004                    }
3005                    PbNodeBody::Sink(node) => {
3006                        rate_limit = node.rate_limit;
3007                        node_name = Some("SINK");
3008                    }
3009                    _ => {}
3010                }
3011
3012                if let Some(rate_limit) = rate_limit {
3013                    rate_limits.push(RateLimitInfo {
3014                        fragment_id,
3015                        job_id,
3016                        fragment_type_mask: fragment_type_mask as u32,
3017                        rate_limit,
3018                        node_name: node_name.unwrap().to_owned(),
3019                    });
3020                }
3021            });
3022        }
3023
3024        Ok(rate_limits)
3025    }
3026}
3027
3028fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
3029    // Validate that props can be altered
3030    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
3031        Some(connector) => {
3032            let connector_type = connector.to_lowercase();
3033            let field_names: Vec<String> = props.keys().cloned().collect();
3034            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
3035                .map_err(|e| SinkError::Config(anyhow!(e)))?;
3036
3037            match_sink_name_str!(
3038                connector_type.as_str(),
3039                SinkType,
3040                {
3041                    let mut new_props = sink.properties.0.clone();
3042                    new_props.extend(props.clone());
3043                    SinkType::validate_alter_config(&new_props)
3044                },
3045                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
3046            )?
3047        }
3048        None => {
3049            return Err(
3050                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
3051            );
3052        }
3053    };
3054    Ok(())
3055}
3056
3057fn update_stmt_with_props(
3058    with_properties: &mut Vec<SqlOption>,
3059    props: &BTreeMap<String, String>,
3060) -> MetaResult<()> {
3061    let mut new_sql_options = with_properties
3062        .iter()
3063        .map(|sql_option| (&sql_option.name, sql_option))
3064        .collect::<IndexMap<_, _>>();
3065    let add_sql_options = props
3066        .iter()
3067        .map(|(k, v)| SqlOption::try_from((k, v)))
3068        .collect::<Result<Vec<SqlOption>, ParserError>>()
3069        .map_err(|e| SinkError::Config(anyhow!(e)))?;
3070    new_sql_options.extend(
3071        add_sql_options
3072            .iter()
3073            .map(|sql_option| (&sql_option.name, sql_option)),
3074    );
3075    *with_properties = new_sql_options.into_values().cloned().collect();
3076    Ok(())
3077}
3078
3079async fn update_sink_fragment_props(
3080    txn: &DatabaseTransaction,
3081    sink_id: SinkId,
3082    props: BTreeMap<String, String>,
3083) -> MetaResult<()> {
3084    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
3085        .select_only()
3086        .columns([
3087            fragment::Column::FragmentId,
3088            fragment::Column::FragmentTypeMask,
3089            fragment::Column::StreamNode,
3090        ])
3091        .filter(fragment::Column::JobId.eq(sink_id))
3092        .into_tuple()
3093        .all(txn)
3094        .await?;
3095    let fragments = fragments
3096        .into_iter()
3097        .filter(|(_, fragment_type_mask, _)| {
3098            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
3099        })
3100        .filter_map(|(id, _, stream_node)| {
3101            let mut stream_node = stream_node.to_protobuf();
3102            let mut found = false;
3103            visit_stream_node_mut(&mut stream_node, |node| {
3104                if let PbNodeBody::Sink(node) = node
3105                    && let Some(sink_desc) = &mut node.sink_desc
3106                    && sink_desc.id == sink_id
3107                {
3108                    sink_desc.properties.extend(props.clone());
3109                    found = true;
3110                }
3111            });
3112            if found { Some((id, stream_node)) } else { None }
3113        })
3114        .collect_vec();
3115    assert!(
3116        !fragments.is_empty(),
3117        "sink id should be used by at least one fragment"
3118    );
3119    for (id, stream_node) in fragments {
3120        Fragment::update(fragment::ActiveModel {
3121            fragment_id: Set(id),
3122            stream_node: Set(StreamNode::from(&stream_node)),
3123            ..Default::default()
3124        })
3125        .exec(txn)
3126        .await?;
3127    }
3128    Ok(())
3129}
3130
3131pub struct SinkIntoTableContext {
3132    /// For alter table (e.g., add column), this is the list of existing sink ids
3133    /// otherwise empty.
3134    pub updated_sink_catalogs: Vec<SinkId>,
3135}
3136
3137pub struct FinishAutoRefreshSchemaSinkContext {
3138    pub tmp_sink_id: SinkId,
3139    pub original_sink_id: SinkId,
3140    pub columns: Vec<PbColumnCatalog>,
3141    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
3142}
3143
3144async fn update_connector_props_fragments<F>(
3145    txn: &DatabaseTransaction,
3146    job_ids: Vec<JobId>,
3147    expect_flag: FragmentTypeFlag,
3148    mut alter_stream_node_fn: F,
3149    is_shared_source: bool,
3150) -> MetaResult<()>
3151where
3152    F: FnMut(&mut PbNodeBody, &mut bool),
3153{
3154    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
3155        .select_only()
3156        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
3157        .filter(
3158            fragment::Column::JobId
3159                .is_in(job_ids.clone())
3160                .and(FragmentTypeMask::intersects(expect_flag)),
3161        )
3162        .into_tuple()
3163        .all(txn)
3164        .await?;
3165    let fragments = fragments
3166        .into_iter()
3167        .filter_map(|(id, stream_node)| {
3168            let mut stream_node = stream_node.to_protobuf();
3169            let mut found = false;
3170            visit_stream_node_mut(&mut stream_node, |node| {
3171                alter_stream_node_fn(node, &mut found);
3172            });
3173            if found { Some((id, stream_node)) } else { None }
3174        })
3175        .collect_vec();
3176    if is_shared_source || job_ids.len() > 1 {
3177        // the first element is the source_id or associated table_id
3178        // if the source is non-shared, there is no updated fragments
3179        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
3180        assert!(
3181            !fragments.is_empty(),
3182            "job ids {:?} (type: {:?}) should be used by at least one fragment",
3183            job_ids,
3184            expect_flag
3185        );
3186    }
3187
3188    for (id, stream_node) in fragments {
3189        Fragment::update(fragment::ActiveModel {
3190            fragment_id: Set(id),
3191            stream_node: Set(StreamNode::from(&stream_node)),
3192            ..Default::default()
3193        })
3194        .exec(txn)
3195        .await?;
3196    }
3197
3198    Ok(())
3199}