risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common::util::stream_graph_visitor::{
26    visit_stream_node_body, visit_stream_node_mut,
27};
28use risingwave_common::{bail, current_cluster_version};
29use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
30use risingwave_connector::error::ConnectorError;
31use risingwave_connector::sink::file_sink::fs::FsSink;
32use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
33use risingwave_connector::source::{ConnectorProperties, SplitImpl};
34use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
35use risingwave_meta_model::actor::ActorStatus;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbSink, PbTable};
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::table_fragments::PbActorStatus;
50use risingwave_pb::meta::{PbObject, PbObjectGroup};
51use risingwave_pb::plan_common::PbColumnCatalog;
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
55use risingwave_pb::stream_plan::stream_node::PbNodeBody;
56use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
57use risingwave_pb::user::PbUserInfo;
58use risingwave_sqlparser::ast::{SqlOption, Statement};
59use risingwave_sqlparser::parser::{Parser, ParserError};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::{BinOper, Expr, Query, SimpleExpr};
62use sea_orm::{
63    ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
64    IntoSimpleExpr, JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect,
65    RelationTrait, TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{ReplaceStreamJobPlan, Reschedule};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::utils::{
74    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
75    check_sink_into_table_cycle, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
76    get_internal_tables_by_id, get_table_columns, grant_default_privileges_automatically,
77    insert_fragment_relations, list_user_info_by_ids,
78};
79use crate::error::MetaErrorInner;
80use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
81use crate::model::{
82    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
83    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
84};
85use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
86use crate::{MetaError, MetaResult};
87
88impl CatalogController {
89    pub async fn create_streaming_job_obj(
90        txn: &DatabaseTransaction,
91        obj_type: ObjectType,
92        owner_id: UserId,
93        database_id: Option<DatabaseId>,
94        schema_id: Option<SchemaId>,
95        create_type: PbCreateType,
96        timezone: Option<String>,
97        streaming_parallelism: StreamingParallelism,
98        max_parallelism: usize,
99        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
100    ) -> MetaResult<ObjectId> {
101        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
102        let job = streaming_job::ActiveModel {
103            job_id: Set(obj.oid),
104            job_status: Set(JobStatus::Initial),
105            create_type: Set(create_type.into()),
106            timezone: Set(timezone),
107            parallelism: Set(streaming_parallelism),
108            max_parallelism: Set(max_parallelism as _),
109            specific_resource_group: Set(specific_resource_group),
110        };
111        job.insert(txn).await?;
112
113        Ok(obj.oid)
114    }
115
116    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
117    /// materialized view.
118    ///
119    /// Some of the fields in the given streaming job are placeholders, which will
120    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
121    #[await_tree::instrument]
122    pub async fn create_job_catalog(
123        &self,
124        streaming_job: &mut StreamingJob,
125        ctx: &StreamContext,
126        parallelism: &Option<Parallelism>,
127        max_parallelism: usize,
128        mut dependencies: HashSet<ObjectId>,
129        specific_resource_group: Option<String>,
130    ) -> MetaResult<()> {
131        let inner = self.inner.write().await;
132        let txn = inner.db.begin().await?;
133        let create_type = streaming_job.create_type();
134
135        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
136            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
137            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
138            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
139        };
140
141        ensure_user_id(streaming_job.owner() as _, &txn).await?;
142        ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
143        ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
144        check_relation_name_duplicate(
145            &streaming_job.name(),
146            streaming_job.database_id() as _,
147            streaming_job.schema_id() as _,
148            &txn,
149        )
150        .await?;
151
152        // check if any dependency is in altering status.
153        if !dependencies.is_empty() {
154            let altering_cnt = ObjectDependency::find()
155                .join(
156                    JoinType::InnerJoin,
157                    object_dependency::Relation::Object1.def(),
158                )
159                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
160                .filter(
161                    object_dependency::Column::Oid
162                        .is_in(dependencies.clone())
163                        .and(object::Column::ObjType.eq(ObjectType::Table))
164                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
165                        .and(
166                            // It means the referring table is just dummy for altering.
167                            object::Column::Oid.not_in_subquery(
168                                Query::select()
169                                    .column(table::Column::TableId)
170                                    .from(Table)
171                                    .to_owned(),
172                            ),
173                        ),
174                )
175                .count(&txn)
176                .await?;
177            if altering_cnt != 0 {
178                return Err(MetaError::permission_denied(
179                    "some dependent relations are being altered",
180                ));
181            }
182        }
183
184        match streaming_job {
185            StreamingJob::MaterializedView(table) => {
186                let job_id = Self::create_streaming_job_obj(
187                    &txn,
188                    ObjectType::Table,
189                    table.owner as _,
190                    Some(table.database_id as _),
191                    Some(table.schema_id as _),
192                    create_type,
193                    ctx.timezone.clone(),
194                    streaming_parallelism,
195                    max_parallelism,
196                    specific_resource_group,
197                )
198                .await?;
199                table.id = job_id as _;
200                let table_model: table::ActiveModel = table.clone().into();
201                Table::insert(table_model).exec(&txn).await?;
202            }
203            StreamingJob::Sink(sink, _) => {
204                if let Some(target_table_id) = sink.target_table
205                    && check_sink_into_table_cycle(
206                        target_table_id as ObjectId,
207                        dependencies.iter().cloned().collect(),
208                        &txn,
209                    )
210                    .await?
211                {
212                    bail!("Creating such a sink will result in circular dependency.");
213                }
214
215                let job_id = Self::create_streaming_job_obj(
216                    &txn,
217                    ObjectType::Sink,
218                    sink.owner as _,
219                    Some(sink.database_id as _),
220                    Some(sink.schema_id as _),
221                    create_type,
222                    ctx.timezone.clone(),
223                    streaming_parallelism,
224                    max_parallelism,
225                    specific_resource_group,
226                )
227                .await?;
228                sink.id = job_id as _;
229                let sink_model: sink::ActiveModel = sink.clone().into();
230                Sink::insert(sink_model).exec(&txn).await?;
231            }
232            StreamingJob::Table(src, table, _) => {
233                let job_id = Self::create_streaming_job_obj(
234                    &txn,
235                    ObjectType::Table,
236                    table.owner as _,
237                    Some(table.database_id as _),
238                    Some(table.schema_id as _),
239                    create_type,
240                    ctx.timezone.clone(),
241                    streaming_parallelism,
242                    max_parallelism,
243                    specific_resource_group,
244                )
245                .await?;
246                table.id = job_id as _;
247                if let Some(src) = src {
248                    let src_obj = Self::create_object(
249                        &txn,
250                        ObjectType::Source,
251                        src.owner as _,
252                        Some(src.database_id as _),
253                        Some(src.schema_id as _),
254                    )
255                    .await?;
256                    src.id = src_obj.oid as _;
257                    src.optional_associated_table_id =
258                        Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
259                    table.optional_associated_source_id = Some(
260                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
261                    );
262                    let source: source::ActiveModel = src.clone().into();
263                    Source::insert(source).exec(&txn).await?;
264                }
265                let table_model: table::ActiveModel = table.clone().into();
266                Table::insert(table_model).exec(&txn).await?;
267            }
268            StreamingJob::Index(index, table) => {
269                ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
270                let job_id = Self::create_streaming_job_obj(
271                    &txn,
272                    ObjectType::Index,
273                    index.owner as _,
274                    Some(index.database_id as _),
275                    Some(index.schema_id as _),
276                    create_type,
277                    ctx.timezone.clone(),
278                    streaming_parallelism,
279                    max_parallelism,
280                    specific_resource_group,
281                )
282                .await?;
283                // to be compatible with old implementation.
284                index.id = job_id as _;
285                index.index_table_id = job_id as _;
286                table.id = job_id as _;
287
288                ObjectDependency::insert(object_dependency::ActiveModel {
289                    oid: Set(index.primary_table_id as _),
290                    used_by: Set(table.id as _),
291                    ..Default::default()
292                })
293                .exec(&txn)
294                .await?;
295
296                let table_model: table::ActiveModel = table.clone().into();
297                Table::insert(table_model).exec(&txn).await?;
298                let index_model: index::ActiveModel = index.clone().into();
299                Index::insert(index_model).exec(&txn).await?;
300            }
301            StreamingJob::Source(src) => {
302                let job_id = Self::create_streaming_job_obj(
303                    &txn,
304                    ObjectType::Source,
305                    src.owner as _,
306                    Some(src.database_id as _),
307                    Some(src.schema_id as _),
308                    create_type,
309                    ctx.timezone.clone(),
310                    streaming_parallelism,
311                    max_parallelism,
312                    specific_resource_group,
313                )
314                .await?;
315                src.id = job_id as _;
316                let source_model: source::ActiveModel = src.clone().into();
317                Source::insert(source_model).exec(&txn).await?;
318            }
319        }
320
321        // collect dependent secrets.
322        dependencies.extend(
323            streaming_job
324                .dependent_secret_ids()?
325                .into_iter()
326                .map(|secret_id| secret_id as ObjectId),
327        );
328        // collect dependent connection
329        dependencies.extend(
330            streaming_job
331                .dependent_connection_ids()?
332                .into_iter()
333                .map(|conn_id| conn_id as ObjectId),
334        );
335
336        // record object dependency.
337        if !dependencies.is_empty() {
338            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
339                object_dependency::ActiveModel {
340                    oid: Set(oid),
341                    used_by: Set(streaming_job.id() as _),
342                    ..Default::default()
343                }
344            }))
345            .exec(&txn)
346            .await?;
347        }
348
349        txn.commit().await?;
350
351        Ok(())
352    }
353
354    /// Create catalogs for internal tables, then notify frontend about them if the job is a
355    /// materialized view.
356    ///
357    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
358    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
359    ///
360    /// Returns a mapping from the temporary table id to the actual global table id.
361    pub async fn create_internal_table_catalog(
362        &self,
363        job: &StreamingJob,
364        mut incomplete_internal_tables: Vec<PbTable>,
365    ) -> MetaResult<HashMap<u32, u32>> {
366        let job_id = job.id() as ObjectId;
367        let inner = self.inner.write().await;
368        let txn = inner.db.begin().await?;
369        let mut table_id_map = HashMap::new();
370        for table in &mut incomplete_internal_tables {
371            let table_id = Self::create_object(
372                &txn,
373                ObjectType::Table,
374                table.owner as _,
375                Some(table.database_id as _),
376                Some(table.schema_id as _),
377            )
378            .await?
379            .oid;
380            table_id_map.insert(table.id, table_id as u32);
381            table.id = table_id as _;
382            table.job_id = Some(job_id as _);
383
384            let table_model = table::ActiveModel {
385                table_id: Set(table_id as _),
386                belongs_to_job_id: Set(Some(job_id)),
387                fragment_id: NotSet,
388                ..table.clone().into()
389            };
390            Table::insert(table_model).exec(&txn).await?;
391        }
392        txn.commit().await?;
393
394        Ok(table_id_map)
395    }
396
397    pub async fn prepare_stream_job_fragments(
398        &self,
399        stream_job_fragments: &StreamJobFragmentsToCreate,
400        streaming_job: &StreamingJob,
401        for_replace: bool,
402    ) -> MetaResult<()> {
403        let need_notify = streaming_job.should_notify_creating();
404        let (sink, table) = match streaming_job {
405            StreamingJob::Sink(sink, _) => (Some(sink), None),
406            StreamingJob::Table(_, table, _) => (None, Some(table)),
407            StreamingJob::Index(_, _)
408            | StreamingJob::Source(_)
409            | StreamingJob::MaterializedView(_) => (None, None),
410        };
411        self.prepare_streaming_job(
412            stream_job_fragments.stream_job_id().table_id as _,
413            || stream_job_fragments.fragments.values(),
414            &stream_job_fragments.actor_status,
415            &stream_job_fragments.actor_splits,
416            &stream_job_fragments.downstreams,
417            need_notify,
418            streaming_job.definition(),
419            for_replace,
420            sink,
421            table,
422        )
423        .await
424    }
425
426    // TODO: In this function, we also update the `Table` model in the meta store.
427    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
428    // making them the source of truth and performing a full replacement for those in the meta store?
429    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
430    #[expect(clippy::too_many_arguments)]
431    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
432    )]
433    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
434        &self,
435        job_id: ObjectId,
436        get_fragments: impl Fn() -> I + 'a,
437        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
438        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
439        downstreams: &FragmentDownstreamRelation,
440        need_notify: bool,
441        definition: String,
442        for_replace: bool,
443        sink: Option<&PbSink>,
444        table: Option<&PbTable>,
445    ) -> MetaResult<()> {
446        let fragment_actors = Self::extract_fragment_and_actors_from_fragments(
447            job_id,
448            get_fragments(),
449            actor_status,
450            actor_splits,
451        )?;
452        let inner = self.inner.write().await;
453
454        let mut objects_to_notify = if need_notify { Some(vec![]) } else { None };
455        let txn = inner.db.begin().await?;
456
457        // Add fragments.
458        let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
459        for fragment in fragments {
460            let fragment_id = fragment.fragment_id;
461            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
462
463            let fragment = fragment.into_active_model();
464            Fragment::insert(fragment).exec(&txn).await?;
465
466            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
467            // After table fragments are created, update them for all tables.
468            if !for_replace {
469                let all_tables = StreamJobFragments::collect_tables(get_fragments());
470                for state_table_id in state_table_ids {
471                    // Table's vnode count is not always the fragment's vnode count, so we have to
472                    // look up the table from `TableFragments`.
473                    // See `ActorGraphBuilder::new`.
474                    let table = all_tables
475                        .get(&(state_table_id as u32))
476                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
477                    assert_eq!(table.id, state_table_id as u32);
478                    assert_eq!(table.fragment_id, fragment_id as u32);
479                    let vnode_count = table.vnode_count();
480
481                    table::ActiveModel {
482                        table_id: Set(state_table_id as _),
483                        fragment_id: Set(Some(fragment_id)),
484                        vnode_count: Set(vnode_count as _),
485                        ..Default::default()
486                    }
487                    .update(&txn)
488                    .await?;
489
490                    if let Some(objects) = &mut objects_to_notify {
491                        let mut table = table.clone();
492                        // In production, definition was replaced but still needed for notification.
493                        if cfg!(not(debug_assertions)) && table.id == job_id as u32 {
494                            table.definition = definition.clone();
495                        }
496                        objects.push(PbObject {
497                            object_info: Some(PbObjectInfo::Table(table)),
498                        });
499                    }
500                }
501            }
502        }
503
504        if let Some(objects) = &mut objects_to_notify
505            && let Some(sink) = sink
506        {
507            objects.push(PbObject {
508                object_info: Some(PbObjectInfo::Sink(sink.clone())),
509            })
510        }
511
512        insert_fragment_relations(&txn, downstreams).await?;
513
514        // Add actors and actor dispatchers.
515        for actors in actors {
516            for actor in actors {
517                let actor = actor.into_active_model();
518                Actor::insert(actor).exec(&txn).await?;
519            }
520        }
521
522        if !for_replace {
523            // Update dml fragment id.
524            if let Some(table) = table {
525                Table::update(table::ActiveModel {
526                    table_id: Set(table.id as _),
527                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
528                    ..Default::default()
529                })
530                .exec(&txn)
531                .await?;
532            }
533        }
534
535        txn.commit().await?;
536
537        if let Some(objects) = objects_to_notify
538            && !objects.is_empty()
539        {
540            self.notify_frontend(Operation::Add, Info::ObjectGroup(PbObjectGroup { objects }))
541                .await;
542        }
543
544        Ok(())
545    }
546
547    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
548    /// It returns (true, _) if the job is not found or aborted.
549    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
550    #[await_tree::instrument]
551    pub async fn try_abort_creating_streaming_job(
552        &self,
553        job_id: ObjectId,
554        is_cancelled: bool,
555    ) -> MetaResult<(bool, Option<DatabaseId>)> {
556        let mut inner = self.inner.write().await;
557        let txn = inner.db.begin().await?;
558
559        let obj = Object::find_by_id(job_id).one(&txn).await?;
560        let Some(obj) = obj else {
561            tracing::warn!(
562                id = job_id,
563                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
564            );
565            return Ok((true, None));
566        };
567        let database_id = obj
568            .database_id
569            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
570        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
571
572        if !is_cancelled && let Some(streaming_job) = &streaming_job {
573            assert_ne!(streaming_job.job_status, JobStatus::Created);
574            if streaming_job.create_type == CreateType::Background
575                && streaming_job.job_status == JobStatus::Creating
576            {
577                // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
578                tracing::warn!(
579                    id = job_id,
580                    "streaming job is created in background and still in creating status"
581                );
582                return Ok((false, Some(database_id)));
583            }
584        }
585
586        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
587
588        // Get the notification info if the job is a materialized view or created in the background.
589        let mut objs = vec![];
590        let table_obj = Table::find_by_id(job_id).one(&txn).await?;
591        let need_notify = if let Some(table) = &table_obj {
592            // If the job is a materialized view, we need to notify the frontend.
593            table.table_type == TableType::MaterializedView
594        } else {
595            streaming_job.is_some_and(|job| job.create_type == CreateType::Background)
596        };
597        if need_notify {
598            let obj: Option<PartialObject> = Object::find_by_id(job_id)
599                .select_only()
600                .columns([
601                    object::Column::Oid,
602                    object::Column::ObjType,
603                    object::Column::SchemaId,
604                    object::Column::DatabaseId,
605                ])
606                .into_partial_model()
607                .one(&txn)
608                .await?;
609            let obj =
610                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
611            objs.push(obj);
612            let internal_table_objs: Vec<PartialObject> = Object::find()
613                .select_only()
614                .columns([
615                    object::Column::Oid,
616                    object::Column::ObjType,
617                    object::Column::SchemaId,
618                    object::Column::DatabaseId,
619                ])
620                .join(JoinType::InnerJoin, object::Relation::Table.def())
621                .filter(table::Column::BelongsToJobId.eq(job_id))
622                .into_partial_model()
623                .all(&txn)
624                .await?;
625            objs.extend(internal_table_objs);
626        }
627
628        // Check if the job is creating sink into table.
629        if table_obj.is_none()
630            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
631                .select_only()
632                .column(sink::Column::TargetTable)
633                .into_tuple::<Option<TableId>>()
634                .one(&txn)
635                .await?
636        {
637            let tmp_id: Option<ObjectId> = ObjectDependency::find()
638                .select_only()
639                .column(object_dependency::Column::UsedBy)
640                .join(
641                    JoinType::InnerJoin,
642                    object_dependency::Relation::Object1.def(),
643                )
644                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
645                .filter(
646                    object_dependency::Column::Oid
647                        .eq(target_table_id)
648                        .and(object::Column::ObjType.eq(ObjectType::Table))
649                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
650                )
651                .into_tuple()
652                .one(&txn)
653                .await?;
654            if let Some(tmp_id) = tmp_id {
655                tracing::warn!(
656                    id = tmp_id,
657                    "aborting temp streaming job for sink into table"
658                );
659                Object::delete_by_id(tmp_id).exec(&txn).await?;
660            }
661        }
662
663        Object::delete_by_id(job_id).exec(&txn).await?;
664        if !internal_table_ids.is_empty() {
665            Object::delete_many()
666                .filter(object::Column::Oid.is_in(internal_table_ids))
667                .exec(&txn)
668                .await?;
669        }
670        if let Some(t) = &table_obj
671            && let Some(source_id) = t.optional_associated_source_id
672        {
673            Object::delete_by_id(source_id).exec(&txn).await?;
674        }
675
676        let err = if is_cancelled {
677            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
678        } else {
679            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
680        };
681        let abort_reason = format!("streaming job aborted {}", err.as_report());
682        for tx in inner
683            .creating_table_finish_notifier
684            .get_mut(&database_id)
685            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
686            .into_iter()
687            .flatten()
688            .flatten()
689        {
690            let _ = tx.send(Err(abort_reason.clone()));
691        }
692        txn.commit().await?;
693
694        if !objs.is_empty() {
695            // We also have notified the frontend about these objects,
696            // so we need to notify the frontend to delete them here.
697            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
698                .await;
699        }
700        Ok((true, Some(database_id)))
701    }
702
703    #[await_tree::instrument]
704    pub async fn post_collect_job_fragments(
705        &self,
706        job_id: ObjectId,
707        actor_ids: Vec<crate::model::ActorId>,
708        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
709        split_assignment: &SplitAssignment,
710        replace_plan: Option<&ReplaceStreamJobPlan>,
711    ) -> MetaResult<()> {
712        let inner = self.inner.write().await;
713        let txn = inner.db.begin().await?;
714
715        let actor_ids = actor_ids
716            .into_iter()
717            .chain(
718                replace_plan
719                    .iter()
720                    .flat_map(|plan| plan.new_fragments.actor_ids().into_iter()),
721            )
722            .map(|id| id as ActorId)
723            .collect_vec();
724
725        Actor::update_many()
726            .col_expr(
727                actor::Column::Status,
728                SimpleExpr::from(ActorStatus::Running.into_value()),
729            )
730            .filter(actor::Column::ActorId.is_in(actor_ids))
731            .exec(&txn)
732            .await?;
733
734        for splits in split_assignment.values().chain(
735            replace_plan
736                .as_ref()
737                .map(|plan| plan.init_split_assignment.values())
738                .into_iter()
739                .flatten(),
740        ) {
741            for (actor_id, splits) in splits {
742                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
743                let connector_splits = &PbConnectorSplits { splits };
744                actor::ActiveModel {
745                    actor_id: Set(*actor_id as _),
746                    splits: Set(Some(connector_splits.into())),
747                    ..Default::default()
748                }
749                .update(&txn)
750                .await?;
751            }
752        }
753
754        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
755        let objects = if let Some(plan) = &replace_plan {
756            insert_fragment_relations(&txn, &plan.upstream_fragment_downstreams).await?;
757
758            let incoming_sink_id = job_id;
759            let (objects, _) = Self::finish_replace_streaming_job_inner(
760                plan.tmp_id as _,
761                plan.replace_upstream.clone(),
762                SinkIntoTableContext {
763                    creating_sink_id: Some(incoming_sink_id),
764                    dropping_sink_id: None,
765                    updated_sink_catalogs: vec![],
766                },
767                &txn,
768                plan.streaming_job.clone(),
769                None,
770                None,
771            )
772            .await?;
773            objects
774        } else {
775            vec![]
776        };
777
778        // Mark job as CREATING.
779        streaming_job::ActiveModel {
780            job_id: Set(job_id),
781            job_status: Set(JobStatus::Creating),
782            ..Default::default()
783        }
784        .update(&txn)
785        .await?;
786
787        txn.commit().await?;
788        if !objects.is_empty() {
789            self.notify_frontend(
790                NotificationOperation::Update,
791                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
792            )
793            .await;
794        }
795
796        Ok(())
797    }
798
799    pub async fn create_job_catalog_for_replace(
800        &self,
801        streaming_job: &StreamingJob,
802        ctx: Option<&StreamContext>,
803        specified_parallelism: Option<&NonZeroUsize>,
804        expected_original_max_parallelism: Option<usize>,
805    ) -> MetaResult<ObjectId> {
806        let id = streaming_job.id();
807        let inner = self.inner.write().await;
808        let txn = inner.db.begin().await?;
809
810        // 1. check version.
811        streaming_job.verify_version_for_replace(&txn).await?;
812        // 2. check concurrent replace.
813        let referring_cnt = ObjectDependency::find()
814            .join(
815                JoinType::InnerJoin,
816                object_dependency::Relation::Object1.def(),
817            )
818            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
819            .filter(
820                object_dependency::Column::Oid
821                    .eq(id as ObjectId)
822                    .and(object::Column::ObjType.eq(ObjectType::Table))
823                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
824            )
825            .count(&txn)
826            .await?;
827        if referring_cnt != 0 {
828            return Err(MetaError::permission_denied(
829                "job is being altered or referenced by some creating jobs",
830            ));
831        }
832
833        // 3. check parallelism.
834        let (original_max_parallelism, original_timezone): (i32, Option<String>) =
835            StreamingJobModel::find_by_id(id as ObjectId)
836                .select_only()
837                .column(streaming_job::Column::MaxParallelism)
838                .column(streaming_job::Column::Timezone)
839                .into_tuple()
840                .one(&txn)
841                .await?
842                .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
843
844        if let Some(max_parallelism) = expected_original_max_parallelism
845            && original_max_parallelism != max_parallelism as i32
846        {
847            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
848            // This should not happen in normal cases.
849            bail!(
850                "cannot use a different max parallelism \
851                 when replacing streaming job, \
852                 original: {}, new: {}",
853                original_max_parallelism,
854                max_parallelism
855            );
856        }
857
858        let parallelism = match specified_parallelism {
859            None => StreamingParallelism::Adaptive,
860            Some(n) => StreamingParallelism::Fixed(n.get() as _),
861        };
862        let timezone = ctx
863            .map(|ctx| ctx.timezone.clone())
864            .unwrap_or(original_timezone);
865
866        // 4. create streaming object for new replace table.
867        let new_obj_id = Self::create_streaming_job_obj(
868            &txn,
869            streaming_job.object_type(),
870            streaming_job.owner() as _,
871            Some(streaming_job.database_id() as _),
872            Some(streaming_job.schema_id() as _),
873            streaming_job.create_type(),
874            timezone,
875            parallelism,
876            original_max_parallelism as _,
877            None,
878        )
879        .await?;
880
881        // 5. record dependency for new replace table.
882        ObjectDependency::insert(object_dependency::ActiveModel {
883            oid: Set(id as _),
884            used_by: Set(new_obj_id as _),
885            ..Default::default()
886        })
887        .exec(&txn)
888        .await?;
889
890        txn.commit().await?;
891
892        Ok(new_obj_id)
893    }
894
895    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
896    pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> {
897        let mut inner = self.inner.write().await;
898        let txn = inner.db.begin().await?;
899
900        let job_type = Object::find_by_id(job_id)
901            .select_only()
902            .column(object::Column::ObjType)
903            .into_tuple()
904            .one(&txn)
905            .await?
906            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
907
908        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
909            .select_only()
910            .column(streaming_job::Column::CreateType)
911            .into_tuple()
912            .one(&txn)
913            .await?
914            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
915
916        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
917        let res = Object::update_many()
918            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
919            .col_expr(
920                object::Column::CreatedAtClusterVersion,
921                current_cluster_version().into(),
922            )
923            .filter(object::Column::Oid.eq(job_id))
924            .exec(&txn)
925            .await?;
926        if res.rows_affected == 0 {
927            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
928        }
929
930        // mark the target stream job as `Created`.
931        let job = streaming_job::ActiveModel {
932            job_id: Set(job_id),
933            job_status: Set(JobStatus::Created),
934            ..Default::default()
935        };
936        job.update(&txn).await?;
937
938        // notify frontend: job, internal tables.
939        let internal_table_objs = Table::find()
940            .find_also_related(Object)
941            .filter(table::Column::BelongsToJobId.eq(job_id))
942            .all(&txn)
943            .await?;
944        let mut objects = internal_table_objs
945            .iter()
946            .map(|(table, obj)| PbObject {
947                object_info: Some(PbObjectInfo::Table(
948                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
949                )),
950            })
951            .collect_vec();
952        let mut notification_op = if create_type == CreateType::Background {
953            NotificationOperation::Update
954        } else {
955            NotificationOperation::Add
956        };
957        let mut updated_user_info = vec![];
958
959        match job_type {
960            ObjectType::Table => {
961                let (table, obj) = Table::find_by_id(job_id)
962                    .find_also_related(Object)
963                    .one(&txn)
964                    .await?
965                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
966                if table.table_type == TableType::MaterializedView {
967                    notification_op = NotificationOperation::Update;
968                }
969
970                if let Some(source_id) = table.optional_associated_source_id {
971                    let (src, obj) = Source::find_by_id(source_id)
972                        .find_also_related(Object)
973                        .one(&txn)
974                        .await?
975                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
976                    objects.push(PbObject {
977                        object_info: Some(PbObjectInfo::Source(
978                            ObjectModel(src, obj.unwrap()).into(),
979                        )),
980                    });
981                }
982                objects.push(PbObject {
983                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
984                });
985            }
986            ObjectType::Sink => {
987                let (sink, obj) = Sink::find_by_id(job_id)
988                    .find_also_related(Object)
989                    .one(&txn)
990                    .await?
991                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
992                objects.push(PbObject {
993                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
994                });
995            }
996            ObjectType::Index => {
997                let (index, obj) = Index::find_by_id(job_id)
998                    .find_also_related(Object)
999                    .one(&txn)
1000                    .await?
1001                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1002                {
1003                    let (table, obj) = Table::find_by_id(index.index_table_id)
1004                        .find_also_related(Object)
1005                        .one(&txn)
1006                        .await?
1007                        .ok_or_else(|| {
1008                            MetaError::catalog_id_not_found("table", index.index_table_id)
1009                        })?;
1010                    objects.push(PbObject {
1011                        object_info: Some(PbObjectInfo::Table(
1012                            ObjectModel(table, obj.unwrap()).into(),
1013                        )),
1014                    });
1015                }
1016
1017                // If the index is created on a table with privileges, we should also
1018                // grant the privileges for the index and its state tables.
1019                let primary_table_privileges = UserPrivilege::find()
1020                    .filter(
1021                        user_privilege::Column::Oid
1022                            .eq(index.primary_table_id)
1023                            .and(user_privilege::Column::Action.eq(Action::Select)),
1024                    )
1025                    .all(&txn)
1026                    .await?;
1027                if !primary_table_privileges.is_empty() {
1028                    let index_state_table_ids: Vec<TableId> = Table::find()
1029                        .select_only()
1030                        .column(table::Column::TableId)
1031                        .filter(
1032                            table::Column::BelongsToJobId
1033                                .eq(job_id)
1034                                .or(table::Column::TableId.eq(index.index_table_id)),
1035                        )
1036                        .into_tuple()
1037                        .all(&txn)
1038                        .await?;
1039                    let mut new_privileges = vec![];
1040                    for privilege in &primary_table_privileges {
1041                        for state_table_id in &index_state_table_ids {
1042                            new_privileges.push(user_privilege::ActiveModel {
1043                                id: Default::default(),
1044                                oid: Set(*state_table_id),
1045                                user_id: Set(privilege.user_id),
1046                                action: Set(Action::Select),
1047                                dependent_id: Set(privilege.dependent_id),
1048                                granted_by: Set(privilege.granted_by),
1049                                with_grant_option: Set(privilege.with_grant_option),
1050                            });
1051                        }
1052                    }
1053                    UserPrivilege::insert_many(new_privileges)
1054                        .exec(&txn)
1055                        .await?;
1056
1057                    updated_user_info = list_user_info_by_ids(
1058                        primary_table_privileges.into_iter().map(|p| p.user_id),
1059                        &txn,
1060                    )
1061                    .await?;
1062                }
1063
1064                objects.push(PbObject {
1065                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1066                });
1067            }
1068            ObjectType::Source => {
1069                let (source, obj) = Source::find_by_id(job_id)
1070                    .find_also_related(Object)
1071                    .one(&txn)
1072                    .await?
1073                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1074                objects.push(PbObject {
1075                    object_info: Some(PbObjectInfo::Source(
1076                        ObjectModel(source, obj.unwrap()).into(),
1077                    )),
1078                });
1079            }
1080            _ => unreachable!("invalid job type: {:?}", job_type),
1081        }
1082
1083        if job_type != ObjectType::Index {
1084            updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1085        }
1086        txn.commit().await?;
1087
1088        let mut version = self
1089            .notify_frontend(
1090                notification_op,
1091                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1092            )
1093            .await;
1094
1095        // notify users about the default privileges
1096        if !updated_user_info.is_empty() {
1097            version = self.notify_users_update(updated_user_info).await;
1098        }
1099
1100        inner
1101            .creating_table_finish_notifier
1102            .values_mut()
1103            .for_each(|creating_tables| {
1104                if let Some(txs) = creating_tables.remove(&job_id) {
1105                    for tx in txs {
1106                        let _ = tx.send(Ok(version));
1107                    }
1108                }
1109            });
1110
1111        Ok(())
1112    }
1113
1114    pub async fn finish_replace_streaming_job(
1115        &self,
1116        tmp_id: ObjectId,
1117        streaming_job: StreamingJob,
1118        replace_upstream: FragmentReplaceUpstream,
1119        sink_into_table_context: SinkIntoTableContext,
1120        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1121        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1122    ) -> MetaResult<NotificationVersion> {
1123        let inner = self.inner.write().await;
1124        let txn = inner.db.begin().await?;
1125
1126        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1127            tmp_id,
1128            replace_upstream,
1129            sink_into_table_context,
1130            &txn,
1131            streaming_job,
1132            drop_table_connector_ctx,
1133            auto_refresh_schema_sinks,
1134        )
1135        .await?;
1136
1137        txn.commit().await?;
1138
1139        let mut version = self
1140            .notify_frontend(
1141                NotificationOperation::Update,
1142                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1143            )
1144            .await;
1145
1146        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1147            self.notify_users_update(user_infos).await;
1148            version = self
1149                .notify_frontend(
1150                    NotificationOperation::Delete,
1151                    build_object_group_for_delete(to_drop_objects),
1152                )
1153                .await;
1154        }
1155
1156        Ok(version)
1157    }
1158
1159    pub async fn finish_replace_streaming_job_inner(
1160        tmp_id: ObjectId,
1161        replace_upstream: FragmentReplaceUpstream,
1162        SinkIntoTableContext {
1163            creating_sink_id,
1164            dropping_sink_id,
1165            updated_sink_catalogs,
1166        }: SinkIntoTableContext,
1167        txn: &DatabaseTransaction,
1168        streaming_job: StreamingJob,
1169        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1170        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1171    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1172        let original_job_id = streaming_job.id() as ObjectId;
1173        let job_type = streaming_job.job_type();
1174
1175        let mut index_item_rewriter = None;
1176
1177        // Update catalog
1178        match streaming_job {
1179            StreamingJob::Table(_source, table, _table_job_type) => {
1180                // The source catalog should remain unchanged
1181
1182                let original_column_catalogs = get_table_columns(txn, original_job_id).await?;
1183
1184                index_item_rewriter = Some({
1185                    let original_columns = original_column_catalogs
1186                        .to_protobuf()
1187                        .into_iter()
1188                        .map(|c| c.column_desc.unwrap())
1189                        .collect_vec();
1190                    let new_columns = table
1191                        .columns
1192                        .iter()
1193                        .map(|c| c.column_desc.clone().unwrap())
1194                        .collect_vec();
1195
1196                    IndexItemRewriter {
1197                        original_columns,
1198                        new_columns,
1199                    }
1200                });
1201
1202                // For sinks created in earlier versions, we need to set the original_target_columns.
1203                for sink_id in updated_sink_catalogs {
1204                    sink::ActiveModel {
1205                        sink_id: Set(sink_id as _),
1206                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1207                        ..Default::default()
1208                    }
1209                    .update(txn)
1210                    .await?;
1211                }
1212                // Update the table catalog with the new one. (column catalog is also updated here)
1213                let mut table = table::ActiveModel::from(table);
1214                let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone();
1215                if let Some(sink_id) = creating_sink_id {
1216                    debug_assert!(!incoming_sinks.contains(&{ sink_id }));
1217                    incoming_sinks.push(sink_id as _);
1218                }
1219                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1220                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1221                {
1222                    // drop table connector, the rest logic is in `drop_table_associated_source`
1223                    table.optional_associated_source_id = Set(None);
1224                }
1225
1226                if let Some(sink_id) = dropping_sink_id {
1227                    let _drained = incoming_sinks
1228                        .extract_if(.., |id| *id == sink_id)
1229                        .collect_vec();
1230                    // TODO(august): re-enable this assertion after refactoring sink into table
1231                    // debug_assert_eq!(drained, vec![sink_id]);
1232                }
1233
1234                table.incoming_sinks = Set(incoming_sinks.into());
1235                table.update(txn).await?;
1236            }
1237            StreamingJob::Source(source) => {
1238                // Update the source catalog with the new one.
1239                let source = source::ActiveModel::from(source);
1240                source.update(txn).await?;
1241            }
1242            StreamingJob::MaterializedView(table) => {
1243                // Update the table catalog with the new one.
1244                let table = table::ActiveModel::from(table);
1245                table.update(txn).await?;
1246            }
1247            _ => unreachable!(
1248                "invalid streaming job type: {:?}",
1249                streaming_job.job_type_str()
1250            ),
1251        }
1252
1253        async fn finish_fragments(
1254            txn: &DatabaseTransaction,
1255            tmp_id: ObjectId,
1256            original_job_id: ObjectId,
1257            replace_upstream: FragmentReplaceUpstream,
1258        ) -> MetaResult<()> {
1259            // 0. update internal tables
1260            // Fields including `fragment_id` were placeholder values before.
1261            // After table fragments are created, update them for all internal tables.
1262            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1263                .select_only()
1264                .columns([
1265                    fragment::Column::FragmentId,
1266                    fragment::Column::StateTableIds,
1267                ])
1268                .filter(fragment::Column::JobId.eq(tmp_id))
1269                .into_tuple()
1270                .all(txn)
1271                .await?;
1272            for (fragment_id, state_table_ids) in fragment_info {
1273                for state_table_id in state_table_ids.into_inner() {
1274                    table::ActiveModel {
1275                        table_id: Set(state_table_id as _),
1276                        fragment_id: Set(Some(fragment_id)),
1277                        // No need to update `vnode_count` because it must remain the same.
1278                        ..Default::default()
1279                    }
1280                    .update(txn)
1281                    .await?;
1282                }
1283            }
1284
1285            // 1. replace old fragments/actors with new ones.
1286            Fragment::delete_many()
1287                .filter(fragment::Column::JobId.eq(original_job_id))
1288                .exec(txn)
1289                .await?;
1290            Fragment::update_many()
1291                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1292                .filter(fragment::Column::JobId.eq(tmp_id))
1293                .exec(txn)
1294                .await?;
1295
1296            // 2. update merges.
1297            // update downstream fragment's Merge node, and upstream_fragment_id
1298            for (fragment_id, fragment_replace_map) in replace_upstream {
1299                let (fragment_id, mut stream_node) =
1300                    Fragment::find_by_id(fragment_id as FragmentId)
1301                        .select_only()
1302                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1303                        .into_tuple::<(FragmentId, StreamNode)>()
1304                        .one(txn)
1305                        .await?
1306                        .map(|(id, node)| (id, node.to_protobuf()))
1307                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1308
1309                visit_stream_node_mut(&mut stream_node, |body| {
1310                    if let PbNodeBody::Merge(m) = body
1311                        && let Some(new_fragment_id) =
1312                            fragment_replace_map.get(&m.upstream_fragment_id)
1313                    {
1314                        m.upstream_fragment_id = *new_fragment_id;
1315                    }
1316                });
1317                fragment::ActiveModel {
1318                    fragment_id: Set(fragment_id),
1319                    stream_node: Set(StreamNode::from(&stream_node)),
1320                    ..Default::default()
1321                }
1322                .update(txn)
1323                .await?;
1324            }
1325
1326            // 3. remove dummy object.
1327            Object::delete_by_id(tmp_id).exec(txn).await?;
1328
1329            Ok(())
1330        }
1331
1332        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1333
1334        // 4. update catalogs and notify.
1335        let mut objects = vec![];
1336        match job_type {
1337            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1338                let (table, table_obj) = Table::find_by_id(original_job_id)
1339                    .find_also_related(Object)
1340                    .one(txn)
1341                    .await?
1342                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1343                objects.push(PbObject {
1344                    object_info: Some(PbObjectInfo::Table(
1345                        ObjectModel(table, table_obj.unwrap()).into(),
1346                    )),
1347                })
1348            }
1349            StreamingJobType::Source => {
1350                let (source, source_obj) = Source::find_by_id(original_job_id)
1351                    .find_also_related(Object)
1352                    .one(txn)
1353                    .await?
1354                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1355                objects.push(PbObject {
1356                    object_info: Some(PbObjectInfo::Source(
1357                        ObjectModel(source, source_obj.unwrap()).into(),
1358                    )),
1359                })
1360            }
1361            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1362        }
1363
1364        if let Some(expr_rewriter) = index_item_rewriter {
1365            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1366                .select_only()
1367                .columns([index::Column::IndexId, index::Column::IndexItems])
1368                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1369                .into_tuple()
1370                .all(txn)
1371                .await?;
1372            for (index_id, nodes) in index_items {
1373                let mut pb_nodes = nodes.to_protobuf();
1374                pb_nodes
1375                    .iter_mut()
1376                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1377                let index = index::ActiveModel {
1378                    index_id: Set(index_id),
1379                    index_items: Set(pb_nodes.into()),
1380                    ..Default::default()
1381                }
1382                .update(txn)
1383                .await?;
1384                let index_obj = index
1385                    .find_related(Object)
1386                    .one(txn)
1387                    .await?
1388                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1389                objects.push(PbObject {
1390                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1391                });
1392            }
1393        }
1394
1395        if let Some(sinks) = auto_refresh_schema_sinks {
1396            for finish_sink_context in sinks {
1397                finish_fragments(
1398                    txn,
1399                    finish_sink_context.tmp_sink_id,
1400                    finish_sink_context.original_sink_id,
1401                    Default::default(),
1402                )
1403                .await?;
1404                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1405                    .find_also_related(Object)
1406                    .one(txn)
1407                    .await?
1408                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1409                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1410                Sink::update(sink::ActiveModel {
1411                    sink_id: Set(finish_sink_context.original_sink_id),
1412                    columns: Set(columns.clone()),
1413                    ..Default::default()
1414                })
1415                .exec(txn)
1416                .await?;
1417                sink.columns = columns;
1418                objects.push(PbObject {
1419                    object_info: Some(PbObjectInfo::Sink(
1420                        ObjectModel(sink, sink_obj.unwrap()).into(),
1421                    )),
1422                });
1423                if let Some((log_store_table_id, new_log_store_table_columns)) =
1424                    finish_sink_context.new_log_store_table
1425                {
1426                    let new_log_store_table_columns: ColumnCatalogArray =
1427                        new_log_store_table_columns.into();
1428                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1429                        .find_also_related(Object)
1430                        .one(txn)
1431                        .await?
1432                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1433                    Table::update(table::ActiveModel {
1434                        table_id: Set(log_store_table_id),
1435                        columns: Set(new_log_store_table_columns.clone()),
1436                        ..Default::default()
1437                    })
1438                    .exec(txn)
1439                    .await?;
1440                    table.columns = new_log_store_table_columns;
1441                    objects.push(PbObject {
1442                        object_info: Some(PbObjectInfo::Table(
1443                            ObjectModel(table, table_obj.unwrap()).into(),
1444                        )),
1445                    });
1446                }
1447            }
1448        }
1449
1450        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1451        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1452            notification_objs =
1453                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1454        }
1455
1456        Ok((objects, notification_objs))
1457    }
1458
1459    /// Abort the replacing streaming job by deleting the temporary job object.
1460    pub async fn try_abort_replacing_streaming_job(
1461        &self,
1462        tmp_job_id: ObjectId,
1463        tmp_sink_ids: Option<Vec<ObjectId>>,
1464    ) -> MetaResult<()> {
1465        let inner = self.inner.write().await;
1466        let txn = inner.db.begin().await?;
1467        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1468        if let Some(tmp_sink_ids) = tmp_sink_ids {
1469            for tmp_sink_id in tmp_sink_ids {
1470                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1471            }
1472        }
1473        txn.commit().await?;
1474        Ok(())
1475    }
1476
1477    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1478    // return the actor_ids to be applied
1479    pub async fn update_source_rate_limit_by_source_id(
1480        &self,
1481        source_id: SourceId,
1482        rate_limit: Option<u32>,
1483    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1484        let inner = self.inner.read().await;
1485        let txn = inner.db.begin().await?;
1486
1487        {
1488            let active_source = source::ActiveModel {
1489                source_id: Set(source_id),
1490                rate_limit: Set(rate_limit.map(|v| v as i32)),
1491                ..Default::default()
1492            };
1493            active_source.update(&txn).await?;
1494        }
1495
1496        let (source, obj) = Source::find_by_id(source_id)
1497            .find_also_related(Object)
1498            .one(&txn)
1499            .await?
1500            .ok_or_else(|| {
1501                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1502            })?;
1503
1504        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1505        let streaming_job_ids: Vec<ObjectId> =
1506            if let Some(table_id) = source.optional_associated_table_id {
1507                vec![table_id]
1508            } else if let Some(source_info) = &source.source_info
1509                && source_info.to_protobuf().is_shared()
1510            {
1511                vec![source_id]
1512            } else {
1513                ObjectDependency::find()
1514                    .select_only()
1515                    .column(object_dependency::Column::UsedBy)
1516                    .filter(object_dependency::Column::Oid.eq(source_id))
1517                    .into_tuple()
1518                    .all(&txn)
1519                    .await?
1520            };
1521
1522        if streaming_job_ids.is_empty() {
1523            return Err(MetaError::invalid_parameter(format!(
1524                "source id {source_id} not used by any streaming job"
1525            )));
1526        }
1527
1528        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1529            .select_only()
1530            .columns([
1531                fragment::Column::FragmentId,
1532                fragment::Column::FragmentTypeMask,
1533                fragment::Column::StreamNode,
1534            ])
1535            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1536            .into_tuple()
1537            .all(&txn)
1538            .await?;
1539        let mut fragments = fragments
1540            .into_iter()
1541            .map(|(id, mask, stream_node)| {
1542                (
1543                    id,
1544                    FragmentTypeMask::from(mask as u32),
1545                    stream_node.to_protobuf(),
1546                )
1547            })
1548            .collect_vec();
1549
1550        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1551            let mut found = false;
1552            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1553                visit_stream_node_mut(stream_node, |node| {
1554                    if let PbNodeBody::Source(node) = node
1555                        && let Some(node_inner) = &mut node.source_inner
1556                        && node_inner.source_id == source_id as u32
1557                    {
1558                        node_inner.rate_limit = rate_limit;
1559                        found = true;
1560                    }
1561                });
1562            }
1563            if is_fs_source {
1564                // in older versions, there's no fragment type flag for `FsFetch` node,
1565                // so we just scan all fragments for StreamFsFetch node if using fs connector
1566                visit_stream_node_mut(stream_node, |node| {
1567                    if let PbNodeBody::StreamFsFetch(node) = node {
1568                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1569                        if let Some(node_inner) = &mut node.node_inner
1570                            && node_inner.source_id == source_id as u32
1571                        {
1572                            node_inner.rate_limit = rate_limit;
1573                            found = true;
1574                        }
1575                    }
1576                });
1577            }
1578            found
1579        });
1580
1581        assert!(
1582            !fragments.is_empty(),
1583            "source id should be used by at least one fragment"
1584        );
1585        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1586        for (id, fragment_type_mask, stream_node) in fragments {
1587            fragment::ActiveModel {
1588                fragment_id: Set(id),
1589                fragment_type_mask: Set(fragment_type_mask.into()),
1590                stream_node: Set(StreamNode::from(&stream_node)),
1591                ..Default::default()
1592            }
1593            .update(&txn)
1594            .await?;
1595        }
1596        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1597
1598        txn.commit().await?;
1599
1600        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1601        let _version = self
1602            .notify_frontend(
1603                NotificationOperation::Update,
1604                NotificationInfo::ObjectGroup(PbObjectGroup {
1605                    objects: vec![PbObject {
1606                        object_info: Some(relation_info),
1607                    }],
1608                }),
1609            )
1610            .await;
1611
1612        Ok(fragment_actors)
1613    }
1614
1615    // edit the content of fragments in given `table_id`
1616    // return the actor_ids to be applied
1617    pub async fn mutate_fragments_by_job_id(
1618        &self,
1619        job_id: ObjectId,
1620        // returns true if the mutation is applied
1621        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1622        // error message when no relevant fragments is found
1623        err_msg: &'static str,
1624    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1625        let inner = self.inner.read().await;
1626        let txn = inner.db.begin().await?;
1627
1628        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1629            .select_only()
1630            .columns([
1631                fragment::Column::FragmentId,
1632                fragment::Column::FragmentTypeMask,
1633                fragment::Column::StreamNode,
1634            ])
1635            .filter(fragment::Column::JobId.eq(job_id))
1636            .into_tuple()
1637            .all(&txn)
1638            .await?;
1639        let mut fragments = fragments
1640            .into_iter()
1641            .map(|(id, mask, stream_node)| {
1642                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1643            })
1644            .collect_vec();
1645
1646        let fragments = fragments
1647            .iter_mut()
1648            .map(|(_, fragment_type_mask, stream_node)| {
1649                fragments_mutation_fn(*fragment_type_mask, stream_node)
1650            })
1651            .collect::<MetaResult<Vec<bool>>>()?
1652            .into_iter()
1653            .zip_eq_debug(std::mem::take(&mut fragments))
1654            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1655            .collect::<Vec<_>>();
1656
1657        if fragments.is_empty() {
1658            return Err(MetaError::invalid_parameter(format!(
1659                "job id {job_id}: {}",
1660                err_msg
1661            )));
1662        }
1663
1664        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1665        for (id, _, stream_node) in fragments {
1666            fragment::ActiveModel {
1667                fragment_id: Set(id),
1668                stream_node: Set(StreamNode::from(&stream_node)),
1669                ..Default::default()
1670            }
1671            .update(&txn)
1672            .await?;
1673        }
1674        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1675
1676        txn.commit().await?;
1677
1678        Ok(fragment_actors)
1679    }
1680
1681    async fn mutate_fragment_by_fragment_id(
1682        &self,
1683        fragment_id: FragmentId,
1684        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1685        err_msg: &'static str,
1686    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1687        let inner = self.inner.read().await;
1688        let txn = inner.db.begin().await?;
1689
1690        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1691            Fragment::find_by_id(fragment_id)
1692                .select_only()
1693                .columns([
1694                    fragment::Column::FragmentTypeMask,
1695                    fragment::Column::StreamNode,
1696                ])
1697                .into_tuple()
1698                .one(&txn)
1699                .await?
1700                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1701        let mut pb_stream_node = stream_node.to_protobuf();
1702        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1703
1704        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1705            return Err(MetaError::invalid_parameter(format!(
1706                "fragment id {fragment_id}: {}",
1707                err_msg
1708            )));
1709        }
1710
1711        fragment::ActiveModel {
1712            fragment_id: Set(fragment_id),
1713            stream_node: Set(stream_node),
1714            ..Default::default()
1715        }
1716        .update(&txn)
1717        .await?;
1718
1719        let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1720
1721        txn.commit().await?;
1722
1723        Ok(fragment_actors)
1724    }
1725
1726    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1727    // return the actor_ids to be applied
1728    pub async fn update_backfill_rate_limit_by_job_id(
1729        &self,
1730        job_id: ObjectId,
1731        rate_limit: Option<u32>,
1732    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1733        let update_backfill_rate_limit =
1734            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1735                let mut found = false;
1736                if fragment_type_mask
1737                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1738                {
1739                    visit_stream_node_mut(stream_node, |node| match node {
1740                        PbNodeBody::StreamCdcScan(node) => {
1741                            node.rate_limit = rate_limit;
1742                            found = true;
1743                        }
1744                        PbNodeBody::StreamScan(node) => {
1745                            node.rate_limit = rate_limit;
1746                            found = true;
1747                        }
1748                        PbNodeBody::SourceBackfill(node) => {
1749                            node.rate_limit = rate_limit;
1750                            found = true;
1751                        }
1752                        PbNodeBody::Sink(node) => {
1753                            node.rate_limit = rate_limit;
1754                            found = true;
1755                        }
1756                        _ => {}
1757                    });
1758                }
1759                Ok(found)
1760            };
1761
1762        self.mutate_fragments_by_job_id(
1763            job_id,
1764            update_backfill_rate_limit,
1765            "stream scan node or source node not found",
1766        )
1767        .await
1768    }
1769
1770    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1771    // return the actor_ids to be applied
1772    pub async fn update_sink_rate_limit_by_job_id(
1773        &self,
1774        job_id: ObjectId,
1775        rate_limit: Option<u32>,
1776    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1777        let update_sink_rate_limit =
1778            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1779                let mut found = Ok(false);
1780                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1781                    visit_stream_node_mut(stream_node, |node| {
1782                        if let PbNodeBody::Sink(node) = node {
1783                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1784                                found = Err(MetaError::invalid_parameter(
1785                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1786                                ));
1787                                return;
1788                            }
1789                            node.rate_limit = rate_limit;
1790                            found = Ok(true);
1791                        }
1792                    });
1793                }
1794                found
1795            };
1796
1797        self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1798            .await
1799    }
1800
1801    pub async fn update_dml_rate_limit_by_job_id(
1802        &self,
1803        job_id: ObjectId,
1804        rate_limit: Option<u32>,
1805    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1806        let update_dml_rate_limit =
1807            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1808                let mut found = false;
1809                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1810                    visit_stream_node_mut(stream_node, |node| {
1811                        if let PbNodeBody::Dml(node) = node {
1812                            node.rate_limit = rate_limit;
1813                            found = true;
1814                        }
1815                    });
1816                }
1817                Ok(found)
1818            };
1819
1820        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1821            .await
1822    }
1823
1824    pub async fn update_source_props_by_source_id(
1825        &self,
1826        source_id: SourceId,
1827        alter_props: BTreeMap<String, String>,
1828        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1829    ) -> MetaResult<WithOptionsSecResolved> {
1830        let inner = self.inner.read().await;
1831        let txn = inner.db.begin().await?;
1832
1833        let (source, _obj) = Source::find_by_id(source_id)
1834            .find_also_related(Object)
1835            .one(&txn)
1836            .await?
1837            .ok_or_else(|| {
1838                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1839            })?;
1840        let connector = source.with_properties.0.get_connector().unwrap();
1841
1842        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1843        let prop_keys: Vec<String> = alter_props
1844            .keys()
1845            .chain(alter_secret_refs.keys())
1846            .cloned()
1847            .collect();
1848        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1849            &connector, &prop_keys,
1850        )?;
1851
1852        let mut options_with_secret = WithOptionsSecResolved::new(
1853            source.with_properties.0.clone(),
1854            source
1855                .secret_ref
1856                .map(|secret_ref| secret_ref.to_protobuf())
1857                .unwrap_or_default(),
1858        );
1859        let (to_add_secret_dep, to_remove_secret_dep) =
1860            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1861
1862        tracing::info!(
1863            "applying new properties to source: source_id={}, options_with_secret={:?}",
1864            source_id,
1865            options_with_secret
1866        );
1867        // check if the alter-ed props are valid for each Connector
1868        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1869        // todo: validate via source manager
1870
1871        let mut associate_table_id = None;
1872
1873        // can be source_id or table_id
1874        // if updating an associated source, the preferred_id is the table_id
1875        // otherwise, it is the source_id
1876        let mut preferred_id: i32 = source_id;
1877        let rewrite_sql = {
1878            let definition = source.definition.clone();
1879
1880            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1881                .map_err(|e| {
1882                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1883                        anyhow!(e).context("Failed to parse source definition SQL"),
1884                    )))
1885                })?
1886                .try_into()
1887                .unwrap();
1888
1889            /// Formats SQL options with secret values properly resolved
1890            ///
1891            /// This function processes configuration options that may contain sensitive data:
1892            /// - Plaintext options are directly converted to `SqlOption`
1893            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
1894            ///   without exposing the actual secret value
1895            ///
1896            /// # Arguments
1897            /// * `txn` - Database transaction for retrieving secrets
1898            /// * `options_with_secret` - Container of options with both plaintext and secret values
1899            ///
1900            /// # Returns
1901            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
1902            async fn format_with_option_secret_resolved(
1903                txn: &DatabaseTransaction,
1904                options_with_secret: &WithOptionsSecResolved,
1905            ) -> MetaResult<Vec<SqlOption>> {
1906                let mut options = Vec::new();
1907                for (k, v) in options_with_secret.as_plaintext() {
1908                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1909                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1910                    options.push(sql_option);
1911                }
1912                for (k, v) in options_with_secret.as_secret() {
1913                    if let Some(secret_model) =
1914                        Secret::find_by_id(v.secret_id as i32).one(txn).await?
1915                    {
1916                        let sql_option =
1917                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1918                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1919                        options.push(sql_option);
1920                    } else {
1921                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1922                    }
1923                }
1924                Ok(options)
1925            }
1926
1927            match &mut stmt {
1928                Statement::CreateSource { stmt } => {
1929                    stmt.with_properties.0 =
1930                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1931                }
1932                Statement::CreateTable { with_options, .. } => {
1933                    *with_options =
1934                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1935                    associate_table_id = source.optional_associated_table_id;
1936                    preferred_id = associate_table_id.unwrap();
1937                }
1938                _ => unreachable!(),
1939            }
1940
1941            stmt.to_string()
1942        };
1943
1944        {
1945            // update secret dependencies
1946            if !to_add_secret_dep.is_empty() {
1947                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1948                    object_dependency::ActiveModel {
1949                        oid: Set(secret_id as _),
1950                        used_by: Set(preferred_id as _),
1951                        ..Default::default()
1952                    }
1953                }))
1954                .exec(&txn)
1955                .await?;
1956            }
1957            if !to_remove_secret_dep.is_empty() {
1958                // todo: fix the filter logic
1959                let _ = ObjectDependency::delete_many()
1960                    .filter(
1961                        object_dependency::Column::Oid
1962                            .is_in(to_remove_secret_dep)
1963                            .and(
1964                                object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
1965                            ),
1966                    )
1967                    .exec(&txn)
1968                    .await?;
1969            }
1970        }
1971
1972        let active_source_model = source::ActiveModel {
1973            source_id: Set(source_id),
1974            definition: Set(rewrite_sql.clone()),
1975            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
1976            secret_ref: Set((!options_with_secret.as_secret().is_empty())
1977                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
1978            ..Default::default()
1979        };
1980        active_source_model.update(&txn).await?;
1981
1982        if let Some(associate_table_id) = associate_table_id {
1983            // update the associated table statement accordly
1984            let active_table_model = table::ActiveModel {
1985                table_id: Set(associate_table_id),
1986                definition: Set(rewrite_sql),
1987                ..Default::default()
1988            };
1989            active_table_model.update(&txn).await?;
1990        }
1991
1992        // update fragments
1993        update_connector_props_fragments(
1994            &txn,
1995            if let Some(associate_table_id) = associate_table_id {
1996                // if updating table with connector, the fragment_id is table id
1997                associate_table_id
1998            } else {
1999                source_id
2000            },
2001            FragmentTypeFlag::Source,
2002            |node, found| {
2003                if let PbNodeBody::Source(node) = node
2004                    && let Some(source_inner) = &mut node.source_inner
2005                {
2006                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2007                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2008                    *found = true;
2009                }
2010            },
2011        )
2012        .await?;
2013
2014        let mut to_update_objs = Vec::with_capacity(2);
2015        let (source, obj) = Source::find_by_id(source_id)
2016            .find_also_related(Object)
2017            .one(&txn)
2018            .await?
2019            .ok_or_else(|| {
2020                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2021            })?;
2022        to_update_objs.push(PbObject {
2023            object_info: Some(PbObjectInfo::Source(
2024                ObjectModel(source, obj.unwrap()).into(),
2025            )),
2026        });
2027
2028        if let Some(associate_table_id) = associate_table_id {
2029            let (table, obj) = Table::find_by_id(associate_table_id)
2030                .find_also_related(Object)
2031                .one(&txn)
2032                .await?
2033                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2034            to_update_objs.push(PbObject {
2035                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2036            });
2037        }
2038
2039        txn.commit().await?;
2040
2041        self.notify_frontend(
2042            NotificationOperation::Update,
2043            NotificationInfo::ObjectGroup(PbObjectGroup {
2044                objects: to_update_objs,
2045            }),
2046        )
2047        .await;
2048
2049        Ok(options_with_secret)
2050    }
2051
2052    pub async fn update_sink_props_by_sink_id(
2053        &self,
2054        sink_id: SinkId,
2055        props: BTreeMap<String, String>,
2056    ) -> MetaResult<HashMap<String, String>> {
2057        let inner = self.inner.read().await;
2058        let txn = inner.db.begin().await?;
2059
2060        let (sink, _obj) = Sink::find_by_id(sink_id)
2061            .find_also_related(Object)
2062            .one(&txn)
2063            .await?
2064            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2065
2066        // Validate that props can be altered
2067        match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2068            Some(connector) => {
2069                let connector_type = connector.to_lowercase();
2070                let field_names: Vec<String> = props.keys().cloned().collect();
2071                check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2072                    .map_err(|e| SinkError::Config(anyhow!(e)))?;
2073
2074                match_sink_name_str!(
2075                    connector_type.as_str(),
2076                    SinkType,
2077                    {
2078                        let mut new_props = sink.properties.0.clone();
2079                        new_props.extend(props.clone());
2080                        SinkType::validate_alter_config(&new_props)
2081                    },
2082                    |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2083                )?
2084            }
2085            None => {
2086                return Err(
2087                    SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2088                );
2089            }
2090        };
2091        let definition = sink.definition.clone();
2092        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2093            .map_err(|e| SinkError::Config(anyhow!(e)))?
2094            .try_into()
2095            .unwrap();
2096        if let Statement::CreateSink { stmt } = &mut stmt {
2097            let mut new_sql_options = stmt
2098                .with_properties
2099                .0
2100                .iter()
2101                .map(|sql_option| (&sql_option.name, sql_option))
2102                .collect::<IndexMap<_, _>>();
2103            let add_sql_options = props
2104                .iter()
2105                .map(|(k, v)| SqlOption::try_from((k, v)))
2106                .collect::<Result<Vec<SqlOption>, ParserError>>()
2107                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2108            new_sql_options.extend(
2109                add_sql_options
2110                    .iter()
2111                    .map(|sql_option| (&sql_option.name, sql_option)),
2112            );
2113            stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
2114        } else {
2115            panic!("sink definition is not a create sink statement")
2116        }
2117        let mut new_config = sink.properties.clone().into_inner();
2118        new_config.extend(props);
2119
2120        let active_sink = sink::ActiveModel {
2121            sink_id: Set(sink_id),
2122            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2123            definition: Set(stmt.to_string()),
2124            ..Default::default()
2125        };
2126        active_sink.update(&txn).await?;
2127
2128        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2129            .select_only()
2130            .columns([
2131                fragment::Column::FragmentId,
2132                fragment::Column::FragmentTypeMask,
2133                fragment::Column::StreamNode,
2134            ])
2135            .filter(fragment::Column::JobId.eq(sink_id))
2136            .into_tuple()
2137            .all(&txn)
2138            .await?;
2139        let fragments = fragments
2140            .into_iter()
2141            .filter(|(_, fragment_type_mask, _)| {
2142                FragmentTypeMask::from(*fragment_type_mask).contains(FragmentTypeFlag::Sink)
2143            })
2144            .filter_map(|(id, _, stream_node)| {
2145                let mut stream_node = stream_node.to_protobuf();
2146                let mut found = false;
2147                visit_stream_node_mut(&mut stream_node, |node| {
2148                    if let PbNodeBody::Sink(node) = node
2149                        && let Some(sink_desc) = &mut node.sink_desc
2150                        && sink_desc.id == sink_id as u32
2151                    {
2152                        sink_desc.properties = new_config.clone();
2153                        found = true;
2154                    }
2155                });
2156                if found { Some((id, stream_node)) } else { None }
2157            })
2158            .collect_vec();
2159        assert!(
2160            !fragments.is_empty(),
2161            "sink id should be used by at least one fragment"
2162        );
2163        for (id, stream_node) in fragments {
2164            fragment::ActiveModel {
2165                fragment_id: Set(id),
2166                stream_node: Set(StreamNode::from(&stream_node)),
2167                ..Default::default()
2168            }
2169            .update(&txn)
2170            .await?;
2171        }
2172
2173        let (sink, obj) = Sink::find_by_id(sink_id)
2174            .find_also_related(Object)
2175            .one(&txn)
2176            .await?
2177            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2178
2179        txn.commit().await?;
2180
2181        let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
2182        let _version = self
2183            .notify_frontend(
2184                NotificationOperation::Update,
2185                NotificationInfo::ObjectGroup(PbObjectGroup {
2186                    objects: vec![PbObject {
2187                        object_info: Some(relation_info),
2188                    }],
2189                }),
2190            )
2191            .await;
2192
2193        Ok(new_config.into_iter().collect())
2194    }
2195
2196    pub async fn update_fragment_rate_limit_by_fragment_id(
2197        &self,
2198        fragment_id: FragmentId,
2199        rate_limit: Option<u32>,
2200    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2201        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2202                                 stream_node: &mut PbStreamNode| {
2203            let mut found = false;
2204            if fragment_type_mask.contains_any(
2205                FragmentTypeFlag::dml_rate_limit_fragments()
2206                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2207                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2208                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2209            ) {
2210                visit_stream_node_mut(stream_node, |node| {
2211                    if let PbNodeBody::Dml(node) = node {
2212                        node.rate_limit = rate_limit;
2213                        found = true;
2214                    }
2215                    if let PbNodeBody::Sink(node) = node {
2216                        node.rate_limit = rate_limit;
2217                        found = true;
2218                    }
2219                    if let PbNodeBody::StreamCdcScan(node) = node {
2220                        node.rate_limit = rate_limit;
2221                        found = true;
2222                    }
2223                    if let PbNodeBody::StreamScan(node) = node {
2224                        node.rate_limit = rate_limit;
2225                        found = true;
2226                    }
2227                    if let PbNodeBody::SourceBackfill(node) = node {
2228                        node.rate_limit = rate_limit;
2229                        found = true;
2230                    }
2231                });
2232            }
2233            found
2234        };
2235        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2236            .await
2237    }
2238
2239    pub async fn post_apply_reschedules(
2240        &self,
2241        reschedules: HashMap<FragmentId, Reschedule>,
2242        post_updates: &JobReschedulePostUpdates,
2243    ) -> MetaResult<()> {
2244        let new_created_actors: HashSet<_> = reschedules
2245            .values()
2246            .flat_map(|reschedule| {
2247                reschedule
2248                    .added_actors
2249                    .values()
2250                    .flatten()
2251                    .map(|actor_id| *actor_id as ActorId)
2252            })
2253            .collect();
2254
2255        let inner = self.inner.write().await;
2256
2257        let txn = inner.db.begin().await?;
2258
2259        for Reschedule {
2260            removed_actors,
2261            vnode_bitmap_updates,
2262            actor_splits,
2263            newly_created_actors,
2264            ..
2265        } in reschedules.into_values()
2266        {
2267            // drop removed actors
2268            Actor::delete_many()
2269                .filter(
2270                    actor::Column::ActorId
2271                        .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2272                )
2273                .exec(&txn)
2274                .await?;
2275
2276            // add new actors
2277            for (
2278                (
2279                    StreamActor {
2280                        actor_id,
2281                        fragment_id,
2282                        vnode_bitmap,
2283                        expr_context,
2284                        ..
2285                    },
2286                    _,
2287                ),
2288                worker_id,
2289            ) in newly_created_actors.into_values()
2290            {
2291                let splits = actor_splits
2292                    .get(&actor_id)
2293                    .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2294
2295                Actor::insert(actor::ActiveModel {
2296                    actor_id: Set(actor_id as _),
2297                    fragment_id: Set(fragment_id as _),
2298                    status: Set(ActorStatus::Running),
2299                    splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2300                    worker_id: Set(worker_id),
2301                    upstream_actor_ids: Set(Default::default()),
2302                    vnode_bitmap: Set(vnode_bitmap
2303                        .as_ref()
2304                        .map(|bitmap| (&bitmap.to_protobuf()).into())),
2305                    expr_context: Set(expr_context.as_ref().unwrap().into()),
2306                })
2307                .exec(&txn)
2308                .await?;
2309            }
2310
2311            // actor update
2312            for (actor_id, bitmap) in vnode_bitmap_updates {
2313                let actor = Actor::find_by_id(actor_id as ActorId)
2314                    .one(&txn)
2315                    .await?
2316                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2317
2318                let mut actor = actor.into_active_model();
2319                actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2320                actor.update(&txn).await?;
2321            }
2322
2323            // Update actor_splits for existing actors
2324            for (actor_id, splits) in actor_splits {
2325                if new_created_actors.contains(&(actor_id as ActorId)) {
2326                    continue;
2327                }
2328
2329                let actor = Actor::find_by_id(actor_id as ActorId)
2330                    .one(&txn)
2331                    .await?
2332                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2333
2334                let mut actor = actor.into_active_model();
2335                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2336                actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2337                actor.update(&txn).await?;
2338            }
2339        }
2340
2341        let JobReschedulePostUpdates {
2342            parallelism_updates,
2343            resource_group_updates,
2344        } = post_updates;
2345
2346        for (table_id, parallelism) in parallelism_updates {
2347            let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2348                .one(&txn)
2349                .await?
2350                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2351                .into_active_model();
2352
2353            streaming_job.parallelism = Set(match parallelism {
2354                TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2355                TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2356                TableParallelism::Custom => StreamingParallelism::Custom,
2357            });
2358
2359            if let Some(resource_group) =
2360                resource_group_updates.get(&(table_id.table_id() as ObjectId))
2361            {
2362                streaming_job.specific_resource_group = Set(resource_group.to_owned());
2363            }
2364
2365            streaming_job.update(&txn).await?;
2366        }
2367
2368        txn.commit().await?;
2369
2370        Ok(())
2371    }
2372
2373    /// Note: `FsFetch` created in old versions are not included.
2374    /// Since this is only used for debugging, it should be fine.
2375    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2376        let inner = self.inner.read().await;
2377        let txn = inner.db.begin().await?;
2378
2379        let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2380            .select_only()
2381            .columns([
2382                fragment::Column::FragmentId,
2383                fragment::Column::JobId,
2384                fragment::Column::FragmentTypeMask,
2385                fragment::Column::StreamNode,
2386            ])
2387            .filter(fragment_type_mask_intersects(FragmentTypeFlag::raw_flag(
2388                FragmentTypeFlag::rate_limit_fragments(),
2389            ) as _))
2390            .into_tuple()
2391            .all(&txn)
2392            .await?;
2393
2394        let mut rate_limits = Vec::new();
2395        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2396            let stream_node = stream_node.to_protobuf();
2397            visit_stream_node_body(&stream_node, |node| {
2398                let mut rate_limit = None;
2399                let mut node_name = None;
2400
2401                match node {
2402                    // source rate limit
2403                    PbNodeBody::Source(node) => {
2404                        if let Some(node_inner) = &node.source_inner {
2405                            rate_limit = node_inner.rate_limit;
2406                            node_name = Some("SOURCE");
2407                        }
2408                    }
2409                    PbNodeBody::StreamFsFetch(node) => {
2410                        if let Some(node_inner) = &node.node_inner {
2411                            rate_limit = node_inner.rate_limit;
2412                            node_name = Some("FS_FETCH");
2413                        }
2414                    }
2415                    // backfill rate limit
2416                    PbNodeBody::SourceBackfill(node) => {
2417                        rate_limit = node.rate_limit;
2418                        node_name = Some("SOURCE_BACKFILL");
2419                    }
2420                    PbNodeBody::StreamScan(node) => {
2421                        rate_limit = node.rate_limit;
2422                        node_name = Some("STREAM_SCAN");
2423                    }
2424                    PbNodeBody::StreamCdcScan(node) => {
2425                        rate_limit = node.rate_limit;
2426                        node_name = Some("STREAM_CDC_SCAN");
2427                    }
2428                    PbNodeBody::Sink(node) => {
2429                        rate_limit = node.rate_limit;
2430                        node_name = Some("SINK");
2431                    }
2432                    _ => {}
2433                }
2434
2435                if let Some(rate_limit) = rate_limit {
2436                    rate_limits.push(RateLimitInfo {
2437                        fragment_id: fragment_id as u32,
2438                        job_id: job_id as u32,
2439                        fragment_type_mask: fragment_type_mask as u32,
2440                        rate_limit,
2441                        node_name: node_name.unwrap().to_owned(),
2442                    });
2443                }
2444            });
2445        }
2446
2447        Ok(rate_limits)
2448    }
2449}
2450
2451fn bitflag_intersects(column: SimpleExpr, value: i32) -> SimpleExpr {
2452    column
2453        .binary(BinOper::Custom("&"), value)
2454        .binary(BinOper::NotEqual, 0)
2455}
2456
2457fn fragment_type_mask_intersects(value: i32) -> SimpleExpr {
2458    bitflag_intersects(fragment::Column::FragmentTypeMask.into_simple_expr(), value)
2459}
2460
2461pub struct SinkIntoTableContext {
2462    /// For creating sink into table, this is `Some`, otherwise `None`.
2463    pub creating_sink_id: Option<SinkId>,
2464    /// For dropping sink into table, this is `Some`, otherwise `None`.
2465    pub dropping_sink_id: Option<SinkId>,
2466    /// For alter table (e.g., add column), this is the list of existing sink ids
2467    /// otherwise empty.
2468    pub updated_sink_catalogs: Vec<SinkId>,
2469}
2470
2471pub struct FinishAutoRefreshSchemaSinkContext {
2472    pub tmp_sink_id: ObjectId,
2473    pub original_sink_id: ObjectId,
2474    pub columns: Vec<PbColumnCatalog>,
2475    pub new_log_store_table: Option<(ObjectId, Vec<PbColumnCatalog>)>,
2476}
2477
2478async fn update_connector_props_fragments<F>(
2479    txn: &DatabaseTransaction,
2480    job_id: i32,
2481    expect_flag: FragmentTypeFlag,
2482    mut alter_stream_node_fn: F,
2483) -> MetaResult<()>
2484where
2485    F: FnMut(&mut PbNodeBody, &mut bool),
2486{
2487    let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
2488        .select_only()
2489        .columns([
2490            fragment::Column::FragmentId,
2491            fragment::Column::FragmentTypeMask,
2492            fragment::Column::StreamNode,
2493        ])
2494        .filter(fragment::Column::JobId.eq(job_id))
2495        .into_tuple()
2496        .all(txn)
2497        .await?;
2498    let fragments = fragments
2499        .into_iter()
2500        .filter(|(_, fragment_type_mask, _)| *fragment_type_mask & expect_flag as i32 != 0)
2501        .filter_map(|(id, _, stream_node)| {
2502            let mut stream_node = stream_node.to_protobuf();
2503            let mut found = false;
2504            visit_stream_node_mut(&mut stream_node, |node| {
2505                alter_stream_node_fn(node, &mut found);
2506            });
2507            if found { Some((id, stream_node)) } else { None }
2508        })
2509        .collect_vec();
2510    assert!(
2511        !fragments.is_empty(),
2512        "job {} (type: {:?}) should be used by at least one fragment",
2513        job_id,
2514        expect_flag
2515    );
2516
2517    for (id, stream_node) in fragments {
2518        fragment::ActiveModel {
2519            fragment_id: Set(id),
2520            stream_node: Set(StreamNode::from(&stream_node)),
2521            ..Default::default()
2522        }
2523        .update(txn)
2524        .await?;
2525    }
2526
2527    Ok(())
2528}