risingwave_meta/controller/
streaming_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::num::NonZeroUsize;
17
18use anyhow::anyhow;
19use indexmap::IndexMap;
20use itertools::Itertools;
21use risingwave_common::catalog::{self, FragmentTypeFlag, FragmentTypeMask};
22use risingwave_common::config::DefaultParallelism;
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::util::iter_util::ZipEqDebug;
25use risingwave_common::util::stream_graph_visitor::{
26    visit_stream_node_body, visit_stream_node_mut,
27};
28use risingwave_common::{bail, current_cluster_version};
29use risingwave_connector::allow_alter_on_fly_fields::check_sink_allow_alter_on_fly_fields;
30use risingwave_connector::error::ConnectorError;
31use risingwave_connector::sink::file_sink::fs::FsSink;
32use risingwave_connector::sink::{CONNECTOR_TYPE_KEY, SinkError};
33use risingwave_connector::source::{ConnectorProperties, SplitImpl};
34use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt, match_sink_name_str};
35use risingwave_meta_model::actor::ActorStatus;
36use risingwave_meta_model::object::ObjectType;
37use risingwave_meta_model::prelude::{StreamingJob as StreamingJobModel, *};
38use risingwave_meta_model::table::TableType;
39use risingwave_meta_model::user_privilege::Action;
40use risingwave_meta_model::*;
41use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
42use risingwave_pb::catalog::table::PbOptionalAssociatedSourceId;
43use risingwave_pb::catalog::{PbCreateType, PbTable};
44use risingwave_pb::meta::list_rate_limits_response::RateLimitInfo;
45use risingwave_pb::meta::object::PbObjectInfo;
46use risingwave_pb::meta::subscribe_response::{
47    Info as NotificationInfo, Info, Operation as NotificationOperation, Operation,
48};
49use risingwave_pb::meta::table_fragments::PbActorStatus;
50use risingwave_pb::meta::{PbObject, PbObjectGroup};
51use risingwave_pb::plan_common::PbColumnCatalog;
52use risingwave_pb::secret::PbSecretRef;
53use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
54use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
55use risingwave_pb::stream_plan::stream_node::PbNodeBody;
56use risingwave_pb::stream_plan::{PbSinkLogStoreType, PbStreamNode};
57use risingwave_pb::user::PbUserInfo;
58use risingwave_sqlparser::ast::{SqlOption, Statement};
59use risingwave_sqlparser::parser::{Parser, ParserError};
60use sea_orm::ActiveValue::Set;
61use sea_orm::sea_query::{Expr, Query, SimpleExpr};
62use sea_orm::{
63    ActiveEnum, ActiveModelTrait, ColumnTrait, DatabaseTransaction, EntityTrait, IntoActiveModel,
64    JoinType, ModelTrait, NotSet, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait,
65    TransactionTrait,
66};
67use thiserror_ext::AsReport;
68
69use super::rename::IndexItemRewriter;
70use crate::barrier::{Command, Reschedule};
71use crate::controller::ObjectModel;
72use crate::controller::catalog::{CatalogController, DropTableConnectorContext};
73use crate::controller::fragment::FragmentTypeMaskExt;
74use crate::controller::utils::{
75    PartialObject, build_object_group_for_delete, check_relation_name_duplicate,
76    check_sink_into_table_cycle, ensure_job_not_canceled, ensure_object_id, ensure_user_id,
77    fetch_target_fragments, get_fragment_actor_ids, get_internal_tables_by_id, get_table_columns,
78    grant_default_privileges_automatically, insert_fragment_relations, list_user_info_by_ids,
79};
80use crate::error::MetaErrorInner;
81use crate::manager::{NotificationVersion, StreamingJob, StreamingJobType};
82use crate::model::{
83    FragmentDownstreamRelation, FragmentReplaceUpstream, StreamActor, StreamContext,
84    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
85};
86use crate::stream::{JobReschedulePostUpdates, SplitAssignment};
87use crate::{MetaError, MetaResult};
88
89impl CatalogController {
90    pub async fn create_streaming_job_obj(
91        txn: &DatabaseTransaction,
92        obj_type: ObjectType,
93        owner_id: UserId,
94        database_id: Option<DatabaseId>,
95        schema_id: Option<SchemaId>,
96        create_type: PbCreateType,
97        timezone: Option<String>,
98        streaming_parallelism: StreamingParallelism,
99        max_parallelism: usize,
100        specific_resource_group: Option<String>, // todo: can we move it to StreamContext?
101    ) -> MetaResult<ObjectId> {
102        let obj = Self::create_object(txn, obj_type, owner_id, database_id, schema_id).await?;
103        let job = streaming_job::ActiveModel {
104            job_id: Set(obj.oid),
105            job_status: Set(JobStatus::Initial),
106            create_type: Set(create_type.into()),
107            timezone: Set(timezone),
108            parallelism: Set(streaming_parallelism),
109            max_parallelism: Set(max_parallelism as _),
110            specific_resource_group: Set(specific_resource_group),
111        };
112        job.insert(txn).await?;
113
114        Ok(obj.oid)
115    }
116
117    /// Create catalogs for the streaming job, then notify frontend about them if the job is a
118    /// materialized view.
119    ///
120    /// Some of the fields in the given streaming job are placeholders, which will
121    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
122    #[await_tree::instrument]
123    pub async fn create_job_catalog(
124        &self,
125        streaming_job: &mut StreamingJob,
126        ctx: &StreamContext,
127        parallelism: &Option<Parallelism>,
128        max_parallelism: usize,
129        mut dependencies: HashSet<ObjectId>,
130        specific_resource_group: Option<String>,
131    ) -> MetaResult<()> {
132        let inner = self.inner.write().await;
133        let txn = inner.db.begin().await?;
134        let create_type = streaming_job.create_type();
135
136        let streaming_parallelism = match (parallelism, self.env.opts.default_parallelism) {
137            (None, DefaultParallelism::Full) => StreamingParallelism::Adaptive,
138            (None, DefaultParallelism::Default(n)) => StreamingParallelism::Fixed(n.get()),
139            (Some(n), _) => StreamingParallelism::Fixed(n.parallelism as _),
140        };
141
142        ensure_user_id(streaming_job.owner() as _, &txn).await?;
143        ensure_object_id(ObjectType::Database, streaming_job.database_id() as _, &txn).await?;
144        ensure_object_id(ObjectType::Schema, streaming_job.schema_id() as _, &txn).await?;
145        check_relation_name_duplicate(
146            &streaming_job.name(),
147            streaming_job.database_id() as _,
148            streaming_job.schema_id() as _,
149            &txn,
150        )
151        .await?;
152
153        // check if any dependency is in altering status.
154        if !dependencies.is_empty() {
155            let altering_cnt = ObjectDependency::find()
156                .join(
157                    JoinType::InnerJoin,
158                    object_dependency::Relation::Object1.def(),
159                )
160                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
161                .filter(
162                    object_dependency::Column::Oid
163                        .is_in(dependencies.clone())
164                        .and(object::Column::ObjType.eq(ObjectType::Table))
165                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
166                        .and(
167                            // It means the referring table is just dummy for altering.
168                            object::Column::Oid.not_in_subquery(
169                                Query::select()
170                                    .column(table::Column::TableId)
171                                    .from(Table)
172                                    .to_owned(),
173                            ),
174                        ),
175                )
176                .count(&txn)
177                .await?;
178            if altering_cnt != 0 {
179                return Err(MetaError::permission_denied(
180                    "some dependent relations are being altered",
181                ));
182            }
183        }
184
185        match streaming_job {
186            StreamingJob::MaterializedView(table) => {
187                let job_id = Self::create_streaming_job_obj(
188                    &txn,
189                    ObjectType::Table,
190                    table.owner as _,
191                    Some(table.database_id as _),
192                    Some(table.schema_id as _),
193                    create_type,
194                    ctx.timezone.clone(),
195                    streaming_parallelism,
196                    max_parallelism,
197                    specific_resource_group,
198                )
199                .await?;
200                table.id = job_id as _;
201                let table_model: table::ActiveModel = table.clone().into();
202                Table::insert(table_model).exec(&txn).await?;
203            }
204            StreamingJob::Sink(sink) => {
205                if let Some(target_table_id) = sink.target_table
206                    && check_sink_into_table_cycle(
207                        target_table_id as ObjectId,
208                        dependencies.iter().cloned().collect(),
209                        &txn,
210                    )
211                    .await?
212                {
213                    bail!("Creating such a sink will result in circular dependency.");
214                }
215
216                let job_id = Self::create_streaming_job_obj(
217                    &txn,
218                    ObjectType::Sink,
219                    sink.owner as _,
220                    Some(sink.database_id as _),
221                    Some(sink.schema_id as _),
222                    create_type,
223                    ctx.timezone.clone(),
224                    streaming_parallelism,
225                    max_parallelism,
226                    specific_resource_group,
227                )
228                .await?;
229                sink.id = job_id as _;
230                let sink_model: sink::ActiveModel = sink.clone().into();
231                Sink::insert(sink_model).exec(&txn).await?;
232            }
233            StreamingJob::Table(src, table, _) => {
234                let job_id = Self::create_streaming_job_obj(
235                    &txn,
236                    ObjectType::Table,
237                    table.owner as _,
238                    Some(table.database_id as _),
239                    Some(table.schema_id as _),
240                    create_type,
241                    ctx.timezone.clone(),
242                    streaming_parallelism,
243                    max_parallelism,
244                    specific_resource_group,
245                )
246                .await?;
247                table.id = job_id as _;
248                if let Some(src) = src {
249                    let src_obj = Self::create_object(
250                        &txn,
251                        ObjectType::Source,
252                        src.owner as _,
253                        Some(src.database_id as _),
254                        Some(src.schema_id as _),
255                    )
256                    .await?;
257                    src.id = src_obj.oid as _;
258                    src.optional_associated_table_id =
259                        Some(PbOptionalAssociatedTableId::AssociatedTableId(job_id as _));
260                    table.optional_associated_source_id = Some(
261                        PbOptionalAssociatedSourceId::AssociatedSourceId(src_obj.oid as _),
262                    );
263                    let source: source::ActiveModel = src.clone().into();
264                    Source::insert(source).exec(&txn).await?;
265                }
266                let table_model: table::ActiveModel = table.clone().into();
267                Table::insert(table_model).exec(&txn).await?;
268            }
269            StreamingJob::Index(index, table) => {
270                ensure_object_id(ObjectType::Table, index.primary_table_id as _, &txn).await?;
271                let job_id = Self::create_streaming_job_obj(
272                    &txn,
273                    ObjectType::Index,
274                    index.owner as _,
275                    Some(index.database_id as _),
276                    Some(index.schema_id as _),
277                    create_type,
278                    ctx.timezone.clone(),
279                    streaming_parallelism,
280                    max_parallelism,
281                    specific_resource_group,
282                )
283                .await?;
284                // to be compatible with old implementation.
285                index.id = job_id as _;
286                index.index_table_id = job_id as _;
287                table.id = job_id as _;
288
289                ObjectDependency::insert(object_dependency::ActiveModel {
290                    oid: Set(index.primary_table_id as _),
291                    used_by: Set(table.id as _),
292                    ..Default::default()
293                })
294                .exec(&txn)
295                .await?;
296
297                let table_model: table::ActiveModel = table.clone().into();
298                Table::insert(table_model).exec(&txn).await?;
299                let index_model: index::ActiveModel = index.clone().into();
300                Index::insert(index_model).exec(&txn).await?;
301            }
302            StreamingJob::Source(src) => {
303                let job_id = Self::create_streaming_job_obj(
304                    &txn,
305                    ObjectType::Source,
306                    src.owner as _,
307                    Some(src.database_id as _),
308                    Some(src.schema_id as _),
309                    create_type,
310                    ctx.timezone.clone(),
311                    streaming_parallelism,
312                    max_parallelism,
313                    specific_resource_group,
314                )
315                .await?;
316                src.id = job_id as _;
317                let source_model: source::ActiveModel = src.clone().into();
318                Source::insert(source_model).exec(&txn).await?;
319            }
320        }
321
322        // collect dependent secrets.
323        dependencies.extend(
324            streaming_job
325                .dependent_secret_ids()?
326                .into_iter()
327                .map(|secret_id| secret_id as ObjectId),
328        );
329        // collect dependent connection
330        dependencies.extend(
331            streaming_job
332                .dependent_connection_ids()?
333                .into_iter()
334                .map(|conn_id| conn_id as ObjectId),
335        );
336
337        // record object dependency.
338        if !dependencies.is_empty() {
339            ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
340                object_dependency::ActiveModel {
341                    oid: Set(oid),
342                    used_by: Set(streaming_job.id() as _),
343                    ..Default::default()
344                }
345            }))
346            .exec(&txn)
347            .await?;
348        }
349
350        txn.commit().await?;
351
352        Ok(())
353    }
354
355    /// Create catalogs for internal tables, then notify frontend about them if the job is a
356    /// materialized view.
357    ///
358    /// Some of the fields in the given "incomplete" internal tables are placeholders, which will
359    /// be updated later in `prepare_streaming_job` and notify again in `finish_streaming_job`.
360    ///
361    /// Returns a mapping from the temporary table id to the actual global table id.
362    pub async fn create_internal_table_catalog(
363        &self,
364        job: &StreamingJob,
365        mut incomplete_internal_tables: Vec<PbTable>,
366    ) -> MetaResult<HashMap<u32, u32>> {
367        let job_id = job.id() as ObjectId;
368        let inner = self.inner.write().await;
369        let txn = inner.db.begin().await?;
370
371        // Ensure the job exists.
372        ensure_job_not_canceled(job_id, &txn).await?;
373
374        let mut table_id_map = HashMap::new();
375        for table in &mut incomplete_internal_tables {
376            let table_id = Self::create_object(
377                &txn,
378                ObjectType::Table,
379                table.owner as _,
380                Some(table.database_id as _),
381                Some(table.schema_id as _),
382            )
383            .await?
384            .oid;
385            table_id_map.insert(table.id, table_id as u32);
386            table.id = table_id as _;
387            table.job_id = Some(job_id as _);
388
389            let table_model = table::ActiveModel {
390                table_id: Set(table_id as _),
391                belongs_to_job_id: Set(Some(job_id)),
392                fragment_id: NotSet,
393                ..table.clone().into()
394            };
395            Table::insert(table_model).exec(&txn).await?;
396        }
397        txn.commit().await?;
398
399        Ok(table_id_map)
400    }
401
402    pub async fn prepare_stream_job_fragments(
403        &self,
404        stream_job_fragments: &StreamJobFragmentsToCreate,
405        streaming_job: &StreamingJob,
406        for_replace: bool,
407    ) -> MetaResult<()> {
408        self.prepare_streaming_job(
409            stream_job_fragments.stream_job_id().table_id as _,
410            || stream_job_fragments.fragments.values(),
411            &stream_job_fragments.actor_status,
412            &stream_job_fragments.actor_splits,
413            &stream_job_fragments.downstreams,
414            for_replace,
415            Some(streaming_job),
416        )
417        .await
418    }
419
420    // TODO: In this function, we also update the `Table` model in the meta store.
421    // Given that we've ensured the tables inside `TableFragments` are complete, shall we consider
422    // making them the source of truth and performing a full replacement for those in the meta store?
423    /// Insert fragments and actors to meta store. Used both for creating new jobs and replacing jobs.
424    #[await_tree::instrument("prepare_streaming_job_for_{}", if for_replace { "replace" } else { "create" }
425    )]
426    pub async fn prepare_streaming_job<'a, I: Iterator<Item = &'a crate::model::Fragment> + 'a>(
427        &self,
428        job_id: ObjectId,
429        get_fragments: impl Fn() -> I + 'a,
430        actor_status: &BTreeMap<crate::model::ActorId, PbActorStatus>,
431        actor_splits: &HashMap<crate::model::ActorId, Vec<SplitImpl>>,
432        downstreams: &FragmentDownstreamRelation,
433        for_replace: bool,
434        creating_streaming_job: Option<&'a StreamingJob>,
435    ) -> MetaResult<()> {
436        let fragment_actors = Self::extract_fragment_and_actors_from_fragments(
437            job_id,
438            get_fragments(),
439            actor_status,
440            actor_splits,
441        )?;
442        let inner = self.inner.write().await;
443
444        let need_notify = creating_streaming_job
445            .map(|job| job.should_notify_creating())
446            .unwrap_or(false);
447        let definition = creating_streaming_job.map(|job| job.definition());
448
449        let mut objects_to_notify = vec![];
450        let txn = inner.db.begin().await?;
451
452        // Ensure the job exists.
453        ensure_job_not_canceled(job_id, &txn).await?;
454
455        // Add fragments.
456        let (fragments, actors): (Vec<_>, Vec<_>) = fragment_actors.into_iter().unzip();
457        for fragment in fragments {
458            let fragment_id = fragment.fragment_id;
459            let state_table_ids = fragment.state_table_ids.inner_ref().clone();
460
461            let fragment = fragment.into_active_model();
462            Fragment::insert(fragment).exec(&txn).await?;
463
464            // Fields including `fragment_id` and `vnode_count` were placeholder values before.
465            // After table fragments are created, update them for all tables.
466            if !for_replace {
467                let all_tables = StreamJobFragments::collect_tables(get_fragments());
468                for state_table_id in state_table_ids {
469                    // Table's vnode count is not always the fragment's vnode count, so we have to
470                    // look up the table from `TableFragments`.
471                    // See `ActorGraphBuilder::new`.
472                    let table = all_tables
473                        .get(&(state_table_id as u32))
474                        .unwrap_or_else(|| panic!("table {} not found", state_table_id));
475                    assert_eq!(table.id, state_table_id as u32);
476                    assert_eq!(table.fragment_id, fragment_id as u32);
477                    let vnode_count = table.vnode_count();
478
479                    table::ActiveModel {
480                        table_id: Set(state_table_id as _),
481                        fragment_id: Set(Some(fragment_id)),
482                        vnode_count: Set(vnode_count as _),
483                        ..Default::default()
484                    }
485                    .update(&txn)
486                    .await?;
487
488                    if need_notify {
489                        let mut table = table.clone();
490                        // In production, definition was replaced but still needed for notification.
491                        if cfg!(not(debug_assertions)) && table.id == job_id as u32 {
492                            table.definition = definition.clone().unwrap();
493                        }
494                        objects_to_notify.push(PbObject {
495                            object_info: Some(PbObjectInfo::Table(table)),
496                        });
497                    }
498                }
499            }
500        }
501
502        // Add streaming job objects to notification
503        if need_notify {
504            match creating_streaming_job.unwrap() {
505                StreamingJob::Sink(sink) => {
506                    objects_to_notify.push(PbObject {
507                        object_info: Some(PbObjectInfo::Sink(sink.clone())),
508                    });
509                }
510                StreamingJob::Index(index, _) => {
511                    objects_to_notify.push(PbObject {
512                        object_info: Some(PbObjectInfo::Index(index.clone())),
513                    });
514                }
515                _ => {}
516            }
517        }
518
519        insert_fragment_relations(&txn, downstreams).await?;
520
521        // Add actors and actor dispatchers.
522        for actors in actors {
523            for actor in actors {
524                let actor = actor.into_active_model();
525                Actor::insert(actor).exec(&txn).await?;
526            }
527        }
528
529        if !for_replace {
530            // Update dml fragment id.
531            if let Some(StreamingJob::Table(_, table, _)) = creating_streaming_job {
532                Table::update(table::ActiveModel {
533                    table_id: Set(table.id as _),
534                    dml_fragment_id: Set(table.dml_fragment_id.map(|id| id as _)),
535                    ..Default::default()
536                })
537                .exec(&txn)
538                .await?;
539            }
540        }
541
542        txn.commit().await?;
543
544        // FIXME: there's a gap between the catalog creation and notification, which may lead to
545        // frontend receiving duplicate notifications if the frontend restarts right in this gap. We
546        // need to either forward the notification or filter catalogs without any fragments in the metastore.
547        if !objects_to_notify.is_empty() {
548            self.notify_frontend(
549                Operation::Add,
550                Info::ObjectGroup(PbObjectGroup {
551                    objects: objects_to_notify,
552                }),
553            )
554            .await;
555        }
556
557        Ok(())
558    }
559
560    /// Builds a cancel command for the streaming job. If the sink (with target table) needs to be dropped, additional
561    /// information is required to build barrier mutation.
562    pub async fn build_cancel_command(
563        &self,
564        table_fragments: &StreamJobFragments,
565    ) -> MetaResult<Command> {
566        let inner = self.inner.read().await;
567        let txn = inner.db.begin().await?;
568
569        let dropped_sink_fragment_with_target =
570            if let Some(sink_fragment) = table_fragments.sink_fragment() {
571                let sink_fragment_id = sink_fragment.fragment_id as FragmentId;
572                let sink_target_fragment = fetch_target_fragments(&txn, [sink_fragment_id]).await?;
573                sink_target_fragment
574                    .get(&sink_fragment_id)
575                    .map(|target_fragments| {
576                        let target_fragment_id = *target_fragments
577                            .first()
578                            .expect("sink should have at least one downstream fragment");
579                        (sink_fragment_id, target_fragment_id)
580                    })
581            } else {
582                None
583            };
584
585        Ok(Command::DropStreamingJobs {
586            streaming_job_ids: HashSet::from_iter([table_fragments.stream_job_id()]),
587            actors: table_fragments.actor_ids(),
588            unregistered_state_table_ids: table_fragments
589                .all_table_ids()
590                .map(catalog::TableId::new)
591                .collect(),
592            unregistered_fragment_ids: table_fragments.fragment_ids().collect(),
593            dropped_sink_fragment_by_targets: dropped_sink_fragment_with_target
594                .into_iter()
595                .map(|(sink, target)| (target as _, vec![sink as _]))
596                .collect(),
597        })
598    }
599
600    /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode.
601    /// It returns (true, _) if the job is not found or aborted.
602    /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists
603    #[await_tree::instrument]
604    pub async fn try_abort_creating_streaming_job(
605        &self,
606        job_id: ObjectId,
607        is_cancelled: bool,
608    ) -> MetaResult<(bool, Option<DatabaseId>)> {
609        let mut inner = self.inner.write().await;
610        let txn = inner.db.begin().await?;
611
612        let obj = Object::find_by_id(job_id).one(&txn).await?;
613        let Some(obj) = obj else {
614            tracing::warn!(
615                id = job_id,
616                "streaming job not found when aborting creating, might be cancelled already or cleaned by recovery"
617            );
618            return Ok((true, None));
619        };
620        let database_id = obj
621            .database_id
622            .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?;
623        let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?;
624
625        if !is_cancelled && let Some(streaming_job) = &streaming_job {
626            assert_ne!(streaming_job.job_status, JobStatus::Created);
627            if streaming_job.create_type == CreateType::Background
628                && streaming_job.job_status == JobStatus::Creating
629            {
630                // If the job is created in background and still in creating status, we should not abort it and let recovery handle it.
631                tracing::warn!(
632                    id = job_id,
633                    "streaming job is created in background and still in creating status"
634                );
635                return Ok((false, Some(database_id)));
636            }
637        }
638
639        let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;
640
641        // Get the notification info if the job is a materialized view or created in the background.
642        let mut objs = vec![];
643        let table_obj = Table::find_by_id(job_id).one(&txn).await?;
644
645        let mut need_notify =
646            streaming_job.is_some_and(|job| job.create_type == CreateType::Background);
647        if !need_notify {
648            // If the job is not created in the background, we only need to notify the frontend if the job is a materialized view.
649            if let Some(table) = &table_obj {
650                need_notify = table.table_type == TableType::MaterializedView;
651            }
652        }
653
654        let dropped_tables = Table::find()
655            .find_also_related(Object)
656            .filter(
657                table::Column::TableId.is_in(
658                    internal_table_ids
659                        .iter()
660                        .cloned()
661                        .chain(table_obj.iter().map(|t| t.table_id as _)),
662                ),
663            )
664            .all(&txn)
665            .await?
666            .into_iter()
667            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
668        inner
669            .dropped_tables
670            .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
671
672        if need_notify {
673            let obj: Option<PartialObject> = Object::find_by_id(job_id)
674                .select_only()
675                .columns([
676                    object::Column::Oid,
677                    object::Column::ObjType,
678                    object::Column::SchemaId,
679                    object::Column::DatabaseId,
680                ])
681                .into_partial_model()
682                .one(&txn)
683                .await?;
684            let obj =
685                obj.ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
686            objs.push(obj);
687            let internal_table_objs: Vec<PartialObject> = Object::find()
688                .select_only()
689                .columns([
690                    object::Column::Oid,
691                    object::Column::ObjType,
692                    object::Column::SchemaId,
693                    object::Column::DatabaseId,
694                ])
695                .join(JoinType::InnerJoin, object::Relation::Table.def())
696                .filter(table::Column::BelongsToJobId.eq(job_id))
697                .into_partial_model()
698                .all(&txn)
699                .await?;
700            objs.extend(internal_table_objs);
701        }
702
703        // Check if the job is creating sink into table.
704        if table_obj.is_none()
705            && let Some(Some(target_table_id)) = Sink::find_by_id(job_id)
706                .select_only()
707                .column(sink::Column::TargetTable)
708                .into_tuple::<Option<TableId>>()
709                .one(&txn)
710                .await?
711        {
712            let tmp_id: Option<ObjectId> = ObjectDependency::find()
713                .select_only()
714                .column(object_dependency::Column::UsedBy)
715                .join(
716                    JoinType::InnerJoin,
717                    object_dependency::Relation::Object1.def(),
718                )
719                .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
720                .filter(
721                    object_dependency::Column::Oid
722                        .eq(target_table_id)
723                        .and(object::Column::ObjType.eq(ObjectType::Table))
724                        .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
725                )
726                .into_tuple()
727                .one(&txn)
728                .await?;
729            if let Some(tmp_id) = tmp_id {
730                tracing::warn!(
731                    id = tmp_id,
732                    "aborting temp streaming job for sink into table"
733                );
734                Object::delete_by_id(tmp_id).exec(&txn).await?;
735            }
736        }
737
738        Object::delete_by_id(job_id).exec(&txn).await?;
739        if !internal_table_ids.is_empty() {
740            Object::delete_many()
741                .filter(object::Column::Oid.is_in(internal_table_ids))
742                .exec(&txn)
743                .await?;
744        }
745        if let Some(t) = &table_obj
746            && let Some(source_id) = t.optional_associated_source_id
747        {
748            Object::delete_by_id(source_id).exec(&txn).await?;
749        }
750
751        let err = if is_cancelled {
752            MetaError::cancelled(format!("streaming job {job_id} is cancelled"))
753        } else {
754            MetaError::catalog_id_not_found("stream job", format!("streaming job {job_id} failed"))
755        };
756        let abort_reason = format!("streaming job aborted {}", err.as_report());
757        for tx in inner
758            .creating_table_finish_notifier
759            .get_mut(&database_id)
760            .map(|creating_tables| creating_tables.remove(&job_id).into_iter())
761            .into_iter()
762            .flatten()
763            .flatten()
764        {
765            let _ = tx.send(Err(abort_reason.clone()));
766        }
767        txn.commit().await?;
768
769        if !objs.is_empty() {
770            // We also have notified the frontend about these objects,
771            // so we need to notify the frontend to delete them here.
772            self.notify_frontend(Operation::Delete, build_object_group_for_delete(objs))
773                .await;
774        }
775        Ok((true, Some(database_id)))
776    }
777
778    #[await_tree::instrument]
779    pub async fn post_collect_job_fragments(
780        &self,
781        job_id: ObjectId,
782        actor_ids: Vec<crate::model::ActorId>,
783        upstream_fragment_new_downstreams: &FragmentDownstreamRelation,
784        split_assignment: &SplitAssignment,
785        new_sink_downstream: Option<FragmentDownstreamRelation>,
786    ) -> MetaResult<()> {
787        let inner = self.inner.write().await;
788        let txn = inner.db.begin().await?;
789
790        let actor_ids = actor_ids.into_iter().map(|id| id as ActorId).collect_vec();
791
792        Actor::update_many()
793            .col_expr(
794                actor::Column::Status,
795                SimpleExpr::from(ActorStatus::Running.into_value()),
796            )
797            .filter(actor::Column::ActorId.is_in(actor_ids))
798            .exec(&txn)
799            .await?;
800
801        for splits in split_assignment.values() {
802            for (actor_id, splits) in splits {
803                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
804                let connector_splits = &PbConnectorSplits { splits };
805                actor::ActiveModel {
806                    actor_id: Set(*actor_id as _),
807                    splits: Set(Some(connector_splits.into())),
808                    ..Default::default()
809                }
810                .update(&txn)
811                .await?;
812            }
813        }
814
815        insert_fragment_relations(&txn, upstream_fragment_new_downstreams).await?;
816
817        if let Some(new_downstream) = new_sink_downstream {
818            insert_fragment_relations(&txn, &new_downstream).await?;
819        }
820
821        // Mark job as CREATING.
822        streaming_job::ActiveModel {
823            job_id: Set(job_id),
824            job_status: Set(JobStatus::Creating),
825            ..Default::default()
826        }
827        .update(&txn)
828        .await?;
829
830        txn.commit().await?;
831
832        Ok(())
833    }
834
835    pub async fn create_job_catalog_for_replace(
836        &self,
837        streaming_job: &StreamingJob,
838        ctx: Option<&StreamContext>,
839        specified_parallelism: Option<&NonZeroUsize>,
840        expected_original_max_parallelism: Option<usize>,
841    ) -> MetaResult<ObjectId> {
842        let id = streaming_job.id();
843        let inner = self.inner.write().await;
844        let txn = inner.db.begin().await?;
845
846        // 1. check version.
847        streaming_job.verify_version_for_replace(&txn).await?;
848        // 2. check concurrent replace.
849        let referring_cnt = ObjectDependency::find()
850            .join(
851                JoinType::InnerJoin,
852                object_dependency::Relation::Object1.def(),
853            )
854            .join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
855            .filter(
856                object_dependency::Column::Oid
857                    .eq(id as ObjectId)
858                    .and(object::Column::ObjType.eq(ObjectType::Table))
859                    .and(streaming_job::Column::JobStatus.ne(JobStatus::Created)),
860            )
861            .count(&txn)
862            .await?;
863        if referring_cnt != 0 {
864            return Err(MetaError::permission_denied(
865                "job is being altered or referenced by some creating jobs",
866            ));
867        }
868
869        // 3. check parallelism.
870        let (original_max_parallelism, original_timezone): (i32, Option<String>) =
871            StreamingJobModel::find_by_id(id as ObjectId)
872                .select_only()
873                .column(streaming_job::Column::MaxParallelism)
874                .column(streaming_job::Column::Timezone)
875                .into_tuple()
876                .one(&txn)
877                .await?
878                .ok_or_else(|| MetaError::catalog_id_not_found(streaming_job.job_type_str(), id))?;
879
880        if let Some(max_parallelism) = expected_original_max_parallelism
881            && original_max_parallelism != max_parallelism as i32
882        {
883            // We already override the max parallelism in `StreamFragmentGraph` before entering this function.
884            // This should not happen in normal cases.
885            bail!(
886                "cannot use a different max parallelism \
887                 when replacing streaming job, \
888                 original: {}, new: {}",
889                original_max_parallelism,
890                max_parallelism
891            );
892        }
893
894        let parallelism = match specified_parallelism {
895            None => StreamingParallelism::Adaptive,
896            Some(n) => StreamingParallelism::Fixed(n.get() as _),
897        };
898        let timezone = ctx
899            .map(|ctx| ctx.timezone.clone())
900            .unwrap_or(original_timezone);
901
902        // 4. create streaming object for new replace table.
903        let new_obj_id = Self::create_streaming_job_obj(
904            &txn,
905            streaming_job.object_type(),
906            streaming_job.owner() as _,
907            Some(streaming_job.database_id() as _),
908            Some(streaming_job.schema_id() as _),
909            streaming_job.create_type(),
910            timezone,
911            parallelism,
912            original_max_parallelism as _,
913            None,
914        )
915        .await?;
916
917        // 5. record dependency for new replace table.
918        ObjectDependency::insert(object_dependency::ActiveModel {
919            oid: Set(id as _),
920            used_by: Set(new_obj_id as _),
921            ..Default::default()
922        })
923        .exec(&txn)
924        .await?;
925
926        txn.commit().await?;
927
928        Ok(new_obj_id)
929    }
930
931    /// `finish_streaming_job` marks job related objects as `Created` and notify frontend.
932    pub async fn finish_streaming_job(&self, job_id: ObjectId) -> MetaResult<()> {
933        let mut inner = self.inner.write().await;
934        let txn = inner.db.begin().await?;
935
936        let job_type = Object::find_by_id(job_id)
937            .select_only()
938            .column(object::Column::ObjType)
939            .into_tuple()
940            .one(&txn)
941            .await?
942            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
943
944        let create_type: CreateType = StreamingJobModel::find_by_id(job_id)
945            .select_only()
946            .column(streaming_job::Column::CreateType)
947            .into_tuple()
948            .one(&txn)
949            .await?
950            .ok_or_else(|| MetaError::catalog_id_not_found("streaming job", job_id))?;
951
952        // update `created_at` as now() and `created_at_cluster_version` as current cluster version.
953        let res = Object::update_many()
954            .col_expr(object::Column::CreatedAt, Expr::current_timestamp().into())
955            .col_expr(
956                object::Column::CreatedAtClusterVersion,
957                current_cluster_version().into(),
958            )
959            .filter(object::Column::Oid.eq(job_id))
960            .exec(&txn)
961            .await?;
962        if res.rows_affected == 0 {
963            return Err(MetaError::catalog_id_not_found("streaming job", job_id));
964        }
965
966        // mark the target stream job as `Created`.
967        let job = streaming_job::ActiveModel {
968            job_id: Set(job_id),
969            job_status: Set(JobStatus::Created),
970            ..Default::default()
971        };
972        job.update(&txn).await?;
973
974        // notify frontend: job, internal tables.
975        let internal_table_objs = Table::find()
976            .find_also_related(Object)
977            .filter(table::Column::BelongsToJobId.eq(job_id))
978            .all(&txn)
979            .await?;
980        let mut objects = internal_table_objs
981            .iter()
982            .map(|(table, obj)| PbObject {
983                object_info: Some(PbObjectInfo::Table(
984                    ObjectModel(table.clone(), obj.clone().unwrap()).into(),
985                )),
986            })
987            .collect_vec();
988        let mut notification_op = if create_type == CreateType::Background {
989            NotificationOperation::Update
990        } else {
991            NotificationOperation::Add
992        };
993        let mut updated_user_info = vec![];
994
995        match job_type {
996            ObjectType::Table => {
997                let (table, obj) = Table::find_by_id(job_id)
998                    .find_also_related(Object)
999                    .one(&txn)
1000                    .await?
1001                    .ok_or_else(|| MetaError::catalog_id_not_found("table", job_id))?;
1002                if table.table_type == TableType::MaterializedView {
1003                    notification_op = NotificationOperation::Update;
1004                }
1005
1006                if let Some(source_id) = table.optional_associated_source_id {
1007                    let (src, obj) = Source::find_by_id(source_id)
1008                        .find_also_related(Object)
1009                        .one(&txn)
1010                        .await?
1011                        .ok_or_else(|| MetaError::catalog_id_not_found("source", source_id))?;
1012                    objects.push(PbObject {
1013                        object_info: Some(PbObjectInfo::Source(
1014                            ObjectModel(src, obj.unwrap()).into(),
1015                        )),
1016                    });
1017                }
1018                objects.push(PbObject {
1019                    object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
1020                });
1021            }
1022            ObjectType::Sink => {
1023                let (sink, obj) = Sink::find_by_id(job_id)
1024                    .find_also_related(Object)
1025                    .one(&txn)
1026                    .await?
1027                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", job_id))?;
1028                objects.push(PbObject {
1029                    object_info: Some(PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into())),
1030                });
1031            }
1032            ObjectType::Index => {
1033                let (index, obj) = Index::find_by_id(job_id)
1034                    .find_also_related(Object)
1035                    .one(&txn)
1036                    .await?
1037                    .ok_or_else(|| MetaError::catalog_id_not_found("index", job_id))?;
1038                {
1039                    let (table, obj) = Table::find_by_id(index.index_table_id)
1040                        .find_also_related(Object)
1041                        .one(&txn)
1042                        .await?
1043                        .ok_or_else(|| {
1044                            MetaError::catalog_id_not_found("table", index.index_table_id)
1045                        })?;
1046                    objects.push(PbObject {
1047                        object_info: Some(PbObjectInfo::Table(
1048                            ObjectModel(table, obj.unwrap()).into(),
1049                        )),
1050                    });
1051                }
1052
1053                // If the index is created on a table with privileges, we should also
1054                // grant the privileges for the index and its state tables.
1055                let primary_table_privileges = UserPrivilege::find()
1056                    .filter(
1057                        user_privilege::Column::Oid
1058                            .eq(index.primary_table_id)
1059                            .and(user_privilege::Column::Action.eq(Action::Select)),
1060                    )
1061                    .all(&txn)
1062                    .await?;
1063                if !primary_table_privileges.is_empty() {
1064                    let index_state_table_ids: Vec<TableId> = Table::find()
1065                        .select_only()
1066                        .column(table::Column::TableId)
1067                        .filter(
1068                            table::Column::BelongsToJobId
1069                                .eq(job_id)
1070                                .or(table::Column::TableId.eq(index.index_table_id)),
1071                        )
1072                        .into_tuple()
1073                        .all(&txn)
1074                        .await?;
1075                    let mut new_privileges = vec![];
1076                    for privilege in &primary_table_privileges {
1077                        for state_table_id in &index_state_table_ids {
1078                            new_privileges.push(user_privilege::ActiveModel {
1079                                id: Default::default(),
1080                                oid: Set(*state_table_id),
1081                                user_id: Set(privilege.user_id),
1082                                action: Set(Action::Select),
1083                                dependent_id: Set(privilege.dependent_id),
1084                                granted_by: Set(privilege.granted_by),
1085                                with_grant_option: Set(privilege.with_grant_option),
1086                            });
1087                        }
1088                    }
1089                    UserPrivilege::insert_many(new_privileges)
1090                        .exec(&txn)
1091                        .await?;
1092
1093                    updated_user_info = list_user_info_by_ids(
1094                        primary_table_privileges.into_iter().map(|p| p.user_id),
1095                        &txn,
1096                    )
1097                    .await?;
1098                }
1099
1100                objects.push(PbObject {
1101                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, obj.unwrap()).into())),
1102                });
1103            }
1104            ObjectType::Source => {
1105                let (source, obj) = Source::find_by_id(job_id)
1106                    .find_also_related(Object)
1107                    .one(&txn)
1108                    .await?
1109                    .ok_or_else(|| MetaError::catalog_id_not_found("source", job_id))?;
1110                objects.push(PbObject {
1111                    object_info: Some(PbObjectInfo::Source(
1112                        ObjectModel(source, obj.unwrap()).into(),
1113                    )),
1114                });
1115            }
1116            _ => unreachable!("invalid job type: {:?}", job_type),
1117        }
1118
1119        if job_type != ObjectType::Index {
1120            updated_user_info = grant_default_privileges_automatically(&txn, job_id).await?;
1121        }
1122        txn.commit().await?;
1123
1124        let mut version = self
1125            .notify_frontend(
1126                notification_op,
1127                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1128            )
1129            .await;
1130
1131        // notify users about the default privileges
1132        if !updated_user_info.is_empty() {
1133            version = self.notify_users_update(updated_user_info).await;
1134        }
1135
1136        inner
1137            .creating_table_finish_notifier
1138            .values_mut()
1139            .for_each(|creating_tables| {
1140                if let Some(txs) = creating_tables.remove(&job_id) {
1141                    for tx in txs {
1142                        let _ = tx.send(Ok(version));
1143                    }
1144                }
1145            });
1146
1147        Ok(())
1148    }
1149
1150    pub async fn finish_replace_streaming_job(
1151        &self,
1152        tmp_id: ObjectId,
1153        streaming_job: StreamingJob,
1154        replace_upstream: FragmentReplaceUpstream,
1155        sink_into_table_context: SinkIntoTableContext,
1156        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1157        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1158    ) -> MetaResult<NotificationVersion> {
1159        let inner = self.inner.write().await;
1160        let txn = inner.db.begin().await?;
1161
1162        let (objects, delete_notification_objs) = Self::finish_replace_streaming_job_inner(
1163            tmp_id,
1164            replace_upstream,
1165            sink_into_table_context,
1166            &txn,
1167            streaming_job,
1168            drop_table_connector_ctx,
1169            auto_refresh_schema_sinks,
1170        )
1171        .await?;
1172
1173        txn.commit().await?;
1174
1175        let mut version = self
1176            .notify_frontend(
1177                NotificationOperation::Update,
1178                NotificationInfo::ObjectGroup(PbObjectGroup { objects }),
1179            )
1180            .await;
1181
1182        if let Some((user_infos, to_drop_objects)) = delete_notification_objs {
1183            self.notify_users_update(user_infos).await;
1184            version = self
1185                .notify_frontend(
1186                    NotificationOperation::Delete,
1187                    build_object_group_for_delete(to_drop_objects),
1188                )
1189                .await;
1190        }
1191
1192        Ok(version)
1193    }
1194
1195    pub async fn finish_replace_streaming_job_inner(
1196        tmp_id: ObjectId,
1197        replace_upstream: FragmentReplaceUpstream,
1198        SinkIntoTableContext {
1199            updated_sink_catalogs,
1200        }: SinkIntoTableContext,
1201        txn: &DatabaseTransaction,
1202        streaming_job: StreamingJob,
1203        drop_table_connector_ctx: Option<&DropTableConnectorContext>,
1204        auto_refresh_schema_sinks: Option<Vec<FinishAutoRefreshSchemaSinkContext>>,
1205    ) -> MetaResult<(Vec<PbObject>, Option<(Vec<PbUserInfo>, Vec<PartialObject>)>)> {
1206        let original_job_id = streaming_job.id() as ObjectId;
1207        let job_type = streaming_job.job_type();
1208
1209        let mut index_item_rewriter = None;
1210
1211        // Update catalog
1212        match streaming_job {
1213            StreamingJob::Table(_source, table, _table_job_type) => {
1214                // The source catalog should remain unchanged
1215
1216                let original_column_catalogs = get_table_columns(txn, original_job_id).await?;
1217
1218                index_item_rewriter = Some({
1219                    let original_columns = original_column_catalogs
1220                        .to_protobuf()
1221                        .into_iter()
1222                        .map(|c| c.column_desc.unwrap())
1223                        .collect_vec();
1224                    let new_columns = table
1225                        .columns
1226                        .iter()
1227                        .map(|c| c.column_desc.clone().unwrap())
1228                        .collect_vec();
1229
1230                    IndexItemRewriter {
1231                        original_columns,
1232                        new_columns,
1233                    }
1234                });
1235
1236                // For sinks created in earlier versions, we need to set the original_target_columns.
1237                for sink_id in updated_sink_catalogs {
1238                    sink::ActiveModel {
1239                        sink_id: Set(sink_id as _),
1240                        original_target_columns: Set(Some(original_column_catalogs.clone())),
1241                        ..Default::default()
1242                    }
1243                    .update(txn)
1244                    .await?;
1245                }
1246                // Update the table catalog with the new one. (column catalog is also updated here)
1247                let mut table = table::ActiveModel::from(table);
1248                if let Some(drop_table_connector_ctx) = drop_table_connector_ctx
1249                    && drop_table_connector_ctx.to_change_streaming_job_id == original_job_id
1250                {
1251                    // drop table connector, the rest logic is in `drop_table_associated_source`
1252                    table.optional_associated_source_id = Set(None);
1253                }
1254
1255                table.update(txn).await?;
1256            }
1257            StreamingJob::Source(source) => {
1258                // Update the source catalog with the new one.
1259                let source = source::ActiveModel::from(source);
1260                source.update(txn).await?;
1261            }
1262            StreamingJob::MaterializedView(table) => {
1263                // Update the table catalog with the new one.
1264                let table = table::ActiveModel::from(table);
1265                table.update(txn).await?;
1266            }
1267            _ => unreachable!(
1268                "invalid streaming job type: {:?}",
1269                streaming_job.job_type_str()
1270            ),
1271        }
1272
1273        async fn finish_fragments(
1274            txn: &DatabaseTransaction,
1275            tmp_id: ObjectId,
1276            original_job_id: ObjectId,
1277            replace_upstream: FragmentReplaceUpstream,
1278        ) -> MetaResult<()> {
1279            // 0. update internal tables
1280            // Fields including `fragment_id` were placeholder values before.
1281            // After table fragments are created, update them for all internal tables.
1282            let fragment_info: Vec<(FragmentId, I32Array)> = Fragment::find()
1283                .select_only()
1284                .columns([
1285                    fragment::Column::FragmentId,
1286                    fragment::Column::StateTableIds,
1287                ])
1288                .filter(fragment::Column::JobId.eq(tmp_id))
1289                .into_tuple()
1290                .all(txn)
1291                .await?;
1292            for (fragment_id, state_table_ids) in fragment_info {
1293                for state_table_id in state_table_ids.into_inner() {
1294                    table::ActiveModel {
1295                        table_id: Set(state_table_id as _),
1296                        fragment_id: Set(Some(fragment_id)),
1297                        // No need to update `vnode_count` because it must remain the same.
1298                        ..Default::default()
1299                    }
1300                    .update(txn)
1301                    .await?;
1302                }
1303            }
1304
1305            // 1. replace old fragments/actors with new ones.
1306            Fragment::delete_many()
1307                .filter(fragment::Column::JobId.eq(original_job_id))
1308                .exec(txn)
1309                .await?;
1310            Fragment::update_many()
1311                .col_expr(fragment::Column::JobId, SimpleExpr::from(original_job_id))
1312                .filter(fragment::Column::JobId.eq(tmp_id))
1313                .exec(txn)
1314                .await?;
1315
1316            // 2. update merges.
1317            // update downstream fragment's Merge node, and upstream_fragment_id
1318            for (fragment_id, fragment_replace_map) in replace_upstream {
1319                let (fragment_id, mut stream_node) =
1320                    Fragment::find_by_id(fragment_id as FragmentId)
1321                        .select_only()
1322                        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
1323                        .into_tuple::<(FragmentId, StreamNode)>()
1324                        .one(txn)
1325                        .await?
1326                        .map(|(id, node)| (id, node.to_protobuf()))
1327                        .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1328
1329                visit_stream_node_mut(&mut stream_node, |body| {
1330                    if let PbNodeBody::Merge(m) = body
1331                        && let Some(new_fragment_id) =
1332                            fragment_replace_map.get(&m.upstream_fragment_id)
1333                    {
1334                        m.upstream_fragment_id = *new_fragment_id;
1335                    }
1336                });
1337                fragment::ActiveModel {
1338                    fragment_id: Set(fragment_id),
1339                    stream_node: Set(StreamNode::from(&stream_node)),
1340                    ..Default::default()
1341                }
1342                .update(txn)
1343                .await?;
1344            }
1345
1346            // 3. remove dummy object.
1347            Object::delete_by_id(tmp_id).exec(txn).await?;
1348
1349            Ok(())
1350        }
1351
1352        finish_fragments(txn, tmp_id, original_job_id, replace_upstream).await?;
1353
1354        // 4. update catalogs and notify.
1355        let mut objects = vec![];
1356        match job_type {
1357            StreamingJobType::Table(_) | StreamingJobType::MaterializedView => {
1358                let (table, table_obj) = Table::find_by_id(original_job_id)
1359                    .find_also_related(Object)
1360                    .one(txn)
1361                    .await?
1362                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1363                objects.push(PbObject {
1364                    object_info: Some(PbObjectInfo::Table(
1365                        ObjectModel(table, table_obj.unwrap()).into(),
1366                    )),
1367                })
1368            }
1369            StreamingJobType::Source => {
1370                let (source, source_obj) = Source::find_by_id(original_job_id)
1371                    .find_also_related(Object)
1372                    .one(txn)
1373                    .await?
1374                    .ok_or_else(|| MetaError::catalog_id_not_found("object", original_job_id))?;
1375                objects.push(PbObject {
1376                    object_info: Some(PbObjectInfo::Source(
1377                        ObjectModel(source, source_obj.unwrap()).into(),
1378                    )),
1379                })
1380            }
1381            _ => unreachable!("invalid streaming job type for replace: {:?}", job_type),
1382        }
1383
1384        if let Some(expr_rewriter) = index_item_rewriter {
1385            let index_items: Vec<(IndexId, ExprNodeArray)> = Index::find()
1386                .select_only()
1387                .columns([index::Column::IndexId, index::Column::IndexItems])
1388                .filter(index::Column::PrimaryTableId.eq(original_job_id))
1389                .into_tuple()
1390                .all(txn)
1391                .await?;
1392            for (index_id, nodes) in index_items {
1393                let mut pb_nodes = nodes.to_protobuf();
1394                pb_nodes
1395                    .iter_mut()
1396                    .for_each(|x| expr_rewriter.rewrite_expr(x));
1397                let index = index::ActiveModel {
1398                    index_id: Set(index_id),
1399                    index_items: Set(pb_nodes.into()),
1400                    ..Default::default()
1401                }
1402                .update(txn)
1403                .await?;
1404                let index_obj = index
1405                    .find_related(Object)
1406                    .one(txn)
1407                    .await?
1408                    .ok_or_else(|| MetaError::catalog_id_not_found("object", index.index_id))?;
1409                objects.push(PbObject {
1410                    object_info: Some(PbObjectInfo::Index(ObjectModel(index, index_obj).into())),
1411                });
1412            }
1413        }
1414
1415        if let Some(sinks) = auto_refresh_schema_sinks {
1416            for finish_sink_context in sinks {
1417                finish_fragments(
1418                    txn,
1419                    finish_sink_context.tmp_sink_id,
1420                    finish_sink_context.original_sink_id,
1421                    Default::default(),
1422                )
1423                .await?;
1424                let (mut sink, sink_obj) = Sink::find_by_id(finish_sink_context.original_sink_id)
1425                    .find_also_related(Object)
1426                    .one(txn)
1427                    .await?
1428                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", original_job_id))?;
1429                let columns = ColumnCatalogArray::from(finish_sink_context.columns);
1430                Sink::update(sink::ActiveModel {
1431                    sink_id: Set(finish_sink_context.original_sink_id),
1432                    columns: Set(columns.clone()),
1433                    ..Default::default()
1434                })
1435                .exec(txn)
1436                .await?;
1437                sink.columns = columns;
1438                objects.push(PbObject {
1439                    object_info: Some(PbObjectInfo::Sink(
1440                        ObjectModel(sink, sink_obj.unwrap()).into(),
1441                    )),
1442                });
1443                if let Some((log_store_table_id, new_log_store_table_columns)) =
1444                    finish_sink_context.new_log_store_table
1445                {
1446                    let new_log_store_table_columns: ColumnCatalogArray =
1447                        new_log_store_table_columns.into();
1448                    let (mut table, table_obj) = Table::find_by_id(log_store_table_id)
1449                        .find_also_related(Object)
1450                        .one(txn)
1451                        .await?
1452                        .ok_or_else(|| MetaError::catalog_id_not_found("table", original_job_id))?;
1453                    Table::update(table::ActiveModel {
1454                        table_id: Set(log_store_table_id),
1455                        columns: Set(new_log_store_table_columns.clone()),
1456                        ..Default::default()
1457                    })
1458                    .exec(txn)
1459                    .await?;
1460                    table.columns = new_log_store_table_columns;
1461                    objects.push(PbObject {
1462                        object_info: Some(PbObjectInfo::Table(
1463                            ObjectModel(table, table_obj.unwrap()).into(),
1464                        )),
1465                    });
1466                }
1467            }
1468        }
1469
1470        let mut notification_objs: Option<(Vec<PbUserInfo>, Vec<PartialObject>)> = None;
1471        if let Some(drop_table_connector_ctx) = drop_table_connector_ctx {
1472            notification_objs =
1473                Some(Self::drop_table_associated_source(txn, drop_table_connector_ctx).await?);
1474        }
1475
1476        Ok((objects, notification_objs))
1477    }
1478
1479    /// Abort the replacing streaming job by deleting the temporary job object.
1480    pub async fn try_abort_replacing_streaming_job(
1481        &self,
1482        tmp_job_id: ObjectId,
1483        tmp_sink_ids: Option<Vec<ObjectId>>,
1484    ) -> MetaResult<()> {
1485        let inner = self.inner.write().await;
1486        let txn = inner.db.begin().await?;
1487        Object::delete_by_id(tmp_job_id).exec(&txn).await?;
1488        if let Some(tmp_sink_ids) = tmp_sink_ids {
1489            for tmp_sink_id in tmp_sink_ids {
1490                Object::delete_by_id(tmp_sink_id).exec(&txn).await?;
1491            }
1492        }
1493        txn.commit().await?;
1494        Ok(())
1495    }
1496
1497    // edit the `rate_limit` of the `Source` node in given `source_id`'s fragments
1498    // return the actor_ids to be applied
1499    pub async fn update_source_rate_limit_by_source_id(
1500        &self,
1501        source_id: SourceId,
1502        rate_limit: Option<u32>,
1503    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1504        let inner = self.inner.read().await;
1505        let txn = inner.db.begin().await?;
1506
1507        {
1508            let active_source = source::ActiveModel {
1509                source_id: Set(source_id),
1510                rate_limit: Set(rate_limit.map(|v| v as i32)),
1511                ..Default::default()
1512            };
1513            active_source.update(&txn).await?;
1514        }
1515
1516        let (source, obj) = Source::find_by_id(source_id)
1517            .find_also_related(Object)
1518            .one(&txn)
1519            .await?
1520            .ok_or_else(|| {
1521                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1522            })?;
1523
1524        let is_fs_source = source.with_properties.inner_ref().is_new_fs_connector();
1525        let streaming_job_ids: Vec<ObjectId> =
1526            if let Some(table_id) = source.optional_associated_table_id {
1527                vec![table_id]
1528            } else if let Some(source_info) = &source.source_info
1529                && source_info.to_protobuf().is_shared()
1530            {
1531                vec![source_id]
1532            } else {
1533                ObjectDependency::find()
1534                    .select_only()
1535                    .column(object_dependency::Column::UsedBy)
1536                    .filter(object_dependency::Column::Oid.eq(source_id))
1537                    .into_tuple()
1538                    .all(&txn)
1539                    .await?
1540            };
1541
1542        if streaming_job_ids.is_empty() {
1543            return Err(MetaError::invalid_parameter(format!(
1544                "source id {source_id} not used by any streaming job"
1545            )));
1546        }
1547
1548        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1549            .select_only()
1550            .columns([
1551                fragment::Column::FragmentId,
1552                fragment::Column::FragmentTypeMask,
1553                fragment::Column::StreamNode,
1554            ])
1555            .filter(fragment::Column::JobId.is_in(streaming_job_ids))
1556            .into_tuple()
1557            .all(&txn)
1558            .await?;
1559        let mut fragments = fragments
1560            .into_iter()
1561            .map(|(id, mask, stream_node)| {
1562                (
1563                    id,
1564                    FragmentTypeMask::from(mask as u32),
1565                    stream_node.to_protobuf(),
1566                )
1567            })
1568            .collect_vec();
1569
1570        fragments.retain_mut(|(_, fragment_type_mask, stream_node)| {
1571            let mut found = false;
1572            if fragment_type_mask.contains(FragmentTypeFlag::Source) {
1573                visit_stream_node_mut(stream_node, |node| {
1574                    if let PbNodeBody::Source(node) = node
1575                        && let Some(node_inner) = &mut node.source_inner
1576                        && node_inner.source_id == source_id as u32
1577                    {
1578                        node_inner.rate_limit = rate_limit;
1579                        found = true;
1580                    }
1581                });
1582            }
1583            if is_fs_source {
1584                // in older versions, there's no fragment type flag for `FsFetch` node,
1585                // so we just scan all fragments for StreamFsFetch node if using fs connector
1586                visit_stream_node_mut(stream_node, |node| {
1587                    if let PbNodeBody::StreamFsFetch(node) = node {
1588                        fragment_type_mask.add(FragmentTypeFlag::FsFetch);
1589                        if let Some(node_inner) = &mut node.node_inner
1590                            && node_inner.source_id == source_id as u32
1591                        {
1592                            node_inner.rate_limit = rate_limit;
1593                            found = true;
1594                        }
1595                    }
1596                });
1597            }
1598            found
1599        });
1600
1601        assert!(
1602            !fragments.is_empty(),
1603            "source id should be used by at least one fragment"
1604        );
1605        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1606        for (id, fragment_type_mask, stream_node) in fragments {
1607            fragment::ActiveModel {
1608                fragment_id: Set(id),
1609                fragment_type_mask: Set(fragment_type_mask.into()),
1610                stream_node: Set(StreamNode::from(&stream_node)),
1611                ..Default::default()
1612            }
1613            .update(&txn)
1614            .await?;
1615        }
1616        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1617
1618        txn.commit().await?;
1619
1620        let relation_info = PbObjectInfo::Source(ObjectModel(source, obj.unwrap()).into());
1621        let _version = self
1622            .notify_frontend(
1623                NotificationOperation::Update,
1624                NotificationInfo::ObjectGroup(PbObjectGroup {
1625                    objects: vec![PbObject {
1626                        object_info: Some(relation_info),
1627                    }],
1628                }),
1629            )
1630            .await;
1631
1632        Ok(fragment_actors)
1633    }
1634
1635    // edit the content of fragments in given `table_id`
1636    // return the actor_ids to be applied
1637    pub async fn mutate_fragments_by_job_id(
1638        &self,
1639        job_id: ObjectId,
1640        // returns true if the mutation is applied
1641        mut fragments_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> MetaResult<bool>,
1642        // error message when no relevant fragments is found
1643        err_msg: &'static str,
1644    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1645        let inner = self.inner.read().await;
1646        let txn = inner.db.begin().await?;
1647
1648        let fragments: Vec<(FragmentId, i32, StreamNode)> = Fragment::find()
1649            .select_only()
1650            .columns([
1651                fragment::Column::FragmentId,
1652                fragment::Column::FragmentTypeMask,
1653                fragment::Column::StreamNode,
1654            ])
1655            .filter(fragment::Column::JobId.eq(job_id))
1656            .into_tuple()
1657            .all(&txn)
1658            .await?;
1659        let mut fragments = fragments
1660            .into_iter()
1661            .map(|(id, mask, stream_node)| {
1662                (id, FragmentTypeMask::from(mask), stream_node.to_protobuf())
1663            })
1664            .collect_vec();
1665
1666        let fragments = fragments
1667            .iter_mut()
1668            .map(|(_, fragment_type_mask, stream_node)| {
1669                fragments_mutation_fn(*fragment_type_mask, stream_node)
1670            })
1671            .collect::<MetaResult<Vec<bool>>>()?
1672            .into_iter()
1673            .zip_eq_debug(std::mem::take(&mut fragments))
1674            .filter_map(|(keep, fragment)| if keep { Some(fragment) } else { None })
1675            .collect::<Vec<_>>();
1676
1677        if fragments.is_empty() {
1678            return Err(MetaError::invalid_parameter(format!(
1679                "job id {job_id}: {}",
1680                err_msg
1681            )));
1682        }
1683
1684        let fragment_ids = fragments.iter().map(|(id, _, _)| *id).collect_vec();
1685        for (id, _, stream_node) in fragments {
1686            fragment::ActiveModel {
1687                fragment_id: Set(id),
1688                stream_node: Set(StreamNode::from(&stream_node)),
1689                ..Default::default()
1690            }
1691            .update(&txn)
1692            .await?;
1693        }
1694        let fragment_actors = get_fragment_actor_ids(&txn, fragment_ids).await?;
1695
1696        txn.commit().await?;
1697
1698        Ok(fragment_actors)
1699    }
1700
1701    async fn mutate_fragment_by_fragment_id(
1702        &self,
1703        fragment_id: FragmentId,
1704        mut fragment_mutation_fn: impl FnMut(FragmentTypeMask, &mut PbStreamNode) -> bool,
1705        err_msg: &'static str,
1706    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1707        let inner = self.inner.read().await;
1708        let txn = inner.db.begin().await?;
1709
1710        let (fragment_type_mask, stream_node): (i32, StreamNode) =
1711            Fragment::find_by_id(fragment_id)
1712                .select_only()
1713                .columns([
1714                    fragment::Column::FragmentTypeMask,
1715                    fragment::Column::StreamNode,
1716                ])
1717                .into_tuple()
1718                .one(&txn)
1719                .await?
1720                .ok_or_else(|| MetaError::catalog_id_not_found("fragment", fragment_id))?;
1721        let mut pb_stream_node = stream_node.to_protobuf();
1722        let fragment_type_mask = FragmentTypeMask::from(fragment_type_mask);
1723
1724        if !fragment_mutation_fn(fragment_type_mask, &mut pb_stream_node) {
1725            return Err(MetaError::invalid_parameter(format!(
1726                "fragment id {fragment_id}: {}",
1727                err_msg
1728            )));
1729        }
1730
1731        fragment::ActiveModel {
1732            fragment_id: Set(fragment_id),
1733            stream_node: Set(stream_node),
1734            ..Default::default()
1735        }
1736        .update(&txn)
1737        .await?;
1738
1739        let fragment_actors = get_fragment_actor_ids(&txn, vec![fragment_id]).await?;
1740
1741        txn.commit().await?;
1742
1743        Ok(fragment_actors)
1744    }
1745
1746    // edit the `rate_limit` of the `Chain` node in given `table_id`'s fragments
1747    // return the actor_ids to be applied
1748    pub async fn update_backfill_rate_limit_by_job_id(
1749        &self,
1750        job_id: ObjectId,
1751        rate_limit: Option<u32>,
1752    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1753        let update_backfill_rate_limit =
1754            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1755                let mut found = false;
1756                if fragment_type_mask
1757                    .contains_any(FragmentTypeFlag::backfill_rate_limit_fragments())
1758                {
1759                    visit_stream_node_mut(stream_node, |node| match node {
1760                        PbNodeBody::StreamCdcScan(node) => {
1761                            node.rate_limit = rate_limit;
1762                            found = true;
1763                        }
1764                        PbNodeBody::StreamScan(node) => {
1765                            node.rate_limit = rate_limit;
1766                            found = true;
1767                        }
1768                        PbNodeBody::SourceBackfill(node) => {
1769                            node.rate_limit = rate_limit;
1770                            found = true;
1771                        }
1772                        PbNodeBody::Sink(node) => {
1773                            node.rate_limit = rate_limit;
1774                            found = true;
1775                        }
1776                        _ => {}
1777                    });
1778                }
1779                Ok(found)
1780            };
1781
1782        self.mutate_fragments_by_job_id(
1783            job_id,
1784            update_backfill_rate_limit,
1785            "stream scan node or source node not found",
1786        )
1787        .await
1788    }
1789
1790    // edit the `rate_limit` of the `Sink` node in given `table_id`'s fragments
1791    // return the actor_ids to be applied
1792    pub async fn update_sink_rate_limit_by_job_id(
1793        &self,
1794        job_id: ObjectId,
1795        rate_limit: Option<u32>,
1796    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1797        let update_sink_rate_limit =
1798            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1799                let mut found = Ok(false);
1800                if fragment_type_mask.contains_any(FragmentTypeFlag::sink_rate_limit_fragments()) {
1801                    visit_stream_node_mut(stream_node, |node| {
1802                        if let PbNodeBody::Sink(node) = node {
1803                            if node.log_store_type != PbSinkLogStoreType::KvLogStore as i32 {
1804                                found = Err(MetaError::invalid_parameter(
1805                                    "sink rate limit is only supported for kv log store, please SET sink_decouple = TRUE before CREATE SINK",
1806                                ));
1807                                return;
1808                            }
1809                            node.rate_limit = rate_limit;
1810                            found = Ok(true);
1811                        }
1812                    });
1813                }
1814                found
1815            };
1816
1817        self.mutate_fragments_by_job_id(job_id, update_sink_rate_limit, "sink node not found")
1818            .await
1819    }
1820
1821    pub async fn update_dml_rate_limit_by_job_id(
1822        &self,
1823        job_id: ObjectId,
1824        rate_limit: Option<u32>,
1825    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
1826        let update_dml_rate_limit =
1827            |fragment_type_mask: FragmentTypeMask, stream_node: &mut PbStreamNode| {
1828                let mut found = false;
1829                if fragment_type_mask.contains_any(FragmentTypeFlag::dml_rate_limit_fragments()) {
1830                    visit_stream_node_mut(stream_node, |node| {
1831                        if let PbNodeBody::Dml(node) = node {
1832                            node.rate_limit = rate_limit;
1833                            found = true;
1834                        }
1835                    });
1836                }
1837                Ok(found)
1838            };
1839
1840        self.mutate_fragments_by_job_id(job_id, update_dml_rate_limit, "dml node not found")
1841            .await
1842    }
1843
1844    pub async fn update_source_props_by_source_id(
1845        &self,
1846        source_id: SourceId,
1847        alter_props: BTreeMap<String, String>,
1848        alter_secret_refs: BTreeMap<String, PbSecretRef>,
1849    ) -> MetaResult<WithOptionsSecResolved> {
1850        let inner = self.inner.read().await;
1851        let txn = inner.db.begin().await?;
1852
1853        let (source, _obj) = Source::find_by_id(source_id)
1854            .find_also_related(Object)
1855            .one(&txn)
1856            .await?
1857            .ok_or_else(|| {
1858                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
1859            })?;
1860        let connector = source.with_properties.0.get_connector().unwrap();
1861        let is_shared_source = source.is_shared();
1862
1863        let mut dep_source_job_ids: Vec<ObjectId> = Vec::new();
1864        if !is_shared_source {
1865            // mv using non-shared source holds a copy of source in their fragments
1866            dep_source_job_ids = ObjectDependency::find()
1867                .select_only()
1868                .column(object_dependency::Column::UsedBy)
1869                .filter(object_dependency::Column::Oid.eq(source_id))
1870                .into_tuple()
1871                .all(&txn)
1872                .await?;
1873        }
1874
1875        // Use check_source_allow_alter_on_fly_fields to validate allowed properties
1876        let prop_keys: Vec<String> = alter_props
1877            .keys()
1878            .chain(alter_secret_refs.keys())
1879            .cloned()
1880            .collect();
1881        risingwave_connector::allow_alter_on_fly_fields::check_source_allow_alter_on_fly_fields(
1882            &connector, &prop_keys,
1883        )?;
1884
1885        let mut options_with_secret = WithOptionsSecResolved::new(
1886            source.with_properties.0.clone(),
1887            source
1888                .secret_ref
1889                .map(|secret_ref| secret_ref.to_protobuf())
1890                .unwrap_or_default(),
1891        );
1892        let (to_add_secret_dep, to_remove_secret_dep) =
1893            options_with_secret.handle_update(alter_props, alter_secret_refs)?;
1894
1895        tracing::info!(
1896            "applying new properties to source: source_id={}, options_with_secret={:?}",
1897            source_id,
1898            options_with_secret
1899        );
1900        // check if the alter-ed props are valid for each Connector
1901        let _ = ConnectorProperties::extract(options_with_secret.clone(), true)?;
1902        // todo: validate via source manager
1903
1904        let mut associate_table_id = None;
1905
1906        // can be source_id or table_id
1907        // if updating an associated source, the preferred_id is the table_id
1908        // otherwise, it is the source_id
1909        let mut preferred_id: i32 = source_id;
1910        let rewrite_sql = {
1911            let definition = source.definition.clone();
1912
1913            let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
1914                .map_err(|e| {
1915                    MetaError::from(MetaErrorInner::Connector(ConnectorError::from(
1916                        anyhow!(e).context("Failed to parse source definition SQL"),
1917                    )))
1918                })?
1919                .try_into()
1920                .unwrap();
1921
1922            /// Formats SQL options with secret values properly resolved
1923            ///
1924            /// This function processes configuration options that may contain sensitive data:
1925            /// - Plaintext options are directly converted to `SqlOption`
1926            /// - Secret options are retrieved from the database and formatted as "SECRET {name}"
1927            ///   without exposing the actual secret value
1928            ///
1929            /// # Arguments
1930            /// * `txn` - Database transaction for retrieving secrets
1931            /// * `options_with_secret` - Container of options with both plaintext and secret values
1932            ///
1933            /// # Returns
1934            /// * `MetaResult<Vec<SqlOption>>` - List of formatted SQL options or error
1935            async fn format_with_option_secret_resolved(
1936                txn: &DatabaseTransaction,
1937                options_with_secret: &WithOptionsSecResolved,
1938            ) -> MetaResult<Vec<SqlOption>> {
1939                let mut options = Vec::new();
1940                for (k, v) in options_with_secret.as_plaintext() {
1941                    let sql_option = SqlOption::try_from((k, &format!("'{}'", v)))
1942                        .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1943                    options.push(sql_option);
1944                }
1945                for (k, v) in options_with_secret.as_secret() {
1946                    if let Some(secret_model) =
1947                        Secret::find_by_id(v.secret_id as i32).one(txn).await?
1948                    {
1949                        let sql_option =
1950                            SqlOption::try_from((k, &format!("SECRET {}", secret_model.name)))
1951                                .map_err(|e| MetaError::invalid_parameter(e.to_report_string()))?;
1952                        options.push(sql_option);
1953                    } else {
1954                        return Err(MetaError::catalog_id_not_found("secret", v.secret_id));
1955                    }
1956                }
1957                Ok(options)
1958            }
1959
1960            match &mut stmt {
1961                Statement::CreateSource { stmt } => {
1962                    stmt.with_properties.0 =
1963                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1964                }
1965                Statement::CreateTable { with_options, .. } => {
1966                    *with_options =
1967                        format_with_option_secret_resolved(&txn, &options_with_secret).await?;
1968                    associate_table_id = source.optional_associated_table_id;
1969                    preferred_id = associate_table_id.unwrap();
1970                }
1971                _ => unreachable!(),
1972            }
1973
1974            stmt.to_string()
1975        };
1976
1977        {
1978            // update secret dependencies
1979            if !to_add_secret_dep.is_empty() {
1980                ObjectDependency::insert_many(to_add_secret_dep.into_iter().map(|secret_id| {
1981                    object_dependency::ActiveModel {
1982                        oid: Set(secret_id as _),
1983                        used_by: Set(preferred_id as _),
1984                        ..Default::default()
1985                    }
1986                }))
1987                .exec(&txn)
1988                .await?;
1989            }
1990            if !to_remove_secret_dep.is_empty() {
1991                // todo: fix the filter logic
1992                let _ = ObjectDependency::delete_many()
1993                    .filter(
1994                        object_dependency::Column::Oid
1995                            .is_in(to_remove_secret_dep)
1996                            .and(
1997                                object_dependency::Column::UsedBy.eq::<ObjectId>(preferred_id as _),
1998                            ),
1999                    )
2000                    .exec(&txn)
2001                    .await?;
2002            }
2003        }
2004
2005        let active_source_model = source::ActiveModel {
2006            source_id: Set(source_id),
2007            definition: Set(rewrite_sql.clone()),
2008            with_properties: Set(options_with_secret.as_plaintext().clone().into()),
2009            secret_ref: Set((!options_with_secret.as_secret().is_empty())
2010                .then(|| SecretRef::from(options_with_secret.as_secret().clone()))),
2011            ..Default::default()
2012        };
2013        active_source_model.update(&txn).await?;
2014
2015        if let Some(associate_table_id) = associate_table_id {
2016            // update the associated table statement accordly
2017            let active_table_model = table::ActiveModel {
2018                table_id: Set(associate_table_id),
2019                definition: Set(rewrite_sql),
2020                ..Default::default()
2021            };
2022            active_table_model.update(&txn).await?;
2023        }
2024
2025        let to_check_job_ids = vec![if let Some(associate_table_id) = associate_table_id {
2026            // if updating table with connector, the fragment_id is table id
2027            associate_table_id
2028        } else {
2029            source_id
2030        }]
2031        .into_iter()
2032        .chain(dep_source_job_ids.into_iter())
2033        .collect_vec();
2034
2035        // update fragments
2036        update_connector_props_fragments(
2037            &txn,
2038            to_check_job_ids,
2039            FragmentTypeFlag::Source,
2040            |node, found| {
2041                if let PbNodeBody::Source(node) = node
2042                    && let Some(source_inner) = &mut node.source_inner
2043                {
2044                    source_inner.with_properties = options_with_secret.as_plaintext().clone();
2045                    source_inner.secret_refs = options_with_secret.as_secret().clone();
2046                    *found = true;
2047                }
2048            },
2049            is_shared_source,
2050        )
2051        .await?;
2052
2053        let mut to_update_objs = Vec::with_capacity(2);
2054        let (source, obj) = Source::find_by_id(source_id)
2055            .find_also_related(Object)
2056            .one(&txn)
2057            .await?
2058            .ok_or_else(|| {
2059                MetaError::catalog_id_not_found(ObjectType::Source.as_str(), source_id)
2060            })?;
2061        to_update_objs.push(PbObject {
2062            object_info: Some(PbObjectInfo::Source(
2063                ObjectModel(source, obj.unwrap()).into(),
2064            )),
2065        });
2066
2067        if let Some(associate_table_id) = associate_table_id {
2068            let (table, obj) = Table::find_by_id(associate_table_id)
2069                .find_also_related(Object)
2070                .one(&txn)
2071                .await?
2072                .ok_or_else(|| MetaError::catalog_id_not_found("table", associate_table_id))?;
2073            to_update_objs.push(PbObject {
2074                object_info: Some(PbObjectInfo::Table(ObjectModel(table, obj.unwrap()).into())),
2075            });
2076        }
2077
2078        txn.commit().await?;
2079
2080        self.notify_frontend(
2081            NotificationOperation::Update,
2082            NotificationInfo::ObjectGroup(PbObjectGroup {
2083                objects: to_update_objs,
2084            }),
2085        )
2086        .await;
2087
2088        Ok(options_with_secret)
2089    }
2090
2091    pub async fn update_sink_props_by_sink_id(
2092        &self,
2093        sink_id: SinkId,
2094        props: BTreeMap<String, String>,
2095    ) -> MetaResult<HashMap<String, String>> {
2096        let inner = self.inner.read().await;
2097        let txn = inner.db.begin().await?;
2098
2099        let (sink, _obj) = Sink::find_by_id(sink_id)
2100            .find_also_related(Object)
2101            .one(&txn)
2102            .await?
2103            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2104
2105        // Validate that props can be altered
2106        match sink.properties.inner_ref().get(CONNECTOR_TYPE_KEY) {
2107            Some(connector) => {
2108                let connector_type = connector.to_lowercase();
2109                let field_names: Vec<String> = props.keys().cloned().collect();
2110                check_sink_allow_alter_on_fly_fields(&connector_type, &field_names)
2111                    .map_err(|e| SinkError::Config(anyhow!(e)))?;
2112
2113                match_sink_name_str!(
2114                    connector_type.as_str(),
2115                    SinkType,
2116                    {
2117                        let mut new_props = sink.properties.0.clone();
2118                        new_props.extend(props.clone());
2119                        SinkType::validate_alter_config(&new_props)
2120                    },
2121                    |sink: &str| Err(SinkError::Config(anyhow!("unsupported sink type {}", sink)))
2122                )?
2123            }
2124            None => {
2125                return Err(
2126                    SinkError::Config(anyhow!("connector not specified when alter sink")).into(),
2127                );
2128            }
2129        };
2130        let definition = sink.definition.clone();
2131        let [mut stmt]: [_; 1] = Parser::parse_sql(&definition)
2132            .map_err(|e| SinkError::Config(anyhow!(e)))?
2133            .try_into()
2134            .unwrap();
2135        if let Statement::CreateSink { stmt } = &mut stmt {
2136            let mut new_sql_options = stmt
2137                .with_properties
2138                .0
2139                .iter()
2140                .map(|sql_option| (&sql_option.name, sql_option))
2141                .collect::<IndexMap<_, _>>();
2142            let add_sql_options = props
2143                .iter()
2144                .map(|(k, v)| SqlOption::try_from((k, v)))
2145                .collect::<Result<Vec<SqlOption>, ParserError>>()
2146                .map_err(|e| SinkError::Config(anyhow!(e)))?;
2147            new_sql_options.extend(
2148                add_sql_options
2149                    .iter()
2150                    .map(|sql_option| (&sql_option.name, sql_option)),
2151            );
2152            stmt.with_properties.0 = new_sql_options.into_values().cloned().collect();
2153        } else {
2154            panic!("sink definition is not a create sink statement")
2155        }
2156        let mut new_config = sink.properties.clone().into_inner();
2157        new_config.extend(props);
2158
2159        let active_sink = sink::ActiveModel {
2160            sink_id: Set(sink_id),
2161            properties: Set(risingwave_meta_model::Property(new_config.clone())),
2162            definition: Set(stmt.to_string()),
2163            ..Default::default()
2164        };
2165        active_sink.update(&txn).await?;
2166
2167        let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2168            .select_only()
2169            .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2170            .filter(
2171                fragment::Column::JobId
2172                    .eq(sink_id)
2173                    .and(FragmentTypeMask::intersects(FragmentTypeFlag::Sink)),
2174            )
2175            .into_tuple()
2176            .all(&txn)
2177            .await?;
2178        let fragments = fragments
2179            .into_iter()
2180            .filter_map(|(id, stream_node)| {
2181                let mut stream_node = stream_node.to_protobuf();
2182                let mut found = false;
2183                visit_stream_node_mut(&mut stream_node, |node| {
2184                    if let PbNodeBody::Sink(node) = node
2185                        && let Some(sink_desc) = &mut node.sink_desc
2186                        && sink_desc.id == sink_id as u32
2187                    {
2188                        sink_desc.properties = new_config.clone();
2189                        found = true;
2190                    }
2191                });
2192                if found { Some((id, stream_node)) } else { None }
2193            })
2194            .collect_vec();
2195        assert!(
2196            !fragments.is_empty(),
2197            "sink id should be used by at least one fragment"
2198        );
2199        for (id, stream_node) in fragments {
2200            fragment::ActiveModel {
2201                fragment_id: Set(id),
2202                stream_node: Set(StreamNode::from(&stream_node)),
2203                ..Default::default()
2204            }
2205            .update(&txn)
2206            .await?;
2207        }
2208
2209        let (sink, obj) = Sink::find_by_id(sink_id)
2210            .find_also_related(Object)
2211            .one(&txn)
2212            .await?
2213            .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Sink.as_str(), sink_id))?;
2214
2215        txn.commit().await?;
2216
2217        let relation_info = PbObjectInfo::Sink(ObjectModel(sink, obj.unwrap()).into());
2218        let _version = self
2219            .notify_frontend(
2220                NotificationOperation::Update,
2221                NotificationInfo::ObjectGroup(PbObjectGroup {
2222                    objects: vec![PbObject {
2223                        object_info: Some(relation_info),
2224                    }],
2225                }),
2226            )
2227            .await;
2228
2229        Ok(new_config.into_iter().collect())
2230    }
2231
2232    pub async fn update_fragment_rate_limit_by_fragment_id(
2233        &self,
2234        fragment_id: FragmentId,
2235        rate_limit: Option<u32>,
2236    ) -> MetaResult<HashMap<FragmentId, Vec<ActorId>>> {
2237        let update_rate_limit = |fragment_type_mask: FragmentTypeMask,
2238                                 stream_node: &mut PbStreamNode| {
2239            let mut found = false;
2240            if fragment_type_mask.contains_any(
2241                FragmentTypeFlag::dml_rate_limit_fragments()
2242                    .chain(FragmentTypeFlag::sink_rate_limit_fragments())
2243                    .chain(FragmentTypeFlag::backfill_rate_limit_fragments())
2244                    .chain(FragmentTypeFlag::source_rate_limit_fragments()),
2245            ) {
2246                visit_stream_node_mut(stream_node, |node| {
2247                    if let PbNodeBody::Dml(node) = node {
2248                        node.rate_limit = rate_limit;
2249                        found = true;
2250                    }
2251                    if let PbNodeBody::Sink(node) = node {
2252                        node.rate_limit = rate_limit;
2253                        found = true;
2254                    }
2255                    if let PbNodeBody::StreamCdcScan(node) = node {
2256                        node.rate_limit = rate_limit;
2257                        found = true;
2258                    }
2259                    if let PbNodeBody::StreamScan(node) = node {
2260                        node.rate_limit = rate_limit;
2261                        found = true;
2262                    }
2263                    if let PbNodeBody::SourceBackfill(node) = node {
2264                        node.rate_limit = rate_limit;
2265                        found = true;
2266                    }
2267                });
2268            }
2269            found
2270        };
2271        self.mutate_fragment_by_fragment_id(fragment_id, update_rate_limit, "fragment not found")
2272            .await
2273    }
2274
2275    pub async fn post_apply_reschedules(
2276        &self,
2277        reschedules: HashMap<FragmentId, Reschedule>,
2278        post_updates: &JobReschedulePostUpdates,
2279    ) -> MetaResult<()> {
2280        let new_created_actors: HashSet<_> = reschedules
2281            .values()
2282            .flat_map(|reschedule| {
2283                reschedule
2284                    .added_actors
2285                    .values()
2286                    .flatten()
2287                    .map(|actor_id| *actor_id as ActorId)
2288            })
2289            .collect();
2290
2291        let inner = self.inner.write().await;
2292
2293        let txn = inner.db.begin().await?;
2294
2295        for Reschedule {
2296            removed_actors,
2297            vnode_bitmap_updates,
2298            actor_splits,
2299            newly_created_actors,
2300            ..
2301        } in reschedules.into_values()
2302        {
2303            // drop removed actors
2304            Actor::delete_many()
2305                .filter(
2306                    actor::Column::ActorId
2307                        .is_in(removed_actors.iter().map(|id| *id as ActorId).collect_vec()),
2308                )
2309                .exec(&txn)
2310                .await?;
2311
2312            // add new actors
2313            for (
2314                (
2315                    StreamActor {
2316                        actor_id,
2317                        fragment_id,
2318                        vnode_bitmap,
2319                        expr_context,
2320                        ..
2321                    },
2322                    _,
2323                ),
2324                worker_id,
2325            ) in newly_created_actors.into_values()
2326            {
2327                let splits = actor_splits
2328                    .get(&actor_id)
2329                    .map(|splits| splits.iter().map(PbConnectorSplit::from).collect_vec());
2330
2331                Actor::insert(actor::ActiveModel {
2332                    actor_id: Set(actor_id as _),
2333                    fragment_id: Set(fragment_id as _),
2334                    status: Set(ActorStatus::Running),
2335                    splits: Set(splits.map(|splits| (&PbConnectorSplits { splits }).into())),
2336                    worker_id: Set(worker_id),
2337                    upstream_actor_ids: Set(Default::default()),
2338                    vnode_bitmap: Set(vnode_bitmap
2339                        .as_ref()
2340                        .map(|bitmap| (&bitmap.to_protobuf()).into())),
2341                    expr_context: Set(expr_context.as_ref().unwrap().into()),
2342                })
2343                .exec(&txn)
2344                .await?;
2345            }
2346
2347            // actor update
2348            for (actor_id, bitmap) in vnode_bitmap_updates {
2349                let actor = Actor::find_by_id(actor_id as ActorId)
2350                    .one(&txn)
2351                    .await?
2352                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2353
2354                let mut actor = actor.into_active_model();
2355                actor.vnode_bitmap = Set(Some((&bitmap.to_protobuf()).into()));
2356                actor.update(&txn).await?;
2357            }
2358
2359            // Update actor_splits for existing actors
2360            for (actor_id, splits) in actor_splits {
2361                if new_created_actors.contains(&(actor_id as ActorId)) {
2362                    continue;
2363                }
2364
2365                let actor = Actor::find_by_id(actor_id as ActorId)
2366                    .one(&txn)
2367                    .await?
2368                    .ok_or_else(|| MetaError::catalog_id_not_found("actor", actor_id))?;
2369
2370                let mut actor = actor.into_active_model();
2371                let splits = splits.iter().map(PbConnectorSplit::from).collect_vec();
2372                actor.splits = Set(Some((&PbConnectorSplits { splits }).into()));
2373                actor.update(&txn).await?;
2374            }
2375        }
2376
2377        let JobReschedulePostUpdates {
2378            parallelism_updates,
2379            resource_group_updates,
2380        } = post_updates;
2381
2382        for (table_id, parallelism) in parallelism_updates {
2383            let mut streaming_job = StreamingJobModel::find_by_id(table_id.table_id() as ObjectId)
2384                .one(&txn)
2385                .await?
2386                .ok_or_else(|| MetaError::catalog_id_not_found("table", table_id))?
2387                .into_active_model();
2388
2389            streaming_job.parallelism = Set(match parallelism {
2390                TableParallelism::Adaptive => StreamingParallelism::Adaptive,
2391                TableParallelism::Fixed(n) => StreamingParallelism::Fixed(*n as _),
2392                TableParallelism::Custom => StreamingParallelism::Custom,
2393            });
2394
2395            if let Some(resource_group) =
2396                resource_group_updates.get(&(table_id.table_id() as ObjectId))
2397            {
2398                streaming_job.specific_resource_group = Set(resource_group.to_owned());
2399            }
2400
2401            streaming_job.update(&txn).await?;
2402        }
2403
2404        txn.commit().await?;
2405
2406        Ok(())
2407    }
2408
2409    /// Note: `FsFetch` created in old versions are not included.
2410    /// Since this is only used for debugging, it should be fine.
2411    pub async fn list_rate_limits(&self) -> MetaResult<Vec<RateLimitInfo>> {
2412        let inner = self.inner.read().await;
2413        let txn = inner.db.begin().await?;
2414
2415        let fragments: Vec<(FragmentId, ObjectId, i32, StreamNode)> = Fragment::find()
2416            .select_only()
2417            .columns([
2418                fragment::Column::FragmentId,
2419                fragment::Column::JobId,
2420                fragment::Column::FragmentTypeMask,
2421                fragment::Column::StreamNode,
2422            ])
2423            .filter(FragmentTypeMask::intersects_any(
2424                FragmentTypeFlag::rate_limit_fragments(),
2425            ))
2426            .into_tuple()
2427            .all(&txn)
2428            .await?;
2429
2430        let mut rate_limits = Vec::new();
2431        for (fragment_id, job_id, fragment_type_mask, stream_node) in fragments {
2432            let stream_node = stream_node.to_protobuf();
2433            visit_stream_node_body(&stream_node, |node| {
2434                let mut rate_limit = None;
2435                let mut node_name = None;
2436
2437                match node {
2438                    // source rate limit
2439                    PbNodeBody::Source(node) => {
2440                        if let Some(node_inner) = &node.source_inner {
2441                            rate_limit = node_inner.rate_limit;
2442                            node_name = Some("SOURCE");
2443                        }
2444                    }
2445                    PbNodeBody::StreamFsFetch(node) => {
2446                        if let Some(node_inner) = &node.node_inner {
2447                            rate_limit = node_inner.rate_limit;
2448                            node_name = Some("FS_FETCH");
2449                        }
2450                    }
2451                    // backfill rate limit
2452                    PbNodeBody::SourceBackfill(node) => {
2453                        rate_limit = node.rate_limit;
2454                        node_name = Some("SOURCE_BACKFILL");
2455                    }
2456                    PbNodeBody::StreamScan(node) => {
2457                        rate_limit = node.rate_limit;
2458                        node_name = Some("STREAM_SCAN");
2459                    }
2460                    PbNodeBody::StreamCdcScan(node) => {
2461                        rate_limit = node.rate_limit;
2462                        node_name = Some("STREAM_CDC_SCAN");
2463                    }
2464                    PbNodeBody::Sink(node) => {
2465                        rate_limit = node.rate_limit;
2466                        node_name = Some("SINK");
2467                    }
2468                    _ => {}
2469                }
2470
2471                if let Some(rate_limit) = rate_limit {
2472                    rate_limits.push(RateLimitInfo {
2473                        fragment_id: fragment_id as u32,
2474                        job_id: job_id as u32,
2475                        fragment_type_mask: fragment_type_mask as u32,
2476                        rate_limit,
2477                        node_name: node_name.unwrap().to_owned(),
2478                    });
2479                }
2480            });
2481        }
2482
2483        Ok(rate_limits)
2484    }
2485}
2486
2487pub struct SinkIntoTableContext {
2488    /// For alter table (e.g., add column), this is the list of existing sink ids
2489    /// otherwise empty.
2490    pub updated_sink_catalogs: Vec<SinkId>,
2491}
2492
2493pub struct FinishAutoRefreshSchemaSinkContext {
2494    pub tmp_sink_id: ObjectId,
2495    pub original_sink_id: ObjectId,
2496    pub columns: Vec<PbColumnCatalog>,
2497    pub new_log_store_table: Option<(ObjectId, Vec<PbColumnCatalog>)>,
2498}
2499
2500async fn update_connector_props_fragments<F>(
2501    txn: &DatabaseTransaction,
2502    job_ids: Vec<i32>,
2503    expect_flag: FragmentTypeFlag,
2504    mut alter_stream_node_fn: F,
2505    is_shared_source: bool,
2506) -> MetaResult<()>
2507where
2508    F: FnMut(&mut PbNodeBody, &mut bool),
2509{
2510    let fragments: Vec<(FragmentId, StreamNode)> = Fragment::find()
2511        .select_only()
2512        .columns([fragment::Column::FragmentId, fragment::Column::StreamNode])
2513        .filter(
2514            fragment::Column::JobId
2515                .is_in(job_ids.clone())
2516                .and(FragmentTypeMask::intersects(expect_flag)),
2517        )
2518        .into_tuple()
2519        .all(txn)
2520        .await?;
2521    let fragments = fragments
2522        .into_iter()
2523        .filter_map(|(id, stream_node)| {
2524            let mut stream_node = stream_node.to_protobuf();
2525            let mut found = false;
2526            visit_stream_node_mut(&mut stream_node, |node| {
2527                alter_stream_node_fn(node, &mut found);
2528            });
2529            if found { Some((id, stream_node)) } else { None }
2530        })
2531        .collect_vec();
2532    if is_shared_source || job_ids.len() > 1 {
2533        // the first element is the source_id or associated table_id
2534        // if the source is non-shared, there is no updated fragments
2535        // job_ids.len() > 1 means the source is used by other streaming jobs, so there should be at least one fragment updated
2536        assert!(
2537            !fragments.is_empty(),
2538            "job ids {:?} (type: {:?}) should be used by at least one fragment",
2539            job_ids,
2540            expect_flag
2541        );
2542    }
2543
2544    for (id, stream_node) in fragments {
2545        fragment::ActiveModel {
2546            fragment_id: Set(id),
2547            stream_node: Set(StreamNode::from(&stream_node)),
2548            ..Default::default()
2549        }
2550        .update(txn)
2551        .await?;
2552    }
2553
2554    Ok(())
2555}