risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::config::DefaultParallelism;
22use risingwave_common::hash::VnodeCountCompat;
23use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
24use risingwave_common::{bail, current_cluster_version};
25use risingwave_connector::sink::file_sink::fs::FsSink;
26use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
27use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
28use risingwave_meta_model::actor::ActorStatus;
29use risingwave_meta_model::object::ObjectType;
30use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
31use risingwave_meta_model::table::TableType;
32use risingwave_meta_model::*;
33use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
34use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
35use risingwave_pb::catalog::{PbCreateType, PbTable};
36use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
37use risingwave_pb::meta::object::PbObjectInfo;
38use risingwave_pb::meta::subscribe_response::{
39    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
40};
41use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
42use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
43use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
44use risingwave_pb::stream_plan::stream_node::PbNodeBody;
45use risingwave_pb::stream_plan::{FragmentTypeFlag, PbFragmentTypeFlag, PbStreamNode};
46use risingwave_pb::user::PbUserInfo;
47use risingwave_sqlparser::ast::{SqlOption, Statement};
48use risingwave_sqlparser::parser::{Parser, ParserError};
49use sea_orm::ActiveValue::Set;
50use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
51use sea_orm::{
52    ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
53    IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
54    RelationTrait, TransactionTrait,
55};
56use thiserror_ext::AsReport;
57
58use super::rename::IndexItemRewriter;
59use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
60use crate::controller::ObjectModel;
61use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
62use crate::controller::utils::{
63    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
64    check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
65    get_fragment_mappings, get_internal_tables_by_id, insert_fragment_relations,
66    rebuild_fragment_mapping_from_actors,
67};
68use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
69use crate::model::{
70    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
71    StreamJobFragmentsToCreate, TableParallelism,
72};
73use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
74use crate::{MetaError, MetaResult};
75
76impl CatalogController {
77    pub async fn create_streaming_job_obj(
78        txn: &DatabaseTransaction,
79        obj_type: ObjectType,
80        owner_id: UserId,
81        database_id: Option<DatabaseId>,
82        schema_id: Option<SchemaId>,
83        create_type: PbCreateType,
84        ctx: &StreamContext,
85        streaming_parallelism: StreamingParallelism,
86        max_parallelism: usize,
87        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
88    ) -> MetaResult<ObjectId> {
89        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
90        let job = streaming_job::ActiveModel {
91            job_id: Set(obj.oid),
92            job_status: Set(JobStatus::Initial),
93            create_type: Set(create_type.into()),
94            timezone: Set(ctx.timezone.clone()),
95            parallelism: Set(streaming_parallelism),
96            max_parallelism: Set(max_parallelism as _),
97            specific_resource_group: Set(specific_resource_group),
98        };
99        job.insert(txn).await?;
100
101        Ok(obj.oid)
102    }
103
104    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
105    /// materialized view.
106    ///
107    /// Some of the fields in the given streaming job are placeholders, which will
108    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
109    #[await_tree::instrument]
110    pub async fn create_job_catalog(
111        &self,
112        streaming_job: &mut StreamingJob,
113        ctx: &StreamContext,
114        parallelism: &Option<Parallelism>,
115        max_parallelism: usize,
116        mut dependencies: HashSet<ObjectId>,
117        specific_resource_group: Option<String>,
118    ) -> MetaResult<()> {
119        let inner = self.inner.write().await;
120        let txn = inner.db.begin().await?;
121        let create_type = streaming_job.create_type();
122
123        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
124            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
125            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
126            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
127        };
128
129        ensure_user_id(streaming_job.owner() as _, &txn).await?;
130        ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
131        ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
132        check_relation_name_duplicate(
133            &streaming_job.name(),
134            streaming_job.database_id() as _,
135            streaming_job.schema_id() as _,
136            &txn,
137        )
138        .await?;
139
140        // TODO(rc): pass all dependencies uniformly, deprecate `dependent_relations` and `dependent_secret_ids`.
141        dependencies.extend(
142            streaming_job
143                .dependent_relations()
144                .into_iter()
145                .map(|id| id as ObjectId),
146        );
147
148        // check if any dependency is in altering status.
149        if !dependencies.is_empty() {
150            let altering_cnt = ObjectDependency::find()
151                .join(
152                    JoinType::InnerJoin,
153                    object_dependency::Relation::Object1.def(),
154                )
155                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
156                .filter(
157                    object_dependency::Column::Oid
158                        .is_in(dependencies.clone())
159                        .and(object::Column::ObjType.eq(ObjectType::Table))
160                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
161                        .and(
162                            // It means the referring table is just dummy for altering.
163                            object::Column::Oid.not_in_subquery(
164                                Query::select()
165                                    .column(table::Column::TableId)
166                                    .from(Table)
167                                    .to_owned(),
168                            ),
169                        ),
170                )
171                .count(&txn)
172                .await?;
173            if altering_cnt != 0 {
174                return Err(MetaError::permission_denied(
175                    "some dependent relations are being altered",
176                ));
177            }
178        }
179
180        match streaming_job {
181            StreamingJob::MaterializedView(table) => {
182                let job_id = Self::create_streaming_job_obj(
183                    &txn,
184                    ObjectType::Table,
185                    table.owner as _,
186                    Some(table.database_id as _),
187                    Some(table.schema_id as _),
188                    create_type,
189                    ctx,
190                    streaming_parallelism,
191                    max_parallelism,
192                    specific_resource_group,
193                )
194                .await?;
195                table.id = job_id as _;
196                let table_model: table::ActiveModel = table.clone().into();
197                Table::insert(table_model).exec(&txn).await?;
198            }
199            StreamingJob::Sink(sink, _) => {
200                if let Some(target_table_id) = sink.target_table {
201                    if check_sink_into_table_cycle(
202                        target_table_id as ObjectId,
203                        dependencies.iter().cloned().collect(),
204                        &txn,
205                    )
206                    .await?
207                    {
208                        bail!("Creating such a sink will result in circular dependency.");
209                    }
210                }
211
212                let job_id = Self::create_streaming_job_obj(
213                    &txn,
214                    ObjectType::Sink,
215                    sink.owner as _,
216                    Some(sink.database_id as _),
217                    Some(sink.schema_id as _),
218                    create_type,
219                    ctx,
220                    streaming_parallelism,
221                    max_parallelism,
222                    specific_resource_group,
223                )
224                .await?;
225                sink.id = job_id as _;
226                let sink_model: sink::ActiveModel = sink.clone().into();
227                Sink::insert(sink_model).exec(&txn).await?;
228            }
229            StreamingJob::Table(src, table, _) => {
230                let job_id = Self::create_streaming_job_obj(
231                    &txn,
232                    ObjectType::Table,
233                    table.owner as _,
234                    Some(table.database_id as _),
235                    Some(table.schema_id as _),
236                    create_type,
237                    ctx,
238                    streaming_parallelism,
239                    max_parallelism,
240                    specific_resource_group,
241                )
242                .await?;
243                table.id = job_id as _;
244                if let Some(src) = src {
245                    let src_obj = Self::create_object(
246                        &txn,
247                        ObjectType::Source,
248                        src.owner as _,
249                        Some(src.database_id as _),
250                        Some(src.schema_id as _),
251                    )
252                    .await?;
253                    src.id = src_obj.oid as _;
254                    src.optional_associated_table_id =
255                        Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
256                    table.optional_associated_source_id = Some(
257                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
258                    );
259                    let source: source::ActiveModel = src.clone().into();
260                    Source::insert(source).exec(&txn).await?;
261                }
262                let table_model: table::ActiveModel = table.clone().into();
263                Table::insert(table_model).exec(&txn).await?;
264            }
265            StreamingJob::Index(index, table) => {
266                ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
267                let job_id = Self::create_streaming_job_obj(
268                    &txn,
269                    ObjectType::Index,
270                    index.owner as _,
271                    Some(index.database_id as _),
272                    Some(index.schema_id as _),
273                    create_type,
274                    ctx,
275                    streaming_parallelism,
276                    max_parallelism,
277                    specific_resource_group,
278                )
279                .await?;
280                // to be compatible with old implementation.
281                index.id = job_id as _;
282                index.index_table_id = job_id as _;
283                table.id = job_id as _;
284
285                ObjectDependency::insert(object_dependency::ActiveModel {
286                    oid: Set(index.primary_table_id as _),
287                    used_by: Set(table.id as _),
288                    ..Default::default()
289                })
290                .exec(&txn)
291                .await?;
292
293                let table_model: table::ActiveModel = table.clone().into();
294                Table::insert(table_model).exec(&txn).await?;
295                let index_model: index::ActiveModel = index.clone().into();
296                Index::insert(index_model).exec(&txn).await?;
297            }
298            StreamingJob::Source(src) => {
299                let job_id = Self::create_streaming_job_obj(
300                    &txn,
301                    ObjectType::Source,
302                    src.owner as _,
303                    Some(src.database_id as _),
304                    Some(src.schema_id as _),
305                    create_type,
306                    ctx,
307                    streaming_parallelism,
308                    max_parallelism,
309                    specific_resource_group,
310                )
311                .await?;
312                src.id = job_id as _;
313                let source_model: source::ActiveModel = src.clone().into();
314                Source::insert(source_model).exec(&txn).await?;
315            }
316        }
317
318        // collect dependent secrets.
319        dependencies.extend(
320            streaming_job
321                .dependent_secret_ids()?
322                .into_iter()
323                .map(|secret_id| secret_id as ObjectId),
324        );
325        // collect dependent connection
326        dependencies.extend(
327            streaming_job
328                .dependent_connection_ids()?
329                .into_iter()
330                .map(|conn_id| conn_id as ObjectId),
331        );
332
333        // record object dependency.
334        if !dependencies.is_empty() {
335            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336                object_dependency::ActiveModel {
337                    oid: Set(oid),
338                    used_by: Set(streaming_job.id() as _),
339                    ..Default::default()
340                }
341            }))
342            .exec(&txn)
343            .await?;
344        }
345
346        txn.commit().await?;
347
348        Ok(())
349    }
350
351    /// Create catalogs for internal tables, then notify frontend about them if the job is a
352    /// materialized view.
353    ///
354    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
355    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
356    ///
357    /// Returns a mapping from the temporary table id to the actual global table id.
358    pub async fn create_internal_table_catalog(
359        &self,
360        job: &StreamingJob,
361        mut incomplete_internal_tables: Vec<PbTable>,
362    ) -> MetaResult<HashMap<u32, u32>> {
363        let job_id = job.id() as ObjectId;
364        let inner = self.inner.write().await;
365        let txn = inner.db.begin().await?;
366        let mut table_id_map = HashMap::new();
367        for table in &mut incomplete_internal_tables {
368            let table_id = Self::create_object(
369                &txn,
370                ObjectType::Table,
371                table.owner as _,
372                Some(table.database_id as _),
373                Some(table.schema_id as _),
374            )
375            .await?
376            .oid;
377            table_id_map.insert(table.id, table_id as u32);
378            table.id = table_id as _;
379            table.job_id = Some(job_id as _);
380
381            let table_model = table::ActiveModel {
382                table_id: Set(table_id as _),
383                belongs_to_job_id: Set(Some(job_id)),
384                fragment_id: NotSet,
385                ..table.clone().into()
386            };
387            Table::insert(table_model).exec(&txn).await?;
388        }
389        txn.commit().await?;
390
391        Ok(table_id_map)
392    }
393
394    // TODO: In this function, we also update the `Table` model in the meta store.
395    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
396    // making them the source of truth and performing a full replacement for those in the meta store?
397    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
398    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" })]
399    pub async fn prepare_streaming_job(
400        &self,
401        stream_job_fragments: &StreamJobFragmentsToCreate,
402        streaming_job: &StreamingJob,
403        for_replace: bool,
404    ) -> MetaResult<()> {
405        let is_materialized_view = streaming_job.is_materialized_view();
406        let fragment_actors =
407            Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
408        let mut all_tables = stream_job_fragments.all_tables();
409        let inner = self.inner.write().await;
410
411        let mut objects = vec![];
412        let txn = inner.db.begin().await?;
413
414        // Add fragments.
415        let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
416        for fragment in fragments {
417            let fragment_id = fragment.fragment_id;
418            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
419
420            let fragment = fragment.into_active_model();
421            Fragment::insert(fragment).exec(&txn).await?;
422
423            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
424            // After table fragments are created, update them for all tables.
425            if !for_replace {
426                for state_table_id in state_table_ids {
427                    // Table's vnode count is not always the fragment's vnode count, so we have to
428                    // look up the table from `TableFragments`.
429                    // See `ActorGraphBuilder::new`.
430                    let table = all_tables
431                        .get_mut(&(state_table_id as u32))
432                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
433                    assert_eq!(table.id, state_table_id as u32);
434                    assert_eq!(table.fragment_id, fragment_id as u32);
435                    table.job_id = Some(streaming_job.id());
436                    let vnode_count = table.vnode_count();
437
438                    table::ActiveModel {
439                        table_id: Set(state_table_id as _),
440                        fragment_id: Set(Some(fragment_id)),
441                        vnode_count: Set(vnode_count as _),
442                        ..Default::default()
443                    }
444                    .update(&txn)
445                    .await?;
446
447                    if is_materialized_view {
448                        objects.push(PbObject {
449                            object_info: Some(PbObjectInfo::Table(table.clone())),
450                        });
451                    }
452                }
453            }
454        }
455
456        insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
457
458        // Add actors and actor dispatchers.
459        for actors in actors {
460            for actor in actors {
461                let actor = actor.into_active_model();
462                Actor::insert(actor).exec(&txn).await?;
463            }
464        }
465
466        if !for_replace {
467            // Update dml fragment id.
468            if let StreamingJob::Table(_, table, ..) = streaming_job {
469                Table::update(table::ActiveModel {
470                    table_id: Set(table.id as _),
471                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
472                    ..Default::default()
473                })
474                .exec(&txn)
475                .await?;
476            }
477        }
478
479        txn.commit().await?;
480
481        if !objects.is_empty() {
482            assert!(is_materialized_view);
483            self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
484                .await;
485        }
486
487        Ok(())
488    }
489
490    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
491    /// It returns (true, _) if the job is not found or aborted.
492    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
493    #[await_tree::instrument]
494    pub async fn try_abort_creating_streaming_job(
495        &self,
496        job_id: ObjectId,
497        is_cancelled: bool,
498    ) -> MetaResult<(bool, Option<DatabaseId>)> {
499        let mut inner = self.inner.write().await;
500        let txn = inner.db.begin().await?;
501
502        let obj = Object::find_by_id(job_id).one(&txn).await?;
503        let Some(obj) = obj else {
504            tracing::warn!(
505                id = job_id,
506                "streaming job not found when aborting creating, might be cleaned by recovery"
507            );
508            return Ok((true, None));
509        };
510        let database_id = obj
511            .database_id
512            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
513
514        if !is_cancelled {
515            let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
516            if let Some(streaming_job) = streaming_job {
517                assert_ne!(streaming_job.job_status, JobStatus::Created);
518                if streaming_job.create_type == CreateType::Background
519                    && streaming_job.job_status == JobStatus::Creating
520                {
521                    // If the job is created in background and still in creating status, we should not abort it and let recovery to handle it.
522                    tracing::warn!(
523                        id = job_id,
524                        "streaming job is created in background and still in creating status"
525                    );
526                    return Ok((false, Some(database_id)));
527                }
528            }
529        }
530
531        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
532
533        // Get the notification info if the job is a materialized view.
534        let table_obj = Table::find_by_id(job_id).one(&txn).await?;
535        let mut objs = vec![];
536        if let Some(table) = &table_obj
537            && table.table_type == TableType::MaterializedView
538        {
539            let obj: Option<PartialObject> = Object::find_by_id(job_id)
540                .select_only()
541                .columns([
542                    object::Column::Oid,
543                    object::Column::ObjType,
544                    object::Column::SchemaId,
545                    object::Column::DatabaseId,
546                ])
547                .into_partial_model()
548                .one(&txn)
549                .await?;
550            let obj =
551                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
552            objs.push(obj);
553            let internal_table_objs: Vec<PartialObject> = Object::find()
554                .select_only()
555                .columns([
556                    object::Column::Oid,
557                    object::Column::ObjType,
558                    object::Column::SchemaId,
559                    object::Column::DatabaseId,
560                ])
561                .join(JoinType::InnerJoin, object::Relation::Table.def())
562                .filter(table::Column::BelongsToJobId.eq(job_id))
563                .into_partial_model()
564                .all(&txn)
565                .await?;
566            objs.extend(internal_table_objs);
567        }
568
569        // Check if the job is creating sink into table.
570        if table_obj.is_none()
571            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
572                .select_only()
573                .column(sink::Column::TargetTable)
574                .into_tuple::<Option<TableId>>()
575                .one(&txn)
576                .await?
577        {
578            let tmp_id: Option<ObjectId> = ObjectDependency::find()
579                .select_only()
580                .column(object_dependency::Column::UsedBy)
581                .join(
582                    JoinType::InnerJoin,
583                    object_dependency::Relation::Object1.def(),
584                )
585                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
586                .filter(
587                    object_dependency::Column::Oid
588                        .eq(target_table_id)
589                        .and(object::Column::ObjType.eq(ObjectType::Table))
590                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
591                )
592                .into_tuple()
593                .one(&txn)
594                .await?;
595            if let Some(tmp_id) = tmp_id {
596                tracing::warn!(
597                    id = tmp_id,
598                    "aborting temp streaming job for sink into table"
599                );
600                Object::delete_by_id(tmp_id).exec(&txn).await?;
601            }
602        }
603
604        Object::delete_by_id(job_id).exec(&txn).await?;
605        if !internal_table_ids.is_empty() {
606            Object::delete_many()
607                .filter(object::Column::Oid.is_in(internal_table_ids))
608                .exec(&txn)
609                .await?;
610        }
611        if let Some(t) = &table_obj
612            && let Some(source_id) = t.optional_associated_source_id
613        {
614            Object::delete_by_id(source_id).exec(&txn).await?;
615        }
616
617        let err = if is_cancelled {
618            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
619        } else {
620            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
621        };
622        let abort_reason = format!("streaming job aborted {}", err.as_report());
623        for tx in inner
624            .creating_table_finish_notifier
625            .get_mut(&database_id)
626            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
627            .into_iter()
628            .flatten()
629            .flatten()
630        {
631            let _ = tx.send(Err(abort_reason.clone()));
632        }
633        txn.commit().await?;
634
635        if !objs.is_empty() {
636            // We also have notified the frontend about these objects,
637            // so we need to notify the frontend to delete them here.
638            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
639                .await;
640        }
641        Ok((true, Some(database_id)))
642    }
643
644    #[await_tree::instrument]
645    pub async fn post_collect_job_fragments(
646        &self,
647        job_id: ObjectId,
648        actor_ids: Vec<crate::model::ActorId>,
649        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
650        split_assignment: &SplitAssignment,
651    ) -> MetaResult<()> {
652        self.post_collect_job_fragments_inner(
653            job_id,
654            actor_ids,
655            upstream_fragment_new_downstreams,
656            split_assignment,
657            false,
658        )
659        .await
660    }
661
662    pub async fn post_collect_job_fragments_inner(
663        &self,
664        job_id: ObjectId,
665        actor_ids: Vec<crate::model::ActorId>,
666        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
667        split_assignment: &SplitAssignment,
668        is_mv: bool,
669    ) -> MetaResult<()> {
670        let inner = self.inner.write().await;
671        let txn = inner.db.begin().await?;
672
673        Actor::update_many()
674            .col_expr(
675                actor::Column::Status,
676                SimpleExpr::from(ActorStatus::Running.into_value()),
677            )
678            .filter(
679                actor::Column::ActorId
680                    .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
681            )
682            .exec(&txn)
683            .await?;
684
685        for splits in split_assignment.values() {
686            for (actor_id, splits) in splits {
687                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
688                let connector_splits = &PbConnectorSplits { splits };
689                actor::ActiveModel {
690                    actor_id: Set(*actor_id as _),
691                    splits: Set(Some(connector_splits.into())),
692                    ..Default::default()
693                }
694                .update(&txn)
695                .await?;
696            }
697        }
698
699        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
700
701        // Mark job as CREATING.
702        streaming_job::ActiveModel {
703            job_id: Set(job_id),
704            job_status: Set(JobStatus::Creating),
705            ..Default::default()
706        }
707        .update(&txn)
708        .await?;
709
710        let fragment_mapping = if is_mv {
711            get_fragment_mappings(&txn, job_id as _).await?
712        } else {
713            vec![]
714        };
715
716        txn.commit().await?;
717        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
718            .await;
719
720        Ok(())
721    }
722
723    pub async fn create_job_catalog_for_replace(
724        &self,
725        streaming_job: &StreamingJob,
726        ctx: &StreamContext,
727        specified_parallelism: &Option<NonZeroUsize>,
728        max_parallelism: usize,
729    ) -> MetaResult<ObjectId> {
730        let id = streaming_job.id();
731        let inner = self.inner.write().await;
732        let txn = inner.db.begin().await?;
733
734        // 1. check version.
735        streaming_job.verify_version_for_replace(&txn).await?;
736        // 2. check concurrent replace.
737        let referring_cnt = ObjectDependency::find()
738            .join(
739                JoinType::InnerJoin,
740                object_dependency::Relation::Object1.def(),
741            )
742            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
743            .filter(
744                object_dependency::Column::Oid
745                    .eq(id as ObjectId)
746                    .and(object::Column::ObjType.eq(ObjectType::Table))
747                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
748            )
749            .count(&txn)
750            .await?;
751        if referring_cnt != 0 {
752            return Err(MetaError::permission_denied(
753                "job is being altered or referenced by some creating jobs",
754            ));
755        }
756
757        // 3. check parallelism.
758        let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
759            .select_only()
760            .column(streaming_job::Column::MaxParallelism)
761            .into_tuple()
762            .one(&txn)
763            .await?
764            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
765
766        if original_max_parallelism != max_parallelism as i32 {
767            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
768            // This should not happen in normal cases.
769            bail!(
770                "cannot use a different max parallelism \
771                 when replacing streaming job, \
772                 original: {}, new: {}",
773                original_max_parallelism,
774                max_parallelism
775            );
776        }
777
778        let parallelism = match specified_parallelism {
779            None => StreamingParallelism::Adaptive,
780            Some(n) => StreamingParallelism::Fixed(n.get() as _),
781        };
782
783        // 4. create streaming object for new replace table.
784        let new_obj_id = Self::create_streaming_job_obj(
785            &txn,
786            streaming_job.object_type(),
787            streaming_job.owner() as _,
788            Some(streaming_job.database_id() as _),
789            Some(streaming_job.schema_id() as _),
790            streaming_job.create_type(),
791            ctx,
792            parallelism,
793            max_parallelism,
794            None,
795        )
796        .await?;
797
798        // 5. record dependency for new replace table.
799        ObjectDependency::insert(object_dependency::ActiveModel {
800            oid: Set(id as _),
801            used_by: Set(new_obj_id as _),
802            ..Default::default()
803        })
804        .exec(&txn)
805        .await?;
806
807        txn.commit().await?;
808
809        Ok(new_obj_id)
810    }
811
812    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
813    pub async fn finish_streaming_job(
814        &self,
815        job_id: ObjectId,
816        replace_stream_job_info: Option<ReplaceStreamJobPlan>,
817    ) -> MetaResult<()> {
818        let mut inner = self.inner.write().await;
819        let txn = inner.db.begin().await?;
820
821        let job_type = Object::find_by_id(job_id)
822            .select_only()
823            .column(object::Column::ObjType)
824            .into_tuple()
825            .one(&txn)
826            .await?
827            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
828
829        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
830        let res = Object::update_many()
831            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
832            .col_expr(
833                object::Column::CreatedAtClusterVersion,
834                current_cluster_version().into(),
835            )
836            .filter(object::Column::Oid.eq(job_id))
837            .exec(&txn)
838            .await?;
839        if res.rows_affected == 0 {
840            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
841        }
842
843        // mark the target stream job as `Created`.
844        let job = streaming_job::ActiveModel {
845            job_id: Set(job_id),
846            job_status: Set(JobStatus::Created),
847            ..Default::default()
848        };
849        job.update(&txn).await?;
850
851        // notify frontend: job, internal tables.
852        let internal_table_objs = Table::find()
853            .find_also_related(Object)
854            .filter(table::Column::BelongsToJobId.eq(job_id))
855            .all(&txn)
856            .await?;
857        let mut objects = internal_table_objs
858            .iter()
859            .map(|(table, obj)| PbObject {
860                object_info: Some(PbObjectInfo::Table(
861                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
862                )),
863            })
864            .collect_vec();
865        let mut notification_op = NotificationOperation::Add;
866
867        match job_type {
868            ObjectType::Table => {
869                let (table, obj) = Table::find_by_id(job_id)
870                    .find_also_related(Object)
871                    .one(&txn)
872                    .await?
873                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
874                if table.table_type == TableType::MaterializedView {
875                    notification_op = NotificationOperation::Update;
876                }
877
878                if let Some(source_id) = table.optional_associated_source_id {
879                    let (src, obj) = Source::find_by_id(source_id)
880                        .find_also_related(Object)
881                        .one(&txn)
882                        .await?
883                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
884                    objects.push(PbObject {
885                        object_info: Some(PbObjectInfo::Source(
886                            ObjectModel(src, obj.unwrap()).into(),
887                        )),
888                    });
889                }
890                objects.push(PbObject {
891                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
892                });
893            }
894            ObjectType::Sink => {
895                let (sink, obj) = Sink::find_by_id(job_id)
896                    .find_also_related(Object)
897                    .one(&txn)
898                    .await?
899                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
900                objects.push(PbObject {
901                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
902                });
903            }
904            ObjectType::Index => {
905                let (index, obj) = Index::find_by_id(job_id)
906                    .find_also_related(Object)
907                    .one(&txn)
908                    .await?
909                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
910                {
911                    let (table, obj) = Table::find_by_id(index.index_table_id)
912                        .find_also_related(Object)
913                        .one(&txn)
914                        .await?
915                        .ok_or_else(|| {
916                            MetaError::catalog_id_not_found("table", index.index_table_id)
917                        })?;
918                    objects.push(PbObject {
919                        object_info: Some(PbObjectInfo::Table(
920                            ObjectModel(table, obj.unwrap()).into(),
921                        )),
922                    });
923                }
924                objects.push(PbObject {
925                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
926                });
927            }
928            ObjectType::Source => {
929                let (source, obj) = Source::find_by_id(job_id)
930                    .find_also_related(Object)
931                    .one(&txn)
932                    .await?
933                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
934                objects.push(PbObject {
935                    object_info: Some(PbObjectInfo::Source(
936                        ObjectModel(source, obj.unwrap()).into(),
937                    )),
938                });
939            }
940            _ => unreachable!("invalid job type: {:?}", job_type),
941        }
942
943        let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
944
945        let replace_table_mapping_update = match replace_stream_job_info {
946            Some(ReplaceStreamJobPlan {
947                streaming_job,
948                replace_upstream,
949                tmp_id,
950                ..
951            }) => {
952                let incoming_sink_id = job_id;
953
954                let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
955                    tmp_id as ObjectId,
956                    replace_upstream,
957                    SinkIntoTableContext {
958                        creating_sink_id: Some(incoming_sink_id as _),
959                        dropping_sink_id: None,
960                        updated_sink_catalogs: vec![],
961                    },
962                    &txn,
963                    streaming_job,
964                    None, // will not drop table connector when creating a streaming job
965                )
966                .await?;
967
968                Some((relations, fragment_mapping))
969            }
970            None => None,
971        };
972
973        txn.commit().await?;
974
975        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
976            .await;
977
978        let mut version = self
979            .notify_frontend(
980                notification_op,
981                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
982            )
983            .await;
984
985        if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
986            self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
987                .await;
988            version = self
989                .notify_frontend(
990                    NotificationOperation::Update,
991                    NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
992                )
993                .await;
994        }
995        inner
996            .creating_table_finish_notifier
997            .values_mut()
998            .for_each(|creating_tables| {
999                if let Some(txs) = creating_tables.remove(&job_id) {
1000                    for tx in txs {
1001                        let _ = tx.send(Ok(version));
1002                    }
1003                }
1004            });
1005
1006        Ok(())
1007    }
1008
1009    pub async fn finish_replace_streaming_job(
1010        &self,
1011        tmp_id: ObjectId,
1012        streaming_job: StreamingJob,
1013        replace_upstream: FragmentReplaceUpstream,
1014        sink_into_table_context: SinkIntoTableContext,
1015        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1016    ) -> MetaResult<NotificationVersion> {
1017        let inner = self.inner.write().await;
1018        let txn = inner.db.begin().await?;
1019
1020        let (objects, fragment_mapping, delete_notification_objs) =
1021            Self::finish_replace_streaming_job_inner(
1022                tmp_id,
1023                replace_upstream,
1024                sink_into_table_context,
1025                &txn,
1026                streaming_job,
1027                drop_table_connector_ctx,
1028            )
1029            .await?;
1030
1031        txn.commit().await?;
1032
1033        // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
1034        // catalog and need to access the old fragment. Let frontend nodes delete the old fragment
1035        // when they receive table catalog change.
1036        // self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
1037        //     .await;
1038        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1039            .await;
1040        let mut version = self
1041            .notify_frontend(
1042                NotificationOperation::Update,
1043                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1044            )
1045            .await;
1046
1047        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1048            self.notify_users_update(user_infos).await;
1049            version = self
1050                .notify_frontend(
1051                    NotificationOperation::Delete,
1052                    build_object_group_for_delete(to_drop_objects),
1053                )
1054                .await;
1055        }
1056
1057        Ok(version)
1058    }
1059
1060    pub async fn finish_replace_streaming_job_inner(
1061        tmp_id: ObjectId,
1062        replace_upstream: FragmentReplaceUpstream,
1063        SinkIntoTableContext {
1064            creating_sink_id,
1065            dropping_sink_id,
1066            updated_sink_catalogs,
1067        }: SinkIntoTableContext,
1068        txn: &DatabaseTransaction,
1069        streaming_job: StreamingJob,
1070        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1071    ) -> MetaResult<(
1072        Vec<PbObject>,
1073        Vec<PbFragmentWorkerSlotMapping>,
1074        Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1075    )> {
1076        let original_job_id = streaming_job.id() as ObjectId;
1077        let job_type = streaming_job.job_type();
1078
1079        let mut index_item_rewriter = None;
1080
1081        // Update catalog
1082        match streaming_job {
1083            StreamingJob::Table(_source, table, _table_job_type) => {
1084                // The source catalog should remain unchanged
1085
1086                let original_column_catalogs = Table::find_by_id(original_job_id)
1087                    .select_only()
1088                    .columns([table::Column::Columns])
1089                    .into_tuple::<ColumnCatalogArray>()
1090                    .one(txn)
1091                    .await?
1092                    .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1093
1094                index_item_rewriter = Some({
1095                    let original_columns = original_column_catalogs
1096                        .to_protobuf()
1097                        .into_iter()
1098                        .map(|c| c.column_desc.unwrap())
1099                        .collect_vec();
1100                    let new_columns = table
1101                        .columns
1102                        .iter()
1103                        .map(|c| c.column_desc.clone().unwrap())
1104                        .collect_vec();
1105
1106                    IndexItemRewriter {
1107                        original_columns,
1108                        new_columns,
1109                    }
1110                });
1111
1112                // For sinks created in earlier versions, we need to set the original_target_columns.
1113                for sink_id in updated_sink_catalogs {
1114                    sink::ActiveModel {
1115                        sink_id: Set(sink_id as _),
1116                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1117                        ..Default::default()
1118                    }
1119                    .update(txn)
1120                    .await?;
1121                }
1122                // Update the table catalog with the new one. (column catalog is also updated here)
1123                let mut table = table::ActiveModel::from(table);
1124                let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1125                if let Some(sink_id) = creating_sink_id {
1126                    debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1127                    incoming_sinks.push(sink_id as _);
1128                }
1129                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1130                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1131                {
1132                    // drop table connector, the rest logic is in `drop_table_associated_source`
1133                    table.optional_associated_source_id = Set(None);
1134                }
1135
1136                if let Some(sink_id) = dropping_sink_id {
1137                    let drained = incoming_sinks
1138                        .extract_if(.., |id| *id == sink_id)
1139                        .collect_vec();
1140                    debug_assert_eq!(drained, vec![sink_id]);
1141                }
1142
1143                table.incoming_sinks = Set(incoming_sinks.into());
1144                table.update(txn).await?;
1145            }
1146            StreamingJob::Source(source) => {
1147                // Update the source catalog with the new one.
1148                let source = source::ActiveModel::from(source);
1149                source.update(txn).await?;
1150            }
1151            _ => unreachable!(
1152                "invalid streaming job type: {:?}",
1153                streaming_job.job_type_str()
1154            ),
1155        }
1156
1157        // 0. update internal tables
1158        // Fields including `fragment_id` were placeholder values before.
1159        // After table fragments are created, update them for all internal tables.
1160        let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1161            .select_only()
1162            .columns([
1163                fragment::Column::FragmentId,
1164                fragment::Column::StateTableIds,
1165            ])
1166            .filter(fragment::Column::JobId.eq(tmp_id))
1167            .into_tuple()
1168            .all(txn)
1169            .await?;
1170        for (fragment_id, state_table_ids) in fragment_info {
1171            for state_table_id in state_table_ids.into_inner() {
1172                table::ActiveModel {
1173                    table_id: Set(state_table_id as _),
1174                    fragment_id: Set(Some(fragment_id)),
1175                    // No need to update `vnode_count` because it must remain the same.
1176                    ..Default::default()
1177                }
1178                .update(txn)
1179                .await?;
1180            }
1181        }
1182
1183        // 1. replace old fragments/actors with new ones.
1184        Fragment::delete_many()
1185            .filter(fragment::Column::JobId.eq(original_job_id))
1186            .exec(txn)
1187            .await?;
1188        Fragment::update_many()
1189            .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1190            .filter(fragment::Column::JobId.eq(tmp_id))
1191            .exec(txn)
1192            .await?;
1193
1194        // 2. update merges.
1195        // update downstream fragment's Merge node, and upstream_fragment_id
1196        for (fragment_id, fragment_replace_map) in replace_upstream {
1197            let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1198                .select_only()
1199                .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1200                .into_tuple::<(FragmentId, StreamNode)>()
1201                .one(txn)
1202                .await?
1203                .map(|(id, node)| (id, node.to_protobuf()))
1204                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1205
1206            visit_stream_node_mut(&mut stream_node, |body| {
1207                if let PbNodeBody::Merge(m) = body
1208                    && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1209                {
1210                    m.upstream_fragment_id = *new_fragment_id;
1211                }
1212            });
1213            fragment::ActiveModel {
1214                fragment_id: Set(fragment_id),
1215                stream_node: Set(StreamNode::from(&stream_node)),
1216                ..Default::default()
1217            }
1218            .update(txn)
1219            .await?;
1220        }
1221
1222        // 3. remove dummy object.
1223        Object::delete_by_id(tmp_id).exec(txn).await?;
1224
1225        // 4. update catalogs and notify.
1226        let mut objects = vec![];
1227        match job_type {
1228            StreamingJobType::Table(_) => {
1229                let (table, table_obj) = Table::find_by_id(original_job_id)
1230                    .find_also_related(Object)
1231                    .one(txn)
1232                    .await?
1233                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1234                objects.push(PbObject {
1235                    object_info: Some(PbObjectInfo::Table(
1236                        ObjectModel(table, table_obj.unwrap()).into(),
1237                    )),
1238                })
1239            }
1240            StreamingJobType::Source => {
1241                let (source, source_obj) = Source::find_by_id(original_job_id)
1242                    .find_also_related(Object)
1243                    .one(txn)
1244                    .await?
1245                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1246                objects.push(PbObject {
1247                    object_info: Some(PbObjectInfo::Source(
1248                        ObjectModel(source, source_obj.unwrap()).into(),
1249                    )),
1250                })
1251            }
1252            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1253        }
1254
1255        if let Some(expr_rewriter) = index_item_rewriter {
1256            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1257                .select_only()
1258                .columns([index::Column::IndexId, index::Column::IndexItems])
1259                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1260                .into_tuple()
1261                .all(txn)
1262                .await?;
1263            for (index_id, nodes) in index_items {
1264                let mut pb_nodes = nodes.to_protobuf();
1265                pb_nodes
1266                    .iter_mut()
1267                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1268                let index = index::ActiveModel {
1269                    index_id: Set(index_id),
1270                    index_items: Set(pb_nodes.into()),
1271                    ..Default::default()
1272                }
1273                .update(txn)
1274                .await?;
1275                let index_obj = index
1276                    .find_related(Object)
1277                    .one(txn)
1278                    .await?
1279                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1280                objects.push(PbObject {
1281                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1282                });
1283            }
1284        }
1285
1286        let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1287
1288        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1289        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1290            notification_objs =
1291                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1292        }
1293
1294        Ok((objects, fragment_mapping, notification_objs))
1295    }
1296
1297    /// Abort the replacing streaming job by deleting the temporary job object.
1298    pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1299        let inner = self.inner.write().await;
1300        Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1301        Ok(())
1302    }
1303
1304    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1305    // return the actor_ids to be applied
1306    pub async fn update_source_rate_limit_by_source_id(
1307        &self,
1308        source_id: SourceId,
1309        rate_limit: Option<u32>,
1310    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1311        let inner = self.inner.read().await;
1312        let txn = inner.db.begin().await?;
1313
1314        {
1315            let active_source = source::ActiveModel {
1316                source_id: Set(source_id),
1317                rate_limit: Set(rate_limit.map(|v| v as i32)),
1318                ..Default::default()
1319            };
1320            active_source.update(&txn).await?;
1321        }
1322
1323        let (source, obj) = Source::find_by_id(source_id)
1324            .find_also_related(Object)
1325            .one(&txn)
1326            .await?
1327            .ok_or_else(|| {
1328                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1329            })?;
1330
1331        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1332        let streaming_job_ids: Vec<ObjectId> =
1333            if let Some(table_id) = source.optional_associated_table_id {
1334                vec![table_id]
1335            } else if let Some(source_info) = &source.source_info
1336                && source_info.to_protobuf().is_shared()
1337            {
1338                vec![source_id]
1339            } else {
1340                ObjectDependency::find()
1341                    .select_only()
1342                    .column(object_dependency::Column::UsedBy)
1343                    .filter(object_dependency::Column::Oid.eq(source_id))
1344                    .into_tuple()
1345                    .all(&txn)
1346                    .await?
1347            };
1348
1349        if streaming_job_ids.is_empty() {
1350            return Err(MetaError::invalid_parameter(format!(
1351                "source id {source_id} not used by any streaming job"
1352            )));
1353        }
1354
1355        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1356            .select_only()
1357            .columns([
1358                fragment::Column::FragmentId,
1359                fragment::Column::FragmentTypeMask,
1360                fragment::Column::StreamNode,
1361            ])
1362            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1363            .into_tuple()
1364            .all(&txn)
1365            .await?;
1366        let mut fragments = fragments
1367            .into_iter()
1368            .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1369            .collect_vec();
1370
1371        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1372            let mut found = false;
1373            if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1374                visit_stream_node_mut(stream_node, |node| {
1375                    if let PbNodeBody::Source(node) = node {
1376                        if let Some(node_inner) = &mut node.source_inner
1377                            && node_inner.source_id == source_id as u32
1378                        {
1379                            node_inner.rate_limit = rate_limit;
1380                            found = true;
1381                        }
1382                    }
1383                });
1384            }
1385            if is_fs_source {
1386                // in older versions, there's no fragment type flag for `FsFetch` node,
1387                // so we just scan all fragments for StreamFsFetch node if using fs connector
1388                visit_stream_node_mut(stream_node, |node| {
1389                    if let PbNodeBody::StreamFsFetch(node) = node {
1390                        *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
1391                        if let Some(node_inner) = &mut node.node_inner
1392                            && node_inner.source_id == source_id as u32
1393                        {
1394                            node_inner.rate_limit = rate_limit;
1395                            found = true;
1396                        }
1397                    }
1398                });
1399            }
1400            found
1401        });
1402
1403        assert!(
1404            !fragments.is_empty(),
1405            "source id should be used by at least one fragment"
1406        );
1407        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1408        for (id, fragment_type_mask, stream_node) in fragments {
1409            fragment::ActiveModel {
1410                fragment_id: Set(id),
1411                fragment_type_mask: Set(fragment_type_mask),
1412                stream_node: Set(StreamNode::from(&stream_node)),
1413                ..Default::default()
1414            }
1415            .update(&txn)
1416            .await?;
1417        }
1418        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1419
1420        txn.commit().await?;
1421
1422        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1423        let _version = self
1424            .notify_frontend(
1425                NotificationOperation::Update,
1426                NotificationInfo::ObjectGroup(PbObjectGroup {
1427                    objects: vec![PbObject {
1428                        object_info: Some(relation_info),
1429                    }],
1430                }),
1431            )
1432            .await;
1433
1434        Ok(fragment_actors)
1435    }
1436
1437    // edit the content of fragments in given `table_id`
1438    // return the actor_ids to be applied
1439    async fn mutate_fragments_by_job_id(
1440        &self,
1441        job_id: ObjectId,
1442        // returns true if the mutation is applied
1443        mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1444        // error message when no relevant fragments is found
1445        err_msg: &'static str,
1446    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1447        let inner = self.inner.read().await;
1448        let txn = inner.db.begin().await?;
1449
1450        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1451            .select_only()
1452            .columns([
1453                fragment::Column::FragmentId,
1454                fragment::Column::FragmentTypeMask,
1455                fragment::Column::StreamNode,
1456            ])
1457            .filter(fragment::Column::JobId.eq(job_id))
1458            .into_tuple()
1459            .all(&txn)
1460            .await?;
1461        let mut fragments = fragments
1462            .into_iter()
1463            .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1464            .collect_vec();
1465
1466        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1467            fragments_mutation_fn(fragment_type_mask, stream_node)
1468        });
1469        if fragments.is_empty() {
1470            return Err(MetaError::invalid_parameter(format!(
1471                "job id {job_id}: {}",
1472                err_msg
1473            )));
1474        }
1475
1476        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1477        for (id, _, stream_node) in fragments {
1478            fragment::ActiveModel {
1479                fragment_id: Set(id),
1480                stream_node: Set(StreamNode::from(&stream_node)),
1481                ..Default::default()
1482            }
1483            .update(&txn)
1484            .await?;
1485        }
1486        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1487
1488        txn.commit().await?;
1489
1490        Ok(fragment_actors)
1491    }
1492
1493    async fn mutate_fragment_by_fragment_id(
1494        &self,
1495        fragment_id: FragmentId,
1496        mut fragment_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1497        err_msg: &'static str,
1498    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1499        let inner = self.inner.read().await;
1500        let txn = inner.db.begin().await?;
1501
1502        let (mut fragment_type_mask, stream_node): (i32, StreamNode) =
1503            Fragment::find_by_id(fragment_id)
1504                .select_only()
1505                .columns([
1506                    fragment::Column::FragmentTypeMask,
1507                    fragment::Column::StreamNode,
1508                ])
1509                .into_tuple()
1510                .one(&txn)
1511                .await?
1512                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1513        let mut pb_stream_node = stream_node.to_protobuf();
1514
1515        if !fragment_mutation_fn(&mut fragment_type_mask, &mut pb_stream_node) {
1516            return Err(MetaError::invalid_parameter(format!(
1517                "fragment id {fragment_id}: {}",
1518                err_msg
1519            )));
1520        }
1521
1522        fragment::ActiveModel {
1523            fragment_id: Set(fragment_id),
1524            stream_node: Set(stream_node),
1525            ..Default::default()
1526        }
1527        .update(&txn)
1528        .await?;
1529
1530        let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1531
1532        txn.commit().await?;
1533
1534        Ok(fragment_actors)
1535    }
1536
1537    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1538    // return the actor_ids to be applied
1539    pub async fn update_backfill_rate_limit_by_job_id(
1540        &self,
1541        job_id: ObjectId,
1542        rate_limit: Option<u32>,
1543    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1544        let update_backfill_rate_limit =
1545            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1546                let mut found = false;
1547                if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 {
1548                    visit_stream_node_mut(stream_node, |node| match node {
1549                        PbNodeBody::StreamCdcScan(node) => {
1550                            node.rate_limit = rate_limit;
1551                            found = true;
1552                        }
1553                        PbNodeBody::StreamScan(node) => {
1554                            node.rate_limit = rate_limit;
1555                            found = true;
1556                        }
1557                        PbNodeBody::SourceBackfill(node) => {
1558                            node.rate_limit = rate_limit;
1559                            found = true;
1560                        }
1561                        PbNodeBody::Sink(node) => {
1562                            node.rate_limit = rate_limit;
1563                            found = true;
1564                        }
1565                        _ => {}
1566                    });
1567                }
1568                found
1569            };
1570
1571        self.mutate_fragments_by_job_id(
1572            job_id,
1573            update_backfill_rate_limit,
1574            "stream scan node or source node not found",
1575        )
1576        .await
1577    }
1578
1579    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1580    // return the actor_ids to be applied
1581    pub async fn update_sink_rate_limit_by_job_id(
1582        &self,
1583        job_id: ObjectId,
1584        rate_limit: Option<u32>,
1585    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1586        let update_sink_rate_limit =
1587            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1588                let mut found = false;
1589                if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 {
1590                    visit_stream_node_mut(stream_node, |node| {
1591                        if let PbNodeBody::Sink(node) = node {
1592                            node.rate_limit = rate_limit;
1593                            found = true;
1594                        }
1595                    });
1596                }
1597                found
1598            };
1599
1600        self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1601            .await
1602    }
1603
1604    pub async fn update_dml_rate_limit_by_job_id(
1605        &self,
1606        job_id: ObjectId,
1607        rate_limit: Option<u32>,
1608    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1609        let update_dml_rate_limit =
1610            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1611                let mut found = false;
1612                if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
1613                    visit_stream_node_mut(stream_node, |node| {
1614                        if let PbNodeBody::Dml(node) = node {
1615                            node.rate_limit = rate_limit;
1616                            found = true;
1617                        }
1618                    });
1619                }
1620                found
1621            };
1622
1623        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1624            .await
1625    }
1626
1627    pub async fn update_sink_props_by_sink_id(
1628        &self,
1629        sink_id: SinkId,
1630        props: BTreeMap<String, String>,
1631    ) -> MetaResult<HashMap<String, String>> {
1632        let inner = self.inner.read().await;
1633        let txn = inner.db.begin().await?;
1634
1635        let (sink, _obj) = Sink::find_by_id(sink_id)
1636            .find_also_related(Object)
1637            .one(&txn)
1638            .await?
1639            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1640
1641        // Validate that props can be altered
1642        match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
1643            Some(connector) => {
1644                let connector_type = connector.to_lowercase();
1645                match_sink_name_str!(
1646                    connector_type.as_str(),
1647                    SinkType,
1648                    {
1649                        for (k, v) in &props {
1650                            if !SinkType::SINK_ALTER_CONFIG_LIST.contains(&k.as_str()) {
1651                                return Err(SinkError::Config(anyhow!(
1652                                    "unsupported alter config: {}={}",
1653                                    k,
1654                                    v
1655                                ))
1656                                .into());
1657                            }
1658                        }
1659                        let mut new_props = sink.properties.0.clone();
1660                        new_props.extend(props.clone());
1661                        SinkType::validate_alter_config(&new_props)
1662                    },
1663                    |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
1664                )?
1665            }
1666            None => {
1667                return Err(
1668                    SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
1669                );
1670            }
1671        };
1672        let definition = sink.definition.clone();
1673        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1674            .map_err(|e| SinkError::Config(anyhow!(e)))?
1675            .try_into()
1676            .unwrap();
1677        if let Statement::CreateSink { stmt } = &mut stmt {
1678            let mut new_sql_options = stmt
1679                .with_properties
1680                .0
1681                .iter()
1682                .map(|sql_option| (&sql_option.name, sql_option))
1683                .collect::<IndexMap<_, _>>();
1684            let add_sql_options = props
1685                .iter()
1686                .map(|(k, v)| SqlOption::try_from((k, v)))
1687                .collect::<Result<Vec<SqlOption>, ParserError>>()
1688                .map_err(|e| SinkError::Config(anyhow!(e)))?;
1689            new_sql_options.extend(
1690                add_sql_options
1691                    .iter()
1692                    .map(|sql_option| (&sql_option.name, sql_option)),
1693            );
1694            stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
1695        } else {
1696            panic!("sink definition is not a create sink statement")
1697        }
1698        let mut new_config = sink.properties.clone().into_inner();
1699        new_config.extend(props);
1700
1701        let active_sink = sink::ActiveModel {
1702            sink_id: Set(sink_id),
1703            properties: Set(risingwave_meta_model::Property(new_config.clone())),
1704            definition: Set(stmt.to_string()),
1705            ..Default::default()
1706        };
1707        active_sink.update(&txn).await?;
1708
1709        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1710            .select_only()
1711            .columns([
1712                fragment::Column::FragmentId,
1713                fragment::Column::FragmentTypeMask,
1714                fragment::Column::StreamNode,
1715            ])
1716            .filter(fragment::Column::JobId.eq(sink_id))
1717            .into_tuple()
1718            .all(&txn)
1719            .await?;
1720        let fragments = fragments
1721            .into_iter()
1722            .filter(|(_, fragment_type_mask, _)| {
1723                *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
1724            })
1725            .filter_map(|(id, _, stream_node)| {
1726                let mut stream_node = stream_node.to_protobuf();
1727                let mut found = false;
1728                visit_stream_node_mut(&mut stream_node, |node| {
1729                    if let PbNodeBody::Sink(node) = node
1730                        && let Some(sink_desc) = &mut node.sink_desc
1731                        && sink_desc.id == sink_id as u32
1732                    {
1733                        sink_desc.properties = new_config.clone();
1734                        found = true;
1735                    }
1736                });
1737                if found { Some((id, stream_node)) } else { None }
1738            })
1739            .collect_vec();
1740        assert!(
1741            !fragments.is_empty(),
1742            "sink id should be used by at least one fragment"
1743        );
1744        for (id, stream_node) in fragments {
1745            fragment::ActiveModel {
1746                fragment_id: Set(id),
1747                stream_node: Set(StreamNode::from(&stream_node)),
1748                ..Default::default()
1749            }
1750            .update(&txn)
1751            .await?;
1752        }
1753
1754        let (sink, obj) = Sink::find_by_id(sink_id)
1755            .find_also_related(Object)
1756            .one(&txn)
1757            .await?
1758            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1759
1760        txn.commit().await?;
1761
1762        let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
1763        let _version = self
1764            .notify_frontend(
1765                NotificationOperation::Update,
1766                NotificationInfo::ObjectGroup(PbObjectGroup {
1767                    objects: vec![PbObject {
1768                        object_info: Some(relation_info),
1769                    }],
1770                }),
1771            )
1772            .await;
1773
1774        Ok(new_config.into_iter().collect())
1775    }
1776
1777    pub async fn update_fragment_rate_limit_by_fragment_id(
1778        &self,
1779        fragment_id: FragmentId,
1780        rate_limit: Option<u32>,
1781    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1782        let update_rate_limit = |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1783            let mut found = false;
1784            if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0
1785                || *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0
1786                || *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0
1787                || *fragment_type_mask & PbFragmentTypeFlag::source_rate_limit_fragments() != 0
1788            {
1789                visit_stream_node_mut(stream_node, |node| {
1790                    if let PbNodeBody::Dml(node) = node {
1791                        node.rate_limit = rate_limit;
1792                        found = true;
1793                    }
1794                    if let PbNodeBody::Sink(node) = node {
1795                        node.rate_limit = rate_limit;
1796                        found = true;
1797                    }
1798                    if let PbNodeBody::StreamCdcScan(node) = node {
1799                        node.rate_limit = rate_limit;
1800                        found = true;
1801                    }
1802                    if let PbNodeBody::StreamScan(node) = node {
1803                        node.rate_limit = rate_limit;
1804                        found = true;
1805                    }
1806                    if let PbNodeBody::SourceBackfill(node) = node {
1807                        node.rate_limit = rate_limit;
1808                        found = true;
1809                    }
1810                });
1811            }
1812            found
1813        };
1814        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
1815            .await
1816    }
1817
1818    pub async fn post_apply_reschedules(
1819        &self,
1820        reschedules: HashMap<FragmentId, Reschedule>,
1821        post_updates: &JobReschedulePostUpdates,
1822    ) -> MetaResult<()> {
1823        let new_created_actors: HashSet<_> = reschedules
1824            .values()
1825            .flat_map(|reschedule| {
1826                reschedule
1827                    .added_actors
1828                    .values()
1829                    .flatten()
1830                    .map(|actor_id| *actor_id as ActorId)
1831            })
1832            .collect();
1833
1834        let inner = self.inner.write().await;
1835
1836        let txn = inner.db.begin().await?;
1837
1838        let mut fragment_mapping_to_notify = vec![];
1839
1840        for (
1841            fragment_id,
1842            Reschedule {
1843                removed_actors,
1844                vnode_bitmap_updates,
1845                actor_splits,
1846                newly_created_actors,
1847                ..
1848            },
1849        ) in reschedules
1850        {
1851            // drop removed actors
1852            Actor::delete_many()
1853                .filter(
1854                    actor::Column::ActorId
1855                        .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
1856                )
1857                .exec(&txn)
1858                .await?;
1859
1860            // add new actors
1861            for (
1862                (
1863                    StreamActor {
1864                        actor_id,
1865                        fragment_id,
1866                        vnode_bitmap,
1867                        expr_context,
1868                        ..
1869                    },
1870                    _,
1871                ),
1872                worker_id,
1873            ) in newly_created_actors.into_values()
1874            {
1875                let splits = actor_splits
1876                    .get(&actor_id)
1877                    .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
1878
1879                Actor::insert(actor::ActiveModel {
1880                    actor_id: Set(actor_id as _),
1881                    fragment_id: Set(fragment_id as _),
1882                    status: Set(ActorStatus::Running),
1883                    splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
1884                    worker_id: Set(worker_id),
1885                    upstream_actor_ids: Set(Default::default()),
1886                    vnode_bitmap: Set(vnode_bitmap
1887                        .as_ref()
1888                        .map(|bitmap| (&bitmap.to_protobuf()).into())),
1889                    expr_context: Set(expr_context.as_ref().unwrap().into()),
1890                })
1891                .exec(&txn)
1892                .await?;
1893            }
1894
1895            // actor update
1896            for (actor_id, bitmap) in vnode_bitmap_updates {
1897                let actor = Actor::find_by_id(actor_id as ActorId)
1898                    .one(&txn)
1899                    .await?
1900                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1901
1902                let mut actor = actor.into_active_model();
1903                actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
1904                actor.update(&txn).await?;
1905            }
1906
1907            // Update actor_splits for existing actors
1908            for (actor_id, splits) in actor_splits {
1909                if new_created_actors.contains(&(actor_id as ActorId)) {
1910                    continue;
1911                }
1912
1913                let actor = Actor::find_by_id(actor_id as ActorId)
1914                    .one(&txn)
1915                    .await?
1916                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1917
1918                let mut actor = actor.into_active_model();
1919                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
1920                actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
1921                actor.update(&txn).await?;
1922            }
1923
1924            // fragment update
1925            let fragment = Fragment::find_by_id(fragment_id)
1926                .one(&txn)
1927                .await?
1928                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1929
1930            let job_actors = fragment
1931                .find_related(Actor)
1932                .all(&txn)
1933                .await?
1934                .into_iter()
1935                .map(|actor| {
1936                    (
1937                        fragment_id,
1938                        fragment.distribution_type,
1939                        actor.actor_id,
1940                        actor.vnode_bitmap,
1941                        actor.worker_id,
1942                        actor.status,
1943                    )
1944                })
1945                .collect_vec();
1946
1947            fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
1948        }
1949
1950        let JobReschedulePostUpdates {
1951            parallelism_updates,
1952            resource_group_updates,
1953        } = post_updates;
1954
1955        for (table_id, parallelism) in parallelism_updates {
1956            let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
1957                .one(&txn)
1958                .await?
1959                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
1960                .into_active_model();
1961
1962            streaming_job.parallelism = Set(match parallelism {
1963                TableParallelism::Adaptive => StreamingParallelism::Adaptive,
1964                TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
1965                TableParallelism::Custom => StreamingParallelism::Custom,
1966            });
1967
1968            if let Some(resource_group) =
1969                resource_group_updates.get(&(table_id.table_id() as ObjectId))
1970            {
1971                streaming_job.specific_resource_group = Set(resource_group.to_owned());
1972            }
1973
1974            streaming_job.update(&txn).await?;
1975        }
1976
1977        txn.commit().await?;
1978        self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1979            .await;
1980
1981        Ok(())
1982    }
1983
1984    /// Note: `FsFetch` created in old versions are not included.
1985    /// Since this is only used for debugging, it should be fine.
1986    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
1987        let inner = self.inner.read().await;
1988        let txn = inner.db.begin().await?;
1989
1990        let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
1991            .select_only()
1992            .columns([
1993                fragment::Column::FragmentId,
1994                fragment::Column::JobId,
1995                fragment::Column::FragmentTypeMask,
1996                fragment::Column::StreamNode,
1997            ])
1998            .filter(fragment_type_mask_intersects(
1999                PbFragmentTypeFlag::rate_limit_fragments(),
2000            ))
2001            .into_tuple()
2002            .all(&txn)
2003            .await?;
2004
2005        let mut rate_limits = Vec::new();
2006        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2007            let stream_node = stream_node.to_protobuf();
2008            let mut rate_limit = None;
2009            let mut node_name = None;
2010
2011            visit_stream_node(&stream_node, |node| {
2012                match node {
2013                    // source rate limit
2014                    PbNodeBody::Source(node) => {
2015                        if let Some(node_inner) = &node.source_inner {
2016                            debug_assert!(
2017                                rate_limit.is_none(),
2018                                "one fragment should only have 1 rate limit node"
2019                            );
2020                            rate_limit = node_inner.rate_limit;
2021                            node_name = Some("SOURCE");
2022                        }
2023                    }
2024                    PbNodeBody::StreamFsFetch(node) => {
2025                        if let Some(node_inner) = &node.node_inner {
2026                            debug_assert!(
2027                                rate_limit.is_none(),
2028                                "one fragment should only have 1 rate limit node"
2029                            );
2030                            rate_limit = node_inner.rate_limit;
2031                            node_name = Some("FS_FETCH");
2032                        }
2033                    }
2034                    // backfill rate limit
2035                    PbNodeBody::SourceBackfill(node) => {
2036                        debug_assert!(
2037                            rate_limit.is_none(),
2038                            "one fragment should only have 1 rate limit node"
2039                        );
2040                        rate_limit = node.rate_limit;
2041                        node_name = Some("SOURCE_BACKFILL");
2042                    }
2043                    PbNodeBody::StreamScan(node) => {
2044                        debug_assert!(
2045                            rate_limit.is_none(),
2046                            "one fragment should only have 1 rate limit node"
2047                        );
2048                        rate_limit = node.rate_limit;
2049                        node_name = Some("STREAM_SCAN");
2050                    }
2051                    PbNodeBody::StreamCdcScan(node) => {
2052                        debug_assert!(
2053                            rate_limit.is_none(),
2054                            "one fragment should only have 1 rate limit node"
2055                        );
2056                        rate_limit = node.rate_limit;
2057                        node_name = Some("STREAM_CDC_SCAN");
2058                    }
2059                    PbNodeBody::Sink(node) => {
2060                        debug_assert!(
2061                            rate_limit.is_none(),
2062                            "one fragment should only have 1 rate limit node"
2063                        );
2064                        rate_limit = node.rate_limit;
2065                        node_name = Some("SINK");
2066                    }
2067                    _ => {}
2068                }
2069            });
2070
2071            if let Some(rate_limit) = rate_limit {
2072                rate_limits.push(RateLimitInfo {
2073                    fragment_id: fragment_id as u32,
2074                    job_id: job_id as u32,
2075                    fragment_type_mask: fragment_type_mask as u32,
2076                    rate_limit,
2077                    node_name: node_name.unwrap().to_owned(),
2078                });
2079            }
2080        }
2081
2082        Ok(rate_limits)
2083    }
2084}
2085
2086fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2087    column
2088        .binary(BinOper::Custom("&"), value)
2089        .binary(BinOper::NotEqual, 0)
2090}
2091
2092fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2093    bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2094}
2095
2096pub struct SinkIntoTableContext {
2097    /// For creating sink into table, this is `Some`, otherwise `None`.
2098    pub creating_sink_id: Option<SinkId>,
2099    /// For dropping sink into table, this is `Some`, otherwise `None`.
2100    pub dropping_sink_id: Option<SinkId>,
2101    /// For alter table (e.g., add column), this is the list of existing sink ids
2102    /// otherwise empty.
2103    pub updated_sink_catalogs: Vec<SinkId>,
2104}