risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::id::JobId;
25use risingwave_common::util::iter_util::ZipEqDebug;
26use risingwave_common::util::stream_graph_visitor::{
27    visit_stream_node_body, visit_stream_node_mut,
28};
29use risingwave_common::{bail, current_cluster_version};
30use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
31use risingwave_connector::error::ConnectorError;
32use risingwave_connector::sink::file_sink::fs::FsSink;
33use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
34use risingwave_connector::source::ConnectorProperties;
35use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::{PbCreateType, PbTable};
42use risingwave_pb::meta::alter_connector_props_request::AlterIcebergTableIds;
43use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
47};
48use risingwave_pb::meta::{PbObject, PbObjectGroup};
49use risingwave_pb::plan_common::PbColumnCatalog;
50use risingwave_pb::secret::PbSecretRef;
51use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
52use risingwave_pb::stream_plan::stream_node::PbNodeBody;
53use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
54use risingwave_pb::user::PbUserInfo;
55use risingwave_sqlparser::ast::{Engine, SqlOption, Statement};
56use risingwave_sqlparser::parser::{Parser, ParserError};
57use sea_orm::ActiveValue::Set;
58use sea_orm::sea_query::{Expr, Query, SimpleExpr};
59use sea_orm::{
60    ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType,
61    ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, TransactionTrait,
62};
63use thiserror_ext::AsReport;
64
65use super::rename::IndexItemRewriter;
66use crate::barrier::{Command, SharedFragmentInfo};
67use crate::controller::ObjectModel;
68use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
69use crate::controller::fragment::FragmentTypeMaskExt;
70use crate::controller::utils::{
71    PartialObject, build_object_group_for_delete, check_if_belongs_to_iceberg_table,
72    check_relation_name_duplicate, check_sink_into_table_cycle, ensure_job_not_canceled,
73    ensure_object_id, ensure_user_id, fetch_target_fragments, get_internal_tables_by_id,
74    get_table_columns, grant_default_privileges_automatically, insert_fragment_relations,
75    list_user_info_by_ids, try_get_iceberg_table_by_downstream_sink,
76};
77use crate::error::MetaErrorInner;
78use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
79use crate::model::{
80    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamContext, StreamJobFragments,
81    StreamJobFragmentsToCreate,
82};
83use crate::stream::SplitAssignment;
84use crate::{MetaError, MetaResult};
85
86impl CatalogController {
87    pub async fn create_streaming_job_obj(
88        txn: &DatabaseTransaction,
89        obj_type: ObjectType,
90        owner_id: UserId,
91        database_id: Option<DatabaseId>,
92        schema_id: Option<SchemaId>,
93        create_type: PbCreateType,
94        ctx: StreamContext,
95        streaming_parallelism: StreamingParallelism,
96        max_parallelism: usize,
97        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
98    ) -> MetaResult<JobId> {
99        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
100        let job_id = obj.oid.as_job_id();
101        let job = streaming_job::ActiveModel {
102            job_id: Set(job_id),
103            job_status: Set(JobStatus::Initial),
104            create_type: Set(create_type.into()),
105            timezone: Set(ctx.timezone),
106            config_override: Set(Some(ctx.config_override.to_string())),
107            parallelism: Set(streaming_parallelism),
108            max_parallelism: Set(max_parallelism as _),
109            specific_resource_group: Set(specific_resource_group),
110        };
111        job.insert(txn).await?;
112
113        Ok(job_id)
114    }
115
116    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
117    /// materialized view.
118    ///
119    /// Some of the fields in the given streaming job are placeholders, which will
120    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
121    #[await_tree::instrument]
122    pub async fn create_job_catalog(
123        &self,
124        streaming_job: &mut StreamingJob,
125        ctx: &StreamContext,
126        parallelism: &Option<Parallelism>,
127        max_parallelism: usize,
128        mut dependencies: HashSet<ObjectId>,
129        specific_resource_group: Option<String>,
130    ) -> MetaResult<()> {
131        let inner = self.inner.write().await;
132        let txn = inner.db.begin().await?;
133        let create_type = streaming_job.create_type();
134
135        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
136            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
137            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
138            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
139        };
140
141        ensure_user_id(streaming_job.owner() as _, &txn).await?;
142        ensure_object_id(ObjectType::Database, streaming_job.database_id(), &txn).await?;
143        ensure_object_id(ObjectType::Schema, streaming_job.schema_id(), &txn).await?;
144        check_relation_name_duplicate(
145            &streaming_job.name(),
146            streaming_job.database_id(),
147            streaming_job.schema_id(),
148            &txn,
149        )
150        .await?;
151
152        // check if any dependency is in altering status.
153        if !dependencies.is_empty() {
154            let altering_cnt = ObjectDependency::find()
155                .join(
156                    JoinType::InnerJoin,
157                    object_dependency::Relation::Object1.def(),
158                )
159                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
160                .filter(
161                    object_dependency::Column::Oid
162                        .is_in(dependencies.clone())
163                        .and(object::Column::ObjType.eq(ObjectType::Table))
164                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
165                        .and(
166                            // It means the referring table is just dummy for altering.
167                            object::Column::Oid.not_in_subquery(
168                                Query::select()
169                                    .column(table::Column::TableId)
170                                    .from(Table)
171                                    .to_owned(),
172                            ),
173                        ),
174                )
175                .count(&txn)
176                .await?;
177            if altering_cnt != 0 {
178                return Err(MetaError::permission_denied(
179                    "some dependent relations are being altered",
180                ));
181            }
182        }
183
184        match streaming_job {
185            StreamingJob::MaterializedView(table) => {
186                let job_id = Self::create_streaming_job_obj(
187                    &txn,
188                    ObjectType::Table,
189                    table.owner as _,
190                    Some(table.database_id),
191                    Some(table.schema_id),
192                    create_type,
193                    ctx.clone(),
194                    streaming_parallelism,
195                    max_parallelism,
196                    specific_resource_group,
197                )
198                .await?;
199                table.id = job_id.as_mv_table_id();
200                let table_model: table::ActiveModel = table.clone().into();
201                Table::insert(table_model).exec(&txn).await?;
202            }
203            StreamingJob::Sink(sink) => {
204                if let Some(target_table_id) = sink.target_table
205                    && check_sink_into_table_cycle(
206                        target_table_id.into(),
207                        dependencies.iter().cloned().collect(),
208                        &txn,
209                    )
210                    .await?
211                {
212                    bail!("Creating such a sink will result in circular dependency.");
213                }
214
215                let job_id = Self::create_streaming_job_obj(
216                    &txn,
217                    ObjectType::Sink,
218                    sink.owner as _,
219                    Some(sink.database_id),
220                    Some(sink.schema_id),
221                    create_type,
222                    ctx.clone(),
223                    streaming_parallelism,
224                    max_parallelism,
225                    specific_resource_group,
226                )
227                .await?;
228                sink.id = job_id.as_sink_id();
229                let sink_model: sink::ActiveModel = sink.clone().into();
230                Sink::insert(sink_model).exec(&txn).await?;
231            }
232            StreamingJob::Table(src, table, _) => {
233                let job_id = Self::create_streaming_job_obj(
234                    &txn,
235                    ObjectType::Table,
236                    table.owner as _,
237                    Some(table.database_id),
238                    Some(table.schema_id),
239                    create_type,
240                    ctx.clone(),
241                    streaming_parallelism,
242                    max_parallelism,
243                    specific_resource_group,
244                )
245                .await?;
246                table.id = job_id.as_mv_table_id();
247                if let Some(src) = src {
248                    let src_obj = Self::create_object(
249                        &txn,
250                        ObjectType::Source,
251                        src.owner as _,
252                        Some(src.database_id),
253                        Some(src.schema_id),
254                    )
255                    .await?;
256                    src.id = src_obj.oid.as_source_id();
257                    src.optional_associated_table_id = Some(job_id.as_mv_table_id().into());
258                    table.optional_associated_source_id = Some(src_obj.oid.as_source_id().into());
259                    let source: source::ActiveModel = src.clone().into();
260                    Source::insert(source).exec(&txn).await?;
261                }
262                let table_model: table::ActiveModel = table.clone().into();
263                Table::insert(table_model).exec(&txn).await?;
264            }
265            StreamingJob::Index(index, table) => {
266                ensure_object_id(ObjectType::Table, index.primary_table_id, &txn).await?;
267                let job_id = Self::create_streaming_job_obj(
268                    &txn,
269                    ObjectType::Index,
270                    index.owner as _,
271                    Some(index.database_id),
272                    Some(index.schema_id),
273                    create_type,
274                    ctx.clone(),
275                    streaming_parallelism,
276                    max_parallelism,
277                    specific_resource_group,
278                )
279                .await?;
280                // to be compatible with old implementation.
281                index.id = job_id.as_index_id();
282                index.index_table_id = job_id.as_mv_table_id();
283                table.id = job_id.as_mv_table_id();
284
285                ObjectDependency::insert(object_dependency::ActiveModel {
286                    oid: Set(index.primary_table_id.into()),
287                    used_by: Set(table.id.into()),
288                    ..Default::default()
289                })
290                .exec(&txn)
291                .await?;
292
293                let table_model: table::ActiveModel = table.clone().into();
294                Table::insert(table_model).exec(&txn).await?;
295                let index_model: index::ActiveModel = index.clone().into();
296                Index::insert(index_model).exec(&txn).await?;
297            }
298            StreamingJob::Source(src) => {
299                let job_id = Self::create_streaming_job_obj(
300                    &txn,
301                    ObjectType::Source,
302                    src.owner as _,
303                    Some(src.database_id),
304                    Some(src.schema_id),
305                    create_type,
306                    ctx.clone(),
307                    streaming_parallelism,
308                    max_parallelism,
309                    specific_resource_group,
310                )
311                .await?;
312                src.id = job_id.as_shared_source_id();
313                let source_model: source::ActiveModel = src.clone().into();
314                Source::insert(source_model).exec(&txn).await?;
315            }
316        }
317
318        // collect dependent secrets.
319        dependencies.extend(
320            streaming_job
321                .dependent_secret_ids()?
322                .into_iter()
323                .map(|id| id.as_object_id()),
324        );
325        // collect dependent connection
326        dependencies.extend(
327            streaming_job
328                .dependent_connection_ids()?
329                .into_iter()
330                .map(|id| id.as_object_id()),
331        );
332
333        // record object dependency.
334        if !dependencies.is_empty() {
335            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336                object_dependency::ActiveModel {
337                    oid: Set(oid),
338                    used_by: Set(streaming_job.id().as_object_id()),
339                    ..Default::default()
340                }
341            }))
342            .exec(&txn)
343            .await?;
344        }
345
346        txn.commit().await?;
347
348        Ok(())
349    }
350
351    /// Create catalogs for internal tables, then notify frontend about them if the job is a
352    /// materialized view.
353    ///
354    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
355    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
356    ///
357    /// Returns a mapping from the temporary table id to the actual global table id.
358    pub async fn create_internal_table_catalog(
359        &self,
360        job: &StreamingJob,
361        mut incomplete_internal_tables: Vec<PbTable>,
362    ) -> MetaResult<HashMap<TableId, TableId>> {
363        let job_id = job.id();
364        let inner = self.inner.write().await;
365        let txn = inner.db.begin().await?;
366
367        // Ensure the job exists.
368        ensure_job_not_canceled(job_id, &txn).await?;
369
370        let mut table_id_map = HashMap::new();
371        for table in &mut incomplete_internal_tables {
372            let table_id = Self::create_object(
373                &txn,
374                ObjectType::Table,
375                table.owner as _,
376                Some(table.database_id),
377                Some(table.schema_id),
378            )
379            .await?
380            .oid
381            .as_table_id();
382            table_id_map.insert(table.id, table_id);
383            table.id = table_id;
384            table.job_id = Some(job_id);
385
386            let table_model = table::ActiveModel {
387                table_id: Set(table_id),
388                belongs_to_job_id: Set(Some(job_id)),
389                fragment_id: NotSet,
390                ..table.clone().into()
391            };
392            Table::insert(table_model).exec(&txn).await?;
393        }
394        txn.commit().await?;
395
396        Ok(table_id_map)
397    }
398
399    pub async fn prepare_stream_job_fragments(
400        &self,
401        stream_job_fragments: &StreamJobFragmentsToCreate,
402        streaming_job: &StreamingJob,
403        for_replace: bool,
404    ) -> MetaResult<()> {
405        self.prepare_streaming_job(
406            stream_job_fragments.stream_job_id(),
407            || stream_job_fragments.fragments.values(),
408            &stream_job_fragments.downstreams,
409            for_replace,
410            Some(streaming_job),
411        )
412        .await
413    }
414
415    // TODO: In this function, we also update the `Table` model in the meta store.
416    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
417    // making them the source of truth and performing a full replacement for those in the meta store?
418    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
419    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
420    )]
421    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
422        &self,
423        job_id: JobId,
424        get_fragments: impl Fn() -> I + 'a,
425        downstreams: &FragmentDownstreamRelation,
426        for_replace: bool,
427        creating_streaming_job: Option<&'a StreamingJob>,
428    ) -> MetaResult<()> {
429        let fragments = Self::prepare_fragment_models_from_fragments(job_id, get_fragments())?;
430
431        let inner = self.inner.write().await;
432
433        let need_notify = creating_streaming_job
434            .map(|job| job.should_notify_creating())
435            .unwrap_or(false);
436        let definition = creating_streaming_job.map(|job| job.definition());
437
438        let mut objects_to_notify = vec![];
439        let txn = inner.db.begin().await?;
440
441        // Ensure the job exists.
442        ensure_job_not_canceled(job_id, &txn).await?;
443
444        for fragment in fragments {
445            let fragment_id = fragment.fragment_id;
446            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
447
448            let fragment = fragment.into_active_model();
449            Fragment::insert(fragment).exec(&txn).await?;
450
451            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
452            // After table fragments are created, update them for all tables.
453            if !for_replace {
454                let all_tables = StreamJobFragments::collect_tables(get_fragments());
455                for state_table_id in state_table_ids {
456                    // Table's vnode count is not always the fragment's vnode count, so we have to
457                    // look up the table from `TableFragments`.
458                    // See `ActorGraphBuilder::new`.
459                    let table = all_tables
460                        .get(&state_table_id)
461                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
462                    assert_eq!(table.id, state_table_id);
463                    assert_eq!(table.fragment_id, fragment_id);
464                    let vnode_count = table.vnode_count();
465
466                    table::ActiveModel {
467                        table_id: Set(state_table_id as _),
468                        fragment_id: Set(Some(fragment_id)),
469                        vnode_count: Set(vnode_count as _),
470                        ..Default::default()
471                    }
472                    .update(&txn)
473                    .await?;
474
475                    if need_notify {
476                        let mut table = table.clone();
477                        // In production, definition was replaced but still needed for notification.
478                        if cfg!(not(debug_assertions)) && table.id == job_id.as_raw_id() {
479                            table.definition = definition.clone().unwrap();
480                        }
481                        objects_to_notify.push(PbObject {
482                            object_info: Some(PbObjectInfo::Table(table)),
483                        });
484                    }
485                }
486            }
487        }
488
489        // Add streaming job objects to notification
490        if need_notify {
491            match creating_streaming_job.unwrap() {
492                StreamingJob::Table(Some(source), ..) => {
493                    objects_to_notify.push(PbObject {
494                        object_info: Some(PbObjectInfo::Source(source.clone())),
495                    });
496                }
497                StreamingJob::Sink(sink) => {
498                    objects_to_notify.push(PbObject {
499                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
500                    });
501                }
502                StreamingJob::Index(index, _) => {
503                    objects_to_notify.push(PbObject {
504                        object_info: Some(PbObjectInfo::Index(index.clone())),
505                    });
506                }
507                _ => {}
508            }
509        }
510
511        insert_fragment_relations(&txn, downstreams).await?;
512
513        if !for_replace {
514            // Update dml fragment id.
515            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
516                Table::update(table::ActiveModel {
517                    table_id: Set(table.id),
518                    dml_fragment_id: Set(table.dml_fragment_id),
519                    ..Default::default()
520                })
521                .exec(&txn)
522                .await?;
523            }
524        }
525
526        txn.commit().await?;
527
528        // FIXME: there's a gap between the catalog creation and notification, which may lead to
529        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
530        // need to either forward the notification or filter catalogs without any fragments in the metastore.
531        if !objects_to_notify.is_empty() {
532            self.notify_frontend(
533                Operation::Add,
534                Info::ObjectGroup(PbObjectGroup {
535                    objects: objects_to_notify,
536                }),
537            )
538            .await;
539        }
540
541        Ok(())
542    }
543
544    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
545    /// information is required to build barrier mutation.
546    pub async fn build_cancel_command(
547        &self,
548        table_fragments: &StreamJobFragments,
549    ) -> MetaResult<Command> {
550        let inner = self.inner.read().await;
551        let txn = inner.db.begin().await?;
552
553        let dropped_sink_fragment_with_target =
554            if let Some(sink_fragment) = table_fragments.sink_fragment() {
555                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
556                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
557                sink_target_fragment
558                    .get(&sink_fragment_id)
559                    .map(|target_fragments| {
560                        let target_fragment_id = *target_fragments
561                            .first()
562                            .expect("sink should have at least one downstream fragment");
563                        (sink_fragment_id, target_fragment_id)
564                    })
565            } else {
566                None
567            };
568
569        Ok(Command::DropStreamingJobs {
570            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
571            actors: table_fragments.actor_ids().collect(),
572            unregistered_state_table_ids: table_fragments.all_table_ids().collect(),
573            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
574            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
575                .into_iter()
576                .map(|(sink, target)| (target as _, vec![sink as _]))
577                .collect(),
578        })
579    }
580
581    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
582    /// It returns (true, _) if the job is not found or aborted.
583    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
584    #[await_tree::instrument]
585    pub async fn try_abort_creating_streaming_job(
586        &self,
587        mut job_id: JobId,
588        is_cancelled: bool,
589    ) -> MetaResult<(bool, Option<DatabaseId>)> {
590        let mut inner = self.inner.write().await;
591        let txn = inner.db.begin().await?;
592
593        let obj = Object::find_by_id(job_id).one(&txn).await?;
594        let Some(obj) = obj else {
595            tracing::warn!(
596                id = %job_id,
597                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
598            );
599            return Ok((true, None));
600        };
601        let database_id = obj
602            .database_id
603            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
604        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
605
606        if !is_cancelled && let Some(streaming_job) = &streaming_job {
607            assert_ne!(streaming_job.job_status, JobStatus::Created);
608            if streaming_job.create_type == CreateType::Background
609                && streaming_job.job_status == JobStatus::Creating
610            {
611                if (obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Sink)
612                    && check_if_belongs_to_iceberg_table(&txn, job_id).await?
613                {
614                    // If the job belongs to an iceberg table, we still need to clean it.
615                } else {
616                    // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
617                    tracing::warn!(
618                        id = %job_id,
619                        "streaming job is created in background and still in creating status"
620                    );
621                    return Ok((false, Some(database_id)));
622                }
623            }
624        }
625
626        let iceberg_table_id =
627            try_get_iceberg_table_by_downstream_sink(&txn, job_id.as_sink_id()).await?;
628        if let Some(iceberg_table_id) = iceberg_table_id {
629            // If the job is iceberg sink, we need to clean the iceberg table as well.
630            // Here we will drop the sink objects directly.
631            let internal_tables = get_internal_tables_by_id(job_id, &txn).await?;
632            Object::delete_many()
633                .filter(
634                    object::Column::Oid
635                        .eq(job_id)
636                        .or(object::Column::Oid.is_in(internal_tables)),
637                )
638                .exec(&txn)
639                .await?;
640            job_id = iceberg_table_id.as_job_id();
641        };
642
643        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
644
645        // Get the notification info if the job is a materialized view or created in the background.
646        let mut objs = vec![];
647        let table_obj = Table::find_by_id(job_id.as_mv_table_id()).one(&txn).await?;
648
649        let mut need_notify =
650            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
651        if !need_notify {
652            // If the job is not created in the background, we only need to notify the frontend if the job is a materialized view.
653            if let Some(table) = &table_obj {
654                need_notify = table.table_type == TableType::MaterializedView;
655            }
656        }
657
658        if is_cancelled {
659            let dropped_tables = Table::find()
660                .find_also_related(Object)
661                .filter(
662                    table::Column::TableId.is_in(
663                        internal_table_ids
664                            .iter()
665                            .cloned()
666                            .chain(table_obj.iter().map(|t| t.table_id as _)),
667                    ),
668                )
669                .all(&txn)
670                .await?
671                .into_iter()
672                .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
673            inner
674                .dropped_tables
675                .extend(dropped_tables.map(|t| (t.id, t)));
676        }
677
678        if need_notify {
679            let obj: Option<PartialObject> = Object::find_by_id(job_id)
680                .select_only()
681                .columns([
682                    object::Column::Oid,
683                    object::Column::ObjType,
684                    object::Column::SchemaId,
685                    object::Column::DatabaseId,
686                ])
687                .into_partial_model()
688                .one(&txn)
689                .await?;
690            let obj =
691                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
692            objs.push(obj);
693            let internal_table_objs: Vec<PartialObject> = Object::find()
694                .select_only()
695                .columns([
696                    object::Column::Oid,
697                    object::Column::ObjType,
698                    object::Column::SchemaId,
699                    object::Column::DatabaseId,
700                ])
701                .join(JoinType::InnerJoin, object::Relation::Table.def())
702                .filter(table::Column::BelongsToJobId.eq(job_id))
703                .into_partial_model()
704                .all(&txn)
705                .await?;
706            objs.extend(internal_table_objs);
707        }
708
709        // Check if the job is creating sink into table.
710        if table_obj.is_none()
711            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id.as_sink_id())
712                .select_only()
713                .column(sink::Column::TargetTable)
714                .into_tuple::<Option<TableId>>()
715                .one(&txn)
716                .await?
717        {
718            let tmp_id: Option<ObjectId> = ObjectDependency::find()
719                .select_only()
720                .column(object_dependency::Column::UsedBy)
721                .join(
722                    JoinType::InnerJoin,
723                    object_dependency::Relation::Object1.def(),
724                )
725                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
726                .filter(
727                    object_dependency::Column::Oid
728                        .eq(target_table_id)
729                        .and(object::Column::ObjType.eq(ObjectType::Table))
730                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
731                )
732                .into_tuple()
733                .one(&txn)
734                .await?;
735            if let Some(tmp_id) = tmp_id {
736                tracing::warn!(
737                    id = %tmp_id,
738                    "aborting temp streaming job for sink into table"
739                );
740
741                Object::delete_by_id(tmp_id).exec(&txn).await?;
742            }
743        }
744
745        Object::delete_by_id(job_id).exec(&txn).await?;
746        if !internal_table_ids.is_empty() {
747            Object::delete_many()
748                .filter(object::Column::Oid.is_in(internal_table_ids))
749                .exec(&txn)
750                .await?;
751        }
752        if let Some(t) = &table_obj
753            && let Some(source_id) = t.optional_associated_source_id
754        {
755            Object::delete_by_id(source_id).exec(&txn).await?;
756        }
757
758        let err = if is_cancelled {
759            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
760        } else {
761            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
762        };
763        let abort_reason = format!("streaming job aborted {}", err.as_report());
764        for tx in inner
765            .creating_table_finish_notifier
766            .get_mut(&database_id)
767            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
768            .into_iter()
769            .flatten()
770            .flatten()
771        {
772            let _ = tx.send(Err(abort_reason.clone()));
773        }
774        txn.commit().await?;
775
776        if !objs.is_empty() {
777            // We also have notified the frontend about these objects,
778            // so we need to notify the frontend to delete them here.
779            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
780                .await;
781        }
782        Ok((true, Some(database_id)))
783    }
784
785    #[await_tree::instrument]
786    pub async fn post_collect_job_fragments(
787        &self,
788        job_id: JobId,
789        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
790        new_sink_downstream: Option<FragmentDownstreamRelation>,
791        split_assignment: Option<&SplitAssignment>,
792    ) -> MetaResult<()> {
793        let inner = self.inner.write().await;
794        let txn = inner.db.begin().await?;
795
796        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
797
798        if let Some(new_downstream) = new_sink_downstream {
799            insert_fragment_relations(&txn, &new_downstream).await?;
800        }
801
802        // Mark job as CREATING.
803        streaming_job::ActiveModel {
804            job_id: Set(job_id),
805            job_status: Set(JobStatus::Creating),
806            ..Default::default()
807        }
808        .update(&txn)
809        .await?;
810
811        if let Some(split_assignment) = split_assignment {
812            let fragment_splits = split_assignment
813                .iter()
814                .map(|(fragment_id, splits)| {
815                    (
816                        *fragment_id as _,
817                        splits.values().flatten().cloned().collect_vec(),
818                    )
819                })
820                .collect();
821
822            self.update_fragment_splits(&txn, &fragment_splits).await?;
823        }
824
825        txn.commit().await?;
826
827        Ok(())
828    }
829
830    pub async fn create_job_catalog_for_replace(
831        &self,
832        streaming_job: &StreamingJob,
833        ctx: Option<&StreamContext>,
834        specified_parallelism: Option<&NonZeroUsize>,
835        expected_original_max_parallelism: Option<usize>,
836    ) -> MetaResult<JobId> {
837        let id = streaming_job.id();
838        let inner = self.inner.write().await;
839        let txn = inner.db.begin().await?;
840
841        // 1. check version.
842        streaming_job.verify_version_for_replace(&txn).await?;
843        // 2. check concurrent replace.
844        let referring_cnt = ObjectDependency::find()
845            .join(
846                JoinType::InnerJoin,
847                object_dependency::Relation::Object1.def(),
848            )
849            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
850            .filter(
851                object_dependency::Column::Oid
852                    .eq(id)
853                    .and(object::Column::ObjType.eq(ObjectType::Table))
854                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
855            )
856            .count(&txn)
857            .await?;
858        if referring_cnt != 0 {
859            return Err(MetaError::permission_denied(
860                "job is being altered or referenced by some creating jobs",
861            ));
862        }
863
864        // 3. check parallelism.
865        let (original_max_parallelism, original_timezone, original_config_override): (
866            i32,
867            Option<String>,
868            Option<String>,
869        ) = StreamingJobModel::find_by_id(id)
870            .select_only()
871            .column(streaming_job::Column::MaxParallelism)
872            .column(streaming_job::Column::Timezone)
873            .column(streaming_job::Column::ConfigOverride)
874            .into_tuple()
875            .one(&txn)
876            .await?
877            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
878
879        if let Some(max_parallelism) = expected_original_max_parallelism
880            && original_max_parallelism != max_parallelism as i32
881        {
882            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
883            // This should not happen in normal cases.
884            bail!(
885                "cannot use a different max parallelism \
886                 when replacing streaming job, \
887                 original: {}, new: {}",
888                original_max_parallelism,
889                max_parallelism
890            );
891        }
892
893        let parallelism = match specified_parallelism {
894            None => StreamingParallelism::Adaptive,
895            Some(n) => StreamingParallelism::Fixed(n.get() as _),
896        };
897
898        let ctx = StreamContext {
899            timezone: ctx
900                .map(|ctx| ctx.timezone.clone())
901                .unwrap_or(original_timezone),
902            // We don't expect replacing a job with a different config override.
903            // Thus always use the original config override.
904            config_override: original_config_override.unwrap_or_default().into(),
905        };
906
907        // 4. create streaming object for new replace table.
908        let new_obj_id = Self::create_streaming_job_obj(
909            &txn,
910            streaming_job.object_type(),
911            streaming_job.owner() as _,
912            Some(streaming_job.database_id() as _),
913            Some(streaming_job.schema_id() as _),
914            streaming_job.create_type(),
915            ctx,
916            parallelism,
917            original_max_parallelism as _,
918            None,
919        )
920        .await?;
921
922        // 5. record dependency for new replace table.
923        ObjectDependency::insert(object_dependency::ActiveModel {
924            oid: Set(id.as_object_id()),
925            used_by: Set(new_obj_id.as_object_id()),
926            ..Default::default()
927        })
928        .exec(&txn)
929        .await?;
930
931        txn.commit().await?;
932
933        Ok(new_obj_id)
934    }
935
936    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
937    pub async fn finish_streaming_job(&self, job_id: JobId) -> MetaResult<()> {
938        let mut inner = self.inner.write().await;
939        let txn = inner.db.begin().await?;
940
941        // Check if the job belongs to iceberg table.
942        if check_if_belongs_to_iceberg_table(&txn, job_id).await? {
943            tracing::info!(
944                "streaming job {} is for iceberg table, wait for manual finish operation",
945                job_id
946            );
947            return Ok(());
948        }
949
950        let (notification_op, objects, updated_user_info) =
951            Self::finish_streaming_job_inner(&txn, job_id).await?;
952
953        txn.commit().await?;
954
955        let mut version = self
956            .notify_frontend(
957                notification_op,
958                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
959            )
960            .await;
961
962        // notify users about the default privileges
963        if !updated_user_info.is_empty() {
964            version = self.notify_users_update(updated_user_info).await;
965        }
966
967        inner
968            .creating_table_finish_notifier
969            .values_mut()
970            .for_each(|creating_tables| {
971                if let Some(txs) = creating_tables.remove(&job_id) {
972                    for tx in txs {
973                        let _ = tx.send(Ok(version));
974                    }
975                }
976            });
977
978        Ok(())
979    }
980
981    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
982    pub async fn finish_streaming_job_inner(
983        txn: &DatabaseTransaction,
984        job_id: JobId,
985    ) -> MetaResult<(Operation, Vec<risingwave_pb::meta::Object>, Vec<PbUserInfo>)> {
986        let job_type = Object::find_by_id(job_id)
987            .select_only()
988            .column(object::Column::ObjType)
989            .into_tuple()
990            .one(txn)
991            .await?
992            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
993
994        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
995            .select_only()
996            .column(streaming_job::Column::CreateType)
997            .into_tuple()
998            .one(txn)
999            .await?
1000            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
1001
1002        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
1003        let res = Object::update_many()
1004            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
1005            .col_expr(
1006                object::Column::CreatedAtClusterVersion,
1007                current_cluster_version().into(),
1008            )
1009            .filter(object::Column::Oid.eq(job_id))
1010            .exec(txn)
1011            .await?;
1012        if res.rows_affected == 0 {
1013            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
1014        }
1015
1016        // mark the target stream job as `Created`.
1017        let job = streaming_job::ActiveModel {
1018            job_id: Set(job_id),
1019            job_status: Set(JobStatus::Created),
1020            ..Default::default()
1021        };
1022        job.update(txn).await?;
1023
1024        // notify frontend: job, internal tables.
1025        let internal_table_objs = Table::find()
1026            .find_also_related(Object)
1027            .filter(table::Column::BelongsToJobId.eq(job_id))
1028            .all(txn)
1029            .await?;
1030        let mut objects = internal_table_objs
1031            .iter()
1032            .map(|(table, obj)| PbObject {
1033                object_info: Some(PbObjectInfo::Table(
1034                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
1035                )),
1036            })
1037            .collect_vec();
1038        let mut notification_op = if create_type == CreateType::Background {
1039            NotificationOperation::Update
1040        } else {
1041            NotificationOperation::Add
1042        };
1043        let mut updated_user_info = vec![];
1044
1045        match job_type {
1046            ObjectType::Table => {
1047                let (table, obj) = Table::find_by_id(job_id.as_mv_table_id())
1048                    .find_also_related(Object)
1049                    .one(txn)
1050                    .await?
1051                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1052                if table.table_type == TableType::MaterializedView {
1053                    notification_op = NotificationOperation::Update;
1054                }
1055
1056                if let Some(source_id) = table.optional_associated_source_id {
1057                    let (src, obj) = Source::find_by_id(source_id)
1058                        .find_also_related(Object)
1059                        .one(txn)
1060                        .await?
1061                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1062                    objects.push(PbObject {
1063                        object_info: Some(PbObjectInfo::Source(
1064                            ObjectModel(src, obj.unwrap()).into(),
1065                        )),
1066                    });
1067                }
1068                objects.push(PbObject {
1069                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1070                });
1071            }
1072            ObjectType::Sink => {
1073                let (sink, obj) = Sink::find_by_id(job_id.as_sink_id())
1074                    .find_also_related(Object)
1075                    .one(txn)
1076                    .await?
1077                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1078                objects.push(PbObject {
1079                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1080                });
1081            }
1082            ObjectType::Index => {
1083                let (index, obj) = Index::find_by_id(job_id.as_index_id())
1084                    .find_also_related(Object)
1085                    .one(txn)
1086                    .await?
1087                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1088                {
1089                    let (table, obj) = Table::find_by_id(index.index_table_id)
1090                        .find_also_related(Object)
1091                        .one(txn)
1092                        .await?
1093                        .ok_or_else(|| {
1094                            MetaError::catalog_id_not_found("table", index.index_table_id)
1095                        })?;
1096                    objects.push(PbObject {
1097                        object_info: Some(PbObjectInfo::Table(
1098                            ObjectModel(table, obj.unwrap()).into(),
1099                        )),
1100                    });
1101                }
1102
1103                // If the index is created on a table with privileges, we should also
1104                // grant the privileges for the index and its state tables.
1105                let primary_table_privileges = UserPrivilege::find()
1106                    .filter(
1107                        user_privilege::Column::Oid
1108                            .eq(index.primary_table_id)
1109                            .and(user_privilege::Column::Action.eq(Action::Select)),
1110                    )
1111                    .all(txn)
1112                    .await?;
1113                if !primary_table_privileges.is_empty() {
1114                    let index_state_table_ids: Vec<TableId> = Table::find()
1115                        .select_only()
1116                        .column(table::Column::TableId)
1117                        .filter(
1118                            table::Column::BelongsToJobId
1119                                .eq(job_id)
1120                                .or(table::Column::TableId.eq(index.index_table_id)),
1121                        )
1122                        .into_tuple()
1123                        .all(txn)
1124                        .await?;
1125                    let mut new_privileges = vec![];
1126                    for privilege in &primary_table_privileges {
1127                        for state_table_id in &index_state_table_ids {
1128                            new_privileges.push(user_privilege::ActiveModel {
1129                                id: Default::default(),
1130                                oid: Set(state_table_id.as_object_id()),
1131                                user_id: Set(privilege.user_id),
1132                                action: Set(Action::Select),
1133                                dependent_id: Set(privilege.dependent_id),
1134                                granted_by: Set(privilege.granted_by),
1135                                with_grant_option: Set(privilege.with_grant_option),
1136                            });
1137                        }
1138                    }
1139                    UserPrivilege::insert_many(new_privileges).exec(txn).await?;
1140
1141                    updated_user_info = list_user_info_by_ids(
1142                        primary_table_privileges.into_iter().map(|p| p.user_id),
1143                        txn,
1144                    )
1145                    .await?;
1146                }
1147
1148                objects.push(PbObject {
1149                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1150                });
1151            }
1152            ObjectType::Source => {
1153                let (source, obj) = Source::find_by_id(job_id.as_shared_source_id())
1154                    .find_also_related(Object)
1155                    .one(txn)
1156                    .await?
1157                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1158                objects.push(PbObject {
1159                    object_info: Some(PbObjectInfo::Source(
1160                        ObjectModel(source, obj.unwrap()).into(),
1161                    )),
1162                });
1163            }
1164            _ => unreachable!("invalid job type: {:?}", job_type),
1165        }
1166
1167        if job_type != ObjectType::Index {
1168            updated_user_info = grant_default_privileges_automatically(txn, job_id).await?;
1169        }
1170
1171        Ok((notification_op, objects, updated_user_info))
1172    }
1173
1174    pub async fn finish_replace_streaming_job(
1175        &self,
1176        tmp_id: JobId,
1177        streaming_job: StreamingJob,
1178        replace_upstream: FragmentReplaceUpstream,
1179        sink_into_table_context: SinkIntoTableContext,
1180        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1181        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1182    ) -> MetaResult<NotificationVersion> {
1183        let inner = self.inner.write().await;
1184        let txn = inner.db.begin().await?;
1185
1186        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1187            tmp_id,
1188            replace_upstream,
1189            sink_into_table_context,
1190            &txn,
1191            streaming_job,
1192            drop_table_connector_ctx,
1193            auto_refresh_schema_sinks,
1194        )
1195        .await?;
1196
1197        txn.commit().await?;
1198
1199        let mut version = self
1200            .notify_frontend(
1201                NotificationOperation::Update,
1202                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1203            )
1204            .await;
1205
1206        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1207            self.notify_users_update(user_infos).await;
1208            version = self
1209                .notify_frontend(
1210                    NotificationOperation::Delete,
1211                    build_object_group_for_delete(to_drop_objects),
1212                )
1213                .await;
1214        }
1215
1216        Ok(version)
1217    }
1218
1219    pub async fn finish_replace_streaming_job_inner(
1220        tmp_id: JobId,
1221        replace_upstream: FragmentReplaceUpstream,
1222        SinkIntoTableContext {
1223            updated_sink_catalogs,
1224        }: SinkIntoTableContext,
1225        txn: &DatabaseTransaction,
1226        streaming_job: StreamingJob,
1227        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1228        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1229    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1230        let original_job_id = streaming_job.id();
1231        let job_type = streaming_job.job_type();
1232
1233        let mut index_item_rewriter = None;
1234
1235        // Update catalog
1236        match streaming_job {
1237            StreamingJob::Table(_source, table, _table_job_type) => {
1238                // The source catalog should remain unchanged
1239
1240                let original_column_catalogs =
1241                    get_table_columns(txn, original_job_id.as_mv_table_id()).await?;
1242
1243                index_item_rewriter = Some({
1244                    let original_columns = original_column_catalogs
1245                        .to_protobuf()
1246                        .into_iter()
1247                        .map(|c| c.column_desc.unwrap())
1248                        .collect_vec();
1249                    let new_columns = table
1250                        .columns
1251                        .iter()
1252                        .map(|c| c.column_desc.clone().unwrap())
1253                        .collect_vec();
1254
1255                    IndexItemRewriter {
1256                        original_columns,
1257                        new_columns,
1258                    }
1259                });
1260
1261                // For sinks created in earlier versions, we need to set the original_target_columns.
1262                for sink_id in updated_sink_catalogs {
1263                    sink::ActiveModel {
1264                        sink_id: Set(sink_id as _),
1265                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1266                        ..Default::default()
1267                    }
1268                    .update(txn)
1269                    .await?;
1270                }
1271                // Update the table catalog with the new one. (column catalog is also updated here)
1272                let mut table = table::ActiveModel::from(table);
1273                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1274                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1275                {
1276                    // drop table connector, the rest logic is in `drop_table_associated_source`
1277                    table.optional_associated_source_id = Set(None);
1278                }
1279
1280                table.update(txn).await?;
1281            }
1282            StreamingJob::Source(source) => {
1283                // Update the source catalog with the new one.
1284                let source = source::ActiveModel::from(source);
1285                source.update(txn).await?;
1286            }
1287            StreamingJob::MaterializedView(table) => {
1288                // Update the table catalog with the new one.
1289                let table = table::ActiveModel::from(table);
1290                table.update(txn).await?;
1291            }
1292            _ => unreachable!(
1293                "invalid streaming job type: {:?}",
1294                streaming_job.job_type_str()
1295            ),
1296        }
1297
1298        async fn finish_fragments(
1299            txn: &DatabaseTransaction,
1300            tmp_id: JobId,
1301            original_job_id: JobId,
1302            replace_upstream: FragmentReplaceUpstream,
1303        ) -> MetaResult<()> {
1304            // 0. update internal tables
1305            // Fields including `fragment_id` were placeholder values before.
1306            // After table fragments are created, update them for all internal tables.
1307            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1308                .select_only()
1309                .columns([
1310                    fragment::Column::FragmentId,
1311                    fragment::Column::StateTableIds,
1312                ])
1313                .filter(fragment::Column::JobId.eq(tmp_id))
1314                .into_tuple()
1315                .all(txn)
1316                .await?;
1317            for (fragment_id, state_table_ids) in fragment_info {
1318                for state_table_id in state_table_ids.into_inner() {
1319                    let state_table_id = TableId::new(state_table_id as _);
1320                    table::ActiveModel {
1321                        table_id: Set(state_table_id),
1322                        fragment_id: Set(Some(fragment_id)),
1323                        // No need to update `vnode_count` because it must remain the same.
1324                        ..Default::default()
1325                    }
1326                    .update(txn)
1327                    .await?;
1328                }
1329            }
1330
1331            // 1. replace old fragments/actors with new ones.
1332            Fragment::delete_many()
1333                .filter(fragment::Column::JobId.eq(original_job_id))
1334                .exec(txn)
1335                .await?;
1336            Fragment::update_many()
1337                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1338                .filter(fragment::Column::JobId.eq(tmp_id))
1339                .exec(txn)
1340                .await?;
1341
1342            // 2. update merges.
1343            // update downstream fragment's Merge node, and upstream_fragment_id
1344            for (fragment_id, fragment_replace_map) in replace_upstream {
1345                let (fragment_id, mut stream_node) =
1346                    Fragment::find_by_id(fragment_id as FragmentId)
1347                        .select_only()
1348                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1349                        .into_tuple::<(FragmentId, StreamNode)>()
1350                        .one(txn)
1351                        .await?
1352                        .map(|(id, node)| (id, node.to_protobuf()))
1353                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1354
1355                visit_stream_node_mut(&mut stream_node, |body| {
1356                    if let PbNodeBody::Merge(m) = body
1357                        && let Some(new_fragment_id) =
1358                            fragment_replace_map.get(&m.upstream_fragment_id)
1359                    {
1360                        m.upstream_fragment_id = *new_fragment_id;
1361                    }
1362                });
1363                fragment::ActiveModel {
1364                    fragment_id: Set(fragment_id),
1365                    stream_node: Set(StreamNode::from(&stream_node)),
1366                    ..Default::default()
1367                }
1368                .update(txn)
1369                .await?;
1370            }
1371
1372            // 3. remove dummy object.
1373            Object::delete_by_id(tmp_id).exec(txn).await?;
1374
1375            Ok(())
1376        }
1377
1378        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1379
1380        // 4. update catalogs and notify.
1381        let mut objects = vec![];
1382        match job_type {
1383            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1384                let (table, table_obj) = Table::find_by_id(original_job_id.as_mv_table_id())
1385                    .find_also_related(Object)
1386                    .one(txn)
1387                    .await?
1388                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1389                objects.push(PbObject {
1390                    object_info: Some(PbObjectInfo::Table(
1391                        ObjectModel(table, table_obj.unwrap()).into(),
1392                    )),
1393                })
1394            }
1395            StreamingJobType::Source => {
1396                let (source, source_obj) =
1397                    Source::find_by_id(original_job_id.as_shared_source_id())
1398                        .find_also_related(Object)
1399                        .one(txn)
1400                        .await?
1401                        .ok_or_else(|| {
1402                            MetaError::catalog_id_not_found("object", original_job_id)
1403                        })?;
1404                objects.push(PbObject {
1405                    object_info: Some(PbObjectInfo::Source(
1406                        ObjectModel(source, source_obj.unwrap()).into(),
1407                    )),
1408                })
1409            }
1410            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1411        }
1412
1413        if let Some(expr_rewriter) = index_item_rewriter {
1414            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1415                .select_only()
1416                .columns([index::Column::IndexId, index::Column::IndexItems])
1417                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1418                .into_tuple()
1419                .all(txn)
1420                .await?;
1421            for (index_id, nodes) in index_items {
1422                let mut pb_nodes = nodes.to_protobuf();
1423                pb_nodes
1424                    .iter_mut()
1425                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1426                let index = index::ActiveModel {
1427                    index_id: Set(index_id),
1428                    index_items: Set(pb_nodes.into()),
1429                    ..Default::default()
1430                }
1431                .update(txn)
1432                .await?;
1433                let index_obj = index
1434                    .find_related(Object)
1435                    .one(txn)
1436                    .await?
1437                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1438                objects.push(PbObject {
1439                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1440                });
1441            }
1442        }
1443
1444        if let Some(sinks) = auto_refresh_schema_sinks {
1445            for finish_sink_context in sinks {
1446                finish_fragments(
1447                    txn,
1448                    finish_sink_context.tmp_sink_id.as_job_id(),
1449                    finish_sink_context.original_sink_id.as_job_id(),
1450                    Default::default(),
1451                )
1452                .await?;
1453                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1454                    .find_also_related(Object)
1455                    .one(txn)
1456                    .await?
1457                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1458                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1459                Sink::update(sink::ActiveModel {
1460                    sink_id: Set(finish_sink_context.original_sink_id),
1461                    columns: Set(columns.clone()),
1462                    ..Default::default()
1463                })
1464                .exec(txn)
1465                .await?;
1466                sink.columns = columns;
1467                objects.push(PbObject {
1468                    object_info: Some(PbObjectInfo::Sink(
1469                        ObjectModel(sink, sink_obj.unwrap()).into(),
1470                    )),
1471                });
1472                if let Some((log_store_table_id, new_log_store_table_columns)) =
1473                    finish_sink_context.new_log_store_table
1474                {
1475                    let new_log_store_table_columns: ColumnCatalogArray =
1476                        new_log_store_table_columns.into();
1477                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1478                        .find_also_related(Object)
1479                        .one(txn)
1480                        .await?
1481                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1482                    Table::update(table::ActiveModel {
1483                        table_id: Set(log_store_table_id),
1484                        columns: Set(new_log_store_table_columns.clone()),
1485                        ..Default::default()
1486                    })
1487                    .exec(txn)
1488                    .await?;
1489                    table.columns = new_log_store_table_columns;
1490                    objects.push(PbObject {
1491                        object_info: Some(PbObjectInfo::Table(
1492                            ObjectModel(table, table_obj.unwrap()).into(),
1493                        )),
1494                    });
1495                }
1496            }
1497        }
1498
1499        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1500        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1501            notification_objs =
1502                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1503        }
1504
1505        Ok((objects, notification_objs))
1506    }
1507
1508    /// Abort the replacing streaming job by deleting the temporary job object.
1509    pub async fn try_abort_replacing_streaming_job(
1510        &self,
1511        tmp_job_id: JobId,
1512        tmp_sink_ids: Option<Vec<ObjectId>>,
1513    ) -> MetaResult<()> {
1514        let inner = self.inner.write().await;
1515        let txn = inner.db.begin().await?;
1516        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1517        if let Some(tmp_sink_ids) = tmp_sink_ids {
1518            for tmp_sink_id in tmp_sink_ids {
1519                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1520            }
1521        }
1522        txn.commit().await?;
1523        Ok(())
1524    }
1525
1526    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1527    // return the actor_ids to be applied
1528    pub async fn update_source_rate_limit_by_source_id(
1529        &self,
1530        source_id: SourceId,
1531        rate_limit: Option<u32>,
1532    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1533        let inner = self.inner.read().await;
1534        let txn = inner.db.begin().await?;
1535
1536        {
1537            let active_source = source::ActiveModel {
1538                source_id: Set(source_id),
1539                rate_limit: Set(rate_limit.map(|v| v as i32)),
1540                ..Default::default()
1541            };
1542            active_source.update(&txn).await?;
1543        }
1544
1545        let (source, obj) = Source::find_by_id(source_id)
1546            .find_also_related(Object)
1547            .one(&txn)
1548            .await?
1549            .ok_or_else(|| {
1550                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1551            })?;
1552
1553        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1554        let streaming_job_ids: Vec<JobId> =
1555            if let Some(table_id) = source.optional_associated_table_id {
1556                vec![table_id.as_job_id()]
1557            } else if let Some(source_info) = &source.source_info
1558                && source_info.to_protobuf().is_shared()
1559            {
1560                vec![source_id.as_share_source_job_id()]
1561            } else {
1562                ObjectDependency::find()
1563                    .select_only()
1564                    .column(object_dependency::Column::UsedBy)
1565                    .filter(object_dependency::Column::Oid.eq(source_id))
1566                    .into_tuple()
1567                    .all(&txn)
1568                    .await?
1569            };
1570
1571        if streaming_job_ids.is_empty() {
1572            return Err(MetaError::invalid_parameter(format!(
1573                "source id {source_id} not used by any streaming job"
1574            )));
1575        }
1576
1577        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1578            .select_only()
1579            .columns([
1580                fragment::Column::FragmentId,
1581                fragment::Column::FragmentTypeMask,
1582                fragment::Column::StreamNode,
1583            ])
1584            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1585            .into_tuple()
1586            .all(&txn)
1587            .await?;
1588        let mut fragments = fragments
1589            .into_iter()
1590            .map(|(id, mask, stream_node)| {
1591                (
1592                    id,
1593                    FragmentTypeMask::from(mask as u32),
1594                    stream_node.to_protobuf(),
1595                )
1596            })
1597            .collect_vec();
1598
1599        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1600            let mut found = false;
1601            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1602                visit_stream_node_mut(stream_node, |node| {
1603                    if let PbNodeBody::Source(node) = node
1604                        && let Some(node_inner) = &mut node.source_inner
1605                        && node_inner.source_id == source_id
1606                    {
1607                        node_inner.rate_limit = rate_limit;
1608                        found = true;
1609                    }
1610                });
1611            }
1612            if is_fs_source {
1613                // in older versions, there's no fragment type flag for `FsFetch` node,
1614                // so we just scan all fragments for StreamFsFetch node if using fs connector
1615                visit_stream_node_mut(stream_node, |node| {
1616                    if let PbNodeBody::StreamFsFetch(node) = node {
1617                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1618                        if let Some(node_inner) = &mut node.node_inner
1619                            && node_inner.source_id == source_id
1620                        {
1621                            node_inner.rate_limit = rate_limit;
1622                            found = true;
1623                        }
1624                    }
1625                });
1626            }
1627            found
1628        });
1629
1630        assert!(
1631            !fragments.is_empty(),
1632            "source id should be used by at least one fragment"
1633        );
1634        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1635
1636        for (id, fragment_type_mask, stream_node) in fragments {
1637            fragment::ActiveModel {
1638                fragment_id: Set(id),
1639                fragment_type_mask: Set(fragment_type_mask.into()),
1640                stream_node: Set(StreamNode::from(&stream_node)),
1641                ..Default::default()
1642            }
1643            .update(&txn)
1644            .await?;
1645        }
1646
1647        txn.commit().await?;
1648
1649        let fragment_actors = self.get_fragment_actors_from_running_info(fragment_ids.into_iter());
1650
1651        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1652        let _version = self
1653            .notify_frontend(
1654                NotificationOperation::Update,
1655                NotificationInfo::ObjectGroup(PbObjectGroup {
1656                    objects: vec![PbObject {
1657                        object_info: Some(relation_info),
1658                    }],
1659                }),
1660            )
1661            .await;
1662
1663        Ok(fragment_actors)
1664    }
1665
1666    fn get_fragment_actors_from_running_info(
1667        &self,
1668        fragment_ids: impl Iterator<Item = FragmentId>,
1669    ) -> HashMap<FragmentId, Vec<ActorId>> {
1670        let mut fragment_actors: HashMap<FragmentId, Vec<ActorId>> = HashMap::new();
1671
1672        let info = self.env.shared_actor_infos().read_guard();
1673
1674        for fragment_id in fragment_ids {
1675            let SharedFragmentInfo { actors, .. } = info.get_fragment(fragment_id).unwrap();
1676            fragment_actors
1677                .entry(fragment_id as _)
1678                .or_default()
1679                .extend(actors.keys().copied());
1680        }
1681
1682        fragment_actors
1683    }
1684
1685    // edit the content of fragments in given `table_id`
1686    // return the actor_ids to be applied
1687    pub async fn mutate_fragments_by_job_id(
1688        &self,
1689        job_id: JobId,
1690        // returns true if the mutation is applied
1691        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1692        // error message when no relevant fragments is found
1693        err_msg: &'static str,
1694    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1695        let inner = self.inner.read().await;
1696        let txn = inner.db.begin().await?;
1697
1698        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1699            .select_only()
1700            .columns([
1701                fragment::Column::FragmentId,
1702                fragment::Column::FragmentTypeMask,
1703                fragment::Column::StreamNode,
1704            ])
1705            .filter(fragment::Column::JobId.eq(job_id))
1706            .into_tuple()
1707            .all(&txn)
1708            .await?;
1709        let mut fragments = fragments
1710            .into_iter()
1711            .map(|(id, mask, stream_node)| {
1712                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1713            })
1714            .collect_vec();
1715
1716        let fragments = fragments
1717            .iter_mut()
1718            .map(|(_, fragment_type_mask, stream_node)| {
1719                fragments_mutation_fn(*fragment_type_mask, stream_node)
1720            })
1721            .collect::<MetaResult<Vec<bool>>>()?
1722            .into_iter()
1723            .zip_eq_debug(std::mem::take(&mut fragments))
1724            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1725            .collect::<Vec<_>>();
1726
1727        if fragments.is_empty() {
1728            return Err(MetaError::invalid_parameter(format!(
1729                "job id {job_id}: {}",
1730                err_msg
1731            )));
1732        }
1733
1734        let fragment_ids: HashSet<FragmentId> = fragments.iter().map(|(id, _, _)| *id).collect();
1735        for (id, _, stream_node) in fragments {
1736            fragment::ActiveModel {
1737                fragment_id: Set(id),
1738                stream_node: Set(StreamNode::from(&stream_node)),
1739                ..Default::default()
1740            }
1741            .update(&txn)
1742            .await?;
1743        }
1744
1745        txn.commit().await?;
1746
1747        let fragment_actors =
1748            self.get_fragment_actors_from_running_info(fragment_ids.iter().copied());
1749
1750        Ok(fragment_actors)
1751    }
1752
1753    async fn mutate_fragment_by_fragment_id(
1754        &self,
1755        fragment_id: FragmentId,
1756        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1757        err_msg: &'static str,
1758    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1759        let inner = self.inner.read().await;
1760        let txn = inner.db.begin().await?;
1761
1762        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1763            Fragment::find_by_id(fragment_id)
1764                .select_only()
1765                .columns([
1766                    fragment::Column::FragmentTypeMask,
1767                    fragment::Column::StreamNode,
1768                ])
1769                .into_tuple()
1770                .one(&txn)
1771                .await?
1772                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1773        let mut pb_stream_node = stream_node.to_protobuf();
1774        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1775
1776        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1777            return Err(MetaError::invalid_parameter(format!(
1778                "fragment id {fragment_id}: {}",
1779                err_msg
1780            )));
1781        }
1782
1783        fragment::ActiveModel {
1784            fragment_id: Set(fragment_id),
1785            stream_node: Set(stream_node),
1786            ..Default::default()
1787        }
1788        .update(&txn)
1789        .await?;
1790
1791        let fragment_actors =
1792            self.get_fragment_actors_from_running_info(std::iter::once(fragment_id));
1793
1794        txn.commit().await?;
1795
1796        Ok(fragment_actors)
1797    }
1798
1799    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1800    // return the actor_ids to be applied
1801    pub async fn update_backfill_rate_limit_by_job_id(
1802        &self,
1803        job_id: JobId,
1804        rate_limit: Option<u32>,
1805    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1806        let update_backfill_rate_limit =
1807            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1808                let mut found = false;
1809                if fragment_type_mask
1810                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1811                {
1812                    visit_stream_node_mut(stream_node, |node| match node {
1813                        PbNodeBody::StreamCdcScan(node) => {
1814                            node.rate_limit = rate_limit;
1815                            found = true;
1816                        }
1817                        PbNodeBody::StreamScan(node) => {
1818                            node.rate_limit = rate_limit;
1819                            found = true;
1820                        }
1821                        PbNodeBody::SourceBackfill(node) => {
1822                            node.rate_limit = rate_limit;
1823                            found = true;
1824                        }
1825                        PbNodeBody::Sink(node) => {
1826                            node.rate_limit = rate_limit;
1827                            found = true;
1828                        }
1829                        _ => {}
1830                    });
1831                }
1832                Ok(found)
1833            };
1834
1835        self.mutate_fragments_by_job_id(
1836            job_id,
1837            update_backfill_rate_limit,
1838            "stream scan node or source node not found",
1839        )
1840        .await
1841    }
1842
1843    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1844    // return the actor_ids to be applied
1845    pub async fn update_sink_rate_limit_by_job_id(
1846        &self,
1847        sink_id: SinkId,
1848        rate_limit: Option<u32>,
1849    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1850        let update_sink_rate_limit =
1851            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1852                let mut found = Ok(false);
1853                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1854                    visit_stream_node_mut(stream_node, |node| {
1855                        if let PbNodeBody::Sink(node) = node {
1856                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1857                                found = Err(MetaError::invalid_parameter(
1858                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1859                                ));
1860                                return;
1861                            }
1862                            node.rate_limit = rate_limit;
1863                            found = Ok(true);
1864                        }
1865                    });
1866                }
1867                found
1868            };
1869
1870        self.mutate_fragments_by_job_id(
1871            sink_id.as_job_id(),
1872            update_sink_rate_limit,
1873            "sink node not found",
1874        )
1875        .await
1876    }
1877
1878    pub async fn update_dml_rate_limit_by_job_id(
1879        &self,
1880        job_id: JobId,
1881        rate_limit: Option<u32>,
1882    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1883        let update_dml_rate_limit =
1884            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1885                let mut found = false;
1886                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1887                    visit_stream_node_mut(stream_node, |node| {
1888                        if let PbNodeBody::Dml(node) = node {
1889                            node.rate_limit = rate_limit;
1890                            found = true;
1891                        }
1892                    });
1893                }
1894                Ok(found)
1895            };
1896
1897        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1898            .await
1899    }
1900
1901    pub async fn update_source_props_by_source_id(
1902        &self,
1903        source_id: SourceId,
1904        alter_props: BTreeMap<String, String>,
1905        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1906    ) -> MetaResult<WithOptionsSecResolved> {
1907        let inner = self.inner.read().await;
1908        let txn = inner.db.begin().await?;
1909
1910        let (source, _obj) = Source::find_by_id(source_id)
1911            .find_also_related(Object)
1912            .one(&txn)
1913            .await?
1914            .ok_or_else(|| {
1915                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1916            })?;
1917        let connector = source.with_properties.0.get_connector().unwrap();
1918        let is_shared_source = source.is_shared();
1919
1920        let mut dep_source_job_ids: Vec<JobId> = Vec::new();
1921        if !is_shared_source {
1922            // mv using non-shared source holds a copy of source in their fragments
1923            dep_source_job_ids = ObjectDependency::find()
1924                .select_only()
1925                .column(object_dependency::Column::UsedBy)
1926                .filter(object_dependency::Column::Oid.eq(source_id))
1927                .into_tuple()
1928                .all(&txn)
1929                .await?;
1930        }
1931
1932        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1933        let prop_keys: Vec<String> = alter_props
1934            .keys()
1935            .chain(alter_secret_refs.keys())
1936            .cloned()
1937            .collect();
1938        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1939            &connector, &prop_keys,
1940        )?;
1941
1942        let mut options_with_secret = WithOptionsSecResolved::new(
1943            source.with_properties.0.clone(),
1944            source
1945                .secret_ref
1946                .map(|secret_ref| secret_ref.to_protobuf())
1947                .unwrap_or_default(),
1948        );
1949        let (to_add_secret_dep, to_remove_secret_dep) =
1950            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1951
1952        tracing::info!(
1953            "applying new properties to source: source_id={}, options_with_secret={:?}",
1954            source_id,
1955            options_with_secret
1956        );
1957        // check if the alter-ed props are valid for each Connector
1958        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1959        // todo: validate via source manager
1960
1961        let mut associate_table_id = None;
1962
1963        // can be source_id or table_id
1964        // if updating an associated source, the preferred_id is the table_id
1965        // otherwise, it is the source_id
1966        let mut preferred_id = source_id.as_object_id();
1967        let rewrite_sql = {
1968            let definition = source.definition.clone();
1969
1970            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1971                .map_err(|e| {
1972                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1973                        anyhow!(e).context("Failed to parse source definition SQL"),
1974                    )))
1975                })?
1976                .try_into()
1977                .unwrap();
1978
1979            /// Formats SQL options with secret values properly resolved
1980            ///
1981            /// This function processes configuration options that may contain sensitive data:
1982            /// - Plaintext options are directly converted to `SqlOption`
1983            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
1984            ///   without exposing the actual secret value
1985            ///
1986            /// # Arguments
1987            /// * `txn` - Database transaction for retrieving secrets
1988            /// * `options_with_secret` - Container of options with both plaintext and secret values
1989            ///
1990            /// # Returns
1991            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
1992            async fn format_with_option_secret_resolved(
1993                txn: &DatabaseTransaction,
1994                options_with_secret: &WithOptionsSecResolved,
1995            ) -> MetaResult<Vec<SqlOption>> {
1996                let mut options = Vec::new();
1997                for (k, v) in options_with_secret.as_plaintext() {
1998                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1999                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2000                    options.push(sql_option);
2001                }
2002                for (k, v) in options_with_secret.as_secret() {
2003                    if let Some(secret_model) = Secret::find_by_id(v.secret_id).one(txn).await? {
2004                        let sql_option =
2005                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
2006                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
2007                        options.push(sql_option);
2008                    } else {
2009                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
2010                    }
2011                }
2012                Ok(options)
2013            }
2014
2015            match &mut stmt {
2016                Statement::CreateSource { stmt } => {
2017                    stmt.with_properties.0 =
2018                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2019                }
2020                Statement::CreateTable { with_options, .. } => {
2021                    *with_options =
2022                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
2023                    associate_table_id = source.optional_associated_table_id;
2024                    preferred_id = associate_table_id.unwrap().as_object_id();
2025                }
2026                _ => unreachable!(),
2027            }
2028
2029            stmt.to_string()
2030        };
2031
2032        {
2033            // update secret dependencies
2034            if !to_add_secret_dep.is_empty() {
2035                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
2036                    object_dependency::ActiveModel {
2037                        oid: Set(secret_id.into()),
2038                        used_by: Set(preferred_id),
2039                        ..Default::default()
2040                    }
2041                }))
2042                .exec(&txn)
2043                .await?;
2044            }
2045            if !to_remove_secret_dep.is_empty() {
2046                // todo: fix the filter logic
2047                let _ = ObjectDependency::delete_many()
2048                    .filter(
2049                        object_dependency::Column::Oid
2050                            .is_in(to_remove_secret_dep)
2051                            .and(object_dependency::Column::UsedBy.eq(preferred_id)),
2052                    )
2053                    .exec(&txn)
2054                    .await?;
2055            }
2056        }
2057
2058        let active_source_model = source::ActiveModel {
2059            source_id: Set(source_id),
2060            definition: Set(rewrite_sql.clone()),
2061            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2062            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2063                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2064            ..Default::default()
2065        };
2066        active_source_model.update(&txn).await?;
2067
2068        if let Some(associate_table_id) = associate_table_id {
2069            // update the associated table statement accordly
2070            let active_table_model = table::ActiveModel {
2071                table_id: Set(associate_table_id),
2072                definition: Set(rewrite_sql),
2073                ..Default::default()
2074            };
2075            active_table_model.update(&txn).await?;
2076        }
2077
2078        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2079            // if updating table with connector, the fragment_id is table id
2080            associate_table_id.as_job_id()
2081        } else {
2082            source_id.as_share_source_job_id()
2083        }]
2084        .into_iter()
2085        .chain(dep_source_job_ids.into_iter())
2086        .collect_vec();
2087
2088        // update fragments
2089        update_connector_props_fragments(
2090            &txn,
2091            to_check_job_ids,
2092            FragmentTypeFlag::Source,
2093            |node, found| {
2094                if let PbNodeBody::Source(node) = node
2095                    && let Some(source_inner) = &mut node.source_inner
2096                {
2097                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2098                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2099                    *found = true;
2100                }
2101            },
2102            is_shared_source,
2103        )
2104        .await?;
2105
2106        let mut to_update_objs = Vec::with_capacity(2);
2107        let (source, obj) = Source::find_by_id(source_id)
2108            .find_also_related(Object)
2109            .one(&txn)
2110            .await?
2111            .ok_or_else(|| {
2112                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2113            })?;
2114        to_update_objs.push(PbObject {
2115            object_info: Some(PbObjectInfo::Source(
2116                ObjectModel(source, obj.unwrap()).into(),
2117            )),
2118        });
2119
2120        if let Some(associate_table_id) = associate_table_id {
2121            let (table, obj) = Table::find_by_id(associate_table_id)
2122                .find_also_related(Object)
2123                .one(&txn)
2124                .await?
2125                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2126            to_update_objs.push(PbObject {
2127                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2128            });
2129        }
2130
2131        txn.commit().await?;
2132
2133        self.notify_frontend(
2134            NotificationOperation::Update,
2135            NotificationInfo::ObjectGroup(PbObjectGroup {
2136                objects: to_update_objs,
2137            }),
2138        )
2139        .await;
2140
2141        Ok(options_with_secret)
2142    }
2143
2144    pub async fn update_sink_props_by_sink_id(
2145        &self,
2146        sink_id: SinkId,
2147        props: BTreeMap<String, String>,
2148    ) -> MetaResult<HashMap<String, String>> {
2149        let inner = self.inner.read().await;
2150        let txn = inner.db.begin().await?;
2151
2152        let (sink, _obj) = Sink::find_by_id(sink_id)
2153            .find_also_related(Object)
2154            .one(&txn)
2155            .await?
2156            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2157        validate_sink_props(&sink, &props)?;
2158        let definition = sink.definition.clone();
2159        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2160            .map_err(|e| SinkError::Config(anyhow!(e)))?
2161            .try_into()
2162            .unwrap();
2163        if let Statement::CreateSink { stmt } = &mut stmt {
2164            update_stmt_with_props(&mut stmt.with_properties.0, &props)?;
2165        } else {
2166            panic!("definition is not a create sink statement")
2167        }
2168        let mut new_config = sink.properties.clone().into_inner();
2169        new_config.extend(props.clone());
2170
2171        let definition = stmt.to_string();
2172        let active_sink = sink::ActiveModel {
2173            sink_id: Set(sink_id),
2174            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2175            definition: Set(definition),
2176            ..Default::default()
2177        };
2178        active_sink.update(&txn).await?;
2179
2180        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2181        let (sink, obj) = Sink::find_by_id(sink_id)
2182            .find_also_related(Object)
2183            .one(&txn)
2184            .await?
2185            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2186        txn.commit().await?;
2187        let relation_infos = vec![PbObject {
2188            object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
2189        }];
2190
2191        let _version = self
2192            .notify_frontend(
2193                NotificationOperation::Update,
2194                NotificationInfo::ObjectGroup(PbObjectGroup {
2195                    objects: relation_infos,
2196                }),
2197            )
2198            .await;
2199
2200        Ok(props.into_iter().collect())
2201    }
2202
2203    pub async fn update_iceberg_table_props_by_table_id(
2204        &self,
2205        table_id: TableId,
2206        props: BTreeMap<String, String>,
2207        alter_iceberg_table_props: Option<
2208            risingwave_pb::meta::alter_connector_props_request::PbExtraOptions,
2209        >,
2210    ) -> MetaResult<(HashMap<String, String>, SinkId)> {
2211        let risingwave_pb::meta::alter_connector_props_request::PbExtraOptions::AlterIcebergTableIds(AlterIcebergTableIds { sink_id, source_id }) = alter_iceberg_table_props.
2212            ok_or_else(|| MetaError::invalid_parameter("alter_iceberg_table_props is required"))?;
2213        let inner = self.inner.read().await;
2214        let txn = inner.db.begin().await?;
2215
2216        let (sink, _obj) = Sink::find_by_id(sink_id)
2217            .find_also_related(Object)
2218            .one(&txn)
2219            .await?
2220            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2221        validate_sink_props(&sink, &props)?;
2222
2223        let definition = sink.definition.clone();
2224        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2225            .map_err(|e| SinkError::Config(anyhow!(e)))?
2226            .try_into()
2227            .unwrap();
2228        if let Statement::CreateTable {
2229            with_options,
2230            engine,
2231            ..
2232        } = &mut stmt
2233        {
2234            if !matches!(engine, Engine::Iceberg) {
2235                return Err(SinkError::Config(anyhow!(
2236                    "only iceberg table can be altered as sink"
2237                ))
2238                .into());
2239            }
2240            update_stmt_with_props(with_options, &props)?;
2241        } else {
2242            panic!("definition is not a create iceberg table statement")
2243        }
2244        let mut new_config = sink.properties.clone().into_inner();
2245        new_config.extend(props.clone());
2246
2247        let definition = stmt.to_string();
2248        let active_sink = sink::ActiveModel {
2249            sink_id: Set(sink_id),
2250            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2251            definition: Set(definition.clone()),
2252            ..Default::default()
2253        };
2254        let active_source = source::ActiveModel {
2255            source_id: Set(source_id),
2256            definition: Set(definition.clone()),
2257            ..Default::default()
2258        };
2259        let active_table = table::ActiveModel {
2260            table_id: Set(table_id),
2261            definition: Set(definition),
2262            ..Default::default()
2263        };
2264        active_sink.update(&txn).await?;
2265        active_source.update(&txn).await?;
2266        active_table.update(&txn).await?;
2267
2268        update_sink_fragment_props(&txn, sink_id, new_config).await?;
2269
2270        let (sink, sink_obj) = Sink::find_by_id(sink_id)
2271            .find_also_related(Object)
2272            .one(&txn)
2273            .await?
2274            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2275        let (source, source_obj) = Source::find_by_id(source_id)
2276            .find_also_related(Object)
2277            .one(&txn)
2278            .await?
2279            .ok_or_else(|| {
2280                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2281            })?;
2282        let (table, table_obj) = Table::find_by_id(table_id)
2283            .find_also_related(Object)
2284            .one(&txn)
2285            .await?
2286            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), table_id))?;
2287        txn.commit().await?;
2288        let relation_infos = vec![
2289            PbObject {
2290                object_info: Some(PbObjectInfo::Sink(
2291                    ObjectModel(sink, sink_obj.unwrap()).into(),
2292                )),
2293            },
2294            PbObject {
2295                object_info: Some(PbObjectInfo::Source(
2296                    ObjectModel(source, source_obj.unwrap()).into(),
2297                )),
2298            },
2299            PbObject {
2300                object_info: Some(PbObjectInfo::Table(
2301                    ObjectModel(table, table_obj.unwrap()).into(),
2302                )),
2303            },
2304        ];
2305        let _version = self
2306            .notify_frontend(
2307                NotificationOperation::Update,
2308                NotificationInfo::ObjectGroup(PbObjectGroup {
2309                    objects: relation_infos,
2310                }),
2311            )
2312            .await;
2313
2314        Ok((props.into_iter().collect(), sink_id))
2315    }
2316
2317    pub async fn update_fragment_rate_limit_by_fragment_id(
2318        &self,
2319        fragment_id: FragmentId,
2320        rate_limit: Option<u32>,
2321    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2322        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2323                                 stream_node: &mut PbStreamNode| {
2324            let mut found = false;
2325            if fragment_type_mask.contains_any(
2326                FragmentTypeFlag::dml_rate_limit_fragments()
2327                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2328                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2329                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2330            ) {
2331                visit_stream_node_mut(stream_node, |node| {
2332                    if let PbNodeBody::Dml(node) = node {
2333                        node.rate_limit = rate_limit;
2334                        found = true;
2335                    }
2336                    if let PbNodeBody::Sink(node) = node {
2337                        node.rate_limit = rate_limit;
2338                        found = true;
2339                    }
2340                    if let PbNodeBody::StreamCdcScan(node) = node {
2341                        node.rate_limit = rate_limit;
2342                        found = true;
2343                    }
2344                    if let PbNodeBody::StreamScan(node) = node {
2345                        node.rate_limit = rate_limit;
2346                        found = true;
2347                    }
2348                    if let PbNodeBody::SourceBackfill(node) = node {
2349                        node.rate_limit = rate_limit;
2350                        found = true;
2351                    }
2352                });
2353            }
2354            found
2355        };
2356        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2357            .await
2358    }
2359
2360    /// Note: `FsFetch` created in old versions are not included.
2361    /// Since this is only used for debugging, it should be fine.
2362    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2363        let inner = self.inner.read().await;
2364        let txn = inner.db.begin().await?;
2365
2366        let fragments: Vec<(FragmentId, JobId, i32, StreamNode)> = Fragment::find()
2367            .select_only()
2368            .columns([
2369                fragment::Column::FragmentId,
2370                fragment::Column::JobId,
2371                fragment::Column::FragmentTypeMask,
2372                fragment::Column::StreamNode,
2373            ])
2374            .filter(FragmentTypeMask::intersects_any(
2375                FragmentTypeFlag::rate_limit_fragments(),
2376            ))
2377            .into_tuple()
2378            .all(&txn)
2379            .await?;
2380
2381        let mut rate_limits = Vec::new();
2382        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2383            let stream_node = stream_node.to_protobuf();
2384            visit_stream_node_body(&stream_node, |node| {
2385                let mut rate_limit = None;
2386                let mut node_name = None;
2387
2388                match node {
2389                    // source rate limit
2390                    PbNodeBody::Source(node) => {
2391                        if let Some(node_inner) = &node.source_inner {
2392                            rate_limit = node_inner.rate_limit;
2393                            node_name = Some("SOURCE");
2394                        }
2395                    }
2396                    PbNodeBody::StreamFsFetch(node) => {
2397                        if let Some(node_inner) = &node.node_inner {
2398                            rate_limit = node_inner.rate_limit;
2399                            node_name = Some("FS_FETCH");
2400                        }
2401                    }
2402                    // backfill rate limit
2403                    PbNodeBody::SourceBackfill(node) => {
2404                        rate_limit = node.rate_limit;
2405                        node_name = Some("SOURCE_BACKFILL");
2406                    }
2407                    PbNodeBody::StreamScan(node) => {
2408                        rate_limit = node.rate_limit;
2409                        node_name = Some("STREAM_SCAN");
2410                    }
2411                    PbNodeBody::StreamCdcScan(node) => {
2412                        rate_limit = node.rate_limit;
2413                        node_name = Some("STREAM_CDC_SCAN");
2414                    }
2415                    PbNodeBody::Sink(node) => {
2416                        rate_limit = node.rate_limit;
2417                        node_name = Some("SINK");
2418                    }
2419                    _ => {}
2420                }
2421
2422                if let Some(rate_limit) = rate_limit {
2423                    rate_limits.push(RateLimitInfo {
2424                        fragment_id,
2425                        job_id,
2426                        fragment_type_mask: fragment_type_mask as u32,
2427                        rate_limit,
2428                        node_name: node_name.unwrap().to_owned(),
2429                    });
2430                }
2431            });
2432        }
2433
2434        Ok(rate_limits)
2435    }
2436}
2437
2438fn validate_sink_props(sink: &sink::Model, props: &BTreeMap<String, String>) -> MetaResult<()> {
2439    // Validate that props can be altered
2440    match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2441        Some(connector) => {
2442            let connector_type = connector.to_lowercase();
2443            let field_names: Vec<String> = props.keys().cloned().collect();
2444            check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2445                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2446
2447            match_sink_name_str!(
2448                connector_type.as_str(),
2449                SinkType,
2450                {
2451                    let mut new_props = sink.properties.0.clone();
2452                    new_props.extend(props.clone());
2453                    SinkType::validate_alter_config(&new_props)
2454                },
2455                |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2456            )?
2457        }
2458        None => {
2459            return Err(
2460                SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2461            );
2462        }
2463    };
2464    Ok(())
2465}
2466
2467fn update_stmt_with_props(
2468    with_properties: &mut Vec<SqlOption>,
2469    props: &BTreeMap<String, String>,
2470) -> MetaResult<()> {
2471    let mut new_sql_options = with_properties
2472        .iter()
2473        .map(|sql_option| (&sql_option.name, sql_option))
2474        .collect::<IndexMap<_, _>>();
2475    let add_sql_options = props
2476        .iter()
2477        .map(|(k, v)| SqlOption::try_from((k, v)))
2478        .collect::<Result<Vec<SqlOption>, ParserError>>()
2479        .map_err(|e| SinkError::Config(anyhow!(e)))?;
2480    new_sql_options.extend(
2481        add_sql_options
2482            .iter()
2483            .map(|sql_option| (&sql_option.name, sql_option)),
2484    );
2485    *with_properties = new_sql_options.into_values().cloned().collect();
2486    Ok(())
2487}
2488
2489async fn update_sink_fragment_props(
2490    txn: &DatabaseTransaction,
2491    sink_id: SinkId,
2492    props: BTreeMap<String, String>,
2493) -> MetaResult<()> {
2494    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2495        .select_only()
2496        .columns([
2497            fragment::Column::FragmentId,
2498            fragment::Column::FragmentTypeMask,
2499            fragment::Column::StreamNode,
2500        ])
2501        .filter(fragment::Column::JobId.eq(sink_id))
2502        .into_tuple()
2503        .all(txn)
2504        .await?;
2505    let fragments = fragments
2506        .into_iter()
2507        .filter(|(_, fragment_type_mask, _)| {
2508            *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
2509        })
2510        .filter_map(|(id, _, stream_node)| {
2511            let mut stream_node = stream_node.to_protobuf();
2512            let mut found = false;
2513            visit_stream_node_mut(&mut stream_node, |node| {
2514                if let PbNodeBody::Sink(node) = node
2515                    && let Some(sink_desc) = &mut node.sink_desc
2516                    && sink_desc.id == sink_id
2517                {
2518                    sink_desc.properties.extend(props.clone());
2519                    found = true;
2520                }
2521            });
2522            if found { Some((id, stream_node)) } else { None }
2523        })
2524        .collect_vec();
2525    assert!(
2526        !fragments.is_empty(),
2527        "sink id should be used by at least one fragment"
2528    );
2529    for (id, stream_node) in fragments {
2530        fragment::ActiveModel {
2531            fragment_id: Set(id),
2532            stream_node: Set(StreamNode::from(&stream_node)),
2533            ..Default::default()
2534        }
2535        .update(txn)
2536        .await?;
2537    }
2538    Ok(())
2539}
2540
2541pub struct SinkIntoTableContext {
2542    /// For alter table (e.g., add column), this is the list of existing sink ids
2543    /// otherwise empty.
2544    pub updated_sink_catalogs: Vec<SinkId>,
2545}
2546
2547pub struct FinishAutoRefreshSchemaSinkContext {
2548    pub tmp_sink_id: SinkId,
2549    pub original_sink_id: SinkId,
2550    pub columns: Vec<PbColumnCatalog>,
2551    pub new_log_store_table: Option<(TableId, Vec<PbColumnCatalog>)>,
2552}
2553
2554async fn update_connector_props_fragments<F>(
2555    txn: &DatabaseTransaction,
2556    job_ids: Vec<JobId>,
2557    expect_flag: FragmentTypeFlag,
2558    mut alter_stream_node_fn: F,
2559    is_shared_source: bool,
2560) -> MetaResult<()>
2561where
2562    F: FnMut(&mut PbNodeBody, &mut bool),
2563{
2564    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2565        .select_only()
2566        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2567        .filter(
2568            fragment::Column::JobId
2569                .is_in(job_ids.clone())
2570                .and(FragmentTypeMask::intersects(expect_flag)),
2571        )
2572        .into_tuple()
2573        .all(txn)
2574        .await?;
2575    let fragments = fragments
2576        .into_iter()
2577        .filter_map(|(id, stream_node)| {
2578            let mut stream_node = stream_node.to_protobuf();
2579            let mut found = false;
2580            visit_stream_node_mut(&mut stream_node, |node| {
2581                alter_stream_node_fn(node, &mut found);
2582            });
2583            if found { Some((id, stream_node)) } else { None }
2584        })
2585        .collect_vec();
2586    if is_shared_source || job_ids.len() > 1 {
2587        // the first element is the source_id or associated table_id
2588        // if the source is non-shared, there is no updated fragments
2589        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
2590        assert!(
2591            !fragments.is_empty(),
2592            "job ids {:?} (type: {:?}) should be used by at least one fragment",
2593            job_ids,
2594            expect_flag
2595        );
2596    }
2597
2598    for (id, stream_node) in fragments {
2599        fragment::ActiveModel {
2600            fragment_id: Set(id),
2601            stream_node: Set(StreamNode::from(&stream_node)),
2602            ..Default::default()
2603        }
2604        .update(txn)
2605        .await?;
2606    }
2607
2608    Ok(())
2609}