risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::stream_graph_visitor::{
25    visit_stream_node_body, visit_stream_node_mut,
26};
27use risingwave_common::{bail, current_cluster_version};
28use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
29use risingwave_connector::error::ConnectorError;
30use risingwave_connector::sink::file_sink::fs::FsSink;
31use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
32use risingwave_connector::source::ConnectorProperties;
33use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
34use risingwave_meta_model::actor::ActorStatus;
35use risingwave_meta_model::object::ObjectType;
36use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
37use risingwave_meta_model::table::TableType;
38use risingwave_meta_model::user_privilege::Action;
39use risingwave_meta_model::*;
40use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
41use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
42use risingwave_pb::catalog::{PbCreateType, PbTable};
43use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
44use risingwave_pb::meta::object::PbObjectInfo;
45use risingwave_pb::meta::subscribe_response::{
46    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
47};
48use risingwave_pb::meta::{PbFragmentWorkerSlotMapping, PbObject, PbObjectGroup};
49use risingwave_pb::secret::PbSecretRef;
50use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
51use risingwave_pb::stream_plan::PbStreamNode;
52use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
53use risingwave_pb::stream_plan::stream_node::PbNodeBody;
54use risingwave_pb::user::PbUserInfo;
55use risingwave_sqlparser::ast::{SqlOption, Statement};
56use risingwave_sqlparser::parser::{Parser, ParserError};
57use sea_orm::ActiveValue::Set;
58use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
59use sea_orm::{
60    ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
61    IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
62    RelationTrait, TransactionTrait,
63};
64use thiserror_ext::AsReport;
65
66use super::rename::IndexItemRewriter;
67use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
68use crate::controller::ObjectModel;
69use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
70use crate::controller::utils::{
71    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
72    check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
73    get_fragment_mappings, get_internal_tables_by_id, grant_default_privileges_automatically,
74    insert_fragment_relations, list_user_info_by_ids, rebuild_fragment_mapping_from_actors,
75};
76use crate::error::MetaErrorInner;
77use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
78use crate::model::{
79    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
80    StreamJobFragmentsToCreate, TableParallelism,
81};
82use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
83use crate::{MetaError, MetaResult};
84
85impl CatalogController {
86    pub async fn create_streaming_job_obj(
87        txn: &DatabaseTransaction,
88        obj_type: ObjectType,
89        owner_id: UserId,
90        database_id: Option<DatabaseId>,
91        schema_id: Option<SchemaId>,
92        create_type: PbCreateType,
93        ctx: &StreamContext,
94        streaming_parallelism: StreamingParallelism,
95        max_parallelism: usize,
96        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
97    ) -> MetaResult<ObjectId> {
98        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
99        let job = streaming_job::ActiveModel {
100            job_id: Set(obj.oid),
101            job_status: Set(JobStatus::Initial),
102            create_type: Set(create_type.into()),
103            timezone: Set(ctx.timezone.clone()),
104            parallelism: Set(streaming_parallelism),
105            max_parallelism: Set(max_parallelism as _),
106            specific_resource_group: Set(specific_resource_group),
107        };
108        job.insert(txn).await?;
109
110        Ok(obj.oid)
111    }
112
113    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
114    /// materialized view.
115    ///
116    /// Some of the fields in the given streaming job are placeholders, which will
117    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
118    #[await_tree::instrument]
119    pub async fn create_job_catalog(
120        &self,
121        streaming_job: &mut StreamingJob,
122        ctx: &StreamContext,
123        parallelism: &Option<Parallelism>,
124        max_parallelism: usize,
125        mut dependencies: HashSet<ObjectId>,
126        specific_resource_group: Option<String>,
127    ) -> MetaResult<()> {
128        let inner = self.inner.write().await;
129        let txn = inner.db.begin().await?;
130        let create_type = streaming_job.create_type();
131
132        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
133            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
134            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
135            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
136        };
137
138        ensure_user_id(streaming_job.owner() as _, &txn).await?;
139        ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
140        ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
141        check_relation_name_duplicate(
142            &streaming_job.name(),
143            streaming_job.database_id() as _,
144            streaming_job.schema_id() as _,
145            &txn,
146        )
147        .await?;
148
149        // check if any dependency is in altering status.
150        if !dependencies.is_empty() {
151            let altering_cnt = ObjectDependency::find()
152                .join(
153                    JoinType::InnerJoin,
154                    object_dependency::Relation::Object1.def(),
155                )
156                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
157                .filter(
158                    object_dependency::Column::Oid
159                        .is_in(dependencies.clone())
160                        .and(object::Column::ObjType.eq(ObjectType::Table))
161                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
162                        .and(
163                            // It means the referring table is just dummy for altering.
164                            object::Column::Oid.not_in_subquery(
165                                Query::select()
166                                    .column(table::Column::TableId)
167                                    .from(Table)
168                                    .to_owned(),
169                            ),
170                        ),
171                )
172                .count(&txn)
173                .await?;
174            if altering_cnt != 0 {
175                return Err(MetaError::permission_denied(
176                    "some dependent relations are being altered",
177                ));
178            }
179        }
180
181        match streaming_job {
182            StreamingJob::MaterializedView(table) => {
183                let job_id = Self::create_streaming_job_obj(
184                    &txn,
185                    ObjectType::Table,
186                    table.owner as _,
187                    Some(table.database_id as _),
188                    Some(table.schema_id as _),
189                    create_type,
190                    ctx,
191                    streaming_parallelism,
192                    max_parallelism,
193                    specific_resource_group,
194                )
195                .await?;
196                table.id = job_id as _;
197                let table_model: table::ActiveModel = table.clone().into();
198                Table::insert(table_model).exec(&txn).await?;
199            }
200            StreamingJob::Sink(sink, _) => {
201                if let Some(target_table_id) = sink.target_table
202                    && check_sink_into_table_cycle(
203                        target_table_id as ObjectId,
204                        dependencies.iter().cloned().collect(),
205                        &txn,
206                    )
207                    .await?
208                {
209                    bail!("Creating such a sink will result in circular dependency.");
210                }
211
212                let job_id = Self::create_streaming_job_obj(
213                    &txn,
214                    ObjectType::Sink,
215                    sink.owner as _,
216                    Some(sink.database_id as _),
217                    Some(sink.schema_id as _),
218                    create_type,
219                    ctx,
220                    streaming_parallelism,
221                    max_parallelism,
222                    specific_resource_group,
223                )
224                .await?;
225                sink.id = job_id as _;
226                let sink_model: sink::ActiveModel = sink.clone().into();
227                Sink::insert(sink_model).exec(&txn).await?;
228            }
229            StreamingJob::Table(src, table, _) => {
230                let job_id = Self::create_streaming_job_obj(
231                    &txn,
232                    ObjectType::Table,
233                    table.owner as _,
234                    Some(table.database_id as _),
235                    Some(table.schema_id as _),
236                    create_type,
237                    ctx,
238                    streaming_parallelism,
239                    max_parallelism,
240                    specific_resource_group,
241                )
242                .await?;
243                table.id = job_id as _;
244                if let Some(src) = src {
245                    let src_obj = Self::create_object(
246                        &txn,
247                        ObjectType::Source,
248                        src.owner as _,
249                        Some(src.database_id as _),
250                        Some(src.schema_id as _),
251                    )
252                    .await?;
253                    src.id = src_obj.oid as _;
254                    src.optional_associated_table_id =
255                        Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
256                    table.optional_associated_source_id = Some(
257                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
258                    );
259                    let source: source::ActiveModel = src.clone().into();
260                    Source::insert(source).exec(&txn).await?;
261                }
262                let table_model: table::ActiveModel = table.clone().into();
263                Table::insert(table_model).exec(&txn).await?;
264            }
265            StreamingJob::Index(index, table) => {
266                ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
267                let job_id = Self::create_streaming_job_obj(
268                    &txn,
269                    ObjectType::Index,
270                    index.owner as _,
271                    Some(index.database_id as _),
272                    Some(index.schema_id as _),
273                    create_type,
274                    ctx,
275                    streaming_parallelism,
276                    max_parallelism,
277                    specific_resource_group,
278                )
279                .await?;
280                // to be compatible with old implementation.
281                index.id = job_id as _;
282                index.index_table_id = job_id as _;
283                table.id = job_id as _;
284
285                ObjectDependency::insert(object_dependency::ActiveModel {
286                    oid: Set(index.primary_table_id as _),
287                    used_by: Set(table.id as _),
288                    ..Default::default()
289                })
290                .exec(&txn)
291                .await?;
292
293                let table_model: table::ActiveModel = table.clone().into();
294                Table::insert(table_model).exec(&txn).await?;
295                let index_model: index::ActiveModel = index.clone().into();
296                Index::insert(index_model).exec(&txn).await?;
297            }
298            StreamingJob::Source(src) => {
299                let job_id = Self::create_streaming_job_obj(
300                    &txn,
301                    ObjectType::Source,
302                    src.owner as _,
303                    Some(src.database_id as _),
304                    Some(src.schema_id as _),
305                    create_type,
306                    ctx,
307                    streaming_parallelism,
308                    max_parallelism,
309                    specific_resource_group,
310                )
311                .await?;
312                src.id = job_id as _;
313                let source_model: source::ActiveModel = src.clone().into();
314                Source::insert(source_model).exec(&txn).await?;
315            }
316        }
317
318        // collect dependent secrets.
319        dependencies.extend(
320            streaming_job
321                .dependent_secret_ids()?
322                .into_iter()
323                .map(|secret_id| secret_id as ObjectId),
324        );
325        // collect dependent connection
326        dependencies.extend(
327            streaming_job
328                .dependent_connection_ids()?
329                .into_iter()
330                .map(|conn_id| conn_id as ObjectId),
331        );
332
333        // record object dependency.
334        if !dependencies.is_empty() {
335            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
336                object_dependency::ActiveModel {
337                    oid: Set(oid),
338                    used_by: Set(streaming_job.id() as _),
339                    ..Default::default()
340                }
341            }))
342            .exec(&txn)
343            .await?;
344        }
345
346        txn.commit().await?;
347
348        Ok(())
349    }
350
351    /// Create catalogs for internal tables, then notify frontend about them if the job is a
352    /// materialized view.
353    ///
354    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
355    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
356    ///
357    /// Returns a mapping from the temporary table id to the actual global table id.
358    pub async fn create_internal_table_catalog(
359        &self,
360        job: &StreamingJob,
361        mut incomplete_internal_tables: Vec<PbTable>,
362    ) -> MetaResult<HashMap<u32, u32>> {
363        let job_id = job.id() as ObjectId;
364        let inner = self.inner.write().await;
365        let txn = inner.db.begin().await?;
366        let mut table_id_map = HashMap::new();
367        for table in &mut incomplete_internal_tables {
368            let table_id = Self::create_object(
369                &txn,
370                ObjectType::Table,
371                table.owner as _,
372                Some(table.database_id as _),
373                Some(table.schema_id as _),
374            )
375            .await?
376            .oid;
377            table_id_map.insert(table.id, table_id as u32);
378            table.id = table_id as _;
379            table.job_id = Some(job_id as _);
380
381            let table_model = table::ActiveModel {
382                table_id: Set(table_id as _),
383                belongs_to_job_id: Set(Some(job_id)),
384                fragment_id: NotSet,
385                ..table.clone().into()
386            };
387            Table::insert(table_model).exec(&txn).await?;
388        }
389        txn.commit().await?;
390
391        Ok(table_id_map)
392    }
393
394    // TODO: In this function, we also update the `Table` model in the meta store.
395    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
396    // making them the source of truth and performing a full replacement for those in the meta store?
397    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
398    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" })]
399    pub async fn prepare_streaming_job(
400        &self,
401        stream_job_fragments: &StreamJobFragmentsToCreate,
402        streaming_job: &StreamingJob,
403        for_replace: bool,
404    ) -> MetaResult<()> {
405        let is_materialized_view = streaming_job.is_materialized_view();
406        let fragment_actors =
407            Self::extract_fragment_and_actors_from_fragments(stream_job_fragments)?;
408        let mut all_tables = stream_job_fragments.all_tables();
409        let inner = self.inner.write().await;
410
411        let mut objects = vec![];
412        let txn = inner.db.begin().await?;
413
414        // Add fragments.
415        let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
416        for fragment in fragments {
417            let fragment_id = fragment.fragment_id;
418            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
419
420            let fragment = fragment.into_active_model();
421            Fragment::insert(fragment).exec(&txn).await?;
422
423            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
424            // After table fragments are created, update them for all tables.
425            if !for_replace {
426                for state_table_id in state_table_ids {
427                    // Table's vnode count is not always the fragment's vnode count, so we have to
428                    // look up the table from `TableFragments`.
429                    // See `ActorGraphBuilder::new`.
430                    let table = all_tables
431                        .get_mut(&(state_table_id as u32))
432                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
433                    assert_eq!(table.id, state_table_id as u32);
434                    assert_eq!(table.fragment_id, fragment_id as u32);
435                    let vnode_count = table.vnode_count();
436
437                    table::ActiveModel {
438                        table_id: Set(state_table_id as _),
439                        fragment_id: Set(Some(fragment_id)),
440                        vnode_count: Set(vnode_count as _),
441                        ..Default::default()
442                    }
443                    .update(&txn)
444                    .await?;
445
446                    if is_materialized_view {
447                        // In production, definition was replaced but still needed for notification.
448                        if cfg!(not(debug_assertions)) && table.id == streaming_job.id() {
449                            table.definition = streaming_job.definition();
450                        }
451                        objects.push(PbObject {
452                            object_info: Some(PbObjectInfo::Table(table.clone())),
453                        });
454                    }
455                }
456            }
457        }
458
459        insert_fragment_relations(&txn, &stream_job_fragments.downstreams).await?;
460
461        // Add actors and actor dispatchers.
462        for actors in actors {
463            for actor in actors {
464                let actor = actor.into_active_model();
465                Actor::insert(actor).exec(&txn).await?;
466            }
467        }
468
469        if !for_replace {
470            // Update dml fragment id.
471            if let StreamingJob::Table(_, table, ..) = streaming_job {
472                Table::update(table::ActiveModel {
473                    table_id: Set(table.id as _),
474                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
475                    ..Default::default()
476                })
477                .exec(&txn)
478                .await?;
479            }
480        }
481
482        txn.commit().await?;
483
484        if !objects.is_empty() {
485            assert!(is_materialized_view);
486            self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
487                .await;
488        }
489
490        Ok(())
491    }
492
493    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
494    /// It returns (true, _) if the job is not found or aborted.
495    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
496    #[await_tree::instrument]
497    pub async fn try_abort_creating_streaming_job(
498        &self,
499        job_id: ObjectId,
500        is_cancelled: bool,
501    ) -> MetaResult<(bool, Option<DatabaseId>)> {
502        let mut inner = self.inner.write().await;
503        let txn = inner.db.begin().await?;
504
505        let obj = Object::find_by_id(job_id).one(&txn).await?;
506        let Some(obj) = obj else {
507            tracing::warn!(
508                id = job_id,
509                "streaming job not found when aborting creating, might be cleaned by recovery"
510            );
511            return Ok((true, None));
512        };
513        let database_id = obj
514            .database_id
515            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
516
517        if !is_cancelled {
518            let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
519            if let Some(streaming_job) = streaming_job {
520                assert_ne!(streaming_job.job_status, JobStatus::Created);
521                if streaming_job.create_type == CreateType::Background
522                    && streaming_job.job_status == JobStatus::Creating
523                {
524                    // If the job is created in background and still in creating status, we should not abort it and let recovery to handle it.
525                    tracing::warn!(
526                        id = job_id,
527                        "streaming job is created in background and still in creating status"
528                    );
529                    return Ok((false, Some(database_id)));
530                }
531            }
532        }
533
534        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
535
536        // Get the notification info if the job is a materialized view.
537        let table_obj = Table::find_by_id(job_id).one(&txn).await?;
538        let mut objs = vec![];
539        if let Some(table) = &table_obj
540            && table.table_type == TableType::MaterializedView
541        {
542            let obj: Option<PartialObject> = Object::find_by_id(job_id)
543                .select_only()
544                .columns([
545                    object::Column::Oid,
546                    object::Column::ObjType,
547                    object::Column::SchemaId,
548                    object::Column::DatabaseId,
549                ])
550                .into_partial_model()
551                .one(&txn)
552                .await?;
553            let obj =
554                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
555            objs.push(obj);
556            let internal_table_objs: Vec<PartialObject> = Object::find()
557                .select_only()
558                .columns([
559                    object::Column::Oid,
560                    object::Column::ObjType,
561                    object::Column::SchemaId,
562                    object::Column::DatabaseId,
563                ])
564                .join(JoinType::InnerJoin, object::Relation::Table.def())
565                .filter(table::Column::BelongsToJobId.eq(job_id))
566                .into_partial_model()
567                .all(&txn)
568                .await?;
569            objs.extend(internal_table_objs);
570        }
571
572        // Check if the job is creating sink into table.
573        if table_obj.is_none()
574            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
575                .select_only()
576                .column(sink::Column::TargetTable)
577                .into_tuple::<Option<TableId>>()
578                .one(&txn)
579                .await?
580        {
581            let tmp_id: Option<ObjectId> = ObjectDependency::find()
582                .select_only()
583                .column(object_dependency::Column::UsedBy)
584                .join(
585                    JoinType::InnerJoin,
586                    object_dependency::Relation::Object1.def(),
587                )
588                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
589                .filter(
590                    object_dependency::Column::Oid
591                        .eq(target_table_id)
592                        .and(object::Column::ObjType.eq(ObjectType::Table))
593                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
594                )
595                .into_tuple()
596                .one(&txn)
597                .await?;
598            if let Some(tmp_id) = tmp_id {
599                tracing::warn!(
600                    id = tmp_id,
601                    "aborting temp streaming job for sink into table"
602                );
603                Object::delete_by_id(tmp_id).exec(&txn).await?;
604            }
605        }
606
607        Object::delete_by_id(job_id).exec(&txn).await?;
608        if !internal_table_ids.is_empty() {
609            Object::delete_many()
610                .filter(object::Column::Oid.is_in(internal_table_ids))
611                .exec(&txn)
612                .await?;
613        }
614        if let Some(t) = &table_obj
615            && let Some(source_id) = t.optional_associated_source_id
616        {
617            Object::delete_by_id(source_id).exec(&txn).await?;
618        }
619
620        let err = if is_cancelled {
621            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
622        } else {
623            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
624        };
625        let abort_reason = format!("streaming job aborted {}", err.as_report());
626        for tx in inner
627            .creating_table_finish_notifier
628            .get_mut(&database_id)
629            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
630            .into_iter()
631            .flatten()
632            .flatten()
633        {
634            let _ = tx.send(Err(abort_reason.clone()));
635        }
636        txn.commit().await?;
637
638        if !objs.is_empty() {
639            // We also have notified the frontend about these objects,
640            // so we need to notify the frontend to delete them here.
641            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
642                .await;
643        }
644        Ok((true, Some(database_id)))
645    }
646
647    #[await_tree::instrument]
648    pub async fn post_collect_job_fragments(
649        &self,
650        job_id: ObjectId,
651        actor_ids: Vec<crate::model::ActorId>,
652        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
653        split_assignment: &SplitAssignment,
654    ) -> MetaResult<()> {
655        self.post_collect_job_fragments_inner(
656            job_id,
657            actor_ids,
658            upstream_fragment_new_downstreams,
659            split_assignment,
660            false,
661        )
662        .await
663    }
664
665    pub async fn post_collect_job_fragments_inner(
666        &self,
667        job_id: ObjectId,
668        actor_ids: Vec<crate::model::ActorId>,
669        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
670        split_assignment: &SplitAssignment,
671        is_mv: bool,
672    ) -> MetaResult<()> {
673        let inner = self.inner.write().await;
674        let txn = inner.db.begin().await?;
675
676        Actor::update_many()
677            .col_expr(
678                actor::Column::Status,
679                SimpleExpr::from(ActorStatus::Running.into_value()),
680            )
681            .filter(
682                actor::Column::ActorId
683                    .is_in(actor_ids.into_iter().map(|id| id as ActorId).collect_vec()),
684            )
685            .exec(&txn)
686            .await?;
687
688        for splits in split_assignment.values() {
689            for (actor_id, splits) in splits {
690                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
691                let connector_splits = &PbConnectorSplits { splits };
692                actor::ActiveModel {
693                    actor_id: Set(*actor_id as _),
694                    splits: Set(Some(connector_splits.into())),
695                    ..Default::default()
696                }
697                .update(&txn)
698                .await?;
699            }
700        }
701
702        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
703
704        // Mark job as CREATING.
705        streaming_job::ActiveModel {
706            job_id: Set(job_id),
707            job_status: Set(JobStatus::Creating),
708            ..Default::default()
709        }
710        .update(&txn)
711        .await?;
712
713        let fragment_mapping = if is_mv {
714            get_fragment_mappings(&txn, job_id as _).await?
715        } else {
716            vec![]
717        };
718
719        txn.commit().await?;
720        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
721            .await;
722
723        Ok(())
724    }
725
726    pub async fn create_job_catalog_for_replace(
727        &self,
728        streaming_job: &StreamingJob,
729        ctx: &StreamContext,
730        specified_parallelism: &Option<NonZeroUsize>,
731        max_parallelism: usize,
732    ) -> MetaResult<ObjectId> {
733        let id = streaming_job.id();
734        let inner = self.inner.write().await;
735        let txn = inner.db.begin().await?;
736
737        // 1. check version.
738        streaming_job.verify_version_for_replace(&txn).await?;
739        // 2. check concurrent replace.
740        let referring_cnt = ObjectDependency::find()
741            .join(
742                JoinType::InnerJoin,
743                object_dependency::Relation::Object1.def(),
744            )
745            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
746            .filter(
747                object_dependency::Column::Oid
748                    .eq(id as ObjectId)
749                    .and(object::Column::ObjType.eq(ObjectType::Table))
750                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
751            )
752            .count(&txn)
753            .await?;
754        if referring_cnt != 0 {
755            return Err(MetaError::permission_denied(
756                "job is being altered or referenced by some creating jobs",
757            ));
758        }
759
760        // 3. check parallelism.
761        let original_max_parallelism: i32 = StreamingJobModel::find_by_id(id as ObjectId)
762            .select_only()
763            .column(streaming_job::Column::MaxParallelism)
764            .into_tuple()
765            .one(&txn)
766            .await?
767            .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
768
769        if original_max_parallelism != max_parallelism as i32 {
770            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
771            // This should not happen in normal cases.
772            bail!(
773                "cannot use a different max parallelism \
774                 when replacing streaming job, \
775                 original: {}, new: {}",
776                original_max_parallelism,
777                max_parallelism
778            );
779        }
780
781        let parallelism = match specified_parallelism {
782            None => StreamingParallelism::Adaptive,
783            Some(n) => StreamingParallelism::Fixed(n.get() as _),
784        };
785
786        // 4. create streaming object for new replace table.
787        let new_obj_id = Self::create_streaming_job_obj(
788            &txn,
789            streaming_job.object_type(),
790            streaming_job.owner() as _,
791            Some(streaming_job.database_id() as _),
792            Some(streaming_job.schema_id() as _),
793            streaming_job.create_type(),
794            ctx,
795            parallelism,
796            max_parallelism,
797            None,
798        )
799        .await?;
800
801        // 5. record dependency for new replace table.
802        ObjectDependency::insert(object_dependency::ActiveModel {
803            oid: Set(id as _),
804            used_by: Set(new_obj_id as _),
805            ..Default::default()
806        })
807        .exec(&txn)
808        .await?;
809
810        txn.commit().await?;
811
812        Ok(new_obj_id)
813    }
814
815    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
816    pub async fn finish_streaming_job(
817        &self,
818        job_id: ObjectId,
819        replace_stream_job_info: Option<ReplaceStreamJobPlan>,
820    ) -> MetaResult<()> {
821        let mut inner = self.inner.write().await;
822        let txn = inner.db.begin().await?;
823
824        let job_type = Object::find_by_id(job_id)
825            .select_only()
826            .column(object::Column::ObjType)
827            .into_tuple()
828            .one(&txn)
829            .await?
830            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
831
832        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
833        let res = Object::update_many()
834            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
835            .col_expr(
836                object::Column::CreatedAtClusterVersion,
837                current_cluster_version().into(),
838            )
839            .filter(object::Column::Oid.eq(job_id))
840            .exec(&txn)
841            .await?;
842        if res.rows_affected == 0 {
843            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
844        }
845
846        // mark the target stream job as `Created`.
847        let job = streaming_job::ActiveModel {
848            job_id: Set(job_id),
849            job_status: Set(JobStatus::Created),
850            ..Default::default()
851        };
852        job.update(&txn).await?;
853
854        // notify frontend: job, internal tables.
855        let internal_table_objs = Table::find()
856            .find_also_related(Object)
857            .filter(table::Column::BelongsToJobId.eq(job_id))
858            .all(&txn)
859            .await?;
860        let mut objects = internal_table_objs
861            .iter()
862            .map(|(table, obj)| PbObject {
863                object_info: Some(PbObjectInfo::Table(
864                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
865                )),
866            })
867            .collect_vec();
868        let mut notification_op = NotificationOperation::Add;
869        let mut updated_user_info = vec![];
870
871        match job_type {
872            ObjectType::Table => {
873                let (table, obj) = Table::find_by_id(job_id)
874                    .find_also_related(Object)
875                    .one(&txn)
876                    .await?
877                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
878                if table.table_type == TableType::MaterializedView {
879                    notification_op = NotificationOperation::Update;
880                }
881
882                if let Some(source_id) = table.optional_associated_source_id {
883                    let (src, obj) = Source::find_by_id(source_id)
884                        .find_also_related(Object)
885                        .one(&txn)
886                        .await?
887                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
888                    objects.push(PbObject {
889                        object_info: Some(PbObjectInfo::Source(
890                            ObjectModel(src, obj.unwrap()).into(),
891                        )),
892                    });
893                }
894                objects.push(PbObject {
895                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
896                });
897            }
898            ObjectType::Sink => {
899                let (sink, obj) = Sink::find_by_id(job_id)
900                    .find_also_related(Object)
901                    .one(&txn)
902                    .await?
903                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
904                objects.push(PbObject {
905                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
906                });
907            }
908            ObjectType::Index => {
909                let (index, obj) = Index::find_by_id(job_id)
910                    .find_also_related(Object)
911                    .one(&txn)
912                    .await?
913                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
914                {
915                    let (table, obj) = Table::find_by_id(index.index_table_id)
916                        .find_also_related(Object)
917                        .one(&txn)
918                        .await?
919                        .ok_or_else(|| {
920                            MetaError::catalog_id_not_found("table", index.index_table_id)
921                        })?;
922                    objects.push(PbObject {
923                        object_info: Some(PbObjectInfo::Table(
924                            ObjectModel(table, obj.unwrap()).into(),
925                        )),
926                    });
927                }
928
929                // If the index is created on a table with privileges, we should also
930                // grant the privileges for the index and its state tables.
931                let primary_table_privileges = UserPrivilege::find()
932                    .filter(
933                        user_privilege::Column::Oid
934                            .eq(index.primary_table_id)
935                            .and(user_privilege::Column::Action.eq(Action::Select)),
936                    )
937                    .all(&txn)
938                    .await?;
939                if !primary_table_privileges.is_empty() {
940                    let index_state_table_ids: Vec<TableId> = Table::find()
941                        .select_only()
942                        .column(table::Column::TableId)
943                        .filter(
944                            table::Column::BelongsToJobId
945                                .eq(job_id)
946                                .or(table::Column::TableId.eq(index.index_table_id)),
947                        )
948                        .into_tuple()
949                        .all(&txn)
950                        .await?;
951                    let mut new_privileges = vec![];
952                    for privilege in &primary_table_privileges {
953                        for state_table_id in &index_state_table_ids {
954                            new_privileges.push(user_privilege::ActiveModel {
955                                id: Default::default(),
956                                oid: Set(*state_table_id),
957                                user_id: Set(privilege.user_id),
958                                action: Set(Action::Select),
959                                dependent_id: Set(privilege.dependent_id),
960                                granted_by: Set(privilege.granted_by),
961                                with_grant_option: Set(privilege.with_grant_option),
962                            });
963                        }
964                    }
965                    UserPrivilege::insert_many(new_privileges)
966                        .exec(&txn)
967                        .await?;
968
969                    updated_user_info = list_user_info_by_ids(
970                        primary_table_privileges.into_iter().map(|p| p.user_id),
971                        &txn,
972                    )
973                    .await?;
974                }
975
976                objects.push(PbObject {
977                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
978                });
979            }
980            ObjectType::Source => {
981                let (source, obj) = Source::find_by_id(job_id)
982                    .find_also_related(Object)
983                    .one(&txn)
984                    .await?
985                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
986                objects.push(PbObject {
987                    object_info: Some(PbObjectInfo::Source(
988                        ObjectModel(source, obj.unwrap()).into(),
989                    )),
990                });
991            }
992            _ => unreachable!("invalid job type: {:?}", job_type),
993        }
994
995        let fragment_mapping = get_fragment_mappings(&txn, job_id).await?;
996
997        let replace_table_mapping_update = match replace_stream_job_info {
998            Some(ReplaceStreamJobPlan {
999                streaming_job,
1000                replace_upstream,
1001                tmp_id,
1002                ..
1003            }) => {
1004                let incoming_sink_id = job_id;
1005
1006                let (relations, fragment_mapping, _) = Self::finish_replace_streaming_job_inner(
1007                    tmp_id as ObjectId,
1008                    replace_upstream,
1009                    SinkIntoTableContext {
1010                        creating_sink_id: Some(incoming_sink_id as _),
1011                        dropping_sink_id: None,
1012                        updated_sink_catalogs: vec![],
1013                    },
1014                    &txn,
1015                    streaming_job,
1016                    None, // will not drop table connector when creating a streaming job
1017                )
1018                .await?;
1019
1020                Some((relations, fragment_mapping))
1021            }
1022            None => None,
1023        };
1024
1025        if job_type != ObjectType::Index {
1026            updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1027        }
1028        txn.commit().await?;
1029
1030        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1031            .await;
1032
1033        let mut version = self
1034            .notify_frontend(
1035                notification_op,
1036                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1037            )
1038            .await;
1039
1040        // notify users about the default privileges
1041        if !updated_user_info.is_empty() {
1042            version = self.notify_users_update(updated_user_info).await;
1043        }
1044
1045        if let Some((objects, fragment_mapping)) = replace_table_mapping_update {
1046            self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1047                .await;
1048            version = self
1049                .notify_frontend(
1050                    NotificationOperation::Update,
1051                    NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1052                )
1053                .await;
1054        }
1055        inner
1056            .creating_table_finish_notifier
1057            .values_mut()
1058            .for_each(|creating_tables| {
1059                if let Some(txs) = creating_tables.remove(&job_id) {
1060                    for tx in txs {
1061                        let _ = tx.send(Ok(version));
1062                    }
1063                }
1064            });
1065
1066        Ok(())
1067    }
1068
1069    pub async fn finish_replace_streaming_job(
1070        &self,
1071        tmp_id: ObjectId,
1072        streaming_job: StreamingJob,
1073        replace_upstream: FragmentReplaceUpstream,
1074        sink_into_table_context: SinkIntoTableContext,
1075        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1076    ) -> MetaResult<NotificationVersion> {
1077        let inner = self.inner.write().await;
1078        let txn = inner.db.begin().await?;
1079
1080        let (objects, fragment_mapping, delete_notification_objs) =
1081            Self::finish_replace_streaming_job_inner(
1082                tmp_id,
1083                replace_upstream,
1084                sink_into_table_context,
1085                &txn,
1086                streaming_job,
1087                drop_table_connector_ctx,
1088            )
1089            .await?;
1090
1091        txn.commit().await?;
1092
1093        // FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
1094        // catalog and need to access the old fragment. Let frontend nodes delete the old fragment
1095        // when they receive table catalog change.
1096        // self.notify_fragment_mapping(NotificationOperation::Delete, old_fragment_mappings)
1097        //     .await;
1098        self.notify_fragment_mapping(NotificationOperation::Add, fragment_mapping)
1099            .await;
1100        let mut version = self
1101            .notify_frontend(
1102                NotificationOperation::Update,
1103                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1104            )
1105            .await;
1106
1107        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1108            self.notify_users_update(user_infos).await;
1109            version = self
1110                .notify_frontend(
1111                    NotificationOperation::Delete,
1112                    build_object_group_for_delete(to_drop_objects),
1113                )
1114                .await;
1115        }
1116
1117        Ok(version)
1118    }
1119
1120    pub async fn finish_replace_streaming_job_inner(
1121        tmp_id: ObjectId,
1122        replace_upstream: FragmentReplaceUpstream,
1123        SinkIntoTableContext {
1124            creating_sink_id,
1125            dropping_sink_id,
1126            updated_sink_catalogs,
1127        }: SinkIntoTableContext,
1128        txn: &DatabaseTransaction,
1129        streaming_job: StreamingJob,
1130        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1131    ) -> MetaResult<(
1132        Vec<PbObject>,
1133        Vec<PbFragmentWorkerSlotMapping>,
1134        Option<(Vec<PbUserInfo>, Vec<PartialObject>)>,
1135    )> {
1136        let original_job_id = streaming_job.id() as ObjectId;
1137        let job_type = streaming_job.job_type();
1138
1139        let mut index_item_rewriter = None;
1140
1141        // Update catalog
1142        match streaming_job {
1143            StreamingJob::Table(_source, table, _table_job_type) => {
1144                // The source catalog should remain unchanged
1145
1146                let original_column_catalogs = Table::find_by_id(original_job_id)
1147                    .select_only()
1148                    .columns([table::Column::Columns])
1149                    .into_tuple::<ColumnCatalogArray>()
1150                    .one(txn)
1151                    .await?
1152                    .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1153
1154                index_item_rewriter = Some({
1155                    let original_columns = original_column_catalogs
1156                        .to_protobuf()
1157                        .into_iter()
1158                        .map(|c| c.column_desc.unwrap())
1159                        .collect_vec();
1160                    let new_columns = table
1161                        .columns
1162                        .iter()
1163                        .map(|c| c.column_desc.clone().unwrap())
1164                        .collect_vec();
1165
1166                    IndexItemRewriter {
1167                        original_columns,
1168                        new_columns,
1169                    }
1170                });
1171
1172                // For sinks created in earlier versions, we need to set the original_target_columns.
1173                for sink_id in updated_sink_catalogs {
1174                    sink::ActiveModel {
1175                        sink_id: Set(sink_id as _),
1176                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1177                        ..Default::default()
1178                    }
1179                    .update(txn)
1180                    .await?;
1181                }
1182                // Update the table catalog with the new one. (column catalog is also updated here)
1183                let mut table = table::ActiveModel::from(table);
1184                let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1185                if let Some(sink_id) = creating_sink_id {
1186                    debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1187                    incoming_sinks.push(sink_id as _);
1188                }
1189                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1190                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1191                {
1192                    // drop table connector, the rest logic is in `drop_table_associated_source`
1193                    table.optional_associated_source_id = Set(None);
1194                }
1195
1196                if let Some(sink_id) = dropping_sink_id {
1197                    let drained = incoming_sinks
1198                        .extract_if(.., |id| *id == sink_id)
1199                        .collect_vec();
1200                    debug_assert_eq!(drained, vec![sink_id]);
1201                }
1202
1203                table.incoming_sinks = Set(incoming_sinks.into());
1204                table.update(txn).await?;
1205            }
1206            StreamingJob::Source(source) => {
1207                // Update the source catalog with the new one.
1208                let source = source::ActiveModel::from(source);
1209                source.update(txn).await?;
1210            }
1211            StreamingJob::MaterializedView(table) => {
1212                // Update the table catalog with the new one.
1213                let table = table::ActiveModel::from(table);
1214                table.update(txn).await?;
1215            }
1216            _ => unreachable!(
1217                "invalid streaming job type: {:?}",
1218                streaming_job.job_type_str()
1219            ),
1220        }
1221
1222        // 0. update internal tables
1223        // Fields including `fragment_id` were placeholder values before.
1224        // After table fragments are created, update them for all internal tables.
1225        let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1226            .select_only()
1227            .columns([
1228                fragment::Column::FragmentId,
1229                fragment::Column::StateTableIds,
1230            ])
1231            .filter(fragment::Column::JobId.eq(tmp_id))
1232            .into_tuple()
1233            .all(txn)
1234            .await?;
1235        for (fragment_id, state_table_ids) in fragment_info {
1236            for state_table_id in state_table_ids.into_inner() {
1237                table::ActiveModel {
1238                    table_id: Set(state_table_id as _),
1239                    fragment_id: Set(Some(fragment_id)),
1240                    // No need to update `vnode_count` because it must remain the same.
1241                    ..Default::default()
1242                }
1243                .update(txn)
1244                .await?;
1245            }
1246        }
1247
1248        // 1. replace old fragments/actors with new ones.
1249        Fragment::delete_many()
1250            .filter(fragment::Column::JobId.eq(original_job_id))
1251            .exec(txn)
1252            .await?;
1253        Fragment::update_many()
1254            .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1255            .filter(fragment::Column::JobId.eq(tmp_id))
1256            .exec(txn)
1257            .await?;
1258
1259        // 2. update merges.
1260        // update downstream fragment's Merge node, and upstream_fragment_id
1261        for (fragment_id, fragment_replace_map) in replace_upstream {
1262            let (fragment_id, mut stream_node) = Fragment::find_by_id(fragment_id as FragmentId)
1263                .select_only()
1264                .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1265                .into_tuple::<(FragmentId, StreamNode)>()
1266                .one(txn)
1267                .await?
1268                .map(|(id, node)| (id, node.to_protobuf()))
1269                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1270
1271            visit_stream_node_mut(&mut stream_node, |body| {
1272                if let PbNodeBody::Merge(m) = body
1273                    && let Some(new_fragment_id) = fragment_replace_map.get(&m.upstream_fragment_id)
1274                {
1275                    m.upstream_fragment_id = *new_fragment_id;
1276                }
1277            });
1278            fragment::ActiveModel {
1279                fragment_id: Set(fragment_id),
1280                stream_node: Set(StreamNode::from(&stream_node)),
1281                ..Default::default()
1282            }
1283            .update(txn)
1284            .await?;
1285        }
1286
1287        // 3. remove dummy object.
1288        Object::delete_by_id(tmp_id).exec(txn).await?;
1289
1290        // 4. update catalogs and notify.
1291        let mut objects = vec![];
1292        match job_type {
1293            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1294                let (table, table_obj) = Table::find_by_id(original_job_id)
1295                    .find_also_related(Object)
1296                    .one(txn)
1297                    .await?
1298                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1299                objects.push(PbObject {
1300                    object_info: Some(PbObjectInfo::Table(
1301                        ObjectModel(table, table_obj.unwrap()).into(),
1302                    )),
1303                })
1304            }
1305            StreamingJobType::Source => {
1306                let (source, source_obj) = Source::find_by_id(original_job_id)
1307                    .find_also_related(Object)
1308                    .one(txn)
1309                    .await?
1310                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1311                objects.push(PbObject {
1312                    object_info: Some(PbObjectInfo::Source(
1313                        ObjectModel(source, source_obj.unwrap()).into(),
1314                    )),
1315                })
1316            }
1317            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1318        }
1319
1320        if let Some(expr_rewriter) = index_item_rewriter {
1321            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1322                .select_only()
1323                .columns([index::Column::IndexId, index::Column::IndexItems])
1324                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1325                .into_tuple()
1326                .all(txn)
1327                .await?;
1328            for (index_id, nodes) in index_items {
1329                let mut pb_nodes = nodes.to_protobuf();
1330                pb_nodes
1331                    .iter_mut()
1332                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1333                let index = index::ActiveModel {
1334                    index_id: Set(index_id),
1335                    index_items: Set(pb_nodes.into()),
1336                    ..Default::default()
1337                }
1338                .update(txn)
1339                .await?;
1340                let index_obj = index
1341                    .find_related(Object)
1342                    .one(txn)
1343                    .await?
1344                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1345                objects.push(PbObject {
1346                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1347                });
1348            }
1349        }
1350
1351        let fragment_mapping: Vec<_> = get_fragment_mappings(txn, original_job_id as _).await?;
1352
1353        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1354        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1355            notification_objs =
1356                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1357        }
1358
1359        Ok((objects, fragment_mapping, notification_objs))
1360    }
1361
1362    /// Abort the replacing streaming job by deleting the temporary job object.
1363    pub async fn try_abort_replacing_streaming_job(&self, tmp_job_id: ObjectId) -> MetaResult<()> {
1364        let inner = self.inner.write().await;
1365        Object::delete_by_id(tmp_job_id).exec(&inner.db).await?;
1366        Ok(())
1367    }
1368
1369    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1370    // return the actor_ids to be applied
1371    pub async fn update_source_rate_limit_by_source_id(
1372        &self,
1373        source_id: SourceId,
1374        rate_limit: Option<u32>,
1375    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1376        let inner = self.inner.read().await;
1377        let txn = inner.db.begin().await?;
1378
1379        {
1380            let active_source = source::ActiveModel {
1381                source_id: Set(source_id),
1382                rate_limit: Set(rate_limit.map(|v| v as i32)),
1383                ..Default::default()
1384            };
1385            active_source.update(&txn).await?;
1386        }
1387
1388        let (source, obj) = Source::find_by_id(source_id)
1389            .find_also_related(Object)
1390            .one(&txn)
1391            .await?
1392            .ok_or_else(|| {
1393                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1394            })?;
1395
1396        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1397        let streaming_job_ids: Vec<ObjectId> =
1398            if let Some(table_id) = source.optional_associated_table_id {
1399                vec![table_id]
1400            } else if let Some(source_info) = &source.source_info
1401                && source_info.to_protobuf().is_shared()
1402            {
1403                vec![source_id]
1404            } else {
1405                ObjectDependency::find()
1406                    .select_only()
1407                    .column(object_dependency::Column::UsedBy)
1408                    .filter(object_dependency::Column::Oid.eq(source_id))
1409                    .into_tuple()
1410                    .all(&txn)
1411                    .await?
1412            };
1413
1414        if streaming_job_ids.is_empty() {
1415            return Err(MetaError::invalid_parameter(format!(
1416                "source id {source_id} not used by any streaming job"
1417            )));
1418        }
1419
1420        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1421            .select_only()
1422            .columns([
1423                fragment::Column::FragmentId,
1424                fragment::Column::FragmentTypeMask,
1425                fragment::Column::StreamNode,
1426            ])
1427            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1428            .into_tuple()
1429            .all(&txn)
1430            .await?;
1431        let mut fragments = fragments
1432            .into_iter()
1433            .map(|(id, mask, stream_node)| {
1434                (
1435                    id,
1436                    FragmentTypeMask::from(mask as u32),
1437                    stream_node.to_protobuf(),
1438                )
1439            })
1440            .collect_vec();
1441
1442        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1443            let mut found = false;
1444            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1445                visit_stream_node_mut(stream_node, |node| {
1446                    if let PbNodeBody::Source(node) = node
1447                        && let Some(node_inner) = &mut node.source_inner
1448                        && node_inner.source_id == source_id as u32
1449                    {
1450                        node_inner.rate_limit = rate_limit;
1451                        found = true;
1452                    }
1453                });
1454            }
1455            if is_fs_source {
1456                // in older versions, there's no fragment type flag for `FsFetch` node,
1457                // so we just scan all fragments for StreamFsFetch node if using fs connector
1458                visit_stream_node_mut(stream_node, |node| {
1459                    if let PbNodeBody::StreamFsFetch(node) = node {
1460                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1461                        if let Some(node_inner) = &mut node.node_inner
1462                            && node_inner.source_id == source_id as u32
1463                        {
1464                            node_inner.rate_limit = rate_limit;
1465                            found = true;
1466                        }
1467                    }
1468                });
1469            }
1470            found
1471        });
1472
1473        assert!(
1474            !fragments.is_empty(),
1475            "source id should be used by at least one fragment"
1476        );
1477        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1478        for (id, fragment_type_mask, stream_node) in fragments {
1479            fragment::ActiveModel {
1480                fragment_id: Set(id),
1481                fragment_type_mask: Set(fragment_type_mask.into()),
1482                stream_node: Set(StreamNode::from(&stream_node)),
1483                ..Default::default()
1484            }
1485            .update(&txn)
1486            .await?;
1487        }
1488        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1489
1490        txn.commit().await?;
1491
1492        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1493        let _version = self
1494            .notify_frontend(
1495                NotificationOperation::Update,
1496                NotificationInfo::ObjectGroup(PbObjectGroup {
1497                    objects: vec![PbObject {
1498                        object_info: Some(relation_info),
1499                    }],
1500                }),
1501            )
1502            .await;
1503
1504        Ok(fragment_actors)
1505    }
1506
1507    // edit the content of fragments in given `table_id`
1508    // return the actor_ids to be applied
1509    pub async fn mutate_fragments_by_job_id(
1510        &self,
1511        job_id: ObjectId,
1512        // returns true if the mutation is applied
1513        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1514        // error message when no relevant fragments is found
1515        err_msg: &'static str,
1516    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1517        let inner = self.inner.read().await;
1518        let txn = inner.db.begin().await?;
1519
1520        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1521            .select_only()
1522            .columns([
1523                fragment::Column::FragmentId,
1524                fragment::Column::FragmentTypeMask,
1525                fragment::Column::StreamNode,
1526            ])
1527            .filter(fragment::Column::JobId.eq(job_id))
1528            .into_tuple()
1529            .all(&txn)
1530            .await?;
1531        let mut fragments = fragments
1532            .into_iter()
1533            .map(|(id, mask, stream_node)| {
1534                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1535            })
1536            .collect_vec();
1537
1538        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1539            fragments_mutation_fn(*fragment_type_mask, stream_node)
1540        });
1541        if fragments.is_empty() {
1542            return Err(MetaError::invalid_parameter(format!(
1543                "job id {job_id}: {}",
1544                err_msg
1545            )));
1546        }
1547
1548        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1549        for (id, _, stream_node) in fragments {
1550            fragment::ActiveModel {
1551                fragment_id: Set(id),
1552                stream_node: Set(StreamNode::from(&stream_node)),
1553                ..Default::default()
1554            }
1555            .update(&txn)
1556            .await?;
1557        }
1558        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1559
1560        txn.commit().await?;
1561
1562        Ok(fragment_actors)
1563    }
1564
1565    async fn mutate_fragment_by_fragment_id(
1566        &self,
1567        fragment_id: FragmentId,
1568        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1569        err_msg: &'static str,
1570    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1571        let inner = self.inner.read().await;
1572        let txn = inner.db.begin().await?;
1573
1574        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1575            Fragment::find_by_id(fragment_id)
1576                .select_only()
1577                .columns([
1578                    fragment::Column::FragmentTypeMask,
1579                    fragment::Column::StreamNode,
1580                ])
1581                .into_tuple()
1582                .one(&txn)
1583                .await?
1584                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1585        let mut pb_stream_node = stream_node.to_protobuf();
1586        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1587
1588        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1589            return Err(MetaError::invalid_parameter(format!(
1590                "fragment id {fragment_id}: {}",
1591                err_msg
1592            )));
1593        }
1594
1595        fragment::ActiveModel {
1596            fragment_id: Set(fragment_id),
1597            stream_node: Set(stream_node),
1598            ..Default::default()
1599        }
1600        .update(&txn)
1601        .await?;
1602
1603        let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1604
1605        txn.commit().await?;
1606
1607        Ok(fragment_actors)
1608    }
1609
1610    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1611    // return the actor_ids to be applied
1612    pub async fn update_backfill_rate_limit_by_job_id(
1613        &self,
1614        job_id: ObjectId,
1615        rate_limit: Option<u32>,
1616    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1617        let update_backfill_rate_limit =
1618            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1619                let mut found = false;
1620                if fragment_type_mask
1621                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1622                {
1623                    visit_stream_node_mut(stream_node, |node| match node {
1624                        PbNodeBody::StreamCdcScan(node) => {
1625                            node.rate_limit = rate_limit;
1626                            found = true;
1627                        }
1628                        PbNodeBody::StreamScan(node) => {
1629                            node.rate_limit = rate_limit;
1630                            found = true;
1631                        }
1632                        PbNodeBody::SourceBackfill(node) => {
1633                            node.rate_limit = rate_limit;
1634                            found = true;
1635                        }
1636                        PbNodeBody::Sink(node) => {
1637                            node.rate_limit = rate_limit;
1638                            found = true;
1639                        }
1640                        _ => {}
1641                    });
1642                }
1643                found
1644            };
1645
1646        self.mutate_fragments_by_job_id(
1647            job_id,
1648            update_backfill_rate_limit,
1649            "stream scan node or source node not found",
1650        )
1651        .await
1652    }
1653
1654    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1655    // return the actor_ids to be applied
1656    pub async fn update_sink_rate_limit_by_job_id(
1657        &self,
1658        job_id: ObjectId,
1659        rate_limit: Option<u32>,
1660    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1661        let update_sink_rate_limit =
1662            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1663                let mut found = false;
1664                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1665                    visit_stream_node_mut(stream_node, |node| {
1666                        if let PbNodeBody::Sink(node) = node {
1667                            node.rate_limit = rate_limit;
1668                            found = true;
1669                        }
1670                    });
1671                }
1672                found
1673            };
1674
1675        self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1676            .await
1677    }
1678
1679    pub async fn update_dml_rate_limit_by_job_id(
1680        &self,
1681        job_id: ObjectId,
1682        rate_limit: Option<u32>,
1683    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1684        let update_dml_rate_limit =
1685            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1686                let mut found = false;
1687                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1688                    visit_stream_node_mut(stream_node, |node| {
1689                        if let PbNodeBody::Dml(node) = node {
1690                            node.rate_limit = rate_limit;
1691                            found = true;
1692                        }
1693                    });
1694                }
1695                found
1696            };
1697
1698        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1699            .await
1700    }
1701
1702    pub async fn update_source_props_by_source_id(
1703        &self,
1704        source_id: SourceId,
1705        alter_props: BTreeMap<String, String>,
1706        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1707    ) -> MetaResult<WithOptionsSecResolved> {
1708        let inner = self.inner.read().await;
1709        let txn = inner.db.begin().await?;
1710
1711        let (source, _obj) = Source::find_by_id(source_id)
1712            .find_also_related(Object)
1713            .one(&txn)
1714            .await?
1715            .ok_or_else(|| {
1716                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1717            })?;
1718        let connector = source.with_properties.0.get_connector().unwrap();
1719
1720        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1721        let prop_keys: Vec<String> = alter_props
1722            .keys()
1723            .chain(alter_secret_refs.keys())
1724            .cloned()
1725            .collect();
1726        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1727            &connector, &prop_keys,
1728        )?;
1729
1730        let mut options_with_secret = WithOptionsSecResolved::new(
1731            source.with_properties.0.clone(),
1732            source
1733                .secret_ref
1734                .map(|secret_ref| secret_ref.to_protobuf())
1735                .unwrap_or_default(),
1736        );
1737        let (to_add_secret_dep, to_remove_secret_dep) =
1738            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1739
1740        tracing::info!(
1741            "applying new properties to source: source_id={}, options_with_secret={:?}",
1742            source_id,
1743            options_with_secret
1744        );
1745        // check if the alter-ed props are valid for each Connector
1746        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1747        // todo: validate via source manager
1748
1749        let mut associate_table_id = None;
1750
1751        // can be source_id or table_id
1752        // if updating an associated source, the preferred_id is the table_id
1753        // otherwise, it is the source_id
1754        let mut preferred_id: i32 = source_id;
1755        let rewrite_sql = {
1756            let definition = source.definition.clone();
1757
1758            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1759                .map_err(|e| {
1760                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1761                        anyhow!(e).context("Failed to parse source definition SQL"),
1762                    )))
1763                })?
1764                .try_into()
1765                .unwrap();
1766
1767            /// Formats SQL options with secret values properly resolved
1768            ///
1769            /// This function processes configuration options that may contain sensitive data:
1770            /// - Plaintext options are directly converted to `SqlOption`
1771            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
1772            ///   without exposing the actual secret value
1773            ///
1774            /// # Arguments
1775            /// * `txn` - Database transaction for retrieving secrets
1776            /// * `options_with_secret` - Container of options with both plaintext and secret values
1777            ///
1778            /// # Returns
1779            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
1780            async fn format_with_option_secret_resolved(
1781                txn: &DatabaseTransaction,
1782                options_with_secret: &WithOptionsSecResolved,
1783            ) -> MetaResult<Vec<SqlOption>> {
1784                let mut options = Vec::new();
1785                for (k, v) in options_with_secret.as_plaintext() {
1786                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1787                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1788                    options.push(sql_option);
1789                }
1790                for (k, v) in options_with_secret.as_secret() {
1791                    if let Some(secret_model) =
1792                        Secret::find_by_id(v.secret_id as i32).one(txn).await?
1793                    {
1794                        let sql_option =
1795                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1796                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1797                        options.push(sql_option);
1798                    } else {
1799                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1800                    }
1801                }
1802                Ok(options)
1803            }
1804
1805            match &mut stmt {
1806                Statement::CreateSource { stmt } => {
1807                    stmt.with_properties.0 =
1808                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1809                }
1810                Statement::CreateTable { with_options, .. } => {
1811                    *with_options =
1812                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1813                    associate_table_id = source.optional_associated_table_id;
1814                    preferred_id = associate_table_id.unwrap();
1815                }
1816                _ => unreachable!(),
1817            }
1818
1819            stmt.to_string()
1820        };
1821
1822        {
1823            // update secret dependencies
1824            if !to_add_secret_dep.is_empty() {
1825                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1826                    object_dependency::ActiveModel {
1827                        oid: Set(secret_id as _),
1828                        used_by: Set(preferred_id as _),
1829                        ..Default::default()
1830                    }
1831                }))
1832                .exec(&txn)
1833                .await?;
1834            }
1835            if !to_remove_secret_dep.is_empty() {
1836                // todo: fix the filter logic
1837                let _ = ObjectDependency::delete_many()
1838                    .filter(
1839                        object_dependency::Column::Oid
1840                            .is_in(to_remove_secret_dep)
1841                            .and(
1842                                object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
1843                            ),
1844                    )
1845                    .exec(&txn)
1846                    .await?;
1847            }
1848        }
1849
1850        let active_source_model = source::ActiveModel {
1851            source_id: Set(source_id),
1852            definition: Set(rewrite_sql.clone()),
1853            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
1854            secret_ref: Set((!options_with_secret.as_secret().is_empty())
1855                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
1856            ..Default::default()
1857        };
1858        active_source_model.update(&txn).await?;
1859
1860        if let Some(associate_table_id) = associate_table_id {
1861            // update the associated table statement accordly
1862            let active_table_model = table::ActiveModel {
1863                table_id: Set(associate_table_id),
1864                definition: Set(rewrite_sql),
1865                ..Default::default()
1866            };
1867            active_table_model.update(&txn).await?;
1868        }
1869
1870        // update fragments
1871        update_connector_props_fragments(
1872            &txn,
1873            if let Some(associate_table_id) = associate_table_id {
1874                // if updating table with connector, the fragment_id is table id
1875                associate_table_id
1876            } else {
1877                source_id
1878            },
1879            FragmentTypeFlag::Source,
1880            |node, found| {
1881                if let PbNodeBody::Source(node) = node
1882                    && let Some(source_inner) = &mut node.source_inner
1883                {
1884                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
1885                    source_inner.secret_refs = options_with_secret.as_secret().clone();
1886                    *found = true;
1887                }
1888            },
1889        )
1890        .await?;
1891
1892        let mut to_update_objs = Vec::with_capacity(2);
1893        let (source, obj) = Source::find_by_id(source_id)
1894            .find_also_related(Object)
1895            .one(&txn)
1896            .await?
1897            .ok_or_else(|| {
1898                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1899            })?;
1900        to_update_objs.push(PbObject {
1901            object_info: Some(PbObjectInfo::Source(
1902                ObjectModel(source, obj.unwrap()).into(),
1903            )),
1904        });
1905
1906        if let Some(associate_table_id) = associate_table_id {
1907            let (table, obj) = Table::find_by_id(associate_table_id)
1908                .find_also_related(Object)
1909                .one(&txn)
1910                .await?
1911                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
1912            to_update_objs.push(PbObject {
1913                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1914            });
1915        }
1916
1917        txn.commit().await?;
1918
1919        self.notify_frontend(
1920            NotificationOperation::Update,
1921            NotificationInfo::ObjectGroup(PbObjectGroup {
1922                objects: to_update_objs,
1923            }),
1924        )
1925        .await;
1926
1927        Ok(options_with_secret)
1928    }
1929
1930    pub async fn update_sink_props_by_sink_id(
1931        &self,
1932        sink_id: SinkId,
1933        props: BTreeMap<String, String>,
1934    ) -> MetaResult<HashMap<String, String>> {
1935        let inner = self.inner.read().await;
1936        let txn = inner.db.begin().await?;
1937
1938        let (sink, _obj) = Sink::find_by_id(sink_id)
1939            .find_also_related(Object)
1940            .one(&txn)
1941            .await?
1942            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
1943
1944        // Validate that props can be altered
1945        match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
1946            Some(connector) => {
1947                let connector_type = connector.to_lowercase();
1948                let field_names: Vec<String> = props.keys().cloned().collect();
1949                check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
1950                    .map_err(|e| SinkError::Config(anyhow!(e)))?;
1951
1952                match_sink_name_str!(
1953                    connector_type.as_str(),
1954                    SinkType,
1955                    {
1956                        let mut new_props = sink.properties.0.clone();
1957                        new_props.extend(props.clone());
1958                        SinkType::validate_alter_config(&new_props)
1959                    },
1960                    |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
1961                )?
1962            }
1963            None => {
1964                return Err(
1965                    SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
1966                );
1967            }
1968        };
1969        let definition = sink.definition.clone();
1970        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1971            .map_err(|e| SinkError::Config(anyhow!(e)))?
1972            .try_into()
1973            .unwrap();
1974        if let Statement::CreateSink { stmt } = &mut stmt {
1975            let mut new_sql_options = stmt
1976                .with_properties
1977                .0
1978                .iter()
1979                .map(|sql_option| (&sql_option.name, sql_option))
1980                .collect::<IndexMap<_, _>>();
1981            let add_sql_options = props
1982                .iter()
1983                .map(|(k, v)| SqlOption::try_from((k, v)))
1984                .collect::<Result<Vec<SqlOption>, ParserError>>()
1985                .map_err(|e| SinkError::Config(anyhow!(e)))?;
1986            new_sql_options.extend(
1987                add_sql_options
1988                    .iter()
1989                    .map(|sql_option| (&sql_option.name, sql_option)),
1990            );
1991            stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
1992        } else {
1993            panic!("sink definition is not a create sink statement")
1994        }
1995        let mut new_config = sink.properties.clone().into_inner();
1996        new_config.extend(props);
1997
1998        let active_sink = sink::ActiveModel {
1999            sink_id: Set(sink_id),
2000            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2001            definition: Set(stmt.to_string()),
2002            ..Default::default()
2003        };
2004        active_sink.update(&txn).await?;
2005
2006        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2007            .select_only()
2008            .columns([
2009                fragment::Column::FragmentId,
2010                fragment::Column::FragmentTypeMask,
2011                fragment::Column::StreamNode,
2012            ])
2013            .filter(fragment::Column::JobId.eq(sink_id))
2014            .into_tuple()
2015            .all(&txn)
2016            .await?;
2017        let fragments = fragments
2018            .into_iter()
2019            .filter(|(_, fragment_type_mask, _)| {
2020                FragmentTypeMask::from(*fragment_type_mask).contains(FragmentTypeFlag::Sink)
2021            })
2022            .filter_map(|(id, _, stream_node)| {
2023                let mut stream_node = stream_node.to_protobuf();
2024                let mut found = false;
2025                visit_stream_node_mut(&mut stream_node, |node| {
2026                    if let PbNodeBody::Sink(node) = node
2027                        && let Some(sink_desc) = &mut node.sink_desc
2028                        && sink_desc.id == sink_id as u32
2029                    {
2030                        sink_desc.properties = new_config.clone();
2031                        found = true;
2032                    }
2033                });
2034                if found { Some((id, stream_node)) } else { None }
2035            })
2036            .collect_vec();
2037        assert!(
2038            !fragments.is_empty(),
2039            "sink id should be used by at least one fragment"
2040        );
2041        for (id, stream_node) in fragments {
2042            fragment::ActiveModel {
2043                fragment_id: Set(id),
2044                stream_node: Set(StreamNode::from(&stream_node)),
2045                ..Default::default()
2046            }
2047            .update(&txn)
2048            .await?;
2049        }
2050
2051        let (sink, obj) = Sink::find_by_id(sink_id)
2052            .find_also_related(Object)
2053            .one(&txn)
2054            .await?
2055            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2056
2057        txn.commit().await?;
2058
2059        let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
2060        let _version = self
2061            .notify_frontend(
2062                NotificationOperation::Update,
2063                NotificationInfo::ObjectGroup(PbObjectGroup {
2064                    objects: vec![PbObject {
2065                        object_info: Some(relation_info),
2066                    }],
2067                }),
2068            )
2069            .await;
2070
2071        Ok(new_config.into_iter().collect())
2072    }
2073
2074    pub async fn update_fragment_rate_limit_by_fragment_id(
2075        &self,
2076        fragment_id: FragmentId,
2077        rate_limit: Option<u32>,
2078    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2079        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2080                                 stream_node: &mut PbStreamNode| {
2081            let mut found = false;
2082            if fragment_type_mask.contains_any(
2083                FragmentTypeFlag::dml_rate_limit_fragments()
2084                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2085                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2086                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2087            ) {
2088                visit_stream_node_mut(stream_node, |node| {
2089                    if let PbNodeBody::Dml(node) = node {
2090                        node.rate_limit = rate_limit;
2091                        found = true;
2092                    }
2093                    if let PbNodeBody::Sink(node) = node {
2094                        node.rate_limit = rate_limit;
2095                        found = true;
2096                    }
2097                    if let PbNodeBody::StreamCdcScan(node) = node {
2098                        node.rate_limit = rate_limit;
2099                        found = true;
2100                    }
2101                    if let PbNodeBody::StreamScan(node) = node {
2102                        node.rate_limit = rate_limit;
2103                        found = true;
2104                    }
2105                    if let PbNodeBody::SourceBackfill(node) = node {
2106                        node.rate_limit = rate_limit;
2107                        found = true;
2108                    }
2109                });
2110            }
2111            found
2112        };
2113        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2114            .await
2115    }
2116
2117    pub async fn post_apply_reschedules(
2118        &self,
2119        reschedules: HashMap<FragmentId, Reschedule>,
2120        post_updates: &JobReschedulePostUpdates,
2121    ) -> MetaResult<()> {
2122        let new_created_actors: HashSet<_> = reschedules
2123            .values()
2124            .flat_map(|reschedule| {
2125                reschedule
2126                    .added_actors
2127                    .values()
2128                    .flatten()
2129                    .map(|actor_id| *actor_id as ActorId)
2130            })
2131            .collect();
2132
2133        let inner = self.inner.write().await;
2134
2135        let txn = inner.db.begin().await?;
2136
2137        let mut fragment_mapping_to_notify = vec![];
2138
2139        for (
2140            fragment_id,
2141            Reschedule {
2142                removed_actors,
2143                vnode_bitmap_updates,
2144                actor_splits,
2145                newly_created_actors,
2146                ..
2147            },
2148        ) in reschedules
2149        {
2150            // drop removed actors
2151            Actor::delete_many()
2152                .filter(
2153                    actor::Column::ActorId
2154                        .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2155                )
2156                .exec(&txn)
2157                .await?;
2158
2159            // add new actors
2160            for (
2161                (
2162                    StreamActor {
2163                        actor_id,
2164                        fragment_id,
2165                        vnode_bitmap,
2166                        expr_context,
2167                        ..
2168                    },
2169                    _,
2170                ),
2171                worker_id,
2172            ) in newly_created_actors.into_values()
2173            {
2174                let splits = actor_splits
2175                    .get(&actor_id)
2176                    .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2177
2178                Actor::insert(actor::ActiveModel {
2179                    actor_id: Set(actor_id as _),
2180                    fragment_id: Set(fragment_id as _),
2181                    status: Set(ActorStatus::Running),
2182                    splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2183                    worker_id: Set(worker_id),
2184                    upstream_actor_ids: Set(Default::default()),
2185                    vnode_bitmap: Set(vnode_bitmap
2186                        .as_ref()
2187                        .map(|bitmap| (&bitmap.to_protobuf()).into())),
2188                    expr_context: Set(expr_context.as_ref().unwrap().into()),
2189                })
2190                .exec(&txn)
2191                .await?;
2192            }
2193
2194            // actor update
2195            for (actor_id, bitmap) in vnode_bitmap_updates {
2196                let actor = Actor::find_by_id(actor_id as ActorId)
2197                    .one(&txn)
2198                    .await?
2199                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2200
2201                let mut actor = actor.into_active_model();
2202                actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2203                actor.update(&txn).await?;
2204            }
2205
2206            // Update actor_splits for existing actors
2207            for (actor_id, splits) in actor_splits {
2208                if new_created_actors.contains(&(actor_id as ActorId)) {
2209                    continue;
2210                }
2211
2212                let actor = Actor::find_by_id(actor_id as ActorId)
2213                    .one(&txn)
2214                    .await?
2215                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2216
2217                let mut actor = actor.into_active_model();
2218                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2219                actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2220                actor.update(&txn).await?;
2221            }
2222
2223            // fragment update
2224            let fragment = Fragment::find_by_id(fragment_id)
2225                .one(&txn)
2226                .await?
2227                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
2228
2229            let job_actors = fragment
2230                .find_related(Actor)
2231                .all(&txn)
2232                .await?
2233                .into_iter()
2234                .map(|actor| {
2235                    (
2236                        fragment_id,
2237                        fragment.distribution_type,
2238                        actor.actor_id,
2239                        actor.vnode_bitmap,
2240                        actor.worker_id,
2241                        actor.status,
2242                    )
2243                })
2244                .collect_vec();
2245
2246            fragment_mapping_to_notify.extend(rebuild_fragment_mapping_from_actors(job_actors));
2247        }
2248
2249        let JobReschedulePostUpdates {
2250            parallelism_updates,
2251            resource_group_updates,
2252        } = post_updates;
2253
2254        for (table_id, parallelism) in parallelism_updates {
2255            let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2256                .one(&txn)
2257                .await?
2258                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2259                .into_active_model();
2260
2261            streaming_job.parallelism = Set(match parallelism {
2262                TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2263                TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2264                TableParallelism::Custom => StreamingParallelism::Custom,
2265            });
2266
2267            if let Some(resource_group) =
2268                resource_group_updates.get(&(table_id.table_id() as ObjectId))
2269            {
2270                streaming_job.specific_resource_group = Set(resource_group.to_owned());
2271            }
2272
2273            streaming_job.update(&txn).await?;
2274        }
2275
2276        txn.commit().await?;
2277        self.notify_fragment_mapping(Operation::Update, fragment_mapping_to_notify)
2278            .await;
2279
2280        Ok(())
2281    }
2282
2283    /// Note: `FsFetch` created in old versions are not included.
2284    /// Since this is only used for debugging, it should be fine.
2285    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2286        let inner = self.inner.read().await;
2287        let txn = inner.db.begin().await?;
2288
2289        let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2290            .select_only()
2291            .columns([
2292                fragment::Column::FragmentId,
2293                fragment::Column::JobId,
2294                fragment::Column::FragmentTypeMask,
2295                fragment::Column::StreamNode,
2296            ])
2297            .filter(fragment_type_mask_intersects(FragmentTypeFlag::raw_flag(
2298                FragmentTypeFlag::rate_limit_fragments(),
2299            ) as _))
2300            .into_tuple()
2301            .all(&txn)
2302            .await?;
2303
2304        let mut rate_limits = Vec::new();
2305        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2306            let stream_node = stream_node.to_protobuf();
2307            visit_stream_node_body(&stream_node, |node| {
2308                let mut rate_limit = None;
2309                let mut node_name = None;
2310
2311                match node {
2312                    // source rate limit
2313                    PbNodeBody::Source(node) => {
2314                        if let Some(node_inner) = &node.source_inner {
2315                            rate_limit = node_inner.rate_limit;
2316                            node_name = Some("SOURCE");
2317                        }
2318                    }
2319                    PbNodeBody::StreamFsFetch(node) => {
2320                        if let Some(node_inner) = &node.node_inner {
2321                            rate_limit = node_inner.rate_limit;
2322                            node_name = Some("FS_FETCH");
2323                        }
2324                    }
2325                    // backfill rate limit
2326                    PbNodeBody::SourceBackfill(node) => {
2327                        rate_limit = node.rate_limit;
2328                        node_name = Some("SOURCE_BACKFILL");
2329                    }
2330                    PbNodeBody::StreamScan(node) => {
2331                        rate_limit = node.rate_limit;
2332                        node_name = Some("STREAM_SCAN");
2333                    }
2334                    PbNodeBody::StreamCdcScan(node) => {
2335                        rate_limit = node.rate_limit;
2336                        node_name = Some("STREAM_CDC_SCAN");
2337                    }
2338                    PbNodeBody::Sink(node) => {
2339                        rate_limit = node.rate_limit;
2340                        node_name = Some("SINK");
2341                    }
2342                    _ => {}
2343                }
2344
2345                if let Some(rate_limit) = rate_limit {
2346                    rate_limits.push(RateLimitInfo {
2347                        fragment_id: fragment_id as u32,
2348                        job_id: job_id as u32,
2349                        fragment_type_mask: fragment_type_mask as u32,
2350                        rate_limit,
2351                        node_name: node_name.unwrap().to_owned(),
2352                    });
2353                }
2354            });
2355        }
2356
2357        Ok(rate_limits)
2358    }
2359}
2360
2361fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2362    column
2363        .binary(BinOper::Custom("&"), value)
2364        .binary(BinOper::NotEqual, 0)
2365}
2366
2367fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2368    bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2369}
2370
2371pub struct SinkIntoTableContext {
2372    /// For creating sink into table, this is `Some`, otherwise `None`.
2373    pub creating_sink_id: Option<SinkId>,
2374    /// For dropping sink into table, this is `Some`, otherwise `None`.
2375    pub dropping_sink_id: Option<SinkId>,
2376    /// For alter table (e.g., add column), this is the list of existing sink ids
2377    /// otherwise empty.
2378    pub updated_sink_catalogs: Vec<SinkId>,
2379}
2380
2381async fn update_connector_props_fragments<F>(
2382    txn: &DatabaseTransaction,
2383    job_id: i32,
2384    expect_flag: FragmentTypeFlag,
2385    mut alter_stream_node_fn: F,
2386) -> MetaResult<()>
2387where
2388    F: FnMut(&mut PbNodeBody, &mut bool),
2389{
2390    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2391        .select_only()
2392        .columns([
2393            fragment::Column::FragmentId,
2394            fragment::Column::FragmentTypeMask,
2395            fragment::Column::StreamNode,
2396        ])
2397        .filter(fragment::Column::JobId.eq(job_id))
2398        .into_tuple()
2399        .all(txn)
2400        .await?;
2401    let fragments = fragments
2402        .into_iter()
2403        .filter(|(_, fragment_type_mask, _)| *fragment_type_mask & expect_flag as i32 != 0)
2404        .filter_map(|(id, _, stream_node)| {
2405            let mut stream_node = stream_node.to_protobuf();
2406            let mut found = false;
2407            visit_stream_node_mut(&mut stream_node, |node| {
2408                alter_stream_node_fn(node, &mut found);
2409            });
2410            if found { Some((id, stream_node)) } else { None }
2411        })
2412        .collect_vec();
2413    assert!(
2414        !fragments.is_empty(),
2415        "job {} (type: {:?}) should be used by at least one fragment",
2416        job_id,
2417        expect_flag
2418    );
2419
2420    for (id, stream_node) in fragments {
2421        fragment::ActiveModel {
2422            fragment_id: Set(id),
2423            stream_node: Set(StreamNode::from(&stream_node)),
2424            ..Default::default()
2425        }
2426        .update(txn)
2427        .await?;
2428    }
2429
2430    Ok(())
2431}