risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::config::DefaultParallelism;
22use risingwave_common::hash::VnodeCountCompat;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
25use risingwave_common::{bail, current_cluster_version};
26use risingwave_connector::sink::file_sink::fs::FsSink;
27use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
28use risingwave_connector::{WithPropertiesExt, match_sink_name_str};
29use risingwave_meta_model::actor::ActorStatus;
30use risingwave_meta_model::object::ObjectType;
31use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
32use risingwave_meta_model::table::TableType;
33use risingwave_meta_model::*;
34use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
35use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
36use risingwave_pb::catalog::{PbCreateType, PbTable};
37use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
38use risingwave_pb::meta::object::PbObjectInfo;
39use risingwave_pb::meta::subscribe_response::{
40    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
41};
42use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
43use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
44use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
45use risingwave_pb::stream_plan::stream_node::PbNodeBody;
46use risingwave_pb::stream_plan::{FragmentTypeFlag, PbFragmentTypeFlag, PbStreamNode};
47use risingwave_pb::user::PbUserInfo;
48use risingwave_sqlparser::ast::{SqlOption, Statement};
49use risingwave_sqlparser::parser::{Parser, ParserError};
50use sea_orm::ActiveValue::Set;
51use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
52use sea_orm::{
53    ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
54    IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
55    RelationTrait, TransactionTrait,
56};
57use thiserror_ext::AsReport;
58
59use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
60use crate::controller::ObjectModel;
61use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
62use crate::controller::rename::ReplaceTableExprRewriter;
63use crate::controller::utils::{
64    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
65    check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
66    get_fragment_mappings, get_internal_tables_by_id, insert_fragment_relations,
67    rebuild_fragment_mapping_from_actors,
68};
69use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
70use crate::model::{
71    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
72    StreamJobFragmentsToCreate, TableParallelism,
73};
74use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
75use crate::{MetaError, MetaResult};
76
77impl CatalogController {
78    pub async fn create_streaming_job_obj(
79        txn: &DatabaseTransaction,
80        obj_type: ObjectType,
81        owner_id: UserId,
82        database_id: Option<DatabaseId>,
83        schema_id: Option<SchemaId>,
84        create_type: PbCreateType,
85        ctx: &StreamContext,
86        streaming_parallelism: StreamingParallelism,
87        max_parallelism: usize,
88        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
89    ) -> MetaResult<ObjectId> {
90        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
91        let job = streaming_job::ActiveModel {
92            job_id: Set(obj.oid),
93            job_status: Set(JobStatus::Initial),
94            create_type: Set(create_type.into()),
95            timezone: Set(ctx.timezone.clone()),
96            parallelism: Set(streaming_parallelism),
97            max_parallelism: Set(max_parallelism as _),
98            specific_resource_group: Set(specific_resource_group),
99        };
100        job.insert(txn).await?;
101
102        Ok(obj.oid)
103    }
104
105    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
106    /// materialized view.
107    ///
108    /// Some of the fields in the given streaming job are placeholders, which will
109    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
110    pub async fn create_job_catalog(
111        &self,
112        streaming_job: &mut StreamingJob,
113        ctx: &StreamContext,
114        parallelism: &Option<Parallelism>,
115        max_parallelism: usize,
116        mut dependencies: HashSet<ObjectId>,
117        specific_resource_group: Option<String>,
118    ) -> MetaResult<()> {
119        let inner = self.inner.write().await;
120        let txn = inner.db.begin().await?;
121        let create_type = streaming_job.create_type();
122
123        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
124            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
125            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
126            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
127        };
128
129        ensure_user_id(streaming_job.owner() as _, &txn).await?;
130        ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
131        ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
132        check_relation_name_duplicate(
133            &streaming_job.name(),
134            streaming_job.database_id() as _,
135            streaming_job.schema_id() as _,
136            &txn,
137        )
138        .await?;
139
140        // TODO(rc): pass all dependencies uniformly, deprecate `dependent_relations` and `dependent_secret_ids`.
141        dependencies.extend(
142            streaming_job
143                .dependent_relations()
144                .into_iter()
145                .map(|id| id as ObjectId),
146        );
147
148        // check if any dependency is in altering status.
149        if !dependencies.is_empty() {
150            let altering_cnt = ObjectDependency::find()
151                .join(
152                    JoinType::InnerJoin,
153                    object_dependency::Relation::Object1.def(),
154                )
155                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
156                .filter(
157                    object_dependency::Column::Oid
158                        .is_in(dependencies.clone())
159                        .and(object::Column::ObjType.eq(ObjectType::Table))
160                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
161                        .and(
162                            // It means the referring table is just dummy for altering.
163                            object::Column::Oid.not_in_subquery(
164                                Query::select()
165                                    .column(table::Column::TableId)
166                                    .from(Table)
167                                    .to_owned(),
168                            ),
169                        ),
170                )
171                .count(&txn)
172                .await?;
173            if altering_cnt != 0 {
174                return Err(MetaError::permission_denied(
175                    "some dependent relations are being altered",
176                ));
177            }
178        }
179
180        match streaming_job {
181            StreamingJob::MaterializedView(table) => {
182                let job_id = Self::create_streaming_job_obj(
183                    &txn,
184                    ObjectType::Table,
185                    table.owner as _,
186                    Some(table.database_id as _),
187                    Some(table.schema_id as _),
188                    create_type,
189                    ctx,
190                    streaming_parallelism,
191                    max_parallelism,
192                    specific_resource_group,
193                )
194                .await?;
195                table.id = job_id as _;
196                let table_model: table::ActiveModel = table.clone().into();
197                Table::insert(table_model).exec(&txn).await?;
198            }
199            StreamingJob::Sink(sink, _) => {
200                if let Some(target_table_id) = sink.target_table {
201                    if check_sink_into_table_cycle(
202                        target_table_id as ObjectId,
203                        dependencies.iter().cloned().collect(),
204                        &txn,
205                    )
206                    .await?
207                    {
208                        bail!("Creating such a sink will result in circular dependency.");
209                    }
210                }
211
212                let job_id = Self::create_streaming_job_obj(
213                    &txn,
214                    ObjectType::Sink,
215                    sink.owner as _,
216                    Some(sink.database_id as _),
217                    Some(sink.schema_id as _),
218                    create_type,
219                    ctx,
220                    streaming_parallelism,
221                    max_parallelism,
222                    specific_resource_group,
223                )
224                .await?;
225                sink.id = job_id as _;
226                let sink_model: sink::ActiveModel = sink.clone().into();
227                Sink::insert(sink_model).exec(&txn).await?;
228            }
229            StreamingJob::Table(src, table, _) => {
230                let job_id = Self::create_streaming_job_obj(
231                    &txn,
232                    ObjectType::Table,
233                    table.owner as _,
234                    Some(table.database_id as _),
235                    Some(table.schema_id as _),
236                    create_type,
237                    ctx,
238                    streaming_parallelism,
239                    max_parallelism,
240                    specific_resource_group,
241                )
242                .await?;
243                table.id = job_id as _;
244                if let Some(src) = src {
245                    let src_obj = Self::create_object(
246                        &txn,
247                        ObjectType::Source,
248                        src.owner as _,
249                        Some(src.database_id as _),
250                        Some(src.schema_id as _),
251                    )
252                    .await?;
253                    src.id = src_obj.oid as _;
254                    src.optional_associated_table_id =
255                        Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
256                    table.optional_associated_source_id = Some(
257                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
258                    );
259                    let source: source::ActiveModel = src.clone().into();
260                    Source::insert(source).exec(&txn).await?;
261                }
262                let table_model: table::ActiveModel = table.clone().into();
263                Table::insert(table_model).exec(&txn).await?;
264            }
265            StreamingJob::Index(index, table) => {
266                ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
267                let job_id = Self::create_streaming_job_obj(
268                    &txn,
269                    ObjectType::Index,
270                    index.owner as _,
271                    Some(index.database_id as _),
272                    Some(index.schema_id as _),
273                    create_type,
274                    ctx,
275                    streaming_parallelism,
276                    max_parallelism,
277                    specific_resource_group,
278                )
279                .await?;
280                // to be compatible with old implementation.
281                index.id = job_id as _;
282                index.index_table_id = job_id as _;
283                table.id = job_id as _;
284
285                ObjectDependency::insert(object_dependency::ActiveModel {
286                    oid: Set(index.primary_table_id as _),
287                    used_by: Set(table.id as _),
288                    ..Default::default()
289                })
290                .exec(&txn)
291                .await?;
292
293                let table_model: table::ActiveModel = table.clone().into();
294                Table::insert(table_model).exec(&txn).await?;
295                let index_model: index::ActiveModel = index.clone().into();
296                Index::insert(index_model).exec(&txn).await?;
297            }
298            StreamingJob::Source(src) => {
299                let job_id = Self::create_streaming_job_obj(
300                    &txn,
301                    ObjectType::Source,
302                    src.owner as _,
303                    Some(src.database_id as _),
304                    Some(src.schema_id as _),
305                    create_type,
306                    ctx,
307                    streaming_parallelism,
308                    max_parallelism,
309                    specific_resource_group,
310                )
311                .await?;
312                src.id = job_id as _;
313                let source_model: source::ActiveModel = src.clone().into();
314                Source::insert(source_model).exec(&txn).await?;
315            }
316        }
317
318        // collect dependent secrets.
319        dependencies.extend(
320            streaming_job
321                .dependent_secret_ids()?
322                .into_iter()
323                .map(|secret_id| secret_id as ObjectId),
324        );
325        // collect dependent connection
326        dependencies.extend(
327            streaming_job
328                .dependent_connection_ids()?
329                .into_iter()
330                .map(|conn_id| conn_id as ObjectId),
331        );
332
333        // record object dependency.
334        if !dependencies.is_empty() {
335            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336                object_dependency::ActiveModel {
337                    oid: Set(oid),
338                    used_by: Set(streaming_job.id() as _),
339                    ..Default::default()
340                }
341            }))
342            .exec(&txn)
343            .await?;
344        }
345
346        txn.commit().await?;
347
348        Ok(())
349    }
350
351    /// Create catalogs for internal tables, then notify frontend about them if the job is a
352    /// materialized view.
353    ///
354    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
355    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
356    ///
357    /// Returns a mapping from the temporary table id to the actual global table id.
358    pub async fn create_internal_table_catalog(
359        &self,
360        job: &StreamingJob,
361        mut incomplete_internal_tables: Vec<PbTable>,
362    ) -> MetaResult<HashMap<u32, u32>> {
363        let job_id = job.id() as ObjectId;
364        let inner = self.inner.write().await;
365        let txn = inner.db.begin().await?;
366        let mut table_id_map = HashMap::new();
367        for table in &mut incomplete_internal_tables {
368            let table_id = Self::create_object(
369                &txn,
370                ObjectType::Table,
371                table.owner as _,
372                Some(table.database_id as _),
373                Some(table.schema_id as _),
374            )
375            .await?
376            .oid;
377            table_id_map.insert(table.id, table_id as u32);
378            table.id = table_id as _;
379            table.job_id = Some(job_id as _);
380
381            let table_model = table::ActiveModel {
382                table_id: Set(table_id as _),
383                belongs_to_job_id: Set(Some(job_id)),
384                fragment_id: NotSet,
385                ..table.clone().into()
386            };
387            Table::insert(table_model).exec(&txn).await?;
388        }
389        txn.commit().await?;
390
391        Ok(table_id_map)
392    }
393
394    // TODO: In this function, we also update the `Table` model in the meta store.
395    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
396    // making them the source of truth and performing a full replacement for those in the meta store?
397    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
398    pub async fn prepare_streaming_job(
399        &self,
400        stream_job_fragments: &StreamJobFragmentsToCreate,
401        streaming_job: &StreamingJob,
402        for_replace: bool,
403    ) -> MetaResult<()> {
404        let is_materialized_view = streaming_job.is_materialized_view();
405        let fragment_actors =
406            Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
407        let mut all_tables = stream_job_fragments.all_tables();
408        let inner = self.inner.write().await;
409
410        let mut objects = vec![];
411        let txn = inner.db.begin().await?;
412
413        // Add fragments.
414        let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
415        for fragment in fragments {
416            let fragment_id = fragment.fragment_id;
417            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
418
419            let fragment = fragment.into_active_model();
420            Fragment::insert(fragment).exec(&txn).await?;
421
422            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
423            // After table fragments are created, update them for all tables.
424            if !for_replace {
425                for state_table_id in state_table_ids {
426                    // Table's vnode count is not always the fragment's vnode count, so we have to
427                    // look up the table from `TableFragments`.
428                    // See `ActorGraphBuilder::new`.
429                    let table = all_tables
430                        .get_mut(&(state_table_id as u32))
431                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
432                    assert_eq!(table.id, state_table_id as u32);
433                    assert_eq!(table.fragment_id, fragment_id as u32);
434                    table.job_id = Some(streaming_job.id());
435                    let vnode_count = table.vnode_count();
436
437                    table::ActiveModel {
438                        table_id: Set(state_table_id as _),
439                        fragment_id: Set(Some(fragment_id)),
440                        vnode_count: Set(vnode_count as _),
441                        ..Default::default()
442                    }
443                    .update(&txn)
444                    .await?;
445
446                    if is_materialized_view {
447                        objects.push(PbObject {
448                            object_info: Some(PbObjectInfo::Table(table.clone())),
449                        });
450                    }
451                }
452            }
453        }
454
455        insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
456
457        // Add actors and actor dispatchers.
458        for actors in actors {
459            for actor in actors {
460                let actor = actor.into_active_model();
461                Actor::insert(actor).exec(&txn).await?;
462            }
463        }
464
465        if !for_replace {
466            // Update dml fragment id.
467            if let StreamingJob::Table(_, table, ..) = streaming_job {
468                Table::update(table::ActiveModel {
469                    table_id: Set(table.id as _),
470                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
471                    ..Default::default()
472                })
473                .exec(&txn)
474                .await?;
475            }
476        }
477
478        txn.commit().await?;
479
480        if !objects.is_empty() {
481            assert!(is_materialized_view);
482            self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
483                .await;
484        }
485
486        Ok(())
487    }
488
489    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
490    /// It returns (true, _) if the job is not found or aborted.
491    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
492    pub async fn try_abort_creating_streaming_job(
493        &self,
494        job_id: ObjectId,
495        is_cancelled: bool,
496    ) -> MetaResult<(bool, Option<DatabaseId>)> {
497        let mut inner = self.inner.write().await;
498        let txn = inner.db.begin().await?;
499
500        let obj = Object::find_by_id(job_id).one(&txn).await?;
501        let Some(obj) = obj else {
502            tracing::warn!(
503                id = job_id,
504                "streaming job not found when aborting creating, might be cleaned by recovery"
505            );
506            return Ok((true, None));
507        };
508        let database_id = obj
509            .database_id
510            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
511
512        if !is_cancelled {
513            let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
514            if let Some(streaming_job) = streaming_job {
515                assert_ne!(streaming_job.job_status, JobStatus::Created);
516                if streaming_job.create_type == CreateType::Background
517                    && streaming_job.job_status == JobStatus::Creating
518                {
519                    // If the job is created in background and still in creating status, we should not abort it and let recovery to handle it.
520                    tracing::warn!(
521                        id = job_id,
522                        "streaming job is created in background and still in creating status"
523                    );
524                    return Ok((false, Some(database_id)));
525                }
526            }
527        }
528
529        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
530
531        // Get the notification info if the job is a materialized view.
532        let table_obj = Table::find_by_id(job_id).one(&txn).await?;
533        let mut objs = vec![];
534        if let Some(table) = &table_obj
535            && table.table_type == TableType::MaterializedView
536        {
537            let obj: Option<PartialObject> = Object::find_by_id(job_id)
538                .select_only()
539                .columns([
540                    object::Column::Oid,
541                    object::Column::ObjType,
542                    object::Column::SchemaId,
543                    object::Column::DatabaseId,
544                ])
545                .into_partial_model()
546                .one(&txn)
547                .await?;
548            let obj =
549                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
550            objs.push(obj);
551            let internal_table_objs: Vec<PartialObject> = Object::find()
552                .select_only()
553                .columns([
554                    object::Column::Oid,
555                    object::Column::ObjType,
556                    object::Column::SchemaId,
557                    object::Column::DatabaseId,
558                ])
559                .join(JoinType::InnerJoin, object::Relation::Table.def())
560                .filter(table::Column::BelongsToJobId.eq(job_id))
561                .into_partial_model()
562                .all(&txn)
563                .await?;
564            objs.extend(internal_table_objs);
565        }
566
567        // Check if the job is creating sink into table.
568        if table_obj.is_none()
569            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
570                .select_only()
571                .column(sink::Column::TargetTable)
572                .into_tuple::<Option<TableId>>()
573                .one(&txn)
574                .await?
575        {
576            let tmp_id: Option<ObjectId> = ObjectDependency::find()
577                .select_only()
578                .column(object_dependency::Column::UsedBy)
579                .join(
580                    JoinType::InnerJoin,
581                    object_dependency::Relation::Object1.def(),
582                )
583                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
584                .filter(
585                    object_dependency::Column::Oid
586                        .eq(target_table_id)
587                        .and(object::Column::ObjType.eq(ObjectType::Table))
588                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
589                )
590                .into_tuple()
591                .one(&txn)
592                .await?;
593            if let Some(tmp_id) = tmp_id {
594                tracing::warn!(
595                    id = tmp_id,
596                    "aborting temp streaming job for sink into table"
597                );
598                Object::delete_by_id(tmp_id).exec(&txn).await?;
599            }
600        }
601
602        Object::delete_by_id(job_id).exec(&txn).await?;
603        if !internal_table_ids.is_empty() {
604            Object::delete_many()
605                .filter(object::Column::Oid.is_in(internal_table_ids))
606                .exec(&txn)
607                .await?;
608        }
609        if let Some(t) = &table_obj
610            && let Some(source_id) = t.optional_associated_source_id
611        {
612            Object::delete_by_id(source_id).exec(&txn).await?;
613        }
614
615        let err = if is_cancelled {
616            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
617        } else {
618            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
619        };
620        let abort_reason = format!("streaming job aborted {}", err.as_report());
621        for tx in inner
622            .creating_table_finish_notifier
623            .get_mut(&database_id)
624            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
625            .into_iter()
626            .flatten()
627            .flatten()
628        {
629            let _ = tx.send(Err(abort_reason.clone()));
630        }
631        txn.commit().await?;
632
633        if !objs.is_empty() {
634            // We also have notified the frontend about these objects,
635            // so we need to notify the frontend to delete them here.
636            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
637                .await;
638        }
639        Ok((true, Some(database_id)))
640    }
641
642    pub async fn post_collect_job_fragments(
643        &self,
644        job_id: ObjectId,
645        actor_ids: Vec<crate::model::ActorId>,
646        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
647        split_assignment: &SplitAssignment,
648    ) -> MetaResult<()> {
649        self.post_collect_job_fragments_inner(
650            job_id,
651            actor_ids,
652            upstream_fragment_new_downstreams,
653            split_assignment,
654            false,
655        )
656        .await
657    }
658
659    pub async fn post_collect_job_fragments_inner(
660        &self,
661        job_id: ObjectId,
662        actor_ids: Vec<crate::model::ActorId>,
663        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
664        split_assignment: &SplitAssignment,
665        is_mv: bool,
666    ) -> MetaResult<()> {
667        let inner = self.inner.write().await;
668        let txn = inner.db.begin().await?;
669
670        Actor::update_many()
671            .col_expr(
672                actor::Column::Status,
673                SimpleExpr::from(ActorStatus::Running.into_value()),
674            )
675            .filter(
676                actor::Column::ActorId
677                    .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
678            )
679            .exec(&txn)
680            .await?;
681
682        for splits in split_assignment.values() {
683            for (actor_id, splits) in splits {
684                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
685                let connector_splits = &PbConnectorSplits { splits };
686                actor::ActiveModel {
687                    actor_id: Set(*actor_id as _),
688                    splits: Set(Some(connector_splits.into())),
689                    ..Default::default()
690                }
691                .update(&txn)
692                .await?;
693            }
694        }
695
696        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
697
698        // Mark job as CREATING.
699        streaming_job::ActiveModel {
700            job_id: Set(job_id),
701            job_status: Set(JobStatus::Creating),
702            ..Default::default()
703        }
704        .update(&txn)
705        .await?;
706
707        let fragment_mapping = if is_mv {
708            get_fragment_mappings(&txn, job_id as _).await?
709        } else {
710            vec![]
711        };
712
713        txn.commit().await?;
714        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
715            .await;
716
717        Ok(())
718    }
719
720    pub async fn create_job_catalog_for_replace(
721        &self,
722        streaming_job: &StreamingJob,
723        ctx: &StreamContext,
724        specified_parallelism: &Option<NonZeroUsize>,
725        max_parallelism: usize,
726    ) -> MetaResult<ObjectId> {
727        let id = streaming_job.id();
728        let inner = self.inner.write().await;
729        let txn = inner.db.begin().await?;
730
731        // 1. check version.
732        streaming_job.verify_version_for_replace(&txn).await?;
733        // 2. check concurrent replace.
734        let referring_cnt = ObjectDependency::find()
735            .join(
736                JoinType::InnerJoin,
737                object_dependency::Relation::Object1.def(),
738            )
739            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
740            .filter(
741                object_dependency::Column::Oid
742                    .eq(id as ObjectId)
743                    .and(object::Column::ObjType.eq(ObjectType::Table))
744                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
745            )
746            .count(&txn)
747            .await?;
748        if referring_cnt != 0 {
749            return Err(MetaError::permission_denied(
750                "job is being altered or referenced by some creating jobs",
751            ));
752        }
753
754        // 3. check parallelism.
755        let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
756            .select_only()
757            .column(streaming_job::Column::MaxParallelism)
758            .into_tuple()
759            .one(&txn)
760            .await?
761            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
762
763        if original_max_parallelism != max_parallelism as i32 {
764            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
765            // This should not happen in normal cases.
766            bail!(
767                "cannot use a different max parallelism \
768                 when replacing streaming job, \
769                 original: {}, new: {}",
770                original_max_parallelism,
771                max_parallelism
772            );
773        }
774
775        let parallelism = match specified_parallelism {
776            None => StreamingParallelism::Adaptive,
777            Some(n) => StreamingParallelism::Fixed(n.get() as _),
778        };
779
780        // 4. create streaming object for new replace table.
781        let new_obj_id = Self::create_streaming_job_obj(
782            &txn,
783            streaming_job.object_type(),
784            streaming_job.owner() as _,
785            Some(streaming_job.database_id() as _),
786            Some(streaming_job.schema_id() as _),
787            streaming_job.create_type(),
788            ctx,
789            parallelism,
790            max_parallelism,
791            None,
792        )
793        .await?;
794
795        // 5. record dependency for new replace table.
796        ObjectDependency::insert(object_dependency::ActiveModel {
797            oid: Set(id as _),
798            used_by: Set(new_obj_id as _),
799            ..Default::default()
800        })
801        .exec(&txn)
802        .await?;
803
804        txn.commit().await?;
805
806        Ok(new_obj_id)
807    }
808
809    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
810    pub async fn finish_streaming_job(
811        &self,
812        job_id: ObjectId,
813        replace_stream_job_info: Option<ReplaceStreamJobPlan>,
814    ) -> MetaResult<()> {
815        let mut inner = self.inner.write().await;
816        let txn = inner.db.begin().await?;
817
818        let job_type = Object::find_by_id(job_id)
819            .select_only()
820            .column(object::Column::ObjType)
821            .into_tuple()
822            .one(&txn)
823            .await?
824            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
825
826        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
827        let res = Object::update_many()
828            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
829            .col_expr(
830                object::Column::CreatedAtClusterVersion,
831                current_cluster_version().into(),
832            )
833            .filter(object::Column::Oid.eq(job_id))
834            .exec(&txn)
835            .await?;
836        if res.rows_affected == 0 {
837            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
838        }
839
840        // mark the target stream job as `Created`.
841        let job = streaming_job::ActiveModel {
842            job_id: Set(job_id),
843            job_status: Set(JobStatus::Created),
844            ..Default::default()
845        };
846        job.update(&txn).await?;
847
848        // notify frontend: job, internal tables.
849        let internal_table_objs = Table::find()
850            .find_also_related(Object)
851            .filter(table::Column::BelongsToJobId.eq(job_id))
852            .all(&txn)
853            .await?;
854        let mut objects = internal_table_objs
855            .iter()
856            .map(|(table, obj)| PbObject {
857                object_info: Some(PbObjectInfo::Table(
858                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
859                )),
860            })
861            .collect_vec();
862        let mut notification_op = NotificationOperation::Add;
863
864        match job_type {
865            ObjectType::Table => {
866                let (table, obj) = Table::find_by_id(job_id)
867                    .find_also_related(Object)
868                    .one(&txn)
869                    .await?
870                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
871                if table.table_type == TableType::MaterializedView {
872                    notification_op = NotificationOperation::Update;
873                }
874
875                if let Some(source_id) = table.optional_associated_source_id {
876                    let (src, obj) = Source::find_by_id(source_id)
877                        .find_also_related(Object)
878                        .one(&txn)
879                        .await?
880                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
881                    objects.push(PbObject {
882                        object_info: Some(PbObjectInfo::Source(
883                            ObjectModel(src, obj.unwrap()).into(),
884                        )),
885                    });
886                }
887                objects.push(PbObject {
888                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
889                });
890            }
891            ObjectType::Sink => {
892                let (sink, obj) = Sink::find_by_id(job_id)
893                    .find_also_related(Object)
894                    .one(&txn)
895                    .await?
896                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
897                objects.push(PbObject {
898                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
899                });
900            }
901            ObjectType::Index => {
902                let (index, obj) = Index::find_by_id(job_id)
903                    .find_also_related(Object)
904                    .one(&txn)
905                    .await?
906                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
907                {
908                    let (table, obj) = Table::find_by_id(index.index_table_id)
909                        .find_also_related(Object)
910                        .one(&txn)
911                        .await?
912                        .ok_or_else(|| {
913                            MetaError::catalog_id_not_found("table", index.index_table_id)
914                        })?;
915                    objects.push(PbObject {
916                        object_info: Some(PbObjectInfo::Table(
917                            ObjectModel(table, obj.unwrap()).into(),
918                        )),
919                    });
920                }
921                objects.push(PbObject {
922                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
923                });
924            }
925            ObjectType::Source => {
926                let (source, obj) = Source::find_by_id(job_id)
927                    .find_also_related(Object)
928                    .one(&txn)
929                    .await?
930                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
931                objects.push(PbObject {
932                    object_info: Some(PbObjectInfo::Source(
933                        ObjectModel(source, obj.unwrap()).into(),
934                    )),
935                });
936            }
937            _ => unreachable!("invalid job type: {:?}", job_type),
938        }
939
940        let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
941
942        let replace_table_mapping_update = match replace_stream_job_info {
943            Some(ReplaceStreamJobPlan {
944                streaming_job,
945                replace_upstream,
946                tmp_id,
947                ..
948            }) => {
949                let incoming_sink_id = job_id;
950
951                let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
952                    tmp_id as ObjectId,
953                    replace_upstream,
954                    None,
955                    SinkIntoTableContext {
956                        creating_sink_id: Some(incoming_sink_id as _),
957                        dropping_sink_id: None,
958                        updated_sink_catalogs: vec![],
959                    },
960                    &txn,
961                    streaming_job,
962                    None, // will not drop table connector when creating a streaming job
963                )
964                .await?;
965
966                Some((relations, fragment_mapping))
967            }
968            None => None,
969        };
970
971        txn.commit().await?;
972
973        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
974            .await;
975
976        let mut version = self
977            .notify_frontend(
978                notification_op,
979                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
980            )
981            .await;
982
983        if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
984            self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
985                .await;
986            version = self
987                .notify_frontend(
988                    NotificationOperation::Update,
989                    NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
990                )
991                .await;
992        }
993        inner
994            .creating_table_finish_notifier
995            .values_mut()
996            .for_each(|creating_tables| {
997                if let Some(txs) = creating_tables.remove(&job_id) {
998                    for tx in txs {
999                        let _ = tx.send(Ok(version));
1000                    }
1001                }
1002            });
1003
1004        Ok(())
1005    }
1006
1007    pub async fn finish_replace_streaming_job(
1008        &self,
1009        tmp_id: ObjectId,
1010        streaming_job: StreamingJob,
1011        replace_upstream: FragmentReplaceUpstream,
1012        col_index_mapping: Option<ColIndexMapping>,
1013        sink_into_table_context: SinkIntoTableContext,
1014        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1015    ) -> MetaResult<NotificationVersion> {
1016        let inner = self.inner.write().await;
1017        let txn = inner.db.begin().await?;
1018
1019        let (objects, fragment_mapping, delete_notification_objs) =
1020            Self::finish_replace_streaming_job_inner(
1021                tmp_id,
1022                replace_upstream,
1023                col_index_mapping,
1024                sink_into_table_context,
1025                &txn,
1026                streaming_job,
1027                drop_table_connector_ctx,
1028            )
1029            .await?;
1030
1031        txn.commit().await?;
1032
1033        // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
1034        // catalog and need to access the old fragment. Let frontend nodes delete the old fragment
1035        // when they receive table catalog change.
1036        // self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
1037        //     .await;
1038        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1039            .await;
1040        let mut version = self
1041            .notify_frontend(
1042                NotificationOperation::Update,
1043                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1044            )
1045            .await;
1046
1047        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1048            self.notify_users_update(user_infos).await;
1049            version = self
1050                .notify_frontend(
1051                    NotificationOperation::Delete,
1052                    build_object_group_for_delete(to_drop_objects),
1053                )
1054                .await;
1055        }
1056
1057        Ok(version)
1058    }
1059
1060    pub async fn finish_replace_streaming_job_inner(
1061        tmp_id: ObjectId,
1062        replace_upstream: FragmentReplaceUpstream,
1063        col_index_mapping: Option<ColIndexMapping>,
1064        SinkIntoTableContext {
1065            creating_sink_id,
1066            dropping_sink_id,
1067            updated_sink_catalogs,
1068        }: SinkIntoTableContext,
1069        txn: &DatabaseTransaction,
1070        streaming_job: StreamingJob,
1071        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1072    ) -> MetaResult<(
1073        Vec<PbObject>,
1074        Vec<PbFragmentWorkerSlotMapping>,
1075        Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1076    )> {
1077        let original_job_id = streaming_job.id() as ObjectId;
1078        let job_type = streaming_job.job_type();
1079
1080        // Update catalog
1081        match streaming_job {
1082            StreamingJob::Table(_source, table, _table_job_type) => {
1083                // The source catalog should remain unchanged
1084
1085                let original_table_catalogs = Table::find_by_id(original_job_id)
1086                    .select_only()
1087                    .columns([table::Column::Columns])
1088                    .into_tuple::<ColumnCatalogArray>()
1089                    .one(txn)
1090                    .await?
1091                    .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1092
1093                // For sinks created in earlier versions, we need to set the original_target_columns.
1094                for sink_id in updated_sink_catalogs {
1095                    sink::ActiveModel {
1096                        sink_id: Set(sink_id as _),
1097                        original_target_columns: Set(Some(original_table_catalogs.clone())),
1098                        ..Default::default()
1099                    }
1100                    .update(txn)
1101                    .await?;
1102                }
1103                // Update the table catalog with the new one. (column catalog is also updated here)
1104                let mut table = table::ActiveModel::from(table);
1105                let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1106                if let Some(sink_id) = creating_sink_id {
1107                    debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1108                    incoming_sinks.push(sink_id as _);
1109                }
1110                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1111                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1112                {
1113                    // drop table connector, the rest logic is in `drop_table_associated_source`
1114                    table.optional_associated_source_id = Set(None);
1115                }
1116
1117                if let Some(sink_id) = dropping_sink_id {
1118                    let drained = incoming_sinks
1119                        .extract_if(.., |id| *id == sink_id)
1120                        .collect_vec();
1121                    debug_assert_eq!(drained, vec![sink_id]);
1122                }
1123
1124                table.incoming_sinks = Set(incoming_sinks.into());
1125                table.update(txn).await?;
1126            }
1127            StreamingJob::Source(source) => {
1128                // Update the source catalog with the new one.
1129                let source = source::ActiveModel::from(source);
1130                source.update(txn).await?;
1131            }
1132            _ => unreachable!(
1133                "invalid streaming job type: {:?}",
1134                streaming_job.job_type_str()
1135            ),
1136        }
1137
1138        // 0. update internal tables
1139        // Fields including `fragment_id` were placeholder values before.
1140        // After table fragments are created, update them for all internal tables.
1141        let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1142            .select_only()
1143            .columns([
1144                fragment::Column::FragmentId,
1145                fragment::Column::StateTableIds,
1146            ])
1147            .filter(fragment::Column::JobId.eq(tmp_id))
1148            .into_tuple()
1149            .all(txn)
1150            .await?;
1151        for (fragment_id, state_table_ids) in fragment_info {
1152            for state_table_id in state_table_ids.into_inner() {
1153                table::ActiveModel {
1154                    table_id: Set(state_table_id as _),
1155                    fragment_id: Set(Some(fragment_id)),
1156                    // No need to update `vnode_count` because it must remain the same.
1157                    ..Default::default()
1158                }
1159                .update(txn)
1160                .await?;
1161            }
1162        }
1163
1164        // 1. replace old fragments/actors with new ones.
1165        Fragment::delete_many()
1166            .filter(fragment::Column::JobId.eq(original_job_id))
1167            .exec(txn)
1168            .await?;
1169        Fragment::update_many()
1170            .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1171            .filter(fragment::Column::JobId.eq(tmp_id))
1172            .exec(txn)
1173            .await?;
1174
1175        // 2. update merges.
1176        // update downstream fragment's Merge node, and upstream_fragment_id
1177        for (fragment_id, fragment_replace_map) in replace_upstream {
1178            let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1179                .select_only()
1180                .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1181                .into_tuple::<(FragmentId, StreamNode)>()
1182                .one(txn)
1183                .await?
1184                .map(|(id, node)| (id, node.to_protobuf()))
1185                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1186
1187            visit_stream_node_mut(&mut stream_node, |body| {
1188                if let PbNodeBody::Merge(m) = body
1189                    && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1190                {
1191                    m.upstream_fragment_id = *new_fragment_id;
1192                }
1193            });
1194            fragment::ActiveModel {
1195                fragment_id: Set(fragment_id),
1196                stream_node: Set(StreamNode::from(&stream_node)),
1197                ..Default::default()
1198            }
1199            .update(txn)
1200            .await?;
1201        }
1202
1203        // 3. remove dummy object.
1204        Object::delete_by_id(tmp_id).exec(txn).await?;
1205
1206        // 4. update catalogs and notify.
1207        let mut objects = vec![];
1208        match job_type {
1209            StreamingJobType::Table(_) => {
1210                let (table, table_obj) = Table::find_by_id(original_job_id)
1211                    .find_also_related(Object)
1212                    .one(txn)
1213                    .await?
1214                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1215                objects.push(PbObject {
1216                    object_info: Some(PbObjectInfo::Table(
1217                        ObjectModel(table, table_obj.unwrap()).into(),
1218                    )),
1219                })
1220            }
1221            StreamingJobType::Source => {
1222                let (source, source_obj) = Source::find_by_id(original_job_id)
1223                    .find_also_related(Object)
1224                    .one(txn)
1225                    .await?
1226                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1227                objects.push(PbObject {
1228                    object_info: Some(PbObjectInfo::Source(
1229                        ObjectModel(source, source_obj.unwrap()).into(),
1230                    )),
1231                })
1232            }
1233            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1234        }
1235        if let Some(table_col_index_mapping) = col_index_mapping {
1236            let expr_rewriter = ReplaceTableExprRewriter {
1237                table_col_index_mapping,
1238            };
1239
1240            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1241                .select_only()
1242                .columns([index::Column::IndexId, index::Column::IndexItems])
1243                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1244                .into_tuple()
1245                .all(txn)
1246                .await?;
1247            for (index_id, nodes) in index_items {
1248                let mut pb_nodes = nodes.to_protobuf();
1249                pb_nodes
1250                    .iter_mut()
1251                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1252                let index = index::ActiveModel {
1253                    index_id: Set(index_id),
1254                    index_items: Set(pb_nodes.into()),
1255                    ..Default::default()
1256                }
1257                .update(txn)
1258                .await?;
1259                let index_obj = index
1260                    .find_related(Object)
1261                    .one(txn)
1262                    .await?
1263                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1264                objects.push(PbObject {
1265                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1266                });
1267            }
1268        }
1269
1270        let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1271
1272        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1273        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1274            notification_objs =
1275                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1276        }
1277
1278        Ok((objects, fragment_mapping, notification_objs))
1279    }
1280
1281    /// Abort the replacing streaming job by deleting the temporary job object.
1282    pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1283        let inner = self.inner.write().await;
1284        Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1285        Ok(())
1286    }
1287
1288    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1289    // return the actor_ids to be applied
1290    pub async fn update_source_rate_limit_by_source_id(
1291        &self,
1292        source_id: SourceId,
1293        rate_limit: Option<u32>,
1294    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1295        let inner = self.inner.read().await;
1296        let txn = inner.db.begin().await?;
1297
1298        {
1299            let active_source = source::ActiveModel {
1300                source_id: Set(source_id),
1301                rate_limit: Set(rate_limit.map(|v| v as i32)),
1302                ..Default::default()
1303            };
1304            active_source.update(&txn).await?;
1305        }
1306
1307        let (source, obj) = Source::find_by_id(source_id)
1308            .find_also_related(Object)
1309            .one(&txn)
1310            .await?
1311            .ok_or_else(|| {
1312                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1313            })?;
1314
1315        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1316        let streaming_job_ids: Vec<ObjectId> =
1317            if let Some(table_id) = source.optional_associated_table_id {
1318                vec![table_id]
1319            } else if let Some(source_info) = &source.source_info
1320                && source_info.to_protobuf().is_shared()
1321            {
1322                vec![source_id]
1323            } else {
1324                ObjectDependency::find()
1325                    .select_only()
1326                    .column(object_dependency::Column::UsedBy)
1327                    .filter(object_dependency::Column::Oid.eq(source_id))
1328                    .into_tuple()
1329                    .all(&txn)
1330                    .await?
1331            };
1332
1333        if streaming_job_ids.is_empty() {
1334            return Err(MetaError::invalid_parameter(format!(
1335                "source id {source_id} not used by any streaming job"
1336            )));
1337        }
1338
1339        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1340            .select_only()
1341            .columns([
1342                fragment::Column::FragmentId,
1343                fragment::Column::FragmentTypeMask,
1344                fragment::Column::StreamNode,
1345            ])
1346            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1347            .into_tuple()
1348            .all(&txn)
1349            .await?;
1350        let mut fragments = fragments
1351            .into_iter()
1352            .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1353            .collect_vec();
1354
1355        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1356            let mut found = false;
1357            if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1358                visit_stream_node_mut(stream_node, |node| {
1359                    if let PbNodeBody::Source(node) = node {
1360                        if let Some(node_inner) = &mut node.source_inner
1361                            && node_inner.source_id == source_id as u32
1362                        {
1363                            node_inner.rate_limit = rate_limit;
1364                            found = true;
1365                        }
1366                    }
1367                });
1368            }
1369            if is_fs_source {
1370                // in older versions, there's no fragment type flag for `FsFetch` node,
1371                // so we just scan all fragments for StreamFsFetch node if using fs connector
1372                visit_stream_node_mut(stream_node, |node| {
1373                    if let PbNodeBody::StreamFsFetch(node) = node {
1374                        *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
1375                        if let Some(node_inner) = &mut node.node_inner
1376                            && node_inner.source_id == source_id as u32
1377                        {
1378                            node_inner.rate_limit = rate_limit;
1379                            found = true;
1380                        }
1381                    }
1382                });
1383            }
1384            found
1385        });
1386
1387        assert!(
1388            !fragments.is_empty(),
1389            "source id should be used by at least one fragment"
1390        );
1391        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1392        for (id, fragment_type_mask, stream_node) in fragments {
1393            fragment::ActiveModel {
1394                fragment_id: Set(id),
1395                fragment_type_mask: Set(fragment_type_mask),
1396                stream_node: Set(StreamNode::from(&stream_node)),
1397                ..Default::default()
1398            }
1399            .update(&txn)
1400            .await?;
1401        }
1402        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1403
1404        txn.commit().await?;
1405
1406        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1407        let _version = self
1408            .notify_frontend(
1409                NotificationOperation::Update,
1410                NotificationInfo::ObjectGroup(PbObjectGroup {
1411                    objects: vec![PbObject {
1412                        object_info: Some(relation_info),
1413                    }],
1414                }),
1415            )
1416            .await;
1417
1418        Ok(fragment_actors)
1419    }
1420
1421    // edit the content of fragments in given `table_id`
1422    // return the actor_ids to be applied
1423    async fn mutate_fragments_by_job_id(
1424        &self,
1425        job_id: ObjectId,
1426        // returns true if the mutation is applied
1427        mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1428        // error message when no relevant fragments is found
1429        err_msg: &'static str,
1430    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1431        let inner = self.inner.read().await;
1432        let txn = inner.db.begin().await?;
1433
1434        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1435            .select_only()
1436            .columns([
1437                fragment::Column::FragmentId,
1438                fragment::Column::FragmentTypeMask,
1439                fragment::Column::StreamNode,
1440            ])
1441            .filter(fragment::Column::JobId.eq(job_id))
1442            .into_tuple()
1443            .all(&txn)
1444            .await?;
1445        let mut fragments = fragments
1446            .into_iter()
1447            .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1448            .collect_vec();
1449
1450        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1451            fragments_mutation_fn(fragment_type_mask, stream_node)
1452        });
1453        if fragments.is_empty() {
1454            return Err(MetaError::invalid_parameter(format!(
1455                "job id {job_id}: {}",
1456                err_msg
1457            )));
1458        }
1459
1460        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1461        for (id, _, stream_node) in fragments {
1462            fragment::ActiveModel {
1463                fragment_id: Set(id),
1464                stream_node: Set(StreamNode::from(&stream_node)),
1465                ..Default::default()
1466            }
1467            .update(&txn)
1468            .await?;
1469        }
1470        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1471
1472        txn.commit().await?;
1473
1474        Ok(fragment_actors)
1475    }
1476
1477    async fn mutate_fragment_by_fragment_id(
1478        &self,
1479        fragment_id: FragmentId,
1480        mut fragment_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1481        err_msg: &'static str,
1482    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1483        let inner = self.inner.read().await;
1484        let txn = inner.db.begin().await?;
1485
1486        let (mut fragment_type_mask, stream_node): (i32, StreamNode) =
1487            Fragment::find_by_id(fragment_id)
1488                .select_only()
1489                .columns([
1490                    fragment::Column::FragmentTypeMask,
1491                    fragment::Column::StreamNode,
1492                ])
1493                .into_tuple()
1494                .one(&txn)
1495                .await?
1496                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1497        let mut pb_stream_node = stream_node.to_protobuf();
1498
1499        if !fragment_mutation_fn(&mut fragment_type_mask, &mut pb_stream_node) {
1500            return Err(MetaError::invalid_parameter(format!(
1501                "fragment id {fragment_id}: {}",
1502                err_msg
1503            )));
1504        }
1505
1506        fragment::ActiveModel {
1507            fragment_id: Set(fragment_id),
1508            stream_node: Set(stream_node),
1509            ..Default::default()
1510        }
1511        .update(&txn)
1512        .await?;
1513
1514        let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1515
1516        txn.commit().await?;
1517
1518        Ok(fragment_actors)
1519    }
1520
1521    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1522    // return the actor_ids to be applied
1523    pub async fn update_backfill_rate_limit_by_job_id(
1524        &self,
1525        job_id: ObjectId,
1526        rate_limit: Option<u32>,
1527    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1528        let update_backfill_rate_limit =
1529            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1530                let mut found = false;
1531                if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 {
1532                    visit_stream_node_mut(stream_node, |node| match node {
1533                        PbNodeBody::StreamCdcScan(node) => {
1534                            node.rate_limit = rate_limit;
1535                            found = true;
1536                        }
1537                        PbNodeBody::StreamScan(node) => {
1538                            node.rate_limit = rate_limit;
1539                            found = true;
1540                        }
1541                        PbNodeBody::SourceBackfill(node) => {
1542                            node.rate_limit = rate_limit;
1543                            found = true;
1544                        }
1545                        PbNodeBody::Sink(node) => {
1546                            node.rate_limit = rate_limit;
1547                            found = true;
1548                        }
1549                        _ => {}
1550                    });
1551                }
1552                found
1553            };
1554
1555        self.mutate_fragments_by_job_id(
1556            job_id,
1557            update_backfill_rate_limit,
1558            "stream scan node or source node not found",
1559        )
1560        .await
1561    }
1562
1563    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1564    // return the actor_ids to be applied
1565    pub async fn update_sink_rate_limit_by_job_id(
1566        &self,
1567        job_id: ObjectId,
1568        rate_limit: Option<u32>,
1569    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1570        let update_sink_rate_limit =
1571            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1572                let mut found = false;
1573                if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 {
1574                    visit_stream_node_mut(stream_node, |node| {
1575                        if let PbNodeBody::Sink(node) = node {
1576                            node.rate_limit = rate_limit;
1577                            found = true;
1578                        }
1579                    });
1580                }
1581                found
1582            };
1583
1584        self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1585            .await
1586    }
1587
1588    pub async fn update_dml_rate_limit_by_job_id(
1589        &self,
1590        job_id: ObjectId,
1591        rate_limit: Option<u32>,
1592    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1593        let update_dml_rate_limit =
1594            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1595                let mut found = false;
1596                if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
1597                    visit_stream_node_mut(stream_node, |node| {
1598                        if let PbNodeBody::Dml(node) = node {
1599                            node.rate_limit = rate_limit;
1600                            found = true;
1601                        }
1602                    });
1603                }
1604                found
1605            };
1606
1607        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1608            .await
1609    }
1610
1611    pub async fn update_sink_props_by_sink_id(
1612        &self,
1613        sink_id: SinkId,
1614        props: BTreeMap<String, String>,
1615    ) -> MetaResult<HashMap<String, String>> {
1616        let inner = self.inner.read().await;
1617        let txn = inner.db.begin().await?;
1618
1619        let (sink, _obj) = Sink::find_by_id(sink_id)
1620            .find_also_related(Object)
1621            .one(&txn)
1622            .await?
1623            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1624
1625        // Validate that props can be altered
1626        match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
1627            Some(connector) => {
1628                let connector_type = connector.to_lowercase();
1629                match_sink_name_str!(
1630                    connector_type.as_str(),
1631                    SinkType,
1632                    {
1633                        for (k, v) in &props {
1634                            if !SinkType::SINK_ALTER_CONFIG_LIST.contains(&k.as_str()) {
1635                                return Err(SinkError::Config(anyhow!(
1636                                    "unsupported alter config: {}={}",
1637                                    k,
1638                                    v
1639                                ))
1640                                .into());
1641                            }
1642                        }
1643                        let mut new_props = sink.properties.0.clone();
1644                        new_props.extend(props.clone());
1645                        SinkType::validate_alter_config(&new_props)
1646                    },
1647                    |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
1648                )?
1649            }
1650            None => {
1651                return Err(
1652                    SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
1653                );
1654            }
1655        };
1656        let definition = sink.definition.clone();
1657        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1658            .map_err(|e| SinkError::Config(anyhow!(e)))?
1659            .try_into()
1660            .unwrap();
1661        if let Statement::CreateSink { stmt } = &mut stmt {
1662            let mut new_sql_options = stmt
1663                .with_properties
1664                .0
1665                .iter()
1666                .map(|sql_option| (&sql_option.name, sql_option))
1667                .collect::<IndexMap<_, _>>();
1668            let add_sql_options = props
1669                .iter()
1670                .map(|(k, v)| SqlOption::try_from((k, v)))
1671                .collect::<Result<Vec<SqlOption>, ParserError>>()
1672                .map_err(|e| SinkError::Config(anyhow!(e)))?;
1673            new_sql_options.extend(
1674                add_sql_options
1675                    .iter()
1676                    .map(|sql_option| (&sql_option.name, sql_option)),
1677            );
1678            stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
1679        } else {
1680            panic!("sink definition is not a create sink statement")
1681        }
1682        let mut new_config = sink.properties.clone().into_inner();
1683        new_config.extend(props);
1684
1685        let active_sink = sink::ActiveModel {
1686            sink_id: Set(sink_id),
1687            properties: Set(risingwave_meta_model::Property(new_config.clone())),
1688            definition: Set(stmt.to_string()),
1689            ..Default::default()
1690        };
1691        active_sink.update(&txn).await?;
1692
1693        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1694            .select_only()
1695            .columns([
1696                fragment::Column::FragmentId,
1697                fragment::Column::FragmentTypeMask,
1698                fragment::Column::StreamNode,
1699            ])
1700            .filter(fragment::Column::JobId.eq(sink_id))
1701            .into_tuple()
1702            .all(&txn)
1703            .await?;
1704        let fragments = fragments
1705            .into_iter()
1706            .filter(|(_, fragment_type_mask, _)| {
1707                *fragment_type_mask & FragmentTypeFlag::Sink as i32 != 0
1708            })
1709            .filter_map(|(id, _, stream_node)| {
1710                let mut stream_node = stream_node.to_protobuf();
1711                let mut found = false;
1712                visit_stream_node_mut(&mut stream_node, |node| {
1713                    if let PbNodeBody::Sink(node) = node
1714                        && let Some(sink_desc) = &mut node.sink_desc
1715                        && sink_desc.id == sink_id as u32
1716                    {
1717                        sink_desc.properties = new_config.clone();
1718                        found = true;
1719                    }
1720                });
1721                if found { Some((id, stream_node)) } else { None }
1722            })
1723            .collect_vec();
1724        assert!(
1725            !fragments.is_empty(),
1726            "sink id should be used by at least one fragment"
1727        );
1728        for (id, stream_node) in fragments {
1729            fragment::ActiveModel {
1730                fragment_id: Set(id),
1731                stream_node: Set(StreamNode::from(&stream_node)),
1732                ..Default::default()
1733            }
1734            .update(&txn)
1735            .await?;
1736        }
1737
1738        let (sink, obj) = Sink::find_by_id(sink_id)
1739            .find_also_related(Object)
1740            .one(&txn)
1741            .await?
1742            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1743
1744        txn.commit().await?;
1745
1746        let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
1747        let _version = self
1748            .notify_frontend(
1749                NotificationOperation::Update,
1750                NotificationInfo::ObjectGroup(PbObjectGroup {
1751                    objects: vec![PbObject {
1752                        object_info: Some(relation_info),
1753                    }],
1754                }),
1755            )
1756            .await;
1757
1758        Ok(new_config.into_iter().collect())
1759    }
1760
1761    pub async fn update_fragment_rate_limit_by_fragment_id(
1762        &self,
1763        fragment_id: FragmentId,
1764        rate_limit: Option<u32>,
1765    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1766        let update_rate_limit = |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1767            let mut found = false;
1768            if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0
1769                || *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0
1770                || *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0
1771                || *fragment_type_mask & PbFragmentTypeFlag::source_rate_limit_fragments() != 0
1772            {
1773                visit_stream_node_mut(stream_node, |node| {
1774                    if let PbNodeBody::Dml(node) = node {
1775                        node.rate_limit = rate_limit;
1776                        found = true;
1777                    }
1778                    if let PbNodeBody::Sink(node) = node {
1779                        node.rate_limit = rate_limit;
1780                        found = true;
1781                    }
1782                    if let PbNodeBody::StreamCdcScan(node) = node {
1783                        node.rate_limit = rate_limit;
1784                        found = true;
1785                    }
1786                    if let PbNodeBody::StreamScan(node) = node {
1787                        node.rate_limit = rate_limit;
1788                        found = true;
1789                    }
1790                    if let PbNodeBody::SourceBackfill(node) = node {
1791                        node.rate_limit = rate_limit;
1792                        found = true;
1793                    }
1794                });
1795            }
1796            found
1797        };
1798        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
1799            .await
1800    }
1801
1802    pub async fn post_apply_reschedules(
1803        &self,
1804        reschedules: HashMap<FragmentId, Reschedule>,
1805        post_updates: &JobReschedulePostUpdates,
1806    ) -> MetaResult<()> {
1807        let new_created_actors: HashSet<_> = reschedules
1808            .values()
1809            .flat_map(|reschedule| {
1810                reschedule
1811                    .added_actors
1812                    .values()
1813                    .flatten()
1814                    .map(|actor_id| *actor_id as ActorId)
1815            })
1816            .collect();
1817
1818        let inner = self.inner.write().await;
1819
1820        let txn = inner.db.begin().await?;
1821
1822        let mut fragment_mapping_to_notify = vec![];
1823
1824        for (
1825            fragment_id,
1826            Reschedule {
1827                removed_actors,
1828                vnode_bitmap_updates,
1829                actor_splits,
1830                newly_created_actors,
1831                ..
1832            },
1833        ) in reschedules
1834        {
1835            // drop removed actors
1836            Actor::delete_many()
1837                .filter(
1838                    actor::Column::ActorId
1839                        .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
1840                )
1841                .exec(&txn)
1842                .await?;
1843
1844            // add new actors
1845            for (
1846                (
1847                    StreamActor {
1848                        actor_id,
1849                        fragment_id,
1850                        vnode_bitmap,
1851                        expr_context,
1852                        ..
1853                    },
1854                    _,
1855                ),
1856                worker_id,
1857            ) in newly_created_actors.into_values()
1858            {
1859                let splits = actor_splits
1860                    .get(&actor_id)
1861                    .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
1862
1863                Actor::insert(actor::ActiveModel {
1864                    actor_id: Set(actor_id as _),
1865                    fragment_id: Set(fragment_id as _),
1866                    status: Set(ActorStatus::Running),
1867                    splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
1868                    worker_id: Set(worker_id),
1869                    upstream_actor_ids: Set(Default::default()),
1870                    vnode_bitmap: Set(vnode_bitmap
1871                        .as_ref()
1872                        .map(|bitmap| (&bitmap.to_protobuf()).into())),
1873                    expr_context: Set(expr_context.as_ref().unwrap().into()),
1874                })
1875                .exec(&txn)
1876                .await?;
1877            }
1878
1879            // actor update
1880            for (actor_id, bitmap) in vnode_bitmap_updates {
1881                let actor = Actor::find_by_id(actor_id as ActorId)
1882                    .one(&txn)
1883                    .await?
1884                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1885
1886                let mut actor = actor.into_active_model();
1887                actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
1888                actor.update(&txn).await?;
1889            }
1890
1891            // Update actor_splits for existing actors
1892            for (actor_id, splits) in actor_splits {
1893                if new_created_actors.contains(&(actor_id as ActorId)) {
1894                    continue;
1895                }
1896
1897                let actor = Actor::find_by_id(actor_id as ActorId)
1898                    .one(&txn)
1899                    .await?
1900                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1901
1902                let mut actor = actor.into_active_model();
1903                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
1904                actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
1905                actor.update(&txn).await?;
1906            }
1907
1908            // fragment update
1909            let fragment = Fragment::find_by_id(fragment_id)
1910                .one(&txn)
1911                .await?
1912                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1913
1914            let job_actors = fragment
1915                .find_related(Actor)
1916                .all(&txn)
1917                .await?
1918                .into_iter()
1919                .map(|actor| {
1920                    (
1921                        fragment_id,
1922                        fragment.distribution_type,
1923                        actor.actor_id,
1924                        actor.vnode_bitmap,
1925                        actor.worker_id,
1926                        actor.status,
1927                    )
1928                })
1929                .collect_vec();
1930
1931            fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
1932        }
1933
1934        let JobReschedulePostUpdates {
1935            parallelism_updates,
1936            resource_group_updates,
1937        } = post_updates;
1938
1939        for (table_id, parallelism) in parallelism_updates {
1940            let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
1941                .one(&txn)
1942                .await?
1943                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
1944                .into_active_model();
1945
1946            streaming_job.parallelism = Set(match parallelism {
1947                TableParallelism::Adaptive => StreamingParallelism::Adaptive,
1948                TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
1949                TableParallelism::Custom => StreamingParallelism::Custom,
1950            });
1951
1952            if let Some(resource_group) =
1953                resource_group_updates.get(&(table_id.table_id() as ObjectId))
1954            {
1955                streaming_job.specific_resource_group = Set(resource_group.to_owned());
1956            }
1957
1958            streaming_job.update(&txn).await?;
1959        }
1960
1961        txn.commit().await?;
1962        self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1963            .await;
1964
1965        Ok(())
1966    }
1967
1968    /// Note: `FsFetch` created in old versions are not included.
1969    /// Since this is only used for debugging, it should be fine.
1970    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
1971        let inner = self.inner.read().await;
1972        let txn = inner.db.begin().await?;
1973
1974        let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
1975            .select_only()
1976            .columns([
1977                fragment::Column::FragmentId,
1978                fragment::Column::JobId,
1979                fragment::Column::FragmentTypeMask,
1980                fragment::Column::StreamNode,
1981            ])
1982            .filter(fragment_type_mask_intersects(
1983                PbFragmentTypeFlag::rate_limit_fragments(),
1984            ))
1985            .into_tuple()
1986            .all(&txn)
1987            .await?;
1988
1989        let mut rate_limits = Vec::new();
1990        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
1991            let stream_node = stream_node.to_protobuf();
1992            let mut rate_limit = None;
1993            let mut node_name = None;
1994
1995            visit_stream_node(&stream_node, |node| {
1996                match node {
1997                    // source rate limit
1998                    PbNodeBody::Source(node) => {
1999                        if let Some(node_inner) = &node.source_inner {
2000                            debug_assert!(
2001                                rate_limit.is_none(),
2002                                "one fragment should only have 1 rate limit node"
2003                            );
2004                            rate_limit = node_inner.rate_limit;
2005                            node_name = Some("SOURCE");
2006                        }
2007                    }
2008                    PbNodeBody::StreamFsFetch(node) => {
2009                        if let Some(node_inner) = &node.node_inner {
2010                            debug_assert!(
2011                                rate_limit.is_none(),
2012                                "one fragment should only have 1 rate limit node"
2013                            );
2014                            rate_limit = node_inner.rate_limit;
2015                            node_name = Some("FS_FETCH");
2016                        }
2017                    }
2018                    // backfill rate limit
2019                    PbNodeBody::SourceBackfill(node) => {
2020                        debug_assert!(
2021                            rate_limit.is_none(),
2022                            "one fragment should only have 1 rate limit node"
2023                        );
2024                        rate_limit = node.rate_limit;
2025                        node_name = Some("SOURCE_BACKFILL");
2026                    }
2027                    PbNodeBody::StreamScan(node) => {
2028                        debug_assert!(
2029                            rate_limit.is_none(),
2030                            "one fragment should only have 1 rate limit node"
2031                        );
2032                        rate_limit = node.rate_limit;
2033                        node_name = Some("STREAM_SCAN");
2034                    }
2035                    PbNodeBody::StreamCdcScan(node) => {
2036                        debug_assert!(
2037                            rate_limit.is_none(),
2038                            "one fragment should only have 1 rate limit node"
2039                        );
2040                        rate_limit = node.rate_limit;
2041                        node_name = Some("STREAM_CDC_SCAN");
2042                    }
2043                    PbNodeBody::Sink(node) => {
2044                        debug_assert!(
2045                            rate_limit.is_none(),
2046                            "one fragment should only have 1 rate limit node"
2047                        );
2048                        rate_limit = node.rate_limit;
2049                        node_name = Some("SINK");
2050                    }
2051                    _ => {}
2052                }
2053            });
2054
2055            if let Some(rate_limit) = rate_limit {
2056                rate_limits.push(RateLimitInfo {
2057                    fragment_id: fragment_id as u32,
2058                    job_id: job_id as u32,
2059                    fragment_type_mask: fragment_type_mask as u32,
2060                    rate_limit,
2061                    node_name: node_name.unwrap().to_owned(),
2062                });
2063            }
2064        }
2065
2066        Ok(rate_limits)
2067    }
2068}
2069
2070fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2071    column
2072        .binary(BinOper::Custom("&"), value)
2073        .binary(BinOper::NotEqual, 0)
2074}
2075
2076fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2077    bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2078}
2079
2080pub struct SinkIntoTableContext {
2081    /// For creating sink into table, this is `Some`, otherwise `None`.
2082    pub creating_sink_id: Option<SinkId>,
2083    /// For dropping sink into table, this is `Some`, otherwise `None`.
2084    pub dropping_sink_id: Option<SinkId>,
2085    /// For alter table (e.g., add column), this is the list of existing sink ids
2086    /// otherwise empty.
2087    pub updated_sink_catalogs: Vec<SinkId>,
2088}