risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use itertools::Itertools;
20use risingwave_common::config::DefaultParallelism;
21use risingwave_common::hash::VnodeCountCompat;
22use risingwave_common::util::column_index_mapping::ColIndexMapping;
23use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_mut};
24use risingwave_common::{bail, current_cluster_version};
25use risingwave_connector::WithPropertiesExt;
26use risingwave_meta_model::actor::ActorStatus;
27use risingwave_meta_model::object::ObjectType;
28use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
29use risingwave_meta_model::table::TableType;
30use risingwave_meta_model::*;
31use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
32use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
33use risingwave_pb::catalog::{PbCreateType, PbTable};
34use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
35use risingwave_pb::meta::object::PbObjectInfo;
36use risingwave_pb::meta::subscribe_response::{
37    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
38};
39use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
40use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
41use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
42use risingwave_pb::stream_plan::stream_node::PbNodeBody;
43use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode};
44use risingwave_pb::user::PbUserInfo;
45use sea_orm::ActiveValue::Set;
46use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
47use sea_orm::{
48    ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
49    IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
50    RelationTrait, TransactionTrait,
51};
52use thiserror_ext::AsReport;
53
54use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
55use crate::controller::ObjectModel;
56use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
57use crate::controller::rename::ReplaceTableExprRewriter;
58use crate::controller::utils::{
59    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
60    check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
61    get_fragment_mappings, get_internal_tables_by_id, insert_fragment_relations,
62    rebuild_fragment_mapping_from_actors,
63};
64use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
65use crate::model::{
66    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
67    StreamJobFragmentsToCreate, TableParallelism,
68};
69use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
70use crate::{MetaError, MetaResult};
71
72impl CatalogController {
73    pub async fn create_streaming_job_obj(
74        txn: &DatabaseTransaction,
75        obj_type: ObjectType,
76        owner_id: UserId,
77        database_id: Option<DatabaseId>,
78        schema_id: Option<SchemaId>,
79        create_type: PbCreateType,
80        ctx: &StreamContext,
81        streaming_parallelism: StreamingParallelism,
82        max_parallelism: usize,
83        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
84    ) -> MetaResult<ObjectId> {
85        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
86        let job = streaming_job::ActiveModel {
87            job_id: Set(obj.oid),
88            job_status: Set(JobStatus::Initial),
89            create_type: Set(create_type.into()),
90            timezone: Set(ctx.timezone.clone()),
91            parallelism: Set(streaming_parallelism),
92            max_parallelism: Set(max_parallelism as _),
93            specific_resource_group: Set(specific_resource_group),
94        };
95        job.insert(txn).await?;
96
97        Ok(obj.oid)
98    }
99
100    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
101    /// materialized view.
102    ///
103    /// Some of the fields in the given streaming job are placeholders, which will
104    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
105    pub async fn create_job_catalog(
106        &self,
107        streaming_job: &mut StreamingJob,
108        ctx: &StreamContext,
109        parallelism: &Option<Parallelism>,
110        max_parallelism: usize,
111        mut dependencies: HashSet<ObjectId>,
112        specific_resource_group: Option<String>,
113    ) -> MetaResult<()> {
114        let inner = self.inner.write().await;
115        let txn = inner.db.begin().await?;
116        let create_type = streaming_job.create_type();
117
118        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
119            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
120            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
121            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
122        };
123
124        ensure_user_id(streaming_job.owner() as _, &txn).await?;
125        ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
126        ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
127        check_relation_name_duplicate(
128            &streaming_job.name(),
129            streaming_job.database_id() as _,
130            streaming_job.schema_id() as _,
131            &txn,
132        )
133        .await?;
134
135        // TODO(rc): pass all dependencies uniformly, deprecate `dependent_relations` and `dependent_secret_ids`.
136        dependencies.extend(
137            streaming_job
138                .dependent_relations()
139                .into_iter()
140                .map(|id| id as ObjectId),
141        );
142
143        // check if any dependency is in altering status.
144        if !dependencies.is_empty() {
145            let altering_cnt = ObjectDependency::find()
146                .join(
147                    JoinType::InnerJoin,
148                    object_dependency::Relation::Object1.def(),
149                )
150                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
151                .filter(
152                    object_dependency::Column::Oid
153                        .is_in(dependencies.clone())
154                        .and(object::Column::ObjType.eq(ObjectType::Table))
155                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
156                        .and(
157                            // It means the referring table is just dummy for altering.
158                            object::Column::Oid.not_in_subquery(
159                                Query::select()
160                                    .column(table::Column::TableId)
161                                    .from(Table)
162                                    .to_owned(),
163                            ),
164                        ),
165                )
166                .count(&txn)
167                .await?;
168            if altering_cnt != 0 {
169                return Err(MetaError::permission_denied(
170                    "some dependent relations are being altered",
171                ));
172            }
173        }
174
175        match streaming_job {
176            StreamingJob::MaterializedView(table) => {
177                let job_id = Self::create_streaming_job_obj(
178                    &txn,
179                    ObjectType::Table,
180                    table.owner as _,
181                    Some(table.database_id as _),
182                    Some(table.schema_id as _),
183                    create_type,
184                    ctx,
185                    streaming_parallelism,
186                    max_parallelism,
187                    specific_resource_group,
188                )
189                .await?;
190                table.id = job_id as _;
191                let table_model: table::ActiveModel = table.clone().into();
192                Table::insert(table_model).exec(&txn).await?;
193            }
194            StreamingJob::Sink(sink, _) => {
195                if let Some(target_table_id) = sink.target_table {
196                    if check_sink_into_table_cycle(
197                        target_table_id as ObjectId,
198                        dependencies.iter().cloned().collect(),
199                        &txn,
200                    )
201                    .await?
202                    {
203                        bail!("Creating such a sink will result in circular dependency.");
204                    }
205                }
206
207                let job_id = Self::create_streaming_job_obj(
208                    &txn,
209                    ObjectType::Sink,
210                    sink.owner as _,
211                    Some(sink.database_id as _),
212                    Some(sink.schema_id as _),
213                    create_type,
214                    ctx,
215                    streaming_parallelism,
216                    max_parallelism,
217                    specific_resource_group,
218                )
219                .await?;
220                sink.id = job_id as _;
221                let sink_model: sink::ActiveModel = sink.clone().into();
222                Sink::insert(sink_model).exec(&txn).await?;
223            }
224            StreamingJob::Table(src, table, _) => {
225                let job_id = Self::create_streaming_job_obj(
226                    &txn,
227                    ObjectType::Table,
228                    table.owner as _,
229                    Some(table.database_id as _),
230                    Some(table.schema_id as _),
231                    create_type,
232                    ctx,
233                    streaming_parallelism,
234                    max_parallelism,
235                    specific_resource_group,
236                )
237                .await?;
238                table.id = job_id as _;
239                if let Some(src) = src {
240                    let src_obj = Self::create_object(
241                        &txn,
242                        ObjectType::Source,
243                        src.owner as _,
244                        Some(src.database_id as _),
245                        Some(src.schema_id as _),
246                    )
247                    .await?;
248                    src.id = src_obj.oid as _;
249                    src.optional_associated_table_id =
250                        Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
251                    table.optional_associated_source_id = Some(
252                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
253                    );
254                    let source: source::ActiveModel = src.clone().into();
255                    Source::insert(source).exec(&txn).await?;
256                }
257                let table_model: table::ActiveModel = table.clone().into();
258                Table::insert(table_model).exec(&txn).await?;
259            }
260            StreamingJob::Index(index, table) => {
261                ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
262                let job_id = Self::create_streaming_job_obj(
263                    &txn,
264                    ObjectType::Index,
265                    index.owner as _,
266                    Some(index.database_id as _),
267                    Some(index.schema_id as _),
268                    create_type,
269                    ctx,
270                    streaming_parallelism,
271                    max_parallelism,
272                    specific_resource_group,
273                )
274                .await?;
275                // to be compatible with old implementation.
276                index.id = job_id as _;
277                index.index_table_id = job_id as _;
278                table.id = job_id as _;
279
280                ObjectDependency::insert(object_dependency::ActiveModel {
281                    oid: Set(index.primary_table_id as _),
282                    used_by: Set(table.id as _),
283                    ..Default::default()
284                })
285                .exec(&txn)
286                .await?;
287
288                let table_model: table::ActiveModel = table.clone().into();
289                Table::insert(table_model).exec(&txn).await?;
290                let index_model: index::ActiveModel = index.clone().into();
291                Index::insert(index_model).exec(&txn).await?;
292            }
293            StreamingJob::Source(src) => {
294                let job_id = Self::create_streaming_job_obj(
295                    &txn,
296                    ObjectType::Source,
297                    src.owner as _,
298                    Some(src.database_id as _),
299                    Some(src.schema_id as _),
300                    create_type,
301                    ctx,
302                    streaming_parallelism,
303                    max_parallelism,
304                    specific_resource_group,
305                )
306                .await?;
307                src.id = job_id as _;
308                let source_model: source::ActiveModel = src.clone().into();
309                Source::insert(source_model).exec(&txn).await?;
310            }
311        }
312
313        // collect dependent secrets.
314        dependencies.extend(
315            streaming_job
316                .dependent_secret_ids()?
317                .into_iter()
318                .map(|secret_id| secret_id as ObjectId),
319        );
320        // collect dependent connection
321        dependencies.extend(
322            streaming_job
323                .dependent_connection_ids()?
324                .into_iter()
325                .map(|conn_id| conn_id as ObjectId),
326        );
327
328        // record object dependency.
329        if !dependencies.is_empty() {
330            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
331                object_dependency::ActiveModel {
332                    oid: Set(oid),
333                    used_by: Set(streaming_job.id() as _),
334                    ..Default::default()
335                }
336            }))
337            .exec(&txn)
338            .await?;
339        }
340
341        txn.commit().await?;
342
343        Ok(())
344    }
345
346    /// Create catalogs for internal tables, then notify frontend about them if the job is a
347    /// materialized view.
348    ///
349    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
350    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
351    ///
352    /// Returns a mapping from the temporary table id to the actual global table id.
353    pub async fn create_internal_table_catalog(
354        &self,
355        job: &StreamingJob,
356        mut incomplete_internal_tables: Vec<PbTable>,
357    ) -> MetaResult<HashMap<u32, u32>> {
358        let job_id = job.id() as ObjectId;
359        let inner = self.inner.write().await;
360        let txn = inner.db.begin().await?;
361        let mut table_id_map = HashMap::new();
362        for table in &mut incomplete_internal_tables {
363            let table_id = Self::create_object(
364                &txn,
365                ObjectType::Table,
366                table.owner as _,
367                Some(table.database_id as _),
368                Some(table.schema_id as _),
369            )
370            .await?
371            .oid;
372            table_id_map.insert(table.id, table_id as u32);
373            table.id = table_id as _;
374            table.job_id = Some(job_id as _);
375
376            let table_model = table::ActiveModel {
377                table_id: Set(table_id as _),
378                belongs_to_job_id: Set(Some(job_id)),
379                fragment_id: NotSet,
380                ..table.clone().into()
381            };
382            Table::insert(table_model).exec(&txn).await?;
383        }
384        txn.commit().await?;
385
386        Ok(table_id_map)
387    }
388
389    // TODO: In this function, we also update the `Table` model in the meta store.
390    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
391    // making them the source of truth and performing a full replacement for those in the meta store?
392    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
393    pub async fn prepare_streaming_job(
394        &self,
395        stream_job_fragments: &StreamJobFragmentsToCreate,
396        streaming_job: &StreamingJob,
397        for_replace: bool,
398    ) -> MetaResult<()> {
399        let is_materialized_view = streaming_job.is_materialized_view();
400        let fragment_actors =
401            Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
402        let mut all_tables = stream_job_fragments.all_tables();
403        let inner = self.inner.write().await;
404
405        let mut objects = vec![];
406        let txn = inner.db.begin().await?;
407
408        // Add fragments.
409        let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
410        for fragment in fragments {
411            let fragment_id = fragment.fragment_id;
412            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
413
414            let fragment = fragment.into_active_model();
415            Fragment::insert(fragment).exec(&txn).await?;
416
417            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
418            // After table fragments are created, update them for all tables.
419            if !for_replace {
420                for state_table_id in state_table_ids {
421                    // Table's vnode count is not always the fragment's vnode count, so we have to
422                    // look up the table from `TableFragments`.
423                    // See `ActorGraphBuilder::new`.
424                    let table = all_tables
425                        .get_mut(&(state_table_id as u32))
426                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
427                    assert_eq!(table.id, state_table_id as u32);
428                    assert_eq!(table.fragment_id, fragment_id as u32);
429                    table.job_id = Some(streaming_job.id());
430                    let vnode_count = table.vnode_count();
431
432                    table::ActiveModel {
433                        table_id: Set(state_table_id as _),
434                        fragment_id: Set(Some(fragment_id)),
435                        vnode_count: Set(vnode_count as _),
436                        ..Default::default()
437                    }
438                    .update(&txn)
439                    .await?;
440
441                    if is_materialized_view {
442                        objects.push(PbObject {
443                            object_info: Some(PbObjectInfo::Table(table.clone())),
444                        });
445                    }
446                }
447            }
448        }
449
450        insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
451
452        // Add actors and actor dispatchers.
453        for actors in actors {
454            for actor in actors {
455                let actor = actor.into_active_model();
456                Actor::insert(actor).exec(&txn).await?;
457            }
458        }
459
460        if !for_replace {
461            // Update dml fragment id.
462            if let StreamingJob::Table(_, table, ..) = streaming_job {
463                Table::update(table::ActiveModel {
464                    table_id: Set(table.id as _),
465                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
466                    ..Default::default()
467                })
468                .exec(&txn)
469                .await?;
470            }
471        }
472
473        txn.commit().await?;
474
475        if !objects.is_empty() {
476            assert!(is_materialized_view);
477            self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
478                .await;
479        }
480
481        Ok(())
482    }
483
484    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
485    /// It returns (true, _) if the job is not found or aborted.
486    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
487    pub async fn try_abort_creating_streaming_job(
488        &self,
489        job_id: ObjectId,
490        is_cancelled: bool,
491    ) -> MetaResult<(bool, Option<DatabaseId>)> {
492        let mut inner = self.inner.write().await;
493        let txn = inner.db.begin().await?;
494
495        let obj = Object::find_by_id(job_id).one(&txn).await?;
496        let Some(obj) = obj else {
497            tracing::warn!(
498                id = job_id,
499                "streaming job not found when aborting creating, might be cleaned by recovery"
500            );
501            return Ok((true, None));
502        };
503        let database_id = obj
504            .database_id
505            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
506
507        if !is_cancelled {
508            let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
509            if let Some(streaming_job) = streaming_job {
510                assert_ne!(streaming_job.job_status, JobStatus::Created);
511                if streaming_job.create_type == CreateType::Background
512                    && streaming_job.job_status == JobStatus::Creating
513                {
514                    // If the job is created in background and still in creating status, we should not abort it and let recovery to handle it.
515                    tracing::warn!(
516                        id = job_id,
517                        "streaming job is created in background and still in creating status"
518                    );
519                    return Ok((false, Some(database_id)));
520                }
521            }
522        }
523
524        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
525
526        // Get the notification info if the job is a materialized view.
527        let table_obj = Table::find_by_id(job_id).one(&txn).await?;
528        let mut objs = vec![];
529        if let Some(table) = &table_obj
530            && table.table_type == TableType::MaterializedView
531        {
532            let obj: Option<PartialObject> = Object::find_by_id(job_id)
533                .select_only()
534                .columns([
535                    object::Column::Oid,
536                    object::Column::ObjType,
537                    object::Column::SchemaId,
538                    object::Column::DatabaseId,
539                ])
540                .into_partial_model()
541                .one(&txn)
542                .await?;
543            let obj =
544                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
545            objs.push(obj);
546            let internal_table_objs: Vec<PartialObject> = Object::find()
547                .select_only()
548                .columns([
549                    object::Column::Oid,
550                    object::Column::ObjType,
551                    object::Column::SchemaId,
552                    object::Column::DatabaseId,
553                ])
554                .join(JoinType::InnerJoin, object::Relation::Table.def())
555                .filter(table::Column::BelongsToJobId.eq(job_id))
556                .into_partial_model()
557                .all(&txn)
558                .await?;
559            objs.extend(internal_table_objs);
560        }
561
562        Object::delete_by_id(job_id).exec(&txn).await?;
563        if !internal_table_ids.is_empty() {
564            Object::delete_many()
565                .filter(object::Column::Oid.is_in(internal_table_ids))
566                .exec(&txn)
567                .await?;
568        }
569        if let Some(t) = &table_obj
570            && let Some(source_id) = t.optional_associated_source_id
571        {
572            Object::delete_by_id(source_id).exec(&txn).await?;
573        }
574
575        let err = if is_cancelled {
576            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
577        } else {
578            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
579        };
580        let abort_reason = format!("streaing job aborted {}", err.as_report());
581        for tx in inner
582            .creating_table_finish_notifier
583            .get_mut(&database_id)
584            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
585            .into_iter()
586            .flatten()
587            .flatten()
588        {
589            let _ = tx.send(Err(abort_reason.clone()));
590        }
591        txn.commit().await?;
592
593        if !objs.is_empty() {
594            // We also have notified the frontend about these objects,
595            // so we need to notify the frontend to delete them here.
596            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
597                .await;
598        }
599        Ok((true, Some(database_id)))
600    }
601
602    pub async fn post_collect_job_fragments(
603        &self,
604        job_id: ObjectId,
605        actor_ids: Vec<crate::model::ActorId>,
606        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
607        split_assignment: &SplitAssignment,
608    ) -> MetaResult<()> {
609        self.post_collect_job_fragments_inner(
610            job_id,
611            actor_ids,
612            upstream_fragment_new_downstreams,
613            split_assignment,
614            false,
615        )
616        .await
617    }
618
619    pub async fn post_collect_job_fragments_inner(
620        &self,
621        job_id: ObjectId,
622        actor_ids: Vec<crate::model::ActorId>,
623        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
624        split_assignment: &SplitAssignment,
625        is_mv: bool,
626    ) -> MetaResult<()> {
627        let inner = self.inner.write().await;
628        let txn = inner.db.begin().await?;
629
630        Actor::update_many()
631            .col_expr(
632                actor::Column::Status,
633                SimpleExpr::from(ActorStatus::Running.into_value()),
634            )
635            .filter(
636                actor::Column::ActorId
637                    .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
638            )
639            .exec(&txn)
640            .await?;
641
642        for splits in split_assignment.values() {
643            for (actor_id, splits) in splits {
644                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
645                let connector_splits = &PbConnectorSplits { splits };
646                actor::ActiveModel {
647                    actor_id: Set(*actor_id as _),
648                    splits: Set(Some(connector_splits.into())),
649                    ..Default::default()
650                }
651                .update(&txn)
652                .await?;
653            }
654        }
655
656        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
657
658        // Mark job as CREATING.
659        streaming_job::ActiveModel {
660            job_id: Set(job_id),
661            job_status: Set(JobStatus::Creating),
662            ..Default::default()
663        }
664        .update(&txn)
665        .await?;
666
667        let fragment_mapping = if is_mv {
668            get_fragment_mappings(&txn, job_id as _).await?
669        } else {
670            vec![]
671        };
672
673        txn.commit().await?;
674        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
675            .await;
676
677        Ok(())
678    }
679
680    pub async fn create_job_catalog_for_replace(
681        &self,
682        streaming_job: &StreamingJob,
683        ctx: &StreamContext,
684        specified_parallelism: &Option<NonZeroUsize>,
685        max_parallelism: usize,
686    ) -> MetaResult<ObjectId> {
687        let id = streaming_job.id();
688        let inner = self.inner.write().await;
689        let txn = inner.db.begin().await?;
690
691        // 1. check version.
692        streaming_job.verify_version_for_replace(&txn).await?;
693        // 2. check concurrent replace.
694        let referring_cnt = ObjectDependency::find()
695            .join(
696                JoinType::InnerJoin,
697                object_dependency::Relation::Object1.def(),
698            )
699            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
700            .filter(
701                object_dependency::Column::Oid
702                    .eq(id as ObjectId)
703                    .and(object::Column::ObjType.eq(ObjectType::Table))
704                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
705            )
706            .count(&txn)
707            .await?;
708        if referring_cnt != 0 {
709            return Err(MetaError::permission_denied(
710                "job is being altered or referenced by some creating jobs",
711            ));
712        }
713
714        // 3. check parallelism.
715        let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
716            .select_only()
717            .column(streaming_job::Column::MaxParallelism)
718            .into_tuple()
719            .one(&txn)
720            .await?
721            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
722
723        if original_max_parallelism != max_parallelism as i32 {
724            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
725            // This should not happen in normal cases.
726            bail!(
727                "cannot use a different max parallelism \
728                 when replacing streaming job, \
729                 original: {}, new: {}",
730                original_max_parallelism,
731                max_parallelism
732            );
733        }
734
735        let parallelism = match specified_parallelism {
736            None => StreamingParallelism::Adaptive,
737            Some(n) => StreamingParallelism::Fixed(n.get() as _),
738        };
739
740        // 4. create streaming object for new replace table.
741        let new_obj_id = Self::create_streaming_job_obj(
742            &txn,
743            streaming_job.object_type(),
744            streaming_job.owner() as _,
745            Some(streaming_job.database_id() as _),
746            Some(streaming_job.schema_id() as _),
747            streaming_job.create_type(),
748            ctx,
749            parallelism,
750            max_parallelism,
751            None,
752        )
753        .await?;
754
755        // 5. record dependency for new replace table.
756        ObjectDependency::insert(object_dependency::ActiveModel {
757            oid: Set(id as _),
758            used_by: Set(new_obj_id as _),
759            ..Default::default()
760        })
761        .exec(&txn)
762        .await?;
763
764        txn.commit().await?;
765
766        Ok(new_obj_id)
767    }
768
769    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
770    pub async fn finish_streaming_job(
771        &self,
772        job_id: ObjectId,
773        replace_stream_job_info: Option<ReplaceStreamJobPlan>,
774    ) -> MetaResult<()> {
775        let mut inner = self.inner.write().await;
776        let txn = inner.db.begin().await?;
777
778        let job_type = Object::find_by_id(job_id)
779            .select_only()
780            .column(object::Column::ObjType)
781            .into_tuple()
782            .one(&txn)
783            .await?
784            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
785
786        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
787        let res = Object::update_many()
788            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
789            .col_expr(
790                object::Column::CreatedAtClusterVersion,
791                current_cluster_version().into(),
792            )
793            .filter(object::Column::Oid.eq(job_id))
794            .exec(&txn)
795            .await?;
796        if res.rows_affected == 0 {
797            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
798        }
799
800        // mark the target stream job as `Created`.
801        let job = streaming_job::ActiveModel {
802            job_id: Set(job_id),
803            job_status: Set(JobStatus::Created),
804            ..Default::default()
805        };
806        job.update(&txn).await?;
807
808        // notify frontend: job, internal tables.
809        let internal_table_objs = Table::find()
810            .find_also_related(Object)
811            .filter(table::Column::BelongsToJobId.eq(job_id))
812            .all(&txn)
813            .await?;
814        let mut objects = internal_table_objs
815            .iter()
816            .map(|(table, obj)| PbObject {
817                object_info: Some(PbObjectInfo::Table(
818                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
819                )),
820            })
821            .collect_vec();
822        let mut notification_op = NotificationOperation::Add;
823
824        match job_type {
825            ObjectType::Table => {
826                let (table, obj) = Table::find_by_id(job_id)
827                    .find_also_related(Object)
828                    .one(&txn)
829                    .await?
830                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
831                if table.table_type == TableType::MaterializedView {
832                    notification_op = NotificationOperation::Update;
833                }
834
835                if let Some(source_id) = table.optional_associated_source_id {
836                    let (src, obj) = Source::find_by_id(source_id)
837                        .find_also_related(Object)
838                        .one(&txn)
839                        .await?
840                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
841                    objects.push(PbObject {
842                        object_info: Some(PbObjectInfo::Source(
843                            ObjectModel(src, obj.unwrap()).into(),
844                        )),
845                    });
846                }
847                objects.push(PbObject {
848                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
849                });
850            }
851            ObjectType::Sink => {
852                let (sink, obj) = Sink::find_by_id(job_id)
853                    .find_also_related(Object)
854                    .one(&txn)
855                    .await?
856                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
857                objects.push(PbObject {
858                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
859                });
860            }
861            ObjectType::Index => {
862                let (index, obj) = Index::find_by_id(job_id)
863                    .find_also_related(Object)
864                    .one(&txn)
865                    .await?
866                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
867                {
868                    let (table, obj) = Table::find_by_id(index.index_table_id)
869                        .find_also_related(Object)
870                        .one(&txn)
871                        .await?
872                        .ok_or_else(|| {
873                            MetaError::catalog_id_not_found("table", index.index_table_id)
874                        })?;
875                    objects.push(PbObject {
876                        object_info: Some(PbObjectInfo::Table(
877                            ObjectModel(table, obj.unwrap()).into(),
878                        )),
879                    });
880                }
881                objects.push(PbObject {
882                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
883                });
884            }
885            ObjectType::Source => {
886                let (source, obj) = Source::find_by_id(job_id)
887                    .find_also_related(Object)
888                    .one(&txn)
889                    .await?
890                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
891                objects.push(PbObject {
892                    object_info: Some(PbObjectInfo::Source(
893                        ObjectModel(source, obj.unwrap()).into(),
894                    )),
895                });
896            }
897            _ => unreachable!("invalid job type: {:?}", job_type),
898        }
899
900        let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
901
902        let replace_table_mapping_update = match replace_stream_job_info {
903            Some(ReplaceStreamJobPlan {
904                streaming_job,
905                replace_upstream,
906                tmp_id,
907                ..
908            }) => {
909                let incoming_sink_id = job_id;
910
911                let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
912                    tmp_id as ObjectId,
913                    replace_upstream,
914                    None,
915                    SinkIntoTableContext {
916                        creating_sink_id: Some(incoming_sink_id as _),
917                        dropping_sink_id: None,
918                        updated_sink_catalogs: vec![],
919                    },
920                    &txn,
921                    streaming_job,
922                    None, // will not drop table connector when creating a streaming job
923                )
924                .await?;
925
926                Some((relations, fragment_mapping))
927            }
928            None => None,
929        };
930
931        txn.commit().await?;
932
933        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
934            .await;
935
936        let mut version = self
937            .notify_frontend(
938                notification_op,
939                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
940            )
941            .await;
942
943        if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
944            self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
945                .await;
946            version = self
947                .notify_frontend(
948                    NotificationOperation::Update,
949                    NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
950                )
951                .await;
952        }
953        inner
954            .creating_table_finish_notifier
955            .values_mut()
956            .for_each(|creating_tables| {
957                if let Some(txs) = creating_tables.remove(&job_id) {
958                    for tx in txs {
959                        let _ = tx.send(Ok(version));
960                    }
961                }
962            });
963
964        Ok(())
965    }
966
967    pub async fn finish_replace_streaming_job(
968        &self,
969        tmp_id: ObjectId,
970        streaming_job: StreamingJob,
971        replace_upstream: FragmentReplaceUpstream,
972        col_index_mapping: Option<ColIndexMapping>,
973        sink_into_table_context: SinkIntoTableContext,
974        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
975    ) -> MetaResult<NotificationVersion> {
976        let inner = self.inner.write().await;
977        let txn = inner.db.begin().await?;
978
979        let (objects, fragment_mapping, delete_notification_objs) =
980            Self::finish_replace_streaming_job_inner(
981                tmp_id,
982                replace_upstream,
983                col_index_mapping,
984                sink_into_table_context,
985                &txn,
986                streaming_job,
987                drop_table_connector_ctx,
988            )
989            .await?;
990
991        txn.commit().await?;
992
993        // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
994        // catalog and need to access the old fragment. Let frontend nodes delete the old fragment
995        // when they receive table catalog change.
996        // self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
997        //     .await;
998        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
999            .await;
1000        let mut version = self
1001            .notify_frontend(
1002                NotificationOperation::Update,
1003                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1004            )
1005            .await;
1006
1007        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1008            self.notify_users_update(user_infos).await;
1009            version = self
1010                .notify_frontend(
1011                    NotificationOperation::Delete,
1012                    build_object_group_for_delete(to_drop_objects),
1013                )
1014                .await;
1015        }
1016
1017        Ok(version)
1018    }
1019
1020    pub async fn finish_replace_streaming_job_inner(
1021        tmp_id: ObjectId,
1022        replace_upstream: FragmentReplaceUpstream,
1023        col_index_mapping: Option<ColIndexMapping>,
1024        SinkIntoTableContext {
1025            creating_sink_id,
1026            dropping_sink_id,
1027            updated_sink_catalogs,
1028        }: SinkIntoTableContext,
1029        txn: &DatabaseTransaction,
1030        streaming_job: StreamingJob,
1031        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1032    ) -> MetaResult<(
1033        Vec<PbObject>,
1034        Vec<PbFragmentWorkerSlotMapping>,
1035        Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1036    )> {
1037        let original_job_id = streaming_job.id() as ObjectId;
1038        let job_type = streaming_job.job_type();
1039
1040        // Update catalog
1041        match streaming_job {
1042            StreamingJob::Table(_source, table, _table_job_type) => {
1043                // The source catalog should remain unchanged
1044
1045                let original_table_catalogs = Table::find_by_id(original_job_id)
1046                    .select_only()
1047                    .columns([table::Column::Columns])
1048                    .into_tuple::<ColumnCatalogArray>()
1049                    .one(txn)
1050                    .await?
1051                    .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1052
1053                // For sinks created in earlier versions, we need to set the original_target_columns.
1054                for sink_id in updated_sink_catalogs {
1055                    sink::ActiveModel {
1056                        sink_id: Set(sink_id as _),
1057                        original_target_columns: Set(Some(original_table_catalogs.clone())),
1058                        ..Default::default()
1059                    }
1060                    .update(txn)
1061                    .await?;
1062                }
1063                // Update the table catalog with the new one. (column catalog is also updated here)
1064                let mut table = table::ActiveModel::from(table);
1065                let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1066                if let Some(sink_id) = creating_sink_id {
1067                    debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1068                    incoming_sinks.push(sink_id as _);
1069                }
1070                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1071                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1072                {
1073                    // drop table connector, the rest logic is in `drop_table_associated_source`
1074                    table.optional_associated_source_id = Set(None);
1075                }
1076
1077                if let Some(sink_id) = dropping_sink_id {
1078                    let drained = incoming_sinks
1079                        .extract_if(.., |id| *id == sink_id)
1080                        .collect_vec();
1081                    debug_assert_eq!(drained, vec![sink_id]);
1082                }
1083
1084                table.incoming_sinks = Set(incoming_sinks.into());
1085                table.update(txn).await?;
1086            }
1087            StreamingJob::Source(source) => {
1088                // Update the source catalog with the new one.
1089                let source = source::ActiveModel::from(source);
1090                source.update(txn).await?;
1091            }
1092            _ => unreachable!(
1093                "invalid streaming job type: {:?}",
1094                streaming_job.job_type_str()
1095            ),
1096        }
1097
1098        // 0. update internal tables
1099        // Fields including `fragment_id` were placeholder values before.
1100        // After table fragments are created, update them for all internal tables.
1101        let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1102            .select_only()
1103            .columns([
1104                fragment::Column::FragmentId,
1105                fragment::Column::StateTableIds,
1106            ])
1107            .filter(fragment::Column::JobId.eq(tmp_id))
1108            .into_tuple()
1109            .all(txn)
1110            .await?;
1111        for (fragment_id, state_table_ids) in fragment_info {
1112            for state_table_id in state_table_ids.into_inner() {
1113                table::ActiveModel {
1114                    table_id: Set(state_table_id as _),
1115                    fragment_id: Set(Some(fragment_id)),
1116                    // No need to update `vnode_count` because it must remain the same.
1117                    ..Default::default()
1118                }
1119                .update(txn)
1120                .await?;
1121            }
1122        }
1123
1124        // 1. replace old fragments/actors with new ones.
1125        Fragment::delete_many()
1126            .filter(fragment::Column::JobId.eq(original_job_id))
1127            .exec(txn)
1128            .await?;
1129        Fragment::update_many()
1130            .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1131            .filter(fragment::Column::JobId.eq(tmp_id))
1132            .exec(txn)
1133            .await?;
1134
1135        // 2. update merges.
1136        // update downstream fragment's Merge node, and upstream_fragment_id
1137        for (fragment_id, fragment_replace_map) in replace_upstream {
1138            let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1139                .select_only()
1140                .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1141                .into_tuple::<(FragmentId, StreamNode)>()
1142                .one(txn)
1143                .await?
1144                .map(|(id, node)| (id, node.to_protobuf()))
1145                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1146
1147            visit_stream_node_mut(&mut stream_node, |body| {
1148                if let PbNodeBody::Merge(m) = body
1149                    && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1150                {
1151                    m.upstream_fragment_id = *new_fragment_id;
1152                }
1153            });
1154            fragment::ActiveModel {
1155                fragment_id: Set(fragment_id),
1156                stream_node: Set(StreamNode::from(&stream_node)),
1157                ..Default::default()
1158            }
1159            .update(txn)
1160            .await?;
1161        }
1162
1163        // 3. remove dummy object.
1164        Object::delete_by_id(tmp_id).exec(txn).await?;
1165
1166        // 4. update catalogs and notify.
1167        let mut objects = vec![];
1168        match job_type {
1169            StreamingJobType::Table(_) => {
1170                let (table, table_obj) = Table::find_by_id(original_job_id)
1171                    .find_also_related(Object)
1172                    .one(txn)
1173                    .await?
1174                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1175                objects.push(PbObject {
1176                    object_info: Some(PbObjectInfo::Table(
1177                        ObjectModel(table, table_obj.unwrap()).into(),
1178                    )),
1179                })
1180            }
1181            StreamingJobType::Source => {
1182                let (source, source_obj) = Source::find_by_id(original_job_id)
1183                    .find_also_related(Object)
1184                    .one(txn)
1185                    .await?
1186                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1187                objects.push(PbObject {
1188                    object_info: Some(PbObjectInfo::Source(
1189                        ObjectModel(source, source_obj.unwrap()).into(),
1190                    )),
1191                })
1192            }
1193            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1194        }
1195        if let Some(table_col_index_mapping) = col_index_mapping {
1196            let expr_rewriter = ReplaceTableExprRewriter {
1197                table_col_index_mapping,
1198            };
1199
1200            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1201                .select_only()
1202                .columns([index::Column::IndexId, index::Column::IndexItems])
1203                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1204                .into_tuple()
1205                .all(txn)
1206                .await?;
1207            for (index_id, nodes) in index_items {
1208                let mut pb_nodes = nodes.to_protobuf();
1209                pb_nodes
1210                    .iter_mut()
1211                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1212                let index = index::ActiveModel {
1213                    index_id: Set(index_id),
1214                    index_items: Set(pb_nodes.into()),
1215                    ..Default::default()
1216                }
1217                .update(txn)
1218                .await?;
1219                let index_obj = index
1220                    .find_related(Object)
1221                    .one(txn)
1222                    .await?
1223                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1224                objects.push(PbObject {
1225                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1226                });
1227            }
1228        }
1229
1230        let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1231
1232        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1233        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1234            notification_objs =
1235                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1236        }
1237
1238        Ok((objects, fragment_mapping, notification_objs))
1239    }
1240
1241    /// Abort the replacing streaming job by deleting the temporary job object.
1242    pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1243        let inner = self.inner.write().await;
1244        Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1245        Ok(())
1246    }
1247
1248    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1249    // return the actor_ids to be applied
1250    pub async fn update_source_rate_limit_by_source_id(
1251        &self,
1252        source_id: SourceId,
1253        rate_limit: Option<u32>,
1254    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1255        let inner = self.inner.read().await;
1256        let txn = inner.db.begin().await?;
1257
1258        {
1259            let active_source = source::ActiveModel {
1260                source_id: Set(source_id),
1261                rate_limit: Set(rate_limit.map(|v| v as i32)),
1262                ..Default::default()
1263            };
1264            active_source.update(&txn).await?;
1265        }
1266
1267        let (source, obj) = Source::find_by_id(source_id)
1268            .find_also_related(Object)
1269            .one(&txn)
1270            .await?
1271            .ok_or_else(|| {
1272                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1273            })?;
1274
1275        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1276        let streaming_job_ids: Vec<ObjectId> =
1277            if let Some(table_id) = source.optional_associated_table_id {
1278                vec![table_id]
1279            } else if let Some(source_info) = &source.source_info
1280                && source_info.to_protobuf().is_shared()
1281            {
1282                vec![source_id]
1283            } else {
1284                ObjectDependency::find()
1285                    .select_only()
1286                    .column(object_dependency::Column::UsedBy)
1287                    .filter(object_dependency::Column::Oid.eq(source_id))
1288                    .into_tuple()
1289                    .all(&txn)
1290                    .await?
1291            };
1292
1293        if streaming_job_ids.is_empty() {
1294            return Err(MetaError::invalid_parameter(format!(
1295                "source id {source_id} not used by any streaming job"
1296            )));
1297        }
1298
1299        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1300            .select_only()
1301            .columns([
1302                fragment::Column::FragmentId,
1303                fragment::Column::FragmentTypeMask,
1304                fragment::Column::StreamNode,
1305            ])
1306            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1307            .into_tuple()
1308            .all(&txn)
1309            .await?;
1310        let mut fragments = fragments
1311            .into_iter()
1312            .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1313            .collect_vec();
1314
1315        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1316            let mut found = false;
1317            if *fragment_type_mask & PbFragmentTypeFlag::Source as i32 != 0 {
1318                visit_stream_node_mut(stream_node, |node| {
1319                    if let PbNodeBody::Source(node) = node {
1320                        if let Some(node_inner) = &mut node.source_inner
1321                            && node_inner.source_id == source_id as u32
1322                        {
1323                            node_inner.rate_limit = rate_limit;
1324                            found = true;
1325                        }
1326                    }
1327                });
1328            }
1329            if is_fs_source {
1330                // in older versions, there's no fragment type flag for `FsFetch` node,
1331                // so we just scan all fragments for StreamFsFetch node if using fs connector
1332                visit_stream_node_mut(stream_node, |node| {
1333                    if let PbNodeBody::StreamFsFetch(node) = node {
1334                        *fragment_type_mask |= PbFragmentTypeFlag::FsFetch as i32;
1335                        if let Some(node_inner) = &mut node.node_inner
1336                            && node_inner.source_id == source_id as u32
1337                        {
1338                            node_inner.rate_limit = rate_limit;
1339                            found = true;
1340                        }
1341                    }
1342                });
1343            }
1344            found
1345        });
1346
1347        assert!(
1348            !fragments.is_empty(),
1349            "source id should be used by at least one fragment"
1350        );
1351        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1352        for (id, fragment_type_mask, stream_node) in fragments {
1353            fragment::ActiveModel {
1354                fragment_id: Set(id),
1355                fragment_type_mask: Set(fragment_type_mask),
1356                stream_node: Set(StreamNode::from(&stream_node)),
1357                ..Default::default()
1358            }
1359            .update(&txn)
1360            .await?;
1361        }
1362        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1363
1364        txn.commit().await?;
1365
1366        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1367        let _version = self
1368            .notify_frontend(
1369                NotificationOperation::Update,
1370                NotificationInfo::ObjectGroup(PbObjectGroup {
1371                    objects: vec![PbObject {
1372                        object_info: Some(relation_info),
1373                    }],
1374                }),
1375            )
1376            .await;
1377
1378        Ok(fragment_actors)
1379    }
1380
1381    // edit the content of fragments in given `table_id`
1382    // return the actor_ids to be applied
1383    async fn mutate_fragments_by_job_id(
1384        &self,
1385        job_id: ObjectId,
1386        // returns true if the mutation is applied
1387        mut fragments_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1388        // error message when no relevant fragments is found
1389        err_msg: &'static str,
1390    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1391        let inner = self.inner.read().await;
1392        let txn = inner.db.begin().await?;
1393
1394        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1395            .select_only()
1396            .columns([
1397                fragment::Column::FragmentId,
1398                fragment::Column::FragmentTypeMask,
1399                fragment::Column::StreamNode,
1400            ])
1401            .filter(fragment::Column::JobId.eq(job_id))
1402            .into_tuple()
1403            .all(&txn)
1404            .await?;
1405        let mut fragments = fragments
1406            .into_iter()
1407            .map(|(id, mask, stream_node)| (id, mask, stream_node.to_protobuf()))
1408            .collect_vec();
1409
1410        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1411            fragments_mutation_fn(fragment_type_mask, stream_node)
1412        });
1413        if fragments.is_empty() {
1414            return Err(MetaError::invalid_parameter(format!(
1415                "job id {job_id}: {}",
1416                err_msg
1417            )));
1418        }
1419
1420        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1421        for (id, _, stream_node) in fragments {
1422            fragment::ActiveModel {
1423                fragment_id: Set(id),
1424                stream_node: Set(StreamNode::from(&stream_node)),
1425                ..Default::default()
1426            }
1427            .update(&txn)
1428            .await?;
1429        }
1430        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1431
1432        txn.commit().await?;
1433
1434        Ok(fragment_actors)
1435    }
1436
1437    async fn mutate_fragment_by_fragment_id(
1438        &self,
1439        fragment_id: FragmentId,
1440        mut fragment_mutation_fn: impl FnMut(&mut i32, &mut PbStreamNode) -> bool,
1441        err_msg: &'static str,
1442    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1443        let inner = self.inner.read().await;
1444        let txn = inner.db.begin().await?;
1445
1446        let (mut fragment_type_mask, stream_node): (i32, StreamNode) =
1447            Fragment::find_by_id(fragment_id)
1448                .select_only()
1449                .columns([
1450                    fragment::Column::FragmentTypeMask,
1451                    fragment::Column::StreamNode,
1452                ])
1453                .into_tuple()
1454                .one(&txn)
1455                .await?
1456                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1457        let mut pb_stream_node = stream_node.to_protobuf();
1458
1459        if !fragment_mutation_fn(&mut fragment_type_mask, &mut pb_stream_node) {
1460            return Err(MetaError::invalid_parameter(format!(
1461                "fragment id {fragment_id}: {}",
1462                err_msg
1463            )));
1464        }
1465
1466        fragment::ActiveModel {
1467            fragment_id: Set(fragment_id),
1468            stream_node: Set(stream_node),
1469            ..Default::default()
1470        }
1471        .update(&txn)
1472        .await?;
1473
1474        let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1475
1476        txn.commit().await?;
1477
1478        Ok(fragment_actors)
1479    }
1480
1481    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1482    // return the actor_ids to be applied
1483    pub async fn update_backfill_rate_limit_by_job_id(
1484        &self,
1485        job_id: ObjectId,
1486        rate_limit: Option<u32>,
1487    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1488        let update_backfill_rate_limit =
1489            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1490                let mut found = false;
1491                if *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0 {
1492                    visit_stream_node_mut(stream_node, |node| match node {
1493                        PbNodeBody::StreamCdcScan(node) => {
1494                            node.rate_limit = rate_limit;
1495                            found = true;
1496                        }
1497                        PbNodeBody::StreamScan(node) => {
1498                            node.rate_limit = rate_limit;
1499                            found = true;
1500                        }
1501                        PbNodeBody::SourceBackfill(node) => {
1502                            node.rate_limit = rate_limit;
1503                            found = true;
1504                        }
1505                        PbNodeBody::Sink(node) => {
1506                            node.rate_limit = rate_limit;
1507                            found = true;
1508                        }
1509                        _ => {}
1510                    });
1511                }
1512                found
1513            };
1514
1515        self.mutate_fragments_by_job_id(
1516            job_id,
1517            update_backfill_rate_limit,
1518            "stream scan node or source node not found",
1519        )
1520        .await
1521    }
1522
1523    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1524    // return the actor_ids to be applied
1525    pub async fn update_sink_rate_limit_by_job_id(
1526        &self,
1527        job_id: ObjectId,
1528        rate_limit: Option<u32>,
1529    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1530        let update_sink_rate_limit =
1531            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1532                let mut found = false;
1533                if *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0 {
1534                    visit_stream_node_mut(stream_node, |node| {
1535                        if let PbNodeBody::Sink(node) = node {
1536                            node.rate_limit = rate_limit;
1537                            found = true;
1538                        }
1539                    });
1540                }
1541                found
1542            };
1543
1544        self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1545            .await
1546    }
1547
1548    pub async fn update_dml_rate_limit_by_job_id(
1549        &self,
1550        job_id: ObjectId,
1551        rate_limit: Option<u32>,
1552    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1553        let update_dml_rate_limit =
1554            |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1555                let mut found = false;
1556                if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0 {
1557                    visit_stream_node_mut(stream_node, |node| {
1558                        if let PbNodeBody::Dml(node) = node {
1559                            node.rate_limit = rate_limit;
1560                            found = true;
1561                        }
1562                    });
1563                }
1564                found
1565            };
1566
1567        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1568            .await
1569    }
1570
1571    pub async fn update_fragment_rate_limit_by_fragment_id(
1572        &self,
1573        fragment_id: FragmentId,
1574        rate_limit: Option<u32>,
1575    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1576        let update_rate_limit = |fragment_type_mask: &mut i32, stream_node: &mut PbStreamNode| {
1577            let mut found = false;
1578            if *fragment_type_mask & PbFragmentTypeFlag::dml_rate_limit_fragments() != 0
1579                || *fragment_type_mask & PbFragmentTypeFlag::sink_rate_limit_fragments() != 0
1580                || *fragment_type_mask & PbFragmentTypeFlag::backfill_rate_limit_fragments() != 0
1581                || *fragment_type_mask & PbFragmentTypeFlag::source_rate_limit_fragments() != 0
1582            {
1583                visit_stream_node_mut(stream_node, |node| {
1584                    if let PbNodeBody::Dml(node) = node {
1585                        node.rate_limit = rate_limit;
1586                        found = true;
1587                    }
1588                    if let PbNodeBody::Sink(node) = node {
1589                        node.rate_limit = rate_limit;
1590                        found = true;
1591                    }
1592                    if let PbNodeBody::StreamCdcScan(node) = node {
1593                        node.rate_limit = rate_limit;
1594                        found = true;
1595                    }
1596                    if let PbNodeBody::StreamScan(node) = node {
1597                        node.rate_limit = rate_limit;
1598                        found = true;
1599                    }
1600                    if let PbNodeBody::SourceBackfill(node) = node {
1601                        node.rate_limit = rate_limit;
1602                        found = true;
1603                    }
1604                });
1605            }
1606            found
1607        };
1608        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
1609            .await
1610    }
1611
1612    pub async fn post_apply_reschedules(
1613        &self,
1614        reschedules: HashMap<FragmentId, Reschedule>,
1615        post_updates: &JobReschedulePostUpdates,
1616    ) -> MetaResult<()> {
1617        let new_created_actors: HashSet<_> = reschedules
1618            .values()
1619            .flat_map(|reschedule| {
1620                reschedule
1621                    .added_actors
1622                    .values()
1623                    .flatten()
1624                    .map(|actor_id| *actor_id as ActorId)
1625            })
1626            .collect();
1627
1628        let inner = self.inner.write().await;
1629
1630        let txn = inner.db.begin().await?;
1631
1632        let mut fragment_mapping_to_notify = vec![];
1633
1634        for (
1635            fragment_id,
1636            Reschedule {
1637                removed_actors,
1638                vnode_bitmap_updates,
1639                actor_splits,
1640                newly_created_actors,
1641                ..
1642            },
1643        ) in reschedules
1644        {
1645            // drop removed actors
1646            Actor::delete_many()
1647                .filter(
1648                    actor::Column::ActorId
1649                        .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
1650                )
1651                .exec(&txn)
1652                .await?;
1653
1654            // add new actors
1655            for (
1656                (
1657                    StreamActor {
1658                        actor_id,
1659                        fragment_id,
1660                        vnode_bitmap,
1661                        expr_context,
1662                        ..
1663                    },
1664                    _,
1665                ),
1666                worker_id,
1667            ) in newly_created_actors.into_values()
1668            {
1669                let splits = actor_splits
1670                    .get(&actor_id)
1671                    .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
1672
1673                Actor::insert(actor::ActiveModel {
1674                    actor_id: Set(actor_id as _),
1675                    fragment_id: Set(fragment_id as _),
1676                    status: Set(ActorStatus::Running),
1677                    splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
1678                    worker_id: Set(worker_id),
1679                    upstream_actor_ids: Set(Default::default()),
1680                    vnode_bitmap: Set(vnode_bitmap
1681                        .as_ref()
1682                        .map(|bitmap| (&bitmap.to_protobuf()).into())),
1683                    expr_context: Set(expr_context.as_ref().unwrap().into()),
1684                })
1685                .exec(&txn)
1686                .await?;
1687            }
1688
1689            // actor update
1690            for (actor_id, bitmap) in vnode_bitmap_updates {
1691                let actor = Actor::find_by_id(actor_id as ActorId)
1692                    .one(&txn)
1693                    .await?
1694                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1695
1696                let mut actor = actor.into_active_model();
1697                actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
1698                actor.update(&txn).await?;
1699            }
1700
1701            // Update actor_splits for existing actors
1702            for (actor_id, splits) in actor_splits {
1703                if new_created_actors.contains(&(actor_id as ActorId)) {
1704                    continue;
1705                }
1706
1707                let actor = Actor::find_by_id(actor_id as ActorId)
1708                    .one(&txn)
1709                    .await?
1710                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
1711
1712                let mut actor = actor.into_active_model();
1713                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
1714                actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
1715                actor.update(&txn).await?;
1716            }
1717
1718            // fragment update
1719            let fragment = Fragment::find_by_id(fragment_id)
1720                .one(&txn)
1721                .await?
1722                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1723
1724            let job_actors = fragment
1725                .find_related(Actor)
1726                .all(&txn)
1727                .await?
1728                .into_iter()
1729                .map(|actor| {
1730                    (
1731                        fragment_id,
1732                        fragment.distribution_type,
1733                        actor.actor_id,
1734                        actor.vnode_bitmap,
1735                        actor.worker_id,
1736                        actor.status,
1737                    )
1738                })
1739                .collect_vec();
1740
1741            fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
1742        }
1743
1744        let JobReschedulePostUpdates {
1745            parallelism_updates,
1746            resource_group_updates,
1747        } = post_updates;
1748
1749        for (table_id, parallelism) in parallelism_updates {
1750            let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
1751                .one(&txn)
1752                .await?
1753                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
1754                .into_active_model();
1755
1756            streaming_job.parallelism = Set(match parallelism {
1757                TableParallelism::Adaptive => StreamingParallelism::Adaptive,
1758                TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
1759                TableParallelism::Custom => StreamingParallelism::Custom,
1760            });
1761
1762            if let Some(resource_group) =
1763                resource_group_updates.get(&(table_id.table_id() as ObjectId))
1764            {
1765                streaming_job.specific_resource_group = Set(resource_group.to_owned());
1766            }
1767
1768            streaming_job.update(&txn).await?;
1769        }
1770
1771        txn.commit().await?;
1772        self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
1773            .await;
1774
1775        Ok(())
1776    }
1777
1778    /// Note: `FsFetch` created in old versions are not included.
1779    /// Since this is only used for debugging, it should be fine.
1780    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
1781        let inner = self.inner.read().await;
1782        let txn = inner.db.begin().await?;
1783
1784        let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
1785            .select_only()
1786            .columns([
1787                fragment::Column::FragmentId,
1788                fragment::Column::JobId,
1789                fragment::Column::FragmentTypeMask,
1790                fragment::Column::StreamNode,
1791            ])
1792            .filter(fragment_type_mask_intersects(
1793                PbFragmentTypeFlag::rate_limit_fragments(),
1794            ))
1795            .into_tuple()
1796            .all(&txn)
1797            .await?;
1798
1799        let mut rate_limits = Vec::new();
1800        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
1801            let stream_node = stream_node.to_protobuf();
1802            let mut rate_limit = None;
1803            let mut node_name = None;
1804
1805            visit_stream_node(&stream_node, |node| {
1806                match node {
1807                    // source rate limit
1808                    PbNodeBody::Source(node) => {
1809                        if let Some(node_inner) = &node.source_inner {
1810                            debug_assert!(
1811                                rate_limit.is_none(),
1812                                "one fragment should only have 1 rate limit node"
1813                            );
1814                            rate_limit = node_inner.rate_limit;
1815                            node_name = Some("SOURCE");
1816                        }
1817                    }
1818                    PbNodeBody::StreamFsFetch(node) => {
1819                        if let Some(node_inner) = &node.node_inner {
1820                            debug_assert!(
1821                                rate_limit.is_none(),
1822                                "one fragment should only have 1 rate limit node"
1823                            );
1824                            rate_limit = node_inner.rate_limit;
1825                            node_name = Some("FS_FETCH");
1826                        }
1827                    }
1828                    // backfill rate limit
1829                    PbNodeBody::SourceBackfill(node) => {
1830                        debug_assert!(
1831                            rate_limit.is_none(),
1832                            "one fragment should only have 1 rate limit node"
1833                        );
1834                        rate_limit = node.rate_limit;
1835                        node_name = Some("SOURCE_BACKFILL");
1836                    }
1837                    PbNodeBody::StreamScan(node) => {
1838                        debug_assert!(
1839                            rate_limit.is_none(),
1840                            "one fragment should only have 1 rate limit node"
1841                        );
1842                        rate_limit = node.rate_limit;
1843                        node_name = Some("STREAM_SCAN");
1844                    }
1845                    PbNodeBody::StreamCdcScan(node) => {
1846                        debug_assert!(
1847                            rate_limit.is_none(),
1848                            "one fragment should only have 1 rate limit node"
1849                        );
1850                        rate_limit = node.rate_limit;
1851                        node_name = Some("STREAM_CDC_SCAN");
1852                    }
1853                    PbNodeBody::Sink(node) => {
1854                        debug_assert!(
1855                            rate_limit.is_none(),
1856                            "one fragment should only have 1 rate limit node"
1857                        );
1858                        rate_limit = node.rate_limit;
1859                        node_name = Some("SINK");
1860                    }
1861                    _ => {}
1862                }
1863            });
1864
1865            if let Some(rate_limit) = rate_limit {
1866                rate_limits.push(RateLimitInfo {
1867                    fragment_id: fragment_id as u32,
1868                    job_id: job_id as u32,
1869                    fragment_type_mask: fragment_type_mask as u32,
1870                    rate_limit,
1871                    node_name: node_name.unwrap().to_owned(),
1872                });
1873            }
1874        }
1875
1876        Ok(rate_limits)
1877    }
1878}
1879
1880fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
1881    column
1882        .binary(BinOper::Custom("&"), value)
1883        .binary(BinOper::NotEqual, 0)
1884}
1885
1886fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
1887    bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
1888}
1889
1890pub struct SinkIntoTableContext {
1891    /// For creating sink into table, this is `Some`, otherwise `None`.
1892    pub creating_sink_id: Option<SinkId>,
1893    /// For dropping sink into table, this is `Some`, otherwise `None`.
1894    pub dropping_sink_id: Option<SinkId>,
1895    /// For alter table (e.g., add column), this is the list of existing sink ids
1896    /// otherwise empty.
1897    pub updated_sink_catalogs: Vec<SinkId>,
1898}