risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{AlterDatabaseParam, FragmentTypeFlag};
27use risingwave_common::config::DefaultParallelism;
28use risingwave_common::hash::VnodeCountCompat;
29use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::util::stream_graph_visitor::{
32    visit_stream_node, visit_stream_node_body, visit_stream_node_cont_mut,
33};
34use risingwave_common::{bail, bail_not_implemented, must_match};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::{
38    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
39};
40use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
41use risingwave_meta_model::object::ObjectType;
42use risingwave_meta_model::{
43    ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
44    SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId,
45};
46use risingwave_pb::catalog::{
47    Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
48    Subscription, Table, View,
49};
50use risingwave_pb::ddl_service::alter_owner_request::Object;
51use risingwave_pb::ddl_service::{
52    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
53    alter_swap_rename_request,
54};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    MergeNode, PbDispatchOutputMapping, PbDispatcherType, PbStreamFragmentGraph,
58    StreamFragmentGraph as StreamFragmentGraphProto,
59};
60use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
61use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
62use strum::Display;
63use thiserror_ext::AsReport;
64use tokio::sync::{Semaphore, oneshot};
65use tokio::time::sleep;
66use tracing::Instrument;
67
68use crate::barrier::BarrierManagerRef;
69use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
70use crate::controller::cluster::StreamingClusterInfo;
71use crate::controller::streaming_job::SinkIntoTableContext;
72use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
73use crate::manager::{
74    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
75    NotificationVersion, StreamingJob, StreamingJobType,
76};
77use crate::model::{
78    DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
79    StreamJobFragmentsToCreate, TableParallelism,
80};
81use crate::stream::{
82    ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
83    CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
84    JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
85    StreamFragmentGraph, create_source_worker, state_match, validate_sink,
86};
87use crate::telemetry::report_event;
88use crate::{MetaError, MetaResult};
89
90#[derive(PartialEq)]
91pub enum DropMode {
92    Restrict,
93    Cascade,
94}
95
96impl DropMode {
97    pub fn from_request_setting(cascade: bool) -> DropMode {
98        if cascade {
99            DropMode::Cascade
100        } else {
101            DropMode::Restrict
102        }
103    }
104}
105
106#[derive(strum::AsRefStr)]
107pub enum StreamingJobId {
108    MaterializedView(TableId),
109    Sink(SinkId),
110    Table(Option<SourceId>, TableId),
111    Index(IndexId),
112}
113
114impl std::fmt::Display for StreamingJobId {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{}", self.as_ref())?;
117        write!(f, "({})", self.id())
118    }
119}
120
121impl StreamingJobId {
122    #[allow(dead_code)]
123    fn id(&self) -> TableId {
124        match self {
125            StreamingJobId::MaterializedView(id)
126            | StreamingJobId::Sink(id)
127            | StreamingJobId::Table(_, id)
128            | StreamingJobId::Index(id) => *id,
129        }
130    }
131}
132
133/// It’s used to describe the information of the job that needs to be replaced
134/// and it will be used during replacing table and creating sink into table operations.
135pub struct ReplaceStreamJobInfo {
136    pub streaming_job: StreamingJob,
137    pub fragment_graph: StreamFragmentGraphProto,
138}
139
140#[derive(Display)]
141pub enum DdlCommand {
142    CreateDatabase(Database),
143    DropDatabase(DatabaseId),
144    CreateSchema(Schema),
145    DropSchema(SchemaId, DropMode),
146    CreateNonSharedSource(Source),
147    DropSource(SourceId, DropMode),
148    CreateFunction(Function),
149    DropFunction(FunctionId),
150    CreateView(View, HashSet<ObjectId>),
151    DropView(ViewId, DropMode),
152    CreateStreamingJob {
153        stream_job: StreamingJob,
154        fragment_graph: StreamFragmentGraphProto,
155        create_type: CreateType,
156        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
157        dependencies: HashSet<ObjectId>,
158        specific_resource_group: Option<String>, // specific resource group
159        if_not_exists: bool,
160    },
161    DropStreamingJob {
162        job_id: StreamingJobId,
163        drop_mode: DropMode,
164        target_replace_info: Option<ReplaceStreamJobInfo>,
165    },
166    AlterName(alter_name_request::Object, String),
167    AlterSwapRename(alter_swap_rename_request::Object),
168    ReplaceStreamJob(ReplaceStreamJobInfo),
169    AlterNonSharedSource(Source),
170    AlterObjectOwner(Object, UserId),
171    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
172    CreateConnection(Connection),
173    DropConnection(ConnectionId),
174    CreateSecret(Secret),
175    AlterSecret(Secret),
176    DropSecret(SecretId),
177    CommentOn(Comment),
178    CreateSubscription(Subscription),
179    DropSubscription(SubscriptionId, DropMode),
180    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
181}
182
183impl DdlCommand {
184    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
185    fn object(&self) -> Either<String, ObjectId> {
186        use Either::*;
187        match self {
188            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
189            DdlCommand::DropDatabase(id) => Right(*id),
190            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
191            DdlCommand::DropSchema(id, _) => Right(*id),
192            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
193            DdlCommand::DropSource(id, _) => Right(*id),
194            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
195            DdlCommand::DropFunction(id) => Right(*id),
196            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
197            DdlCommand::DropView(id, _) => Right(*id),
198            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
199            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
200            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
201            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
202            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
203            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
204            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
205            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
206            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
207            DdlCommand::DropConnection(id) => Right(*id),
208            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
209            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
210            DdlCommand::DropSecret(id) => Right(*id),
211            DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
212            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
213            DdlCommand::DropSubscription(id, _) => Right(*id),
214            DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
215        }
216    }
217
218    fn allow_in_recovery(&self) -> bool {
219        match self {
220            DdlCommand::DropDatabase(_)
221            | DdlCommand::DropSchema(_, _)
222            | DdlCommand::DropSource(_, _)
223            | DdlCommand::DropFunction(_)
224            | DdlCommand::DropView(_, _)
225            | DdlCommand::DropStreamingJob { .. }
226            | DdlCommand::DropConnection(_)
227            | DdlCommand::DropSecret(_)
228            | DdlCommand::DropSubscription(_, _)
229            | DdlCommand::AlterName(_, _)
230            | DdlCommand::AlterObjectOwner(_, _)
231            | DdlCommand::AlterSetSchema(_, _)
232            | DdlCommand::CreateDatabase(_)
233            | DdlCommand::CreateSchema(_)
234            | DdlCommand::CreateFunction(_)
235            | DdlCommand::CreateView(_, _)
236            | DdlCommand::CreateConnection(_)
237            | DdlCommand::CommentOn(_)
238            | DdlCommand::CreateSecret(_)
239            | DdlCommand::AlterSecret(_)
240            | DdlCommand::AlterSwapRename(_)
241            | DdlCommand::AlterDatabaseParam(_, _) => true,
242            DdlCommand::CreateStreamingJob { .. }
243            | DdlCommand::CreateNonSharedSource(_)
244            | DdlCommand::ReplaceStreamJob(_)
245            | DdlCommand::AlterNonSharedSource(_)
246            | DdlCommand::CreateSubscription(_) => false,
247        }
248    }
249}
250
251#[derive(Clone)]
252pub struct DdlController {
253    pub(crate) env: MetaSrvEnv,
254
255    pub(crate) metadata_manager: MetadataManager,
256    pub(crate) stream_manager: GlobalStreamManagerRef,
257    pub(crate) source_manager: SourceManagerRef,
258    barrier_manager: BarrierManagerRef,
259
260    // The semaphore is used to limit the number of concurrent streaming job creation.
261    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
262
263    /// Sequence number for DDL commands, used for observability and debugging.
264    seq: Arc<AtomicU64>,
265}
266
267#[derive(Clone)]
268pub struct CreatingStreamingJobPermit {
269    pub(crate) semaphore: Arc<Semaphore>,
270}
271
272impl CreatingStreamingJobPermit {
273    async fn new(env: &MetaSrvEnv) -> Self {
274        let mut permits = env
275            .system_params_reader()
276            .await
277            .max_concurrent_creating_streaming_jobs() as usize;
278        if permits == 0 {
279            // if the system parameter is set to zero, use the max permitted value.
280            permits = Semaphore::MAX_PERMITS;
281        }
282        let semaphore = Arc::new(Semaphore::new(permits));
283
284        let (local_notification_tx, mut local_notification_rx) =
285            tokio::sync::mpsc::unbounded_channel();
286        env.notification_manager()
287            .insert_local_sender(local_notification_tx)
288            .await;
289        let semaphore_clone = semaphore.clone();
290        tokio::spawn(async move {
291            while let Some(notification) = local_notification_rx.recv().await {
292                let LocalNotification::SystemParamsChange(p) = &notification else {
293                    continue;
294                };
295                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
296                if new_permits == 0 {
297                    new_permits = Semaphore::MAX_PERMITS;
298                }
299                match permits.cmp(&new_permits) {
300                    Ordering::Less => {
301                        semaphore_clone.add_permits(new_permits - permits);
302                    }
303                    Ordering::Equal => continue,
304                    Ordering::Greater => {
305                        semaphore_clone
306                            .acquire_many((permits - new_permits) as u32)
307                            .await
308                            .unwrap()
309                            .forget();
310                    }
311                }
312                tracing::info!(
313                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
314                    permits,
315                    new_permits
316                );
317                permits = new_permits;
318            }
319        });
320
321        Self { semaphore }
322    }
323}
324
325impl DdlController {
326    pub async fn new(
327        env: MetaSrvEnv,
328        metadata_manager: MetadataManager,
329        stream_manager: GlobalStreamManagerRef,
330        source_manager: SourceManagerRef,
331        barrier_manager: BarrierManagerRef,
332    ) -> Self {
333        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
334        Self {
335            env,
336            metadata_manager,
337            stream_manager,
338            source_manager,
339            barrier_manager,
340            creating_streaming_job_permits,
341            seq: Arc::new(AtomicU64::new(0)),
342        }
343    }
344
345    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
346    pub fn next_seq(&self) -> u64 {
347        // This is a simple atomic increment operation.
348        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
349    }
350
351    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
352    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
353    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
354    /// would be a huge hassle and pain if we don't spawn here.
355    ///
356    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
357    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
358        if !command.allow_in_recovery() {
359            self.barrier_manager.check_status_running()?;
360        }
361
362        let await_tree_key = format!("DDL Command {}", self.next_seq());
363        let await_tree_span = await_tree::span!("{command}({})", command.object());
364
365        let ctrl = self.clone();
366        let fut = async move {
367            match command {
368                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
369                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
370                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
371                DdlCommand::DropSchema(schema_id, drop_mode) => {
372                    ctrl.drop_schema(schema_id, drop_mode).await
373                }
374                DdlCommand::CreateNonSharedSource(source) => {
375                    ctrl.create_non_shared_source(source).await
376                }
377                DdlCommand::DropSource(source_id, drop_mode) => {
378                    ctrl.drop_source(source_id, drop_mode).await
379                }
380                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
381                DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
382                DdlCommand::CreateView(view, dependencies) => {
383                    ctrl.create_view(view, dependencies).await
384                }
385                DdlCommand::DropView(view_id, drop_mode) => {
386                    ctrl.drop_view(view_id, drop_mode).await
387                }
388                DdlCommand::CreateStreamingJob {
389                    stream_job,
390                    fragment_graph,
391                    create_type: _,
392                    affected_table_replace_info,
393                    dependencies,
394                    specific_resource_group,
395                    if_not_exists,
396                } => {
397                    ctrl.create_streaming_job(
398                        stream_job,
399                        fragment_graph,
400                        affected_table_replace_info,
401                        dependencies,
402                        specific_resource_group,
403                        if_not_exists,
404                    )
405                    .await
406                }
407                DdlCommand::DropStreamingJob {
408                    job_id,
409                    drop_mode,
410                    target_replace_info,
411                } => {
412                    ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
413                        .await
414                }
415                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
416                    streaming_job,
417                    fragment_graph,
418                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
419                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
420                DdlCommand::AlterObjectOwner(object, owner_id) => {
421                    ctrl.alter_owner(object, owner_id).await
422                }
423                DdlCommand::AlterSetSchema(object, new_schema_id) => {
424                    ctrl.alter_set_schema(object, new_schema_id).await
425                }
426                DdlCommand::CreateConnection(connection) => {
427                    ctrl.create_connection(connection).await
428                }
429                DdlCommand::DropConnection(connection_id) => {
430                    ctrl.drop_connection(connection_id).await
431                }
432                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
433                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
434                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
435                DdlCommand::AlterNonSharedSource(source) => {
436                    ctrl.alter_non_shared_source(source).await
437                }
438                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
439                DdlCommand::CreateSubscription(subscription) => {
440                    ctrl.create_subscription(subscription).await
441                }
442                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
443                    ctrl.drop_subscription(subscription_id, drop_mode).await
444                }
445                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
446                DdlCommand::AlterDatabaseParam(database_id, param) => {
447                    ctrl.alter_database_param(database_id, param).await
448                }
449            }
450        }
451        .in_current_span();
452        let fut = (self.env.await_tree_reg())
453            .register(await_tree_key, await_tree_span)
454            .instrument(Box::pin(fut));
455        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
456        Ok(Some(WaitVersion {
457            catalog_version: notification_version,
458            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
459        }))
460    }
461
462    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
463        self.barrier_manager.get_ddl_progress().await
464    }
465
466    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
467        let (version, updated_db) = self
468            .metadata_manager
469            .catalog_controller
470            .create_database(database)
471            .await?;
472        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
473        self.barrier_manager
474            .update_database_barrier(
475                updated_db.database_id,
476                updated_db.barrier_interval_ms.map(|v| v as u32),
477                updated_db.checkpoint_frequency.map(|v| v as u64),
478            )
479            .await?;
480        Ok(version)
481    }
482
483    #[tracing::instrument(skip(self), level = "debug")]
484    pub async fn reschedule_streaming_job(
485        &self,
486        job_id: u32,
487        target: JobRescheduleTarget,
488        mut deferred: bool,
489    ) -> MetaResult<()> {
490        tracing::info!("alter parallelism");
491        if self.barrier_manager.check_status_running().is_err() {
492            tracing::info!(
493                "alter parallelism is set to deferred mode because the system is in recovery state"
494            );
495            deferred = true;
496        }
497
498        self.stream_manager
499            .reschedule_streaming_job(job_id, target, deferred)
500            .await
501    }
502
503    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
504        self.drop_object(
505            ObjectType::Database,
506            database_id as _,
507            DropMode::Cascade,
508            None,
509        )
510        .await
511    }
512
513    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
514        self.metadata_manager
515            .catalog_controller
516            .create_schema(schema)
517            .await
518    }
519
520    async fn drop_schema(
521        &self,
522        schema_id: SchemaId,
523        drop_mode: DropMode,
524    ) -> MetaResult<NotificationVersion> {
525        self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
526            .await
527    }
528
529    /// Shared source is handled in [`Self::create_streaming_job`]
530    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
531        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
532            .await
533            .context("failed to create source worker")?;
534
535        let (source_id, version) = self
536            .metadata_manager
537            .catalog_controller
538            .create_source(source)
539            .await?;
540        self.source_manager
541            .register_source_with_handle(source_id, handle)
542            .await;
543        Ok(version)
544    }
545
546    async fn drop_source(
547        &self,
548        source_id: SourceId,
549        drop_mode: DropMode,
550    ) -> MetaResult<NotificationVersion> {
551        self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
552            .await
553    }
554
555    /// This replaces the source in the catalog.
556    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
557    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
558        self.metadata_manager
559            .catalog_controller
560            .alter_non_shared_source(source)
561            .await
562    }
563
564    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
565        self.metadata_manager
566            .catalog_controller
567            .create_function(function)
568            .await
569    }
570
571    async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
572        self.drop_object(
573            ObjectType::Function,
574            function_id as _,
575            DropMode::Restrict,
576            None,
577        )
578        .await
579    }
580
581    async fn create_view(
582        &self,
583        view: View,
584        dependencies: HashSet<ObjectId>,
585    ) -> MetaResult<NotificationVersion> {
586        self.metadata_manager
587            .catalog_controller
588            .create_view(view, dependencies)
589            .await
590    }
591
592    async fn drop_view(
593        &self,
594        view_id: ViewId,
595        drop_mode: DropMode,
596    ) -> MetaResult<NotificationVersion> {
597        self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
598            .await
599    }
600
601    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
602        validate_connection(&connection).await?;
603        self.metadata_manager
604            .catalog_controller
605            .create_connection(connection)
606            .await
607    }
608
609    async fn drop_connection(
610        &self,
611        connection_id: ConnectionId,
612    ) -> MetaResult<NotificationVersion> {
613        self.drop_object(
614            ObjectType::Connection,
615            connection_id as _,
616            DropMode::Restrict,
617            None,
618        )
619        .await
620    }
621
622    async fn alter_database_param(
623        &self,
624        database_id: DatabaseId,
625        param: AlterDatabaseParam,
626    ) -> MetaResult<NotificationVersion> {
627        let (version, updated_db) = self
628            .metadata_manager
629            .catalog_controller
630            .alter_database_param(database_id, param)
631            .await?;
632        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
633        self.barrier_manager
634            .update_database_barrier(
635                database_id,
636                updated_db.barrier_interval_ms.map(|v| v as u32),
637                updated_db.checkpoint_frequency.map(|v| v as u64),
638            )
639            .await?;
640        Ok(version)
641    }
642
643    // The 'secret' part of the request we receive from the frontend is in plaintext;
644    // here, we need to encrypt it before storing it in the catalog.
645    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
646        let secret_store_private_key = self
647            .env
648            .opts
649            .secret_store_private_key
650            .clone()
651            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
652
653        let encrypted_payload = SecretEncryption::encrypt(
654            secret_store_private_key.as_slice(),
655            secret.get_value().as_slice(),
656        )
657        .context(format!("failed to encrypt secret {}", secret.name))?;
658        Ok(encrypted_payload
659            .serialize()
660            .context(format!("failed to serialize secret {}", secret.name))?)
661    }
662
663    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
664        // The 'secret' part of the request we receive from the frontend is in plaintext;
665        // here, we need to encrypt it before storing it in the catalog.
666        let secret_plain_payload = secret.value.clone();
667        let encrypted_payload = self.get_encrypted_payload(&secret)?;
668        secret.value = encrypted_payload;
669
670        self.metadata_manager
671            .catalog_controller
672            .create_secret(secret, secret_plain_payload)
673            .await
674    }
675
676    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
677        self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
678            .await
679    }
680
681    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
682        let secret_plain_payload = secret.value.clone();
683        let encrypted_payload = self.get_encrypted_payload(&secret)?;
684        secret.value = encrypted_payload;
685        self.metadata_manager
686            .catalog_controller
687            .alter_secret(secret, secret_plain_payload)
688            .await
689    }
690
691    async fn create_subscription(
692        &self,
693        mut subscription: Subscription,
694    ) -> MetaResult<NotificationVersion> {
695        tracing::debug!("create subscription");
696        let _permit = self
697            .creating_streaming_job_permits
698            .semaphore
699            .acquire()
700            .await
701            .unwrap();
702        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
703        self.metadata_manager
704            .catalog_controller
705            .create_subscription_catalog(&mut subscription)
706            .await?;
707        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
708            tracing::debug!(error = %err.as_report(), "failed to create subscription");
709            let _ = self
710                .metadata_manager
711                .catalog_controller
712                .try_abort_creating_subscription(subscription.id as _)
713                .await
714                .inspect_err(|e| {
715                    tracing::error!(
716                        error = %e.as_report(),
717                        "failed to abort create subscription after failure"
718                    );
719                });
720            return Err(err);
721        }
722
723        let version = self
724            .metadata_manager
725            .catalog_controller
726            .notify_create_subscription(subscription.id)
727            .await?;
728        tracing::debug!("finish create subscription");
729        Ok(version)
730    }
731
732    async fn drop_subscription(
733        &self,
734        subscription_id: SubscriptionId,
735        drop_mode: DropMode,
736    ) -> MetaResult<NotificationVersion> {
737        tracing::debug!("preparing drop subscription");
738        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
739        let subscription = self
740            .metadata_manager
741            .catalog_controller
742            .get_subscription_by_id(subscription_id)
743            .await?;
744        let table_id = subscription.dependent_table_id;
745        let database_id = subscription.database_id.into();
746        let (_, version) = self
747            .metadata_manager
748            .catalog_controller
749            .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
750            .await?;
751        self.stream_manager
752            .drop_subscription(database_id, subscription_id as _, table_id)
753            .await;
754        tracing::debug!("finish drop subscription");
755        Ok(version)
756    }
757
758    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
759    #[await_tree::instrument]
760    pub(crate) async fn validate_cdc_table(
761        table: &Table,
762        table_fragments: &StreamJobFragments,
763    ) -> MetaResult<()> {
764        let stream_scan_fragment = table_fragments
765            .fragments
766            .values()
767            .filter(|f| f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan))
768            .exactly_one()
769            .ok()
770            .with_context(|| {
771                format!(
772                    "expect exactly one stream scan fragment, got: {:?}",
773                    table_fragments.fragments
774                )
775            })?;
776
777        assert_eq!(
778            stream_scan_fragment.actors.len(),
779            1,
780            "Stream scan fragment should have only one actor"
781        );
782        let mut found_cdc_scan = false;
783        match &stream_scan_fragment.nodes.node_body {
784            Some(NodeBody::StreamCdcScan(_)) => {
785                if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
786                    .await?
787                {
788                    found_cdc_scan = true;
789                }
790            }
791            // When there's generated columns, the cdc scan node is wrapped in a project node
792            Some(NodeBody::Project(_)) => {
793                for input in &stream_scan_fragment.nodes.input {
794                    if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
795                        found_cdc_scan = true;
796                    }
797                }
798            }
799            _ => {
800                bail!("Unexpected node body for stream cdc scan");
801            }
802        };
803        if !found_cdc_scan {
804            bail!("No stream cdc scan node found in stream scan fragment");
805        }
806        Ok(())
807    }
808
809    async fn validate_cdc_table_inner(
810        node_body: &Option<NodeBody>,
811        table_id: u32,
812    ) -> MetaResult<bool> {
813        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
814            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
815        {
816            let options_with_secret = WithOptionsSecResolved::new(
817                cdc_table_desc.connect_properties.clone(),
818                cdc_table_desc.secret_refs.clone(),
819            );
820
821            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
822            props.init_from_pb_cdc_table_desc(cdc_table_desc);
823
824            // Try creating a split enumerator to validate
825            let _enumerator = props
826                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
827                .await?;
828
829            tracing::debug!(?table_id, "validate cdc table success");
830            Ok(true)
831        } else {
832            Ok(false)
833        }
834    }
835
836    // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
837    // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
838    // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
839    pub(crate) async fn inject_replace_table_job_for_table_sink(
840        &self,
841        tmp_id: u32,
842        mgr: &MetadataManager,
843        stream_ctx: StreamContext,
844        sink: Option<&Sink>,
845        creating_sink_table_fragments: Option<&StreamJobFragments>,
846        dropping_sink_id: Option<SinkId>,
847        streaming_job: &StreamingJob,
848        fragment_graph: StreamFragmentGraph,
849    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
850        let (mut replace_table_ctx, mut stream_job_fragments) = self
851            .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _)
852            .await?;
853
854        let target_table = streaming_job.table().unwrap();
855
856        if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
857            let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
858            let sink = sink.expect("sink not found");
859            Self::inject_replace_table_plan_for_sink(
860                sink.id,
861                &sink_fragment,
862                target_table,
863                &mut replace_table_ctx,
864                stream_job_fragments.inner.union_fragment_for_table(),
865                None,
866            );
867        }
868
869        let [table_catalog]: [_; 1] = mgr
870            .get_table_catalog_by_ids(vec![target_table.id])
871            .await?
872            .try_into()
873            .expect("Target table should exist in sink into table");
874
875        {
876            let catalogs = mgr
877                .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
878                .await?;
879
880            for sink in catalogs {
881                let sink_id = sink.id;
882
883                if let Some(dropping_sink_id) = dropping_sink_id
884                    && sink_id == (dropping_sink_id as u32)
885                {
886                    continue;
887                };
888
889                let sink_table_fragments = mgr
890                    .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
891                    .await?;
892
893                let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
894
895                Self::inject_replace_table_plan_for_sink(
896                    sink_id,
897                    &sink_fragment,
898                    target_table,
899                    &mut replace_table_ctx,
900                    stream_job_fragments.inner.union_fragment_for_table(),
901                    Some(&sink.unique_identity()),
902                );
903            }
904        }
905
906        // check if the union fragment is fully assigned.
907        for fragment in stream_job_fragments.fragments.values() {
908            {
909                {
910                    visit_stream_node_body(&fragment.nodes, |node| {
911                        if let NodeBody::Merge(merge_node) = node {
912                            let upstream_fragment_id = merge_node.upstream_fragment_id;
913                            if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
914                                .upstream_fragment_downstreams
915                                .get(&upstream_fragment_id)
916                            {
917                                let mut upstream_fragment_downstreams =
918                                    external_upstream_fragment_downstreams.iter().filter(
919                                        |downstream| {
920                                            downstream.downstream_fragment_id
921                                                == fragment.fragment_id
922                                        },
923                                    );
924                                assert!(
925                                    upstream_fragment_downstreams.next().is_some(),
926                                    "All the mergers for the union should have been fully assigned beforehand."
927                                );
928                            } else {
929                                let mut upstream_fragment_downstreams = stream_job_fragments
930                                    .downstreams
931                                    .get(&upstream_fragment_id)
932                                    .into_iter()
933                                    .flatten()
934                                    .filter(|downstream| {
935                                        downstream.downstream_fragment_id == fragment.fragment_id
936                                    });
937                                assert!(
938                                    upstream_fragment_downstreams.next().is_some(),
939                                    "All the mergers for the union should have been fully assigned beforehand."
940                                );
941                            }
942                        }
943                    });
944                }
945            }
946        }
947
948        Ok((replace_table_ctx, stream_job_fragments))
949    }
950
951    pub(crate) fn inject_replace_table_plan_for_sink(
952        sink_id: u32,
953        sink_fragment: &Fragment,
954        table: &Table,
955        replace_table_ctx: &mut ReplaceStreamJobContext,
956        union_fragment: &mut Fragment,
957        unique_identity: Option<&str>,
958    ) {
959        let sink_fields = sink_fragment.nodes.fields.clone();
960
961        let output_indices = sink_fields
962            .iter()
963            .enumerate()
964            .map(|(idx, _)| idx as _)
965            .collect_vec();
966
967        let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
968
969        let sink_fragment_downstreams = replace_table_ctx
970            .upstream_fragment_downstreams
971            .entry(sink_fragment.fragment_id)
972            .or_default();
973
974        {
975            sink_fragment_downstreams.push(DownstreamFragmentRelation {
976                downstream_fragment_id: union_fragment.fragment_id,
977                dispatcher_type: DispatcherType::Hash,
978                dist_key_indices: dist_key_indices.clone(),
979                output_mapping: PbDispatchOutputMapping::simple(output_indices),
980            });
981        }
982
983        let upstream_fragment_id = sink_fragment.fragment_id;
984
985        let mut max_operator_id = 0;
986
987        visit_stream_node(&union_fragment.nodes, |node| {
988            max_operator_id = max_operator_id.max(node.operator_id);
989        });
990
991        {
992            {
993                visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
994                    if let Some(NodeBody::Union(_)) = &mut node.node_body {
995                        for input_project_node in &mut node.input {
996                            if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
997                                let merge_stream_node =
998                                    input_project_node.input.iter_mut().exactly_one().unwrap();
999
1000                                // we need to align nodes here
1001                                if input_project_node.identity.as_str()
1002                                    != unique_identity
1003                                        .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
1004                                {
1005                                    continue;
1006                                }
1007
1008                                if let Some(NodeBody::Merge(merge_node)) =
1009                                    &mut merge_stream_node.node_body
1010                                {
1011                                    {
1012                                        merge_stream_node.identity =
1013                                            format!("MergeExecutor(from sink {})", sink_id);
1014                                        merge_stream_node.operator_id = max_operator_id + 1;
1015
1016                                        input_project_node.identity =
1017                                            format!("ProjectExecutor(from sink {})", sink_id);
1018                                        input_project_node.operator_id = max_operator_id + 2;
1019                                    }
1020
1021                                    **merge_node = {
1022                                        #[expect(deprecated)]
1023                                        MergeNode {
1024                                            upstream_actor_id: vec![],
1025                                            upstream_fragment_id,
1026                                            upstream_dispatcher_type: PbDispatcherType::Hash as _,
1027                                            fields: sink_fields.to_vec(),
1028                                        }
1029                                    };
1030
1031                                    merge_stream_node.fields = sink_fields.to_vec();
1032
1033                                    return false;
1034                                }
1035                            }
1036                        }
1037                    }
1038                    true
1039                });
1040            }
1041        }
1042    }
1043
1044    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
1045    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
1046    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1047    pub async fn create_streaming_job(
1048        &self,
1049        mut streaming_job: StreamingJob,
1050        fragment_graph: StreamFragmentGraphProto,
1051        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1052        dependencies: HashSet<ObjectId>,
1053        specific_resource_group: Option<String>,
1054        if_not_exists: bool,
1055    ) -> MetaResult<NotificationVersion> {
1056        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1057        let check_ret = self
1058            .metadata_manager
1059            .catalog_controller
1060            .create_job_catalog(
1061                &mut streaming_job,
1062                &ctx,
1063                &fragment_graph.parallelism,
1064                fragment_graph.max_parallelism as _,
1065                dependencies,
1066                specific_resource_group.clone(),
1067            )
1068            .await;
1069        if let Err(meta_err) = check_ret {
1070            if !if_not_exists {
1071                return Err(meta_err);
1072            }
1073            if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1074                if streaming_job.create_type() == CreateType::Foreground {
1075                    let database_id = streaming_job.database_id();
1076                    return self
1077                        .metadata_manager
1078                        .wait_streaming_job_finished(database_id.into(), *job_id)
1079                        .await;
1080                } else {
1081                    return Ok(IGNORED_NOTIFICATION_VERSION);
1082                }
1083            } else {
1084                return Err(meta_err);
1085            }
1086        }
1087        let job_id = streaming_job.id();
1088
1089        tracing::debug!(
1090            id = job_id,
1091            definition = streaming_job.definition(),
1092            create_type = streaming_job.create_type().as_str_name(),
1093            job_type = ?streaming_job.job_type(),
1094            "starting streaming job",
1095        );
1096        let _permit = self
1097            .creating_streaming_job_permits
1098            .semaphore
1099            .acquire()
1100            .instrument_await("acquire_creating_streaming_job_permit")
1101            .await
1102            .unwrap();
1103        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1104
1105        let name = streaming_job.name();
1106        let definition = streaming_job.definition();
1107        let source_id = match &streaming_job {
1108            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1109            _ => None,
1110        };
1111
1112        // create streaming job.
1113        match self
1114            .create_streaming_job_inner(
1115                ctx,
1116                streaming_job,
1117                fragment_graph,
1118                affected_table_replace_info,
1119                specific_resource_group,
1120            )
1121            .await
1122        {
1123            Ok(version) => Ok(version),
1124            Err(err) => {
1125                tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
1126                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1127                    id: job_id,
1128                    name,
1129                    definition,
1130                    error: err.as_report().to_string(),
1131                };
1132                self.env.event_log_manager_ref().add_event_logs(vec![
1133                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1134                ]);
1135                let (aborted, _) = self
1136                    .metadata_manager
1137                    .catalog_controller
1138                    .try_abort_creating_streaming_job(job_id as _, false)
1139                    .await?;
1140                if aborted {
1141                    tracing::warn!(id = job_id, "aborted streaming job");
1142                    // FIXME: might also need other cleanup here
1143                    if let Some(source_id) = source_id {
1144                        self.source_manager
1145                            .apply_source_change(SourceChange::DropSource {
1146                                dropped_source_ids: vec![source_id as SourceId],
1147                            })
1148                            .await;
1149                    }
1150                }
1151                Err(err)
1152            }
1153        }
1154    }
1155
1156    #[await_tree::instrument(boxed)]
1157    async fn create_streaming_job_inner(
1158        &self,
1159        ctx: StreamContext,
1160        mut streaming_job: StreamingJob,
1161        fragment_graph: StreamFragmentGraphProto,
1162        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1163        specific_resource_group: Option<String>,
1164    ) -> MetaResult<NotificationVersion> {
1165        let mut fragment_graph =
1166            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1167        streaming_job.set_info_from_graph(&fragment_graph);
1168
1169        // create internal table catalogs and refill table id.
1170        let incomplete_internal_tables = fragment_graph
1171            .incomplete_internal_tables()
1172            .into_values()
1173            .collect_vec();
1174        let table_id_map = self
1175            .metadata_manager
1176            .catalog_controller
1177            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1178            .await?;
1179        fragment_graph.refill_internal_table_ids(table_id_map);
1180
1181        let affected_table_replace_info = match affected_table_replace_info {
1182            Some(replace_table_info) => {
1183                assert!(
1184                    specific_resource_group.is_none(),
1185                    "specific_resource_group is not supported for replace table (alter column or sink into table)"
1186                );
1187
1188                let ReplaceStreamJobInfo {
1189                    mut streaming_job,
1190                    fragment_graph,
1191                    ..
1192                } = replace_table_info;
1193
1194                // Ensure the max parallelism unchanged before replacing table.
1195                let original_max_parallelism = self
1196                    .metadata_manager
1197                    .get_job_max_parallelism(streaming_job.id().into())
1198                    .await?;
1199                let fragment_graph = PbStreamFragmentGraph {
1200                    max_parallelism: original_max_parallelism as _,
1201                    ..fragment_graph
1202                };
1203
1204                let fragment_graph =
1205                    StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1206                streaming_job.set_info_from_graph(&fragment_graph);
1207                let streaming_job = streaming_job;
1208
1209                Some((streaming_job, fragment_graph))
1210            }
1211            None => None,
1212        };
1213
1214        // create fragment and actor catalogs.
1215        tracing::debug!(id = streaming_job.id(), "building streaming job");
1216        let (ctx, stream_job_fragments) = self
1217            .build_stream_job(
1218                ctx,
1219                streaming_job,
1220                fragment_graph,
1221                affected_table_replace_info,
1222                specific_resource_group,
1223            )
1224            .await?;
1225
1226        let streaming_job = &ctx.streaming_job;
1227
1228        match streaming_job {
1229            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1230                Self::validate_cdc_table(table, &stream_job_fragments).await?;
1231            }
1232            StreamingJob::Table(Some(source), ..) => {
1233                // Register the source on the connector node.
1234                self.source_manager.register_source(source).await?;
1235                let connector_name = source
1236                    .get_with_properties()
1237                    .get(UPSTREAM_SOURCE_KEY)
1238                    .cloned();
1239                let attr = source.info.as_ref().map(|source_info| {
1240                    jsonbb::json!({
1241                            "format": source_info.format().as_str_name(),
1242                            "encode": source_info.row_encode().as_str_name(),
1243                    })
1244                });
1245                report_create_object(
1246                    streaming_job.id(),
1247                    "source",
1248                    PbTelemetryDatabaseObject::Source,
1249                    connector_name,
1250                    attr,
1251                );
1252            }
1253            StreamingJob::Sink(sink, _) => {
1254                // Validate the sink on the connector node.
1255                validate_sink(sink).await?;
1256                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1257                let attr = sink.format_desc.as_ref().map(|sink_info| {
1258                    jsonbb::json!({
1259                        "format": sink_info.format().as_str_name(),
1260                        "encode": sink_info.encode().as_str_name(),
1261                    })
1262                });
1263                report_create_object(
1264                    streaming_job.id(),
1265                    "sink",
1266                    PbTelemetryDatabaseObject::Sink,
1267                    connector_name,
1268                    attr,
1269                );
1270            }
1271            StreamingJob::Source(source) => {
1272                // Register the source on the connector node.
1273                self.source_manager.register_source(source).await?;
1274                let connector_name = source
1275                    .get_with_properties()
1276                    .get(UPSTREAM_SOURCE_KEY)
1277                    .cloned();
1278                let attr = source.info.as_ref().map(|source_info| {
1279                    jsonbb::json!({
1280                            "format": source_info.format().as_str_name(),
1281                            "encode": source_info.row_encode().as_str_name(),
1282                    })
1283                });
1284                report_create_object(
1285                    streaming_job.id(),
1286                    "source",
1287                    PbTelemetryDatabaseObject::Source,
1288                    connector_name,
1289                    attr,
1290                );
1291            }
1292            _ => {}
1293        }
1294
1295        self.metadata_manager
1296            .catalog_controller
1297            .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1298            .await?;
1299
1300        // create streaming jobs.
1301        let stream_job_id = streaming_job.id();
1302        match (streaming_job.create_type(), &streaming_job) {
1303            // FIXME(kwannoel): Unify background stream's creation path with MV below.
1304            (CreateType::Unspecified, _)
1305            | (CreateType::Foreground, _)
1306            | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1307                let version = self
1308                    .stream_manager
1309                    .create_streaming_job(stream_job_fragments, ctx, None)
1310                    .await?;
1311                Ok(version)
1312            }
1313            (CreateType::Background, _) => {
1314                let await_tree_key = format!("Background DDL Worker ({})", streaming_job.id());
1315                let await_tree_span =
1316                    span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1317
1318                let ctrl = self.clone();
1319                let (tx, rx) = oneshot::channel();
1320                let fut = async move {
1321                    let _ = ctrl
1322                        .stream_manager
1323                        .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1324                        .await
1325                        .inspect_err(|err| {
1326                            tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1327                        });
1328                };
1329
1330                let fut = (self.env.await_tree_reg())
1331                    .register(await_tree_key, await_tree_span)
1332                    .instrument(fut);
1333                tokio::spawn(fut);
1334
1335                rx.instrument_await("wait_background_streaming_job_creation_started")
1336                    .await
1337                    .map_err(|_| {
1338                        anyhow!(
1339                            "failed to receive create streaming job result of job: {}",
1340                            stream_job_id
1341                        )
1342                    })??;
1343                Ok(IGNORED_NOTIFICATION_VERSION)
1344            }
1345        }
1346    }
1347
1348    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1349    pub async fn drop_object(
1350        &self,
1351        object_type: ObjectType,
1352        object_id: ObjectId,
1353        drop_mode: DropMode,
1354        target_replace_info: Option<ReplaceStreamJobInfo>,
1355    ) -> MetaResult<NotificationVersion> {
1356        let (release_ctx, mut version) = self
1357            .metadata_manager
1358            .catalog_controller
1359            .drop_object(object_type, object_id, drop_mode)
1360            .await?;
1361
1362        if let Some(replace_table_info) = target_replace_info {
1363            let stream_ctx =
1364                StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1365
1366            let ReplaceStreamJobInfo {
1367                mut streaming_job,
1368                fragment_graph,
1369                ..
1370            } = replace_table_info;
1371
1372            let sink_id = if let ObjectType::Sink = object_type {
1373                object_id as _
1374            } else {
1375                panic!("additional replace table event only occurs when dropping sink into table")
1376            };
1377
1378            // Ensure the max parallelism unchanged before replacing table.
1379            let original_max_parallelism = self
1380                .metadata_manager
1381                .get_job_max_parallelism(streaming_job.id().into())
1382                .await?;
1383            let fragment_graph = PbStreamFragmentGraph {
1384                max_parallelism: original_max_parallelism as _,
1385                ..fragment_graph
1386            };
1387
1388            let fragment_graph =
1389                StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1390            streaming_job.set_info_from_graph(&fragment_graph);
1391            let streaming_job = streaming_job;
1392
1393            streaming_job.table().expect("should be table job");
1394
1395            tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1396            let tmp_id = self
1397                .metadata_manager
1398                .catalog_controller
1399                .create_job_catalog_for_replace(
1400                    &streaming_job,
1401                    &stream_ctx,
1402                    &fragment_graph.specified_parallelism(),
1403                    fragment_graph.max_parallelism(),
1404                )
1405                .await? as u32;
1406
1407            let (ctx, stream_job_fragments) = self
1408                .inject_replace_table_job_for_table_sink(
1409                    tmp_id,
1410                    &self.metadata_manager,
1411                    stream_ctx,
1412                    None,
1413                    None,
1414                    Some(sink_id),
1415                    &streaming_job,
1416                    fragment_graph,
1417                )
1418                .await?;
1419
1420            let result: MetaResult<_> = try {
1421                let replace_upstream = ctx.replace_upstream.clone();
1422
1423                self.metadata_manager
1424                    .catalog_controller
1425                    .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1426                    .await?;
1427
1428                self.stream_manager
1429                    .replace_stream_job(stream_job_fragments, ctx)
1430                    .await?;
1431
1432                replace_upstream
1433            };
1434
1435            version = match result {
1436                Ok(replace_upstream) => {
1437                    let version = self
1438                        .metadata_manager
1439                        .catalog_controller
1440                        .finish_replace_streaming_job(
1441                            tmp_id as _,
1442                            streaming_job,
1443                            replace_upstream,
1444                            SinkIntoTableContext {
1445                                creating_sink_id: None,
1446                                dropping_sink_id: Some(sink_id),
1447                                updated_sink_catalogs: vec![],
1448                            },
1449                            None, // no source is dropped when dropping sink into table
1450                        )
1451                        .await?;
1452                    Ok(version)
1453                }
1454                Err(err) => {
1455                    tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1456                    let _ = self.metadata_manager
1457                        .catalog_controller
1458                        .try_abort_replacing_streaming_job(tmp_id as _)
1459                        .await
1460                        .inspect_err(|err| {
1461                            tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1462                        });
1463                    Err(err)
1464                }
1465            }?;
1466        }
1467
1468        let ReleaseContext {
1469            database_id,
1470            removed_streaming_job_ids,
1471            removed_state_table_ids,
1472            removed_source_ids,
1473            removed_secret_ids: secret_ids,
1474            removed_source_fragments,
1475            removed_actors,
1476            removed_fragments,
1477        } = release_ctx;
1478
1479        let _guard = self.source_manager.pause_tick().await;
1480        self.stream_manager
1481            .drop_streaming_jobs(
1482                risingwave_common::catalog::DatabaseId::new(database_id as _),
1483                removed_actors.iter().map(|id| *id as _).collect(),
1484                removed_streaming_job_ids,
1485                removed_state_table_ids,
1486                removed_fragments.iter().map(|id| *id as _).collect(),
1487            )
1488            .await;
1489
1490        // clean up sources after dropping streaming jobs.
1491        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1492        self.source_manager
1493            .apply_source_change(SourceChange::DropSource {
1494                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1495            })
1496            .await;
1497
1498        // unregister fragments and actors from source manager.
1499        // FIXME: need also unregister source backfill fragments.
1500        let dropped_source_fragments = removed_source_fragments
1501            .into_iter()
1502            .map(|(source_id, fragments)| {
1503                (
1504                    source_id,
1505                    fragments.into_iter().map(|id| id as u32).collect(),
1506                )
1507            })
1508            .collect();
1509        let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1510        self.source_manager
1511            .apply_source_change(SourceChange::DropMv {
1512                dropped_source_fragments,
1513                dropped_actors,
1514            })
1515            .await;
1516
1517        // remove secrets.
1518        for secret in secret_ids {
1519            LocalSecretManager::global().remove_secret(secret as _);
1520        }
1521        Ok(version)
1522    }
1523
1524    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1525    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1526    pub async fn replace_job(
1527        &self,
1528        mut streaming_job: StreamingJob,
1529        fragment_graph: StreamFragmentGraphProto,
1530    ) -> MetaResult<NotificationVersion> {
1531        match &mut streaming_job {
1532            StreamingJob::Table(..)
1533            | StreamingJob::Source(..)
1534            | StreamingJob::MaterializedView(..) => {}
1535            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1536                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1537            }
1538        }
1539
1540        let job_id = streaming_job.id();
1541
1542        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1543        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1544
1545        // Ensure the max parallelism unchanged before replacing table.
1546        let original_max_parallelism = self
1547            .metadata_manager
1548            .get_job_max_parallelism(streaming_job.id().into())
1549            .await?;
1550        let fragment_graph = PbStreamFragmentGraph {
1551            max_parallelism: original_max_parallelism as _,
1552            ..fragment_graph
1553        };
1554
1555        // 1. build fragment graph.
1556        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1557        streaming_job.set_info_from_graph(&fragment_graph);
1558
1559        // make it immutable
1560        let streaming_job = streaming_job;
1561
1562        let tmp_id = self
1563            .metadata_manager
1564            .catalog_controller
1565            .create_job_catalog_for_replace(
1566                &streaming_job,
1567                &ctx,
1568                &fragment_graph.specified_parallelism(),
1569                fragment_graph.max_parallelism(),
1570            )
1571            .await?;
1572
1573        tracing::debug!(id = job_id, "building replace streaming job");
1574        let mut updated_sink_catalogs = vec![];
1575
1576        let mut drop_table_connector_ctx = None;
1577        let result: MetaResult<_> = try {
1578            let (mut ctx, mut stream_job_fragments) = self
1579                .build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _)
1580                .await?;
1581            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1582
1583            // Handle sinks that sink into the table.
1584            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1585                let catalogs = self
1586                    .metadata_manager
1587                    .get_sink_catalog_by_ids(&table.incoming_sinks)
1588                    .await?;
1589
1590                for sink in catalogs {
1591                    let sink_id = &sink.id;
1592
1593                    let sink_table_fragments = self
1594                        .metadata_manager
1595                        .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1596                            *sink_id,
1597                        ))
1598                        .await?;
1599
1600                    let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1601
1602                    Self::inject_replace_table_plan_for_sink(
1603                        *sink_id,
1604                        &sink_fragment,
1605                        table,
1606                        &mut ctx,
1607                        stream_job_fragments.inner.union_fragment_for_table(),
1608                        Some(&sink.unique_identity()),
1609                    );
1610
1611                    if sink.original_target_columns.is_empty() {
1612                        updated_sink_catalogs.push(sink.id as _);
1613                    }
1614                }
1615            }
1616
1617            let replace_upstream = ctx.replace_upstream.clone();
1618
1619            self.metadata_manager
1620                .catalog_controller
1621                .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1622                .await?;
1623
1624            self.stream_manager
1625                .replace_stream_job(stream_job_fragments, ctx)
1626                .await?;
1627            replace_upstream
1628        };
1629
1630        match result {
1631            Ok(replace_upstream) => {
1632                let version = self
1633                    .metadata_manager
1634                    .catalog_controller
1635                    .finish_replace_streaming_job(
1636                        tmp_id,
1637                        streaming_job,
1638                        replace_upstream,
1639                        SinkIntoTableContext {
1640                            creating_sink_id: None,
1641                            dropping_sink_id: None,
1642                            updated_sink_catalogs,
1643                        },
1644                        drop_table_connector_ctx.as_ref(),
1645                    )
1646                    .await?;
1647                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1648                    self.source_manager
1649                        .apply_source_change(SourceChange::DropSource {
1650                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1651                        })
1652                        .await;
1653                }
1654                Ok(version)
1655            }
1656            Err(err) => {
1657                tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1658                let _ = self.metadata_manager
1659                    .catalog_controller
1660                    .try_abort_replacing_streaming_job(tmp_id)
1661                    .await.inspect_err(|err| {
1662                    tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1663                });
1664                Err(err)
1665            }
1666        }
1667    }
1668
1669    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" })]
1670    async fn drop_streaming_job(
1671        &self,
1672        job_id: StreamingJobId,
1673        drop_mode: DropMode,
1674        target_replace_info: Option<ReplaceStreamJobInfo>,
1675    ) -> MetaResult<NotificationVersion> {
1676        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1677
1678        let (object_id, object_type) = match job_id {
1679            StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1680            StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1681            StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1682            StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1683        };
1684
1685        let version = self
1686            .drop_object(object_type, object_id, drop_mode, target_replace_info)
1687            .await?;
1688        #[cfg(not(madsim))]
1689        if let StreamingJobId::Sink(sink_id) = job_id {
1690            // delete system table for exactly once iceberg sink
1691            // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1692            let db = self.env.meta_store_ref().conn.clone();
1693            clean_all_rows_by_sink_id(&db, sink_id).await?;
1694        }
1695        Ok(version)
1696    }
1697
1698    /// Resolve the parallelism of the stream job based on the given information.
1699    ///
1700    /// Returns error if user specifies a parallelism that cannot be satisfied.
1701    fn resolve_stream_parallelism(
1702        &self,
1703        specified: Option<NonZeroUsize>,
1704        max: NonZeroUsize,
1705        cluster_info: &StreamingClusterInfo,
1706        resource_group: String,
1707    ) -> MetaResult<NonZeroUsize> {
1708        let available = cluster_info.parallelism(&resource_group);
1709        let Some(available) = NonZeroUsize::new(available) else {
1710            bail_unavailable!(
1711                "no available slots to schedule in resource group \"{}\", \
1712                 have you allocated any compute nodes within this resource group?",
1713                resource_group
1714            );
1715        };
1716
1717        if let Some(specified) = specified {
1718            if specified > max {
1719                bail_invalid_parameter!(
1720                    "specified parallelism {} should not exceed max parallelism {}",
1721                    specified,
1722                    max,
1723                );
1724            }
1725            if specified > available {
1726                bail_unavailable!(
1727                    "insufficient parallelism to schedule in resource group \"{}\", \
1728                     required: {}, available: {}",
1729                    resource_group,
1730                    specified,
1731                    available,
1732                );
1733            }
1734            Ok(specified)
1735        } else {
1736            // Use configured parallelism if no default parallelism is specified.
1737            let default_parallelism = match self.env.opts.default_parallelism {
1738                DefaultParallelism::Full => available,
1739                DefaultParallelism::Default(num) => {
1740                    if num > available {
1741                        bail_unavailable!(
1742                            "insufficient parallelism to schedule in resource group \"{}\", \
1743                            required: {}, available: {}",
1744                            resource_group,
1745                            num,
1746                            available,
1747                        );
1748                    }
1749                    num
1750                }
1751            };
1752
1753            if default_parallelism > max {
1754                tracing::warn!(
1755                    max_parallelism = max.get(),
1756                    resource_group,
1757                    "too many parallelism available, use max parallelism instead",
1758                );
1759            }
1760            Ok(default_parallelism.min(max))
1761        }
1762    }
1763
1764    /// Builds the actor graph:
1765    /// - Add the upstream fragments to the fragment graph
1766    /// - Schedule the fragments based on their distribution
1767    /// - Expand each fragment into one or several actors
1768    /// - Construct the fragment level backfill order control.
1769    #[await_tree::instrument]
1770    pub(crate) async fn build_stream_job(
1771        &self,
1772        stream_ctx: StreamContext,
1773        mut stream_job: StreamingJob,
1774        fragment_graph: StreamFragmentGraph,
1775        affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1776        specific_resource_group: Option<String>,
1777    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1778        let id = stream_job.id();
1779        let specified_parallelism = fragment_graph.specified_parallelism();
1780        let expr_context = stream_ctx.to_expr_context();
1781        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1782
1783        // 1. Fragment Level ordering graph
1784        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1785
1786        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1787        // contains all information needed for building the actor graph.
1788
1789        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1790            fragment_graph.collect_snapshot_backfill_info()?;
1791        assert!(
1792            snapshot_backfill_info
1793                .iter()
1794                .chain([&cross_db_snapshot_backfill_info])
1795                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1796                .all(|backfill_epoch| backfill_epoch.is_none()),
1797            "should not set backfill epoch when initially build the job: {:?} {:?}",
1798            snapshot_backfill_info,
1799            cross_db_snapshot_backfill_info
1800        );
1801
1802        // check if log store exists for all cross-db upstreams
1803        self.metadata_manager
1804            .catalog_controller
1805            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1806            .await?;
1807
1808        let upstream_table_ids = fragment_graph
1809            .dependent_table_ids()
1810            .iter()
1811            .filter(|id| {
1812                !cross_db_snapshot_backfill_info
1813                    .upstream_mv_table_id_to_backfill_epoch
1814                    .contains_key(id)
1815            })
1816            .cloned()
1817            .collect();
1818
1819        let (upstream_root_fragments, existing_actor_location) = self
1820            .metadata_manager
1821            .get_upstream_root_fragments(&upstream_table_ids)
1822            .await?;
1823
1824        if snapshot_backfill_info.is_some() {
1825            match stream_job {
1826                StreamingJob::MaterializedView(_)
1827                | StreamingJob::Sink(_, _)
1828                | StreamingJob::Index(_, _) => {}
1829                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1830                    return Err(
1831                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1832                    );
1833                }
1834            }
1835        }
1836
1837        let upstream_actors = upstream_root_fragments
1838            .values()
1839            .map(|fragment| {
1840                (
1841                    fragment.fragment_id,
1842                    fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1843                )
1844            })
1845            .collect();
1846
1847        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1848            fragment_graph,
1849            upstream_root_fragments,
1850            existing_actor_location,
1851            (&stream_job).into(),
1852        )?;
1853
1854        let resource_group = match specific_resource_group {
1855            None => {
1856                self.metadata_manager
1857                    .get_database_resource_group(stream_job.database_id() as ObjectId)
1858                    .await?
1859            }
1860            Some(resource_group) => resource_group,
1861        };
1862
1863        // 3. Build the actor graph.
1864        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1865
1866        let parallelism = self.resolve_stream_parallelism(
1867            specified_parallelism,
1868            max_parallelism,
1869            &cluster_info,
1870            resource_group.clone(),
1871        )?;
1872
1873        let parallelism = self
1874            .env
1875            .system_params_reader()
1876            .await
1877            .adaptive_parallelism_strategy()
1878            .compute_target_parallelism(parallelism.get());
1879
1880        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1881        let actor_graph_builder = ActorGraphBuilder::new(
1882            id,
1883            resource_group,
1884            complete_graph,
1885            cluster_info,
1886            parallelism,
1887        )?;
1888
1889        let ActorGraphBuildResult {
1890            graph,
1891            downstream_fragment_relations,
1892            building_locations,
1893            existing_locations,
1894            upstream_fragment_downstreams,
1895            new_no_shuffle,
1896            replace_upstream,
1897            ..
1898        } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1899        assert!(replace_upstream.is_empty());
1900
1901        // 4. Build the table fragments structure that will be persisted in the stream manager,
1902        // and the context that contains all information needed for building the
1903        // actors on the compute nodes.
1904
1905        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1906        // Otherwise, it defaults to FIXED based on deduction.
1907        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1908            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1909            _ => TableParallelism::Fixed(parallelism.get()),
1910        };
1911
1912        let stream_job_fragments = StreamJobFragments::new(
1913            id.into(),
1914            graph,
1915            &building_locations.actor_locations,
1916            stream_ctx.clone(),
1917            table_parallelism,
1918            max_parallelism.get(),
1919        );
1920        let internal_tables = stream_job_fragments.internal_tables();
1921
1922        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1923            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1924        }
1925
1926        let replace_table_job_info = match affected_table_replace_info {
1927            Some((table_stream_job, fragment_graph)) => {
1928                if snapshot_backfill_info.is_some() {
1929                    return Err(anyhow!(
1930                        "snapshot backfill should not have replace table info: {table_stream_job:?}"
1931                    )
1932                    .into());
1933                }
1934                let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1935                    bail!("additional replace table event only occurs when sinking into table");
1936                };
1937
1938                table_stream_job.table().expect("should be table job");
1939                let tmp_id = self
1940                    .metadata_manager
1941                    .catalog_controller
1942                    .create_job_catalog_for_replace(
1943                        &table_stream_job,
1944                        &stream_ctx,
1945                        &fragment_graph.specified_parallelism(),
1946                        fragment_graph.max_parallelism(),
1947                    )
1948                    .await? as u32;
1949
1950                let (context, table_fragments) = self
1951                    .inject_replace_table_job_for_table_sink(
1952                        tmp_id,
1953                        &self.metadata_manager,
1954                        stream_ctx,
1955                        Some(sink),
1956                        Some(&stream_job_fragments),
1957                        None,
1958                        &table_stream_job,
1959                        fragment_graph,
1960                    )
1961                    .await?;
1962                // When sinking into table occurs, some variables of the target table may be modified,
1963                // such as `fragment_id` being altered by `prepare_replace_table`.
1964                // At this point, it’s necessary to update the table info carried with the sink.
1965                must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1966                    // The StreamingJob in ReplaceTableInfo must be StreamingJob::Table
1967                    *target_table = Some((table.clone(), source.clone()));
1968                });
1969
1970                Some((table_stream_job, context, table_fragments))
1971            }
1972            None => None,
1973        };
1974
1975        let ctx = CreateStreamingJobContext {
1976            upstream_fragment_downstreams,
1977            new_no_shuffle,
1978            upstream_actors,
1979            internal_tables,
1980            building_locations,
1981            existing_locations,
1982            definition: stream_job.definition(),
1983            mv_table_id: stream_job.mv_table(),
1984            create_type: stream_job.create_type(),
1985            job_type: (&stream_job).into(),
1986            streaming_job: stream_job,
1987            replace_table_job_info,
1988            option: CreateStreamingJobOption {},
1989            snapshot_backfill_info,
1990            cross_db_snapshot_backfill_info,
1991            fragment_backfill_ordering,
1992        };
1993
1994        Ok((
1995            ctx,
1996            StreamJobFragmentsToCreate {
1997                inner: stream_job_fragments,
1998                downstreams: downstream_fragment_relations,
1999            },
2000        ))
2001    }
2002
2003    /// `build_replace_table` builds a job replacement and returns the context and new job
2004    /// fragments.
2005    ///
2006    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
2007    /// replacement is finished.
2008    pub(crate) async fn build_replace_job(
2009        &self,
2010        stream_ctx: StreamContext,
2011        stream_job: &StreamingJob,
2012        mut fragment_graph: StreamFragmentGraph,
2013        tmp_job_id: TableId,
2014    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2015        match &stream_job {
2016            StreamingJob::Table(..)
2017            | StreamingJob::Source(..)
2018            | StreamingJob::MaterializedView(..) => {}
2019            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2020                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2021            }
2022        }
2023
2024        let id = stream_job.id();
2025        let expr_context = stream_ctx.to_expr_context();
2026
2027        // check if performing drop table connector
2028        let mut drop_table_associated_source_id = None;
2029        if let StreamingJob::Table(None, _, _) = &stream_job {
2030            drop_table_associated_source_id = self
2031                .metadata_manager
2032                .get_table_associated_source_id(id as _)
2033                .await?;
2034        }
2035
2036        let old_fragments = self
2037            .metadata_manager
2038            .get_job_fragments_by_id(&id.into())
2039            .await?;
2040        let old_internal_table_ids = old_fragments.internal_table_ids();
2041
2042        // handle drop table's associated source
2043        let mut drop_table_connector_ctx = None;
2044        if let Some(to_remove_source_id) = drop_table_associated_source_id {
2045            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
2046            debug_assert!(old_internal_table_ids.len() == 1);
2047
2048            drop_table_connector_ctx = Some(DropTableConnectorContext {
2049                // we do not remove the original table catalog as it's still needed for the streaming job
2050                // just need to remove the ref to the state table
2051                to_change_streaming_job_id: id as i32,
2052                to_remove_state_table_id: old_internal_table_ids[0] as i32, // asserted before
2053                to_remove_source_id,
2054            });
2055        } else if stream_job.is_materialized_view() {
2056            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
2057            // but more robust.
2058            let old_fragments_upstreams = self
2059                .metadata_manager
2060                .catalog_controller
2061                .upstream_fragments(old_fragments.fragment_ids())
2062                .await?;
2063
2064            let old_state_graph =
2065                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2066            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2067            let mapping =
2068                state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
2069                    .context("incompatible altering on the streaming job states")?;
2070
2071            fragment_graph.fit_internal_table_ids_with_mapping(mapping);
2072        } else {
2073            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2074            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2075            let old_internal_tables = self
2076                .metadata_manager
2077                .get_table_catalog_by_ids(old_internal_table_ids)
2078                .await?;
2079            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2080        }
2081
2082        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2083        // graph that contains all information needed for building the actor graph.
2084        let original_root_fragment = old_fragments
2085            .root_fragment()
2086            .expect("root fragment not found");
2087
2088        let job_type = StreamingJobType::from(stream_job);
2089
2090        // Extract the downstream fragments from the fragment graph.
2091        let (downstream_fragments, downstream_actor_location) =
2092            self.metadata_manager.get_downstream_fragments(id).await?;
2093
2094        // build complete graph based on the table job type
2095        let complete_graph = match &job_type {
2096            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2097                CompleteStreamFragmentGraph::with_downstreams(
2098                    fragment_graph,
2099                    original_root_fragment.fragment_id,
2100                    downstream_fragments,
2101                    downstream_actor_location,
2102                    job_type,
2103                )?
2104            }
2105            StreamingJobType::Table(TableJobType::SharedCdcSource)
2106            | StreamingJobType::MaterializedView => {
2107                // CDC tables or materialized views can have upstream jobs as well.
2108                let (upstream_root_fragments, upstream_actor_location) = self
2109                    .metadata_manager
2110                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2111                    .await?;
2112
2113                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2114                    fragment_graph,
2115                    upstream_root_fragments,
2116                    upstream_actor_location,
2117                    original_root_fragment.fragment_id,
2118                    downstream_fragments,
2119                    downstream_actor_location,
2120                    job_type,
2121                )?
2122            }
2123            _ => unreachable!(),
2124        };
2125
2126        let resource_group = self
2127            .metadata_manager
2128            .get_existing_job_resource_group(id as ObjectId)
2129            .await?;
2130
2131        // 2. Build the actor graph.
2132        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2133
2134        // XXX: what is this parallelism?
2135        // Is it "assigned parallelism"?
2136        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2137            .expect("The number of actors in the original table fragment should be greater than 0");
2138
2139        let actor_graph_builder = ActorGraphBuilder::new(
2140            id,
2141            resource_group,
2142            complete_graph,
2143            cluster_info,
2144            parallelism,
2145        )?;
2146
2147        let ActorGraphBuildResult {
2148            graph,
2149            downstream_fragment_relations,
2150            building_locations,
2151            existing_locations,
2152            upstream_fragment_downstreams,
2153            replace_upstream,
2154            new_no_shuffle,
2155            ..
2156        } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2157
2158        // general table & source does not have upstream job, so the dispatchers should be empty
2159        if matches!(
2160            job_type,
2161            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2162        ) {
2163            assert!(upstream_fragment_downstreams.is_empty());
2164        }
2165
2166        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2167        // the context that contains all information needed for building the actors on the compute
2168        // nodes.
2169        let stream_job_fragments = StreamJobFragments::new(
2170            (tmp_job_id as u32).into(),
2171            graph,
2172            &building_locations.actor_locations,
2173            stream_ctx,
2174            old_fragments.assigned_parallelism,
2175            old_fragments.max_parallelism,
2176        );
2177
2178        // Note: no need to set `vnode_count` as it's already set by the frontend.
2179        // See `get_replace_table_plan`.
2180
2181        let ctx = ReplaceStreamJobContext {
2182            old_fragments,
2183            replace_upstream,
2184            new_no_shuffle,
2185            upstream_fragment_downstreams,
2186            building_locations,
2187            existing_locations,
2188            streaming_job: stream_job.clone(),
2189            tmp_id: tmp_job_id as _,
2190            drop_table_connector_ctx,
2191        };
2192
2193        Ok((
2194            ctx,
2195            StreamJobFragmentsToCreate {
2196                inner: stream_job_fragments,
2197                downstreams: downstream_fragment_relations,
2198            },
2199        ))
2200    }
2201
2202    async fn alter_name(
2203        &self,
2204        relation: alter_name_request::Object,
2205        new_name: &str,
2206    ) -> MetaResult<NotificationVersion> {
2207        let (obj_type, id) = match relation {
2208            alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2209            alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2210            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2211            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2212            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2213            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2214            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2215            alter_name_request::Object::SubscriptionId(id) => {
2216                (ObjectType::Subscription, id as ObjectId)
2217            }
2218        };
2219        self.metadata_manager
2220            .catalog_controller
2221            .alter_name(obj_type, id, new_name)
2222            .await
2223    }
2224
2225    async fn alter_swap_rename(
2226        &self,
2227        object: alter_swap_rename_request::Object,
2228    ) -> MetaResult<NotificationVersion> {
2229        let (obj_type, src_id, dst_id) = match object {
2230            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2231            alter_swap_rename_request::Object::Table(objs) => {
2232                let (src_id, dst_id) = (
2233                    objs.src_object_id as ObjectId,
2234                    objs.dst_object_id as ObjectId,
2235                );
2236                (ObjectType::Table, src_id, dst_id)
2237            }
2238            alter_swap_rename_request::Object::View(objs) => {
2239                let (src_id, dst_id) = (
2240                    objs.src_object_id as ObjectId,
2241                    objs.dst_object_id as ObjectId,
2242                );
2243                (ObjectType::View, src_id, dst_id)
2244            }
2245            alter_swap_rename_request::Object::Source(objs) => {
2246                let (src_id, dst_id) = (
2247                    objs.src_object_id as ObjectId,
2248                    objs.dst_object_id as ObjectId,
2249                );
2250                (ObjectType::Source, src_id, dst_id)
2251            }
2252            alter_swap_rename_request::Object::Sink(objs) => {
2253                let (src_id, dst_id) = (
2254                    objs.src_object_id as ObjectId,
2255                    objs.dst_object_id as ObjectId,
2256                );
2257                (ObjectType::Sink, src_id, dst_id)
2258            }
2259            alter_swap_rename_request::Object::Subscription(objs) => {
2260                let (src_id, dst_id) = (
2261                    objs.src_object_id as ObjectId,
2262                    objs.dst_object_id as ObjectId,
2263                );
2264                (ObjectType::Subscription, src_id, dst_id)
2265            }
2266        };
2267
2268        self.metadata_manager
2269            .catalog_controller
2270            .alter_swap_rename(obj_type, src_id, dst_id)
2271            .await
2272    }
2273
2274    async fn alter_owner(
2275        &self,
2276        object: Object,
2277        owner_id: UserId,
2278    ) -> MetaResult<NotificationVersion> {
2279        let (obj_type, id) = match object {
2280            Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2281            Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2282            Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2283            Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2284            Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2285            Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2286            Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2287            Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2288        };
2289        self.metadata_manager
2290            .catalog_controller
2291            .alter_owner(obj_type, id, owner_id as _)
2292            .await
2293    }
2294
2295    async fn alter_set_schema(
2296        &self,
2297        object: alter_set_schema_request::Object,
2298        new_schema_id: SchemaId,
2299    ) -> MetaResult<NotificationVersion> {
2300        let (obj_type, id) = match object {
2301            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2302            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2303            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2304            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2305            alter_set_schema_request::Object::FunctionId(id) => {
2306                (ObjectType::Function, id as ObjectId)
2307            }
2308            alter_set_schema_request::Object::ConnectionId(id) => {
2309                (ObjectType::Connection, id as ObjectId)
2310            }
2311            alter_set_schema_request::Object::SubscriptionId(id) => {
2312                (ObjectType::Subscription, id as ObjectId)
2313            }
2314        };
2315        self.metadata_manager
2316            .catalog_controller
2317            .alter_schema(obj_type, id, new_schema_id as _)
2318            .await
2319    }
2320
2321    pub async fn wait(&self) -> MetaResult<()> {
2322        let timeout_ms = 30 * 60 * 1000;
2323        for _ in 0..timeout_ms {
2324            if self
2325                .metadata_manager
2326                .catalog_controller
2327                .list_background_creating_mviews(true)
2328                .await?
2329                .is_empty()
2330            {
2331                return Ok(());
2332            }
2333
2334            sleep(Duration::from_millis(1)).await;
2335        }
2336        Err(MetaError::cancelled(format!(
2337            "timeout after {timeout_ms}ms"
2338        )))
2339    }
2340
2341    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2342        self.metadata_manager
2343            .catalog_controller
2344            .comment_on(comment)
2345            .await
2346    }
2347}
2348
2349fn report_create_object(
2350    catalog_id: u32,
2351    event_name: &str,
2352    obj_type: PbTelemetryDatabaseObject,
2353    connector_name: Option<String>,
2354    attr_info: Option<jsonbb::Value>,
2355) {
2356    report_event(
2357        PbTelemetryEventStage::CreateStreamJob,
2358        event_name,
2359        catalog_id.into(),
2360        connector_name,
2361        Some(obj_type),
2362        attr_info,
2363    );
2364}
2365
2366async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2367    match Entity::delete_many()
2368        .filter(Column::SinkId.eq(sink_id))
2369        .exec(db)
2370        .await
2371    {
2372        Ok(result) => {
2373            let deleted_count = result.rows_affected;
2374
2375            tracing::info!(
2376                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2377                deleted_count,
2378                sink_id
2379            );
2380            Ok(())
2381        }
2382        Err(e) => {
2383            tracing::error!(
2384                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2385                sink_id,
2386                e.as_report()
2387            );
2388            Err(e.into())
2389        }
2390    }
2391}