risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::AlterDatabaseParam;
27use risingwave_common::config::DefaultParallelism;
28use risingwave_common::hash::VnodeCountCompat;
29use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::util::stream_graph_visitor::{
32    visit_stream_node, visit_stream_node_cont_mut,
33};
34use risingwave_common::{bail, bail_not_implemented, must_match};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::{
38    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
39};
40use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
41use risingwave_meta_model::object::ObjectType;
42use risingwave_meta_model::{
43    ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
44    SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId,
45};
46use risingwave_pb::catalog::{
47    Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
48    Subscription, Table, View,
49};
50use risingwave_pb::ddl_service::alter_owner_request::Object;
51use risingwave_pb::ddl_service::{
52    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
53    alter_swap_rename_request,
54};
55use risingwave_pb::stream_plan::stream_node::NodeBody;
56use risingwave_pb::stream_plan::{
57    FragmentTypeFlag, MergeNode, PbDispatchOutputMapping, PbDispatcherType, PbStreamFragmentGraph,
58    StreamFragmentGraph as StreamFragmentGraphProto,
59};
60use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
61use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
62use strum::Display;
63use thiserror_ext::AsReport;
64use tokio::sync::{Semaphore, oneshot};
65use tokio::time::sleep;
66use tracing::Instrument;
67
68use crate::barrier::BarrierManagerRef;
69use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
70use crate::controller::cluster::StreamingClusterInfo;
71use crate::controller::streaming_job::SinkIntoTableContext;
72use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
73use crate::manager::{
74    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
75    NotificationVersion, StreamingJob, StreamingJobType,
76};
77use crate::model::{
78    DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
79    StreamJobFragmentsToCreate, TableParallelism,
80};
81use crate::stream::{
82    ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
83    CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
84    JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
85    StreamFragmentGraph, create_source_worker, validate_sink,
86};
87use crate::telemetry::report_event;
88use crate::{MetaError, MetaResult};
89
90#[derive(PartialEq)]
91pub enum DropMode {
92    Restrict,
93    Cascade,
94}
95
96impl DropMode {
97    pub fn from_request_setting(cascade: bool) -> DropMode {
98        if cascade {
99            DropMode::Cascade
100        } else {
101            DropMode::Restrict
102        }
103    }
104}
105
106#[derive(strum::AsRefStr)]
107pub enum StreamingJobId {
108    MaterializedView(TableId),
109    Sink(SinkId),
110    Table(Option<SourceId>, TableId),
111    Index(IndexId),
112}
113
114impl std::fmt::Display for StreamingJobId {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        write!(f, "{}", self.as_ref())?;
117        write!(f, "({})", self.id())
118    }
119}
120
121impl StreamingJobId {
122    #[allow(dead_code)]
123    fn id(&self) -> TableId {
124        match self {
125            StreamingJobId::MaterializedView(id)
126            | StreamingJobId::Sink(id)
127            | StreamingJobId::Table(_, id)
128            | StreamingJobId::Index(id) => *id,
129        }
130    }
131}
132
133/// It’s used to describe the information of the job that needs to be replaced
134/// and it will be used during replacing table and creating sink into table operations.
135pub struct ReplaceStreamJobInfo {
136    pub streaming_job: StreamingJob,
137    pub fragment_graph: StreamFragmentGraphProto,
138}
139
140#[derive(Display)]
141pub enum DdlCommand {
142    CreateDatabase(Database),
143    DropDatabase(DatabaseId),
144    CreateSchema(Schema),
145    DropSchema(SchemaId, DropMode),
146    CreateNonSharedSource(Source),
147    DropSource(SourceId, DropMode),
148    CreateFunction(Function),
149    DropFunction(FunctionId),
150    CreateView(View),
151    DropView(ViewId, DropMode),
152    CreateStreamingJob {
153        stream_job: StreamingJob,
154        fragment_graph: StreamFragmentGraphProto,
155        create_type: CreateType,
156        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
157        dependencies: HashSet<ObjectId>,
158        specific_resource_group: Option<String>, // specific resource group
159        if_not_exists: bool,
160    },
161    DropStreamingJob {
162        job_id: StreamingJobId,
163        drop_mode: DropMode,
164        target_replace_info: Option<ReplaceStreamJobInfo>,
165    },
166    AlterName(alter_name_request::Object, String),
167    AlterSwapRename(alter_swap_rename_request::Object),
168    ReplaceStreamJob(ReplaceStreamJobInfo),
169    AlterNonSharedSource(Source),
170    AlterObjectOwner(Object, UserId),
171    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
172    CreateConnection(Connection),
173    DropConnection(ConnectionId),
174    CreateSecret(Secret),
175    AlterSecret(Secret),
176    DropSecret(SecretId),
177    CommentOn(Comment),
178    CreateSubscription(Subscription),
179    DropSubscription(SubscriptionId, DropMode),
180    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
181}
182
183impl DdlCommand {
184    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
185    fn object(&self) -> Either<String, ObjectId> {
186        use Either::*;
187        match self {
188            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
189            DdlCommand::DropDatabase(id) => Right(*id),
190            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
191            DdlCommand::DropSchema(id, _) => Right(*id),
192            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
193            DdlCommand::DropSource(id, _) => Right(*id),
194            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
195            DdlCommand::DropFunction(id) => Right(*id),
196            DdlCommand::CreateView(view) => Left(view.name.clone()),
197            DdlCommand::DropView(id, _) => Right(*id),
198            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
199            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
200            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
201            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
202            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
203            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
204            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
205            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
206            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
207            DdlCommand::DropConnection(id) => Right(*id),
208            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
209            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
210            DdlCommand::DropSecret(id) => Right(*id),
211            DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
212            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
213            DdlCommand::DropSubscription(id, _) => Right(*id),
214            DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
215        }
216    }
217
218    fn allow_in_recovery(&self) -> bool {
219        match self {
220            DdlCommand::DropDatabase(_)
221            | DdlCommand::DropSchema(_, _)
222            | DdlCommand::DropSource(_, _)
223            | DdlCommand::DropFunction(_)
224            | DdlCommand::DropView(_, _)
225            | DdlCommand::DropStreamingJob { .. }
226            | DdlCommand::DropConnection(_)
227            | DdlCommand::DropSecret(_)
228            | DdlCommand::DropSubscription(_, _)
229            | DdlCommand::AlterName(_, _)
230            | DdlCommand::AlterObjectOwner(_, _)
231            | DdlCommand::AlterSetSchema(_, _)
232            | DdlCommand::CreateDatabase(_)
233            | DdlCommand::CreateSchema(_)
234            | DdlCommand::CreateFunction(_)
235            | DdlCommand::CreateView(_)
236            | DdlCommand::CreateConnection(_)
237            | DdlCommand::CommentOn(_)
238            | DdlCommand::CreateSecret(_)
239            | DdlCommand::AlterSecret(_)
240            | DdlCommand::AlterSwapRename(_)
241            | DdlCommand::AlterDatabaseParam(_, _) => true,
242            DdlCommand::CreateStreamingJob { .. }
243            | DdlCommand::CreateNonSharedSource(_)
244            | DdlCommand::ReplaceStreamJob(_)
245            | DdlCommand::AlterNonSharedSource(_)
246            | DdlCommand::CreateSubscription(_) => false,
247        }
248    }
249}
250
251#[derive(Clone)]
252pub struct DdlController {
253    pub(crate) env: MetaSrvEnv,
254
255    pub(crate) metadata_manager: MetadataManager,
256    pub(crate) stream_manager: GlobalStreamManagerRef,
257    pub(crate) source_manager: SourceManagerRef,
258    barrier_manager: BarrierManagerRef,
259
260    // The semaphore is used to limit the number of concurrent streaming job creation.
261    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
262
263    /// Sequence number for DDL commands, used for observability and debugging.
264    seq: Arc<AtomicU64>,
265}
266
267#[derive(Clone)]
268pub struct CreatingStreamingJobPermit {
269    pub(crate) semaphore: Arc<Semaphore>,
270}
271
272impl CreatingStreamingJobPermit {
273    async fn new(env: &MetaSrvEnv) -> Self {
274        let mut permits = env
275            .system_params_reader()
276            .await
277            .max_concurrent_creating_streaming_jobs() as usize;
278        if permits == 0 {
279            // if the system parameter is set to zero, use the max permitted value.
280            permits = Semaphore::MAX_PERMITS;
281        }
282        let semaphore = Arc::new(Semaphore::new(permits));
283
284        let (local_notification_tx, mut local_notification_rx) =
285            tokio::sync::mpsc::unbounded_channel();
286        env.notification_manager()
287            .insert_local_sender(local_notification_tx)
288            .await;
289        let semaphore_clone = semaphore.clone();
290        tokio::spawn(async move {
291            while let Some(notification) = local_notification_rx.recv().await {
292                let LocalNotification::SystemParamsChange(p) = &notification else {
293                    continue;
294                };
295                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
296                if new_permits == 0 {
297                    new_permits = Semaphore::MAX_PERMITS;
298                }
299                match permits.cmp(&new_permits) {
300                    Ordering::Less => {
301                        semaphore_clone.add_permits(new_permits - permits);
302                    }
303                    Ordering::Equal => continue,
304                    Ordering::Greater => {
305                        semaphore_clone
306                            .acquire_many((permits - new_permits) as u32)
307                            .await
308                            .unwrap()
309                            .forget();
310                    }
311                }
312                tracing::info!(
313                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
314                    permits,
315                    new_permits
316                );
317                permits = new_permits;
318            }
319        });
320
321        Self { semaphore }
322    }
323}
324
325impl DdlController {
326    pub async fn new(
327        env: MetaSrvEnv,
328        metadata_manager: MetadataManager,
329        stream_manager: GlobalStreamManagerRef,
330        source_manager: SourceManagerRef,
331        barrier_manager: BarrierManagerRef,
332    ) -> Self {
333        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
334        Self {
335            env,
336            metadata_manager,
337            stream_manager,
338            source_manager,
339            barrier_manager,
340            creating_streaming_job_permits,
341            seq: Arc::new(AtomicU64::new(0)),
342        }
343    }
344
345    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
346    pub fn next_seq(&self) -> u64 {
347        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
348    }
349
350    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
351    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
352    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
353    /// would be a huge hassle and pain if we don't spawn here.
354    ///
355    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
356    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
357        if !command.allow_in_recovery() {
358            self.barrier_manager.check_status_running()?;
359        }
360
361        let await_tree_key = format!("DDL Command {}", self.next_seq());
362        let await_tree_span = await_tree::span!("{command}({})", command.object());
363
364        let ctrl = self.clone();
365        let fut = async move {
366            match command {
367                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
368                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
369                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
370                DdlCommand::DropSchema(schema_id, drop_mode) => {
371                    ctrl.drop_schema(schema_id, drop_mode).await
372                }
373                DdlCommand::CreateNonSharedSource(source) => {
374                    ctrl.create_non_shared_source(source).await
375                }
376                DdlCommand::DropSource(source_id, drop_mode) => {
377                    ctrl.drop_source(source_id, drop_mode).await
378                }
379                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
380                DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
381                DdlCommand::CreateView(view) => ctrl.create_view(view).await,
382                DdlCommand::DropView(view_id, drop_mode) => {
383                    ctrl.drop_view(view_id, drop_mode).await
384                }
385                DdlCommand::CreateStreamingJob {
386                    stream_job,
387                    fragment_graph,
388                    create_type: _,
389                    affected_table_replace_info,
390                    dependencies,
391                    specific_resource_group,
392                    if_not_exists,
393                } => {
394                    ctrl.create_streaming_job(
395                        stream_job,
396                        fragment_graph,
397                        affected_table_replace_info,
398                        dependencies,
399                        specific_resource_group,
400                        if_not_exists,
401                    )
402                    .await
403                }
404                DdlCommand::DropStreamingJob {
405                    job_id,
406                    drop_mode,
407                    target_replace_info,
408                } => {
409                    ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
410                        .await
411                }
412                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
413                    streaming_job,
414                    fragment_graph,
415                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
416                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
417                DdlCommand::AlterObjectOwner(object, owner_id) => {
418                    ctrl.alter_owner(object, owner_id).await
419                }
420                DdlCommand::AlterSetSchema(object, new_schema_id) => {
421                    ctrl.alter_set_schema(object, new_schema_id).await
422                }
423                DdlCommand::CreateConnection(connection) => {
424                    ctrl.create_connection(connection).await
425                }
426                DdlCommand::DropConnection(connection_id) => {
427                    ctrl.drop_connection(connection_id).await
428                }
429                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
430                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
431                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
432                DdlCommand::AlterNonSharedSource(source) => {
433                    ctrl.alter_non_shared_source(source).await
434                }
435                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
436                DdlCommand::CreateSubscription(subscription) => {
437                    ctrl.create_subscription(subscription).await
438                }
439                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
440                    ctrl.drop_subscription(subscription_id, drop_mode).await
441                }
442                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
443                DdlCommand::AlterDatabaseParam(database_id, param) => {
444                    ctrl.alter_database_param(database_id, param).await
445                }
446            }
447        }
448        .in_current_span();
449        let fut = (self.env.await_tree_reg())
450            .register(await_tree_key, await_tree_span)
451            .instrument(Box::pin(fut));
452        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
453        Ok(Some(WaitVersion {
454            catalog_version: notification_version,
455            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
456        }))
457    }
458
459    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
460        self.barrier_manager.get_ddl_progress().await
461    }
462
463    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
464        self.metadata_manager
465            .catalog_controller
466            .create_database(database)
467            .await
468    }
469
470    #[tracing::instrument(skip(self), level = "debug")]
471    pub async fn reschedule_streaming_job(
472        &self,
473        job_id: u32,
474        target: JobRescheduleTarget,
475        mut deferred: bool,
476    ) -> MetaResult<()> {
477        tracing::info!("alter parallelism");
478        if self.barrier_manager.check_status_running().is_err() {
479            tracing::info!(
480                "alter parallelism is set to deferred mode because the system is in recovery state"
481            );
482            deferred = true;
483        }
484
485        self.stream_manager
486            .reschedule_streaming_job(job_id, target, deferred)
487            .await
488    }
489
490    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
491        self.drop_object(
492            ObjectType::Database,
493            database_id as _,
494            DropMode::Cascade,
495            None,
496        )
497        .await
498    }
499
500    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
501        self.metadata_manager
502            .catalog_controller
503            .create_schema(schema)
504            .await
505    }
506
507    async fn drop_schema(
508        &self,
509        schema_id: SchemaId,
510        drop_mode: DropMode,
511    ) -> MetaResult<NotificationVersion> {
512        self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
513            .await
514    }
515
516    /// Shared source is handled in [`Self::create_streaming_job`]
517    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
518        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
519            .await
520            .context("failed to create source worker")?;
521
522        let (source_id, version) = self
523            .metadata_manager
524            .catalog_controller
525            .create_source(source)
526            .await?;
527        self.source_manager
528            .register_source_with_handle(source_id, handle)
529            .await;
530        Ok(version)
531    }
532
533    async fn drop_source(
534        &self,
535        source_id: SourceId,
536        drop_mode: DropMode,
537    ) -> MetaResult<NotificationVersion> {
538        self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
539            .await
540    }
541
542    /// This replaces the source in the catalog.
543    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
544    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
545        self.metadata_manager
546            .catalog_controller
547            .alter_non_shared_source(source)
548            .await
549    }
550
551    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
552        self.metadata_manager
553            .catalog_controller
554            .create_function(function)
555            .await
556    }
557
558    async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
559        self.drop_object(
560            ObjectType::Function,
561            function_id as _,
562            DropMode::Restrict,
563            None,
564        )
565        .await
566    }
567
568    async fn create_view(&self, view: View) -> MetaResult<NotificationVersion> {
569        self.metadata_manager
570            .catalog_controller
571            .create_view(view)
572            .await
573    }
574
575    async fn drop_view(
576        &self,
577        view_id: ViewId,
578        drop_mode: DropMode,
579    ) -> MetaResult<NotificationVersion> {
580        self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
581            .await
582    }
583
584    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
585        validate_connection(&connection).await?;
586        self.metadata_manager
587            .catalog_controller
588            .create_connection(connection)
589            .await
590    }
591
592    async fn drop_connection(
593        &self,
594        connection_id: ConnectionId,
595    ) -> MetaResult<NotificationVersion> {
596        self.drop_object(
597            ObjectType::Connection,
598            connection_id as _,
599            DropMode::Restrict,
600            None,
601        )
602        .await
603    }
604
605    async fn alter_database_param(
606        &self,
607        database_id: DatabaseId,
608        param: AlterDatabaseParam,
609    ) -> MetaResult<NotificationVersion> {
610        self.metadata_manager
611            .catalog_controller
612            .alter_database_param(database_id, param)
613            .await
614    }
615
616    // The 'secret' part of the request we receive from the frontend is in plaintext;
617    // here, we need to encrypt it before storing it in the catalog.
618    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
619        let secret_store_private_key = self
620            .env
621            .opts
622            .secret_store_private_key
623            .clone()
624            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
625
626        let encrypted_payload = SecretEncryption::encrypt(
627            secret_store_private_key.as_slice(),
628            secret.get_value().as_slice(),
629        )
630        .context(format!("failed to encrypt secret {}", secret.name))?;
631        Ok(encrypted_payload
632            .serialize()
633            .context(format!("failed to serialize secret {}", secret.name))?)
634    }
635
636    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
637        // The 'secret' part of the request we receive from the frontend is in plaintext;
638        // here, we need to encrypt it before storing it in the catalog.
639        let secret_plain_payload = secret.value.clone();
640        let encrypted_payload = self.get_encrypted_payload(&secret)?;
641        secret.value = encrypted_payload;
642
643        self.metadata_manager
644            .catalog_controller
645            .create_secret(secret, secret_plain_payload)
646            .await
647    }
648
649    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
650        self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
651            .await
652    }
653
654    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
655        let secret_plain_payload = secret.value.clone();
656        let encrypted_payload = self.get_encrypted_payload(&secret)?;
657        secret.value = encrypted_payload;
658        self.metadata_manager
659            .catalog_controller
660            .alter_secret(secret, secret_plain_payload)
661            .await
662    }
663
664    async fn create_subscription(
665        &self,
666        mut subscription: Subscription,
667    ) -> MetaResult<NotificationVersion> {
668        tracing::debug!("create subscription");
669        let _permit = self
670            .creating_streaming_job_permits
671            .semaphore
672            .acquire()
673            .await
674            .unwrap();
675        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
676        self.metadata_manager
677            .catalog_controller
678            .create_subscription_catalog(&mut subscription)
679            .await?;
680        self.stream_manager
681            .create_subscription(&subscription)
682            .await
683            .inspect_err(|e| {
684                tracing::debug!(error = %e.as_report(), "cancel create subscription");
685            })?;
686
687        let version = self
688            .metadata_manager
689            .catalog_controller
690            .notify_create_subscription(subscription.id)
691            .await?;
692        tracing::debug!("finish create subscription");
693        Ok(version)
694    }
695
696    async fn drop_subscription(
697        &self,
698        subscription_id: SubscriptionId,
699        drop_mode: DropMode,
700    ) -> MetaResult<NotificationVersion> {
701        tracing::debug!("preparing drop subscription");
702        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
703        let subscription = self
704            .metadata_manager
705            .catalog_controller
706            .get_subscription_by_id(subscription_id)
707            .await?;
708        let table_id = subscription.dependent_table_id;
709        let database_id = subscription.database_id.into();
710        let (_, version) = self
711            .metadata_manager
712            .catalog_controller
713            .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
714            .await?;
715        self.stream_manager
716            .drop_subscription(database_id, subscription_id as _, table_id)
717            .await;
718        tracing::debug!("finish drop subscription");
719        Ok(version)
720    }
721
722    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
723    #[await_tree::instrument]
724    pub(crate) async fn validate_cdc_table(
725        table: &Table,
726        table_fragments: &StreamJobFragments,
727    ) -> MetaResult<()> {
728        let stream_scan_fragment = table_fragments
729            .fragments
730            .values()
731            .filter(|f| f.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0)
732            .exactly_one()
733            .ok()
734            .with_context(|| {
735                format!(
736                    "expect exactly one stream scan fragment, got: {:?}",
737                    table_fragments.fragments
738                )
739            })?;
740
741        assert_eq!(
742            stream_scan_fragment.actors.len(),
743            1,
744            "Stream scan fragment should have only one actor"
745        );
746        let mut found_cdc_scan = false;
747        match &stream_scan_fragment.nodes.node_body {
748            Some(NodeBody::StreamCdcScan(_)) => {
749                if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
750                    .await?
751                {
752                    found_cdc_scan = true;
753                }
754            }
755            // When there's generated columns, the cdc scan node is wrapped in a project node
756            Some(NodeBody::Project(_)) => {
757                for input in &stream_scan_fragment.nodes.input {
758                    if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
759                        found_cdc_scan = true;
760                    }
761                }
762            }
763            _ => {
764                bail!("Unexpected node body for stream cdc scan");
765            }
766        };
767        if !found_cdc_scan {
768            bail!("No stream cdc scan node found in stream scan fragment");
769        }
770        Ok(())
771    }
772
773    async fn validate_cdc_table_inner(
774        node_body: &Option<NodeBody>,
775        table_id: u32,
776    ) -> MetaResult<bool> {
777        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
778            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
779        {
780            let options_with_secret = WithOptionsSecResolved::new(
781                cdc_table_desc.connect_properties.clone(),
782                cdc_table_desc.secret_refs.clone(),
783            );
784
785            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
786            props.init_from_pb_cdc_table_desc(cdc_table_desc);
787
788            // Try creating a split enumerator to validate
789            let _enumerator = props
790                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
791                .await?;
792
793            tracing::debug!(?table_id, "validate cdc table success");
794            Ok(true)
795        } else {
796            Ok(false)
797        }
798    }
799
800    // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
801    // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
802    // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
803    pub(crate) async fn inject_replace_table_job_for_table_sink(
804        &self,
805        tmp_id: u32,
806        mgr: &MetadataManager,
807        stream_ctx: StreamContext,
808        sink: Option<&Sink>,
809        creating_sink_table_fragments: Option<&StreamJobFragments>,
810        dropping_sink_id: Option<SinkId>,
811        streaming_job: &StreamingJob,
812        fragment_graph: StreamFragmentGraph,
813    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
814        let (mut replace_table_ctx, mut stream_job_fragments) = self
815            .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _)
816            .await?;
817
818        let target_table = streaming_job.table().unwrap();
819
820        if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
821            let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
822            let sink = sink.expect("sink not found");
823            Self::inject_replace_table_plan_for_sink(
824                sink.id,
825                &sink_fragment,
826                target_table,
827                &mut replace_table_ctx,
828                stream_job_fragments.inner.union_fragment_for_table(),
829                None,
830            );
831        }
832
833        let [table_catalog]: [_; 1] = mgr
834            .get_table_catalog_by_ids(vec![target_table.id])
835            .await?
836            .try_into()
837            .expect("Target table should exist in sink into table");
838
839        {
840            let catalogs = mgr
841                .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
842                .await?;
843
844            for sink in catalogs {
845                let sink_id = sink.id;
846
847                if let Some(dropping_sink_id) = dropping_sink_id
848                    && sink_id == (dropping_sink_id as u32)
849                {
850                    continue;
851                };
852
853                let sink_table_fragments = mgr
854                    .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
855                    .await?;
856
857                let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
858
859                Self::inject_replace_table_plan_for_sink(
860                    sink_id,
861                    &sink_fragment,
862                    target_table,
863                    &mut replace_table_ctx,
864                    stream_job_fragments.inner.union_fragment_for_table(),
865                    Some(&sink.unique_identity()),
866                );
867            }
868        }
869
870        // check if the union fragment is fully assigned.
871        for fragment in stream_job_fragments.fragments.values() {
872            {
873                {
874                    visit_stream_node(&fragment.nodes, |node| {
875                        if let NodeBody::Merge(merge_node) = node {
876                            let upstream_fragment_id = merge_node.upstream_fragment_id;
877                            if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
878                                .upstream_fragment_downstreams
879                                .get(&upstream_fragment_id)
880                            {
881                                let mut upstream_fragment_downstreams =
882                                    external_upstream_fragment_downstreams.iter().filter(
883                                        |downstream| {
884                                            downstream.downstream_fragment_id
885                                                == fragment.fragment_id
886                                        },
887                                    );
888                                assert!(
889                                    upstream_fragment_downstreams.next().is_some(),
890                                    "All the mergers for the union should have been fully assigned beforehand."
891                                );
892                            } else {
893                                let mut upstream_fragment_downstreams = stream_job_fragments
894                                    .downstreams
895                                    .get(&upstream_fragment_id)
896                                    .into_iter()
897                                    .flatten()
898                                    .filter(|downstream| {
899                                        downstream.downstream_fragment_id == fragment.fragment_id
900                                    });
901                                assert!(
902                                    upstream_fragment_downstreams.next().is_some(),
903                                    "All the mergers for the union should have been fully assigned beforehand."
904                                );
905                            }
906                        }
907                    });
908                }
909            }
910        }
911
912        Ok((replace_table_ctx, stream_job_fragments))
913    }
914
915    pub(crate) fn inject_replace_table_plan_for_sink(
916        sink_id: u32,
917        sink_fragment: &Fragment,
918        table: &Table,
919        replace_table_ctx: &mut ReplaceStreamJobContext,
920        union_fragment: &mut Fragment,
921        unique_identity: Option<&str>,
922    ) {
923        let sink_fields = sink_fragment.nodes.fields.clone();
924
925        let output_indices = sink_fields
926            .iter()
927            .enumerate()
928            .map(|(idx, _)| idx as _)
929            .collect_vec();
930
931        let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
932
933        let sink_fragment_downstreams = replace_table_ctx
934            .upstream_fragment_downstreams
935            .entry(sink_fragment.fragment_id)
936            .or_default();
937
938        {
939            sink_fragment_downstreams.push(DownstreamFragmentRelation {
940                downstream_fragment_id: union_fragment.fragment_id,
941                dispatcher_type: DispatcherType::Hash,
942                dist_key_indices: dist_key_indices.clone(),
943                output_mapping: PbDispatchOutputMapping::simple(output_indices),
944            });
945        }
946
947        let upstream_fragment_id = sink_fragment.fragment_id;
948
949        {
950            {
951                visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
952                    if let Some(NodeBody::Union(_)) = &mut node.node_body {
953                        for input_project_node in &mut node.input {
954                            if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
955                                let merge_stream_node =
956                                    input_project_node.input.iter_mut().exactly_one().unwrap();
957
958                                // we need to align nodes here
959                                if input_project_node.identity.as_str()
960                                    != unique_identity
961                                        .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
962                                {
963                                    continue;
964                                }
965
966                                if let Some(NodeBody::Merge(merge_node)) =
967                                    &mut merge_stream_node.node_body
968                                {
969                                    {
970                                        merge_stream_node.identity =
971                                            format!("MergeExecutor(from sink {})", sink_id);
972
973                                        input_project_node.identity =
974                                            format!("ProjectExecutor(from sink {})", sink_id);
975                                    }
976
977                                    **merge_node = {
978                                        #[expect(deprecated)]
979                                        MergeNode {
980                                            upstream_actor_id: vec![],
981                                            upstream_fragment_id,
982                                            upstream_dispatcher_type: PbDispatcherType::Hash as _,
983                                            fields: sink_fields.to_vec(),
984                                        }
985                                    };
986
987                                    merge_stream_node.fields = sink_fields.to_vec();
988
989                                    return false;
990                                }
991                            }
992                        }
993                    }
994                    true
995                });
996            }
997        }
998    }
999
1000    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
1001    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
1002    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1003    pub async fn create_streaming_job(
1004        &self,
1005        mut streaming_job: StreamingJob,
1006        fragment_graph: StreamFragmentGraphProto,
1007        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1008        dependencies: HashSet<ObjectId>,
1009        specific_resource_group: Option<String>,
1010        if_not_exists: bool,
1011    ) -> MetaResult<NotificationVersion> {
1012        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1013        let check_ret = self
1014            .metadata_manager
1015            .catalog_controller
1016            .create_job_catalog(
1017                &mut streaming_job,
1018                &ctx,
1019                &fragment_graph.parallelism,
1020                fragment_graph.max_parallelism as _,
1021                dependencies,
1022                specific_resource_group.clone(),
1023            )
1024            .await;
1025        if let Err(meta_err) = check_ret {
1026            if !if_not_exists {
1027                return Err(meta_err);
1028            }
1029            if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1030                if streaming_job.create_type() == CreateType::Foreground {
1031                    let database_id = streaming_job.database_id();
1032                    return self
1033                        .metadata_manager
1034                        .wait_streaming_job_finished(database_id.into(), *job_id)
1035                        .await;
1036                } else {
1037                    return Ok(IGNORED_NOTIFICATION_VERSION);
1038                }
1039            } else {
1040                return Err(meta_err);
1041            }
1042        }
1043        let job_id = streaming_job.id();
1044
1045        tracing::debug!(
1046            id = job_id,
1047            definition = streaming_job.definition(),
1048            create_type = streaming_job.create_type().as_str_name(),
1049            job_type = ?streaming_job.job_type(),
1050            "starting streaming job",
1051        );
1052        let _permit = self
1053            .creating_streaming_job_permits
1054            .semaphore
1055            .acquire()
1056            .instrument_await("acquire_creating_streaming_job_permit")
1057            .await
1058            .unwrap();
1059        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1060
1061        let name = streaming_job.name();
1062        let definition = streaming_job.definition();
1063        let source_id = match &streaming_job {
1064            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1065            _ => None,
1066        };
1067
1068        // create streaming job.
1069        match self
1070            .create_streaming_job_inner(
1071                ctx,
1072                streaming_job,
1073                fragment_graph,
1074                affected_table_replace_info,
1075                specific_resource_group,
1076            )
1077            .await
1078        {
1079            Ok(version) => Ok(version),
1080            Err(err) => {
1081                tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
1082                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1083                    id: job_id,
1084                    name,
1085                    definition,
1086                    error: err.as_report().to_string(),
1087                };
1088                self.env.event_log_manager_ref().add_event_logs(vec![
1089                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1090                ]);
1091                let (aborted, _) = self
1092                    .metadata_manager
1093                    .catalog_controller
1094                    .try_abort_creating_streaming_job(job_id as _, false)
1095                    .await?;
1096                if aborted {
1097                    tracing::warn!(id = job_id, "aborted streaming job");
1098                    // FIXME: might also need other cleanup here
1099                    if let Some(source_id) = source_id {
1100                        self.source_manager
1101                            .apply_source_change(SourceChange::DropSource {
1102                                dropped_source_ids: vec![source_id as SourceId],
1103                            })
1104                            .await;
1105                    }
1106                }
1107                Err(err)
1108            }
1109        }
1110    }
1111
1112    #[await_tree::instrument(boxed)]
1113    async fn create_streaming_job_inner(
1114        &self,
1115        ctx: StreamContext,
1116        mut streaming_job: StreamingJob,
1117        fragment_graph: StreamFragmentGraphProto,
1118        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1119        specific_resource_group: Option<String>,
1120    ) -> MetaResult<NotificationVersion> {
1121        let mut fragment_graph =
1122            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1123        streaming_job.set_info_from_graph(&fragment_graph);
1124
1125        // create internal table catalogs and refill table id.
1126        let incomplete_internal_tables = fragment_graph
1127            .incomplete_internal_tables()
1128            .into_values()
1129            .collect_vec();
1130        let table_id_map = self
1131            .metadata_manager
1132            .catalog_controller
1133            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1134            .await?;
1135        fragment_graph.refill_internal_table_ids(table_id_map);
1136
1137        let affected_table_replace_info = match affected_table_replace_info {
1138            Some(replace_table_info) => {
1139                assert!(
1140                    specific_resource_group.is_none(),
1141                    "specific_resource_group is not supported for replace table (alter column or sink into table)"
1142                );
1143
1144                let ReplaceStreamJobInfo {
1145                    mut streaming_job,
1146                    fragment_graph,
1147                    ..
1148                } = replace_table_info;
1149
1150                // Ensure the max parallelism unchanged before replacing table.
1151                let original_max_parallelism = self
1152                    .metadata_manager
1153                    .get_job_max_parallelism(streaming_job.id().into())
1154                    .await?;
1155                let fragment_graph = PbStreamFragmentGraph {
1156                    max_parallelism: original_max_parallelism as _,
1157                    ..fragment_graph
1158                };
1159
1160                let fragment_graph =
1161                    StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1162                streaming_job.set_info_from_graph(&fragment_graph);
1163                let streaming_job = streaming_job;
1164
1165                Some((streaming_job, fragment_graph))
1166            }
1167            None => None,
1168        };
1169
1170        // create fragment and actor catalogs.
1171        tracing::debug!(id = streaming_job.id(), "building streaming job");
1172        let (ctx, stream_job_fragments) = self
1173            .build_stream_job(
1174                ctx,
1175                streaming_job,
1176                fragment_graph,
1177                affected_table_replace_info,
1178                specific_resource_group,
1179            )
1180            .await?;
1181
1182        let streaming_job = &ctx.streaming_job;
1183
1184        match streaming_job {
1185            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1186                Self::validate_cdc_table(table, &stream_job_fragments).await?;
1187            }
1188            StreamingJob::Table(Some(source), ..) => {
1189                // Register the source on the connector node.
1190                self.source_manager.register_source(source).await?;
1191                let connector_name = source
1192                    .get_with_properties()
1193                    .get(UPSTREAM_SOURCE_KEY)
1194                    .cloned();
1195                let attr = source.info.as_ref().map(|source_info| {
1196                    jsonbb::json!({
1197                            "format": source_info.format().as_str_name(),
1198                            "encode": source_info.row_encode().as_str_name(),
1199                    })
1200                });
1201                report_create_object(
1202                    streaming_job.id(),
1203                    "source",
1204                    PbTelemetryDatabaseObject::Source,
1205                    connector_name,
1206                    attr,
1207                );
1208            }
1209            StreamingJob::Sink(sink, _) => {
1210                // Validate the sink on the connector node.
1211                validate_sink(sink).await?;
1212                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1213                let attr = sink.format_desc.as_ref().map(|sink_info| {
1214                    jsonbb::json!({
1215                        "format": sink_info.format().as_str_name(),
1216                        "encode": sink_info.encode().as_str_name(),
1217                    })
1218                });
1219                report_create_object(
1220                    streaming_job.id(),
1221                    "sink",
1222                    PbTelemetryDatabaseObject::Sink,
1223                    connector_name,
1224                    attr,
1225                );
1226            }
1227            StreamingJob::Source(source) => {
1228                // Register the source on the connector node.
1229                self.source_manager.register_source(source).await?;
1230                let connector_name = source
1231                    .get_with_properties()
1232                    .get(UPSTREAM_SOURCE_KEY)
1233                    .cloned();
1234                let attr = source.info.as_ref().map(|source_info| {
1235                    jsonbb::json!({
1236                            "format": source_info.format().as_str_name(),
1237                            "encode": source_info.row_encode().as_str_name(),
1238                    })
1239                });
1240                report_create_object(
1241                    streaming_job.id(),
1242                    "source",
1243                    PbTelemetryDatabaseObject::Source,
1244                    connector_name,
1245                    attr,
1246                );
1247            }
1248            _ => {}
1249        }
1250
1251        self.metadata_manager
1252            .catalog_controller
1253            .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1254            .await?;
1255
1256        // create streaming jobs.
1257        let stream_job_id = streaming_job.id();
1258        match (streaming_job.create_type(), &streaming_job) {
1259            // FIXME(kwannoel): Unify background stream's creation path with MV below.
1260            (CreateType::Unspecified, _)
1261            | (CreateType::Foreground, _)
1262            | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1263                let version = self
1264                    .stream_manager
1265                    .create_streaming_job(stream_job_fragments, ctx, None)
1266                    .await?;
1267                Ok(version)
1268            }
1269            (CreateType::Background, _) => {
1270                let await_tree_key = format!("Background DDL Worker ({})", streaming_job.id());
1271                let await_tree_span =
1272                    span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1273
1274                let ctrl = self.clone();
1275                let (tx, rx) = oneshot::channel();
1276                let fut = async move {
1277                    let _ = ctrl
1278                        .stream_manager
1279                        .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1280                        .await
1281                        .inspect_err(|err| {
1282                            tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1283                        });
1284                };
1285
1286                let fut = (self.env.await_tree_reg())
1287                    .register(await_tree_key, await_tree_span)
1288                    .instrument(fut);
1289                tokio::spawn(fut);
1290
1291                rx.instrument_await("wait_background_streaming_job_creation_started")
1292                    .await
1293                    .map_err(|_| {
1294                        anyhow!(
1295                            "failed to receive create streaming job result of job: {}",
1296                            stream_job_id
1297                        )
1298                    })??;
1299                Ok(IGNORED_NOTIFICATION_VERSION)
1300            }
1301        }
1302    }
1303
1304    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1305    pub async fn drop_object(
1306        &self,
1307        object_type: ObjectType,
1308        object_id: ObjectId,
1309        drop_mode: DropMode,
1310        target_replace_info: Option<ReplaceStreamJobInfo>,
1311    ) -> MetaResult<NotificationVersion> {
1312        let (release_ctx, mut version) = self
1313            .metadata_manager
1314            .catalog_controller
1315            .drop_object(object_type, object_id, drop_mode)
1316            .await?;
1317
1318        if let Some(replace_table_info) = target_replace_info {
1319            let stream_ctx =
1320                StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1321
1322            let ReplaceStreamJobInfo {
1323                mut streaming_job,
1324                fragment_graph,
1325                ..
1326            } = replace_table_info;
1327
1328            let sink_id = if let ObjectType::Sink = object_type {
1329                object_id as _
1330            } else {
1331                panic!("additional replace table event only occurs when dropping sink into table")
1332            };
1333
1334            // Ensure the max parallelism unchanged before replacing table.
1335            let original_max_parallelism = self
1336                .metadata_manager
1337                .get_job_max_parallelism(streaming_job.id().into())
1338                .await?;
1339            let fragment_graph = PbStreamFragmentGraph {
1340                max_parallelism: original_max_parallelism as _,
1341                ..fragment_graph
1342            };
1343
1344            let fragment_graph =
1345                StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1346            streaming_job.set_info_from_graph(&fragment_graph);
1347            let streaming_job = streaming_job;
1348
1349            streaming_job.table().expect("should be table job");
1350
1351            tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1352            let tmp_id = self
1353                .metadata_manager
1354                .catalog_controller
1355                .create_job_catalog_for_replace(
1356                    &streaming_job,
1357                    &stream_ctx,
1358                    &fragment_graph.specified_parallelism(),
1359                    fragment_graph.max_parallelism(),
1360                )
1361                .await? as u32;
1362
1363            let (ctx, stream_job_fragments) = self
1364                .inject_replace_table_job_for_table_sink(
1365                    tmp_id,
1366                    &self.metadata_manager,
1367                    stream_ctx,
1368                    None,
1369                    None,
1370                    Some(sink_id),
1371                    &streaming_job,
1372                    fragment_graph,
1373                )
1374                .await?;
1375
1376            let result: MetaResult<_> = try {
1377                let replace_upstream = ctx.replace_upstream.clone();
1378
1379                self.metadata_manager
1380                    .catalog_controller
1381                    .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1382                    .await?;
1383
1384                self.stream_manager
1385                    .replace_stream_job(stream_job_fragments, ctx)
1386                    .await?;
1387
1388                replace_upstream
1389            };
1390
1391            version = match result {
1392                Ok(replace_upstream) => {
1393                    let version = self
1394                        .metadata_manager
1395                        .catalog_controller
1396                        .finish_replace_streaming_job(
1397                            tmp_id as _,
1398                            streaming_job,
1399                            replace_upstream,
1400                            SinkIntoTableContext {
1401                                creating_sink_id: None,
1402                                dropping_sink_id: Some(sink_id),
1403                                updated_sink_catalogs: vec![],
1404                            },
1405                            None, // no source is dropped when dropping sink into table
1406                        )
1407                        .await?;
1408                    Ok(version)
1409                }
1410                Err(err) => {
1411                    tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1412                    let _ = self.metadata_manager
1413                        .catalog_controller
1414                        .try_abort_replacing_streaming_job(tmp_id as _)
1415                        .await
1416                        .inspect_err(|err| {
1417                            tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1418                        });
1419                    Err(err)
1420                }
1421            }?;
1422        }
1423
1424        let ReleaseContext {
1425            database_id,
1426            removed_streaming_job_ids,
1427            removed_state_table_ids,
1428            removed_source_ids,
1429            removed_secret_ids: secret_ids,
1430            removed_source_fragments,
1431            removed_actors,
1432            removed_fragments,
1433        } = release_ctx;
1434
1435        let _guard = self.source_manager.pause_tick().await;
1436        self.stream_manager
1437            .drop_streaming_jobs(
1438                risingwave_common::catalog::DatabaseId::new(database_id as _),
1439                removed_actors.iter().map(|id| *id as _).collect(),
1440                removed_streaming_job_ids,
1441                removed_state_table_ids,
1442                removed_fragments.iter().map(|id| *id as _).collect(),
1443            )
1444            .await;
1445
1446        // clean up sources after dropping streaming jobs.
1447        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1448        self.source_manager
1449            .apply_source_change(SourceChange::DropSource {
1450                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1451            })
1452            .await;
1453
1454        // unregister fragments and actors from source manager.
1455        // FIXME: need also unregister source backfill fragments.
1456        let dropped_source_fragments = removed_source_fragments
1457            .into_iter()
1458            .map(|(source_id, fragments)| {
1459                (
1460                    source_id,
1461                    fragments.into_iter().map(|id| id as u32).collect(),
1462                )
1463            })
1464            .collect();
1465        let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1466        self.source_manager
1467            .apply_source_change(SourceChange::DropMv {
1468                dropped_source_fragments,
1469                dropped_actors,
1470            })
1471            .await;
1472
1473        // remove secrets.
1474        for secret in secret_ids {
1475            LocalSecretManager::global().remove_secret(secret as _);
1476        }
1477        Ok(version)
1478    }
1479
1480    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1481    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1482    pub async fn replace_job(
1483        &self,
1484        mut streaming_job: StreamingJob,
1485        fragment_graph: StreamFragmentGraphProto,
1486    ) -> MetaResult<NotificationVersion> {
1487        match &mut streaming_job {
1488            StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1489            StreamingJob::MaterializedView(..)
1490            | StreamingJob::Sink(..)
1491            | StreamingJob::Index(..) => {
1492                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1493            }
1494        }
1495
1496        let job_id = streaming_job.id();
1497
1498        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1499        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1500
1501        // Ensure the max parallelism unchanged before replacing table.
1502        let original_max_parallelism = self
1503            .metadata_manager
1504            .get_job_max_parallelism(streaming_job.id().into())
1505            .await?;
1506        let fragment_graph = PbStreamFragmentGraph {
1507            max_parallelism: original_max_parallelism as _,
1508            ..fragment_graph
1509        };
1510
1511        // 1. build fragment graph.
1512        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1513        streaming_job.set_info_from_graph(&fragment_graph);
1514
1515        // make it immutable
1516        let streaming_job = streaming_job;
1517
1518        let tmp_id = self
1519            .metadata_manager
1520            .catalog_controller
1521            .create_job_catalog_for_replace(
1522                &streaming_job,
1523                &ctx,
1524                &fragment_graph.specified_parallelism(),
1525                fragment_graph.max_parallelism(),
1526            )
1527            .await?;
1528
1529        tracing::debug!(id = job_id, "building replace streaming job");
1530        let mut updated_sink_catalogs = vec![];
1531
1532        let mut drop_table_connector_ctx = None;
1533        let result: MetaResult<_> = try {
1534            let (mut ctx, mut stream_job_fragments) = self
1535                .build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _)
1536                .await?;
1537            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1538
1539            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1540                let catalogs = self
1541                    .metadata_manager
1542                    .get_sink_catalog_by_ids(&table.incoming_sinks)
1543                    .await?;
1544
1545                for sink in catalogs {
1546                    let sink_id = &sink.id;
1547
1548                    let sink_table_fragments = self
1549                        .metadata_manager
1550                        .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1551                            *sink_id,
1552                        ))
1553                        .await?;
1554
1555                    let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1556
1557                    Self::inject_replace_table_plan_for_sink(
1558                        *sink_id,
1559                        &sink_fragment,
1560                        table,
1561                        &mut ctx,
1562                        stream_job_fragments.inner.union_fragment_for_table(),
1563                        Some(&sink.unique_identity()),
1564                    );
1565
1566                    if sink.original_target_columns.is_empty() {
1567                        updated_sink_catalogs.push(sink.id as _);
1568                    }
1569                }
1570            }
1571
1572            let replace_upstream = ctx.replace_upstream.clone();
1573
1574            self.metadata_manager
1575                .catalog_controller
1576                .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1577                .await?;
1578
1579            self.stream_manager
1580                .replace_stream_job(stream_job_fragments, ctx)
1581                .await?;
1582            replace_upstream
1583        };
1584
1585        match result {
1586            Ok(replace_upstream) => {
1587                let version = self
1588                    .metadata_manager
1589                    .catalog_controller
1590                    .finish_replace_streaming_job(
1591                        tmp_id,
1592                        streaming_job,
1593                        replace_upstream,
1594                        SinkIntoTableContext {
1595                            creating_sink_id: None,
1596                            dropping_sink_id: None,
1597                            updated_sink_catalogs,
1598                        },
1599                        drop_table_connector_ctx.as_ref(),
1600                    )
1601                    .await?;
1602                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1603                    self.source_manager
1604                        .apply_source_change(SourceChange::DropSource {
1605                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1606                        })
1607                        .await;
1608                }
1609                Ok(version)
1610            }
1611            Err(err) => {
1612                tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1613                let _ = self.metadata_manager
1614                    .catalog_controller
1615                    .try_abort_replacing_streaming_job(tmp_id)
1616                    .await.inspect_err(|err| {
1617                    tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1618                });
1619                Err(err)
1620            }
1621        }
1622    }
1623
1624    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" })]
1625    async fn drop_streaming_job(
1626        &self,
1627        job_id: StreamingJobId,
1628        drop_mode: DropMode,
1629        target_replace_info: Option<ReplaceStreamJobInfo>,
1630    ) -> MetaResult<NotificationVersion> {
1631        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1632
1633        let (object_id, object_type) = match job_id {
1634            StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1635            StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1636            StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1637            StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1638        };
1639
1640        let version = self
1641            .drop_object(object_type, object_id, drop_mode, target_replace_info)
1642            .await?;
1643        #[cfg(not(madsim))]
1644        if let StreamingJobId::Sink(sink_id) = job_id {
1645            // delete system table for exactly once iceberg sink
1646            // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1647            let db = self.env.meta_store_ref().conn.clone();
1648            clean_all_rows_by_sink_id(&db, sink_id).await?;
1649        }
1650        Ok(version)
1651    }
1652
1653    /// Resolve the parallelism of the stream job based on the given information.
1654    ///
1655    /// Returns error if user specifies a parallelism that cannot be satisfied.
1656    fn resolve_stream_parallelism(
1657        &self,
1658        specified: Option<NonZeroUsize>,
1659        max: NonZeroUsize,
1660        cluster_info: &StreamingClusterInfo,
1661        resource_group: String,
1662    ) -> MetaResult<NonZeroUsize> {
1663        let available = cluster_info.parallelism(&resource_group);
1664        let Some(available) = NonZeroUsize::new(available) else {
1665            bail_unavailable!(
1666                "no available slots to schedule in resource group \"{}\", \
1667                 have you allocated any compute nodes within this resource group?",
1668                resource_group
1669            );
1670        };
1671
1672        if let Some(specified) = specified {
1673            if specified > max {
1674                bail_invalid_parameter!(
1675                    "specified parallelism {} should not exceed max parallelism {}",
1676                    specified,
1677                    max,
1678                );
1679            }
1680            if specified > available {
1681                bail_unavailable!(
1682                    "insufficient parallelism to schedule in resource group \"{}\", \
1683                     required: {}, available: {}",
1684                    resource_group,
1685                    specified,
1686                    available,
1687                );
1688            }
1689            Ok(specified)
1690        } else {
1691            // Use configured parallelism if no default parallelism is specified.
1692            let default_parallelism = match self.env.opts.default_parallelism {
1693                DefaultParallelism::Full => available,
1694                DefaultParallelism::Default(num) => {
1695                    if num > available {
1696                        bail_unavailable!(
1697                            "insufficient parallelism to schedule in resource group \"{}\", \
1698                            required: {}, available: {}",
1699                            resource_group,
1700                            num,
1701                            available,
1702                        );
1703                    }
1704                    num
1705                }
1706            };
1707
1708            if default_parallelism > max {
1709                tracing::warn!(
1710                    max_parallelism = max.get(),
1711                    resource_group,
1712                    "too many parallelism available, use max parallelism instead",
1713                );
1714            }
1715            Ok(default_parallelism.min(max))
1716        }
1717    }
1718
1719    /// Builds the actor graph:
1720    /// - Add the upstream fragments to the fragment graph
1721    /// - Schedule the fragments based on their distribution
1722    /// - Expand each fragment into one or several actors
1723    /// - Construct the fragment level backfill order control.
1724    #[await_tree::instrument]
1725    pub(crate) async fn build_stream_job(
1726        &self,
1727        stream_ctx: StreamContext,
1728        mut stream_job: StreamingJob,
1729        fragment_graph: StreamFragmentGraph,
1730        affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1731        specific_resource_group: Option<String>,
1732    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1733        let id = stream_job.id();
1734        let specified_parallelism = fragment_graph.specified_parallelism();
1735        let expr_context = stream_ctx.to_expr_context();
1736        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1737
1738        // 1. Fragment Level ordering graph
1739        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1740
1741        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1742        // contains all information needed for building the actor graph.
1743
1744        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1745            fragment_graph.collect_snapshot_backfill_info()?;
1746        assert!(
1747            snapshot_backfill_info
1748                .iter()
1749                .chain([&cross_db_snapshot_backfill_info])
1750                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1751                .all(|backfill_epoch| backfill_epoch.is_none()),
1752            "should not set backfill epoch when initially build the job: {:?} {:?}",
1753            snapshot_backfill_info,
1754            cross_db_snapshot_backfill_info
1755        );
1756
1757        // check if log store exists for all cross-db upstreams
1758        self.metadata_manager
1759            .catalog_controller
1760            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1761            .await?;
1762
1763        let upstream_table_ids = fragment_graph
1764            .dependent_table_ids()
1765            .iter()
1766            .filter(|id| {
1767                !cross_db_snapshot_backfill_info
1768                    .upstream_mv_table_id_to_backfill_epoch
1769                    .contains_key(id)
1770            })
1771            .cloned()
1772            .collect();
1773
1774        let (upstream_root_fragments, existing_actor_location) = self
1775            .metadata_manager
1776            .get_upstream_root_fragments(&upstream_table_ids)
1777            .await?;
1778
1779        if snapshot_backfill_info.is_some() {
1780            match stream_job {
1781                StreamingJob::MaterializedView(_)
1782                | StreamingJob::Sink(_, _)
1783                | StreamingJob::Index(_, _) => {}
1784                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1785                    return Err(
1786                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1787                    );
1788                }
1789            }
1790        }
1791
1792        let upstream_actors = upstream_root_fragments
1793            .values()
1794            .map(|fragment| {
1795                (
1796                    fragment.fragment_id,
1797                    fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1798                )
1799            })
1800            .collect();
1801
1802        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1803            fragment_graph,
1804            upstream_root_fragments,
1805            existing_actor_location,
1806            (&stream_job).into(),
1807        )?;
1808
1809        let resource_group = match specific_resource_group {
1810            None => {
1811                self.metadata_manager
1812                    .get_database_resource_group(stream_job.database_id() as ObjectId)
1813                    .await?
1814            }
1815            Some(resource_group) => resource_group,
1816        };
1817
1818        // 3. Build the actor graph.
1819        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1820
1821        let parallelism = self.resolve_stream_parallelism(
1822            specified_parallelism,
1823            max_parallelism,
1824            &cluster_info,
1825            resource_group.clone(),
1826        )?;
1827
1828        let parallelism = self
1829            .env
1830            .system_params_reader()
1831            .await
1832            .adaptive_parallelism_strategy()
1833            .compute_target_parallelism(parallelism.get());
1834
1835        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1836        let actor_graph_builder = ActorGraphBuilder::new(
1837            id,
1838            resource_group,
1839            complete_graph,
1840            cluster_info,
1841            parallelism,
1842        )?;
1843
1844        let ActorGraphBuildResult {
1845            graph,
1846            downstream_fragment_relations,
1847            building_locations,
1848            existing_locations,
1849            upstream_fragment_downstreams,
1850            new_no_shuffle,
1851            replace_upstream,
1852            ..
1853        } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1854        assert!(replace_upstream.is_empty());
1855
1856        // 4. Build the table fragments structure that will be persisted in the stream manager,
1857        // and the context that contains all information needed for building the
1858        // actors on the compute nodes.
1859
1860        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1861        // Otherwise, it defaults to FIXED based on deduction.
1862        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1863            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1864            _ => TableParallelism::Fixed(parallelism.get()),
1865        };
1866
1867        let stream_job_fragments = StreamJobFragments::new(
1868            id.into(),
1869            graph,
1870            &building_locations.actor_locations,
1871            stream_ctx.clone(),
1872            table_parallelism,
1873            max_parallelism.get(),
1874        );
1875        let internal_tables = stream_job_fragments.internal_tables();
1876
1877        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1878            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1879        }
1880
1881        let replace_table_job_info = match affected_table_replace_info {
1882            Some((table_stream_job, fragment_graph)) => {
1883                if snapshot_backfill_info.is_some() {
1884                    return Err(anyhow!(
1885                        "snapshot backfill should not have replace table info: {table_stream_job:?}"
1886                    )
1887                    .into());
1888                }
1889                let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1890                    bail!("additional replace table event only occurs when sinking into table");
1891                };
1892
1893                table_stream_job.table().expect("should be table job");
1894                let tmp_id = self
1895                    .metadata_manager
1896                    .catalog_controller
1897                    .create_job_catalog_for_replace(
1898                        &table_stream_job,
1899                        &stream_ctx,
1900                        &fragment_graph.specified_parallelism(),
1901                        fragment_graph.max_parallelism(),
1902                    )
1903                    .await? as u32;
1904
1905                let (context, table_fragments) = self
1906                    .inject_replace_table_job_for_table_sink(
1907                        tmp_id,
1908                        &self.metadata_manager,
1909                        stream_ctx,
1910                        Some(sink),
1911                        Some(&stream_job_fragments),
1912                        None,
1913                        &table_stream_job,
1914                        fragment_graph,
1915                    )
1916                    .await?;
1917                // When sinking into table occurs, some variables of the target table may be modified,
1918                // such as `fragment_id` being altered by `prepare_replace_table`.
1919                // At this point, it’s necessary to update the table info carried with the sink.
1920                must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1921                    // The StreamingJob in ReplaceTableInfo must be StreamingJob::Table
1922                    *target_table = Some((table.clone(), source.clone()));
1923                });
1924
1925                Some((table_stream_job, context, table_fragments))
1926            }
1927            None => None,
1928        };
1929
1930        let ctx = CreateStreamingJobContext {
1931            upstream_fragment_downstreams,
1932            new_no_shuffle,
1933            upstream_actors,
1934            internal_tables,
1935            building_locations,
1936            existing_locations,
1937            definition: stream_job.definition(),
1938            mv_table_id: stream_job.mv_table(),
1939            create_type: stream_job.create_type(),
1940            job_type: (&stream_job).into(),
1941            streaming_job: stream_job,
1942            replace_table_job_info,
1943            option: CreateStreamingJobOption {},
1944            snapshot_backfill_info,
1945            cross_db_snapshot_backfill_info,
1946            fragment_backfill_ordering,
1947        };
1948
1949        Ok((
1950            ctx,
1951            StreamJobFragmentsToCreate {
1952                inner: stream_job_fragments,
1953                downstreams: downstream_fragment_relations,
1954            },
1955        ))
1956    }
1957
1958    /// `build_replace_table` builds a job replacement and returns the context and new job
1959    /// fragments.
1960    ///
1961    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1962    /// replacement is finished.
1963    pub(crate) async fn build_replace_job(
1964        &self,
1965        stream_ctx: StreamContext,
1966        stream_job: &StreamingJob,
1967        mut fragment_graph: StreamFragmentGraph,
1968        tmp_job_id: TableId,
1969    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1970        match &stream_job {
1971            StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1972            StreamingJob::MaterializedView(..)
1973            | StreamingJob::Sink(..)
1974            | StreamingJob::Index(..) => {
1975                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1976            }
1977        }
1978
1979        let id = stream_job.id();
1980        let expr_context = stream_ctx.to_expr_context();
1981
1982        // check if performing drop table connector
1983        let mut drop_table_associated_source_id = None;
1984        if let StreamingJob::Table(None, _, _) = &stream_job {
1985            drop_table_associated_source_id = self
1986                .metadata_manager
1987                .get_table_associated_source_id(id as _)
1988                .await?;
1989        }
1990
1991        let old_fragments = self
1992            .metadata_manager
1993            .get_job_fragments_by_id(&id.into())
1994            .await?;
1995        let old_internal_table_ids = old_fragments.internal_table_ids();
1996
1997        // handle drop table's associated source
1998        let mut drop_table_connector_ctx = None;
1999        if drop_table_associated_source_id.is_some() {
2000            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
2001            debug_assert!(old_internal_table_ids.len() == 1);
2002
2003            drop_table_connector_ctx = Some(DropTableConnectorContext {
2004                // we do not remove the original table catalog as it's still needed for the streaming job
2005                // just need to remove the ref to the state table
2006                to_change_streaming_job_id: id as i32,
2007                to_remove_state_table_id: old_internal_table_ids[0] as i32, // asserted before
2008                to_remove_source_id: drop_table_associated_source_id.unwrap(),
2009            });
2010        } else {
2011            let old_internal_tables = self
2012                .metadata_manager
2013                .get_table_catalog_by_ids(old_internal_table_ids)
2014                .await?;
2015            fragment_graph.fit_internal_table_ids(old_internal_tables)?;
2016        }
2017
2018        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2019        // graph that contains all information needed for building the actor graph.
2020        let original_root_fragment = old_fragments
2021            .root_fragment()
2022            .expect("root fragment not found");
2023
2024        let job_type = StreamingJobType::from(stream_job);
2025
2026        // Extract the downstream fragments from the fragment graph.
2027        let (downstream_fragments, downstream_actor_location) =
2028            self.metadata_manager.get_downstream_fragments(id).await?;
2029
2030        // build complete graph based on the table job type
2031        let complete_graph = match &job_type {
2032            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2033                CompleteStreamFragmentGraph::with_downstreams(
2034                    fragment_graph,
2035                    original_root_fragment.fragment_id,
2036                    downstream_fragments,
2037                    downstream_actor_location,
2038                    job_type,
2039                )?
2040            }
2041            StreamingJobType::Table(TableJobType::SharedCdcSource) => {
2042                // get the upstream fragment which should be the cdc source
2043                let (upstream_root_fragments, upstream_actor_location) = self
2044                    .metadata_manager
2045                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2046                    .await?;
2047
2048                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2049                    fragment_graph,
2050                    upstream_root_fragments,
2051                    upstream_actor_location,
2052                    original_root_fragment.fragment_id,
2053                    downstream_fragments,
2054                    downstream_actor_location,
2055                    job_type,
2056                )?
2057            }
2058            _ => unreachable!(),
2059        };
2060
2061        let resource_group = self
2062            .metadata_manager
2063            .get_existing_job_resource_group(id as ObjectId)
2064            .await?;
2065
2066        // 2. Build the actor graph.
2067        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2068
2069        // XXX: what is this parallelism?
2070        // Is it "assigned parallelism"?
2071        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2072            .expect("The number of actors in the original table fragment should be greater than 0");
2073
2074        let actor_graph_builder = ActorGraphBuilder::new(
2075            id,
2076            resource_group,
2077            complete_graph,
2078            cluster_info,
2079            parallelism,
2080        )?;
2081
2082        let ActorGraphBuildResult {
2083            graph,
2084            downstream_fragment_relations,
2085            building_locations,
2086            existing_locations,
2087            upstream_fragment_downstreams,
2088            replace_upstream,
2089            new_no_shuffle,
2090            ..
2091        } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2092
2093        // general table & source does not have upstream job, so the dispatchers should be empty
2094        if matches!(
2095            job_type,
2096            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2097        ) {
2098            assert!(upstream_fragment_downstreams.is_empty());
2099        }
2100
2101        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2102        // the context that contains all information needed for building the actors on the compute
2103        // nodes.
2104        let stream_job_fragments = StreamJobFragments::new(
2105            (tmp_job_id as u32).into(),
2106            graph,
2107            &building_locations.actor_locations,
2108            stream_ctx,
2109            old_fragments.assigned_parallelism,
2110            old_fragments.max_parallelism,
2111        );
2112
2113        // Note: no need to set `vnode_count` as it's already set by the frontend.
2114        // See `get_replace_table_plan`.
2115
2116        let ctx = ReplaceStreamJobContext {
2117            old_fragments,
2118            replace_upstream,
2119            new_no_shuffle,
2120            upstream_fragment_downstreams,
2121            building_locations,
2122            existing_locations,
2123            streaming_job: stream_job.clone(),
2124            tmp_id: tmp_job_id as _,
2125            drop_table_connector_ctx,
2126        };
2127
2128        Ok((
2129            ctx,
2130            StreamJobFragmentsToCreate {
2131                inner: stream_job_fragments,
2132                downstreams: downstream_fragment_relations,
2133            },
2134        ))
2135    }
2136
2137    async fn alter_name(
2138        &self,
2139        relation: alter_name_request::Object,
2140        new_name: &str,
2141    ) -> MetaResult<NotificationVersion> {
2142        let (obj_type, id) = match relation {
2143            alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2144            alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2145            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2146            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2147            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2148            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2149            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2150            alter_name_request::Object::SubscriptionId(id) => {
2151                (ObjectType::Subscription, id as ObjectId)
2152            }
2153        };
2154        self.metadata_manager
2155            .catalog_controller
2156            .alter_name(obj_type, id, new_name)
2157            .await
2158    }
2159
2160    async fn alter_swap_rename(
2161        &self,
2162        object: alter_swap_rename_request::Object,
2163    ) -> MetaResult<NotificationVersion> {
2164        let (obj_type, src_id, dst_id) = match object {
2165            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2166            alter_swap_rename_request::Object::Table(objs) => {
2167                let (src_id, dst_id) = (
2168                    objs.src_object_id as ObjectId,
2169                    objs.dst_object_id as ObjectId,
2170                );
2171                (ObjectType::Table, src_id, dst_id)
2172            }
2173            alter_swap_rename_request::Object::View(objs) => {
2174                let (src_id, dst_id) = (
2175                    objs.src_object_id as ObjectId,
2176                    objs.dst_object_id as ObjectId,
2177                );
2178                (ObjectType::View, src_id, dst_id)
2179            }
2180            alter_swap_rename_request::Object::Source(objs) => {
2181                let (src_id, dst_id) = (
2182                    objs.src_object_id as ObjectId,
2183                    objs.dst_object_id as ObjectId,
2184                );
2185                (ObjectType::Source, src_id, dst_id)
2186            }
2187            alter_swap_rename_request::Object::Sink(objs) => {
2188                let (src_id, dst_id) = (
2189                    objs.src_object_id as ObjectId,
2190                    objs.dst_object_id as ObjectId,
2191                );
2192                (ObjectType::Sink, src_id, dst_id)
2193            }
2194            alter_swap_rename_request::Object::Subscription(objs) => {
2195                let (src_id, dst_id) = (
2196                    objs.src_object_id as ObjectId,
2197                    objs.dst_object_id as ObjectId,
2198                );
2199                (ObjectType::Subscription, src_id, dst_id)
2200            }
2201        };
2202
2203        self.metadata_manager
2204            .catalog_controller
2205            .alter_swap_rename(obj_type, src_id, dst_id)
2206            .await
2207    }
2208
2209    async fn alter_owner(
2210        &self,
2211        object: Object,
2212        owner_id: UserId,
2213    ) -> MetaResult<NotificationVersion> {
2214        let (obj_type, id) = match object {
2215            Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2216            Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2217            Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2218            Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2219            Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2220            Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2221            Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2222            Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2223        };
2224        self.metadata_manager
2225            .catalog_controller
2226            .alter_owner(obj_type, id, owner_id as _)
2227            .await
2228    }
2229
2230    async fn alter_set_schema(
2231        &self,
2232        object: alter_set_schema_request::Object,
2233        new_schema_id: SchemaId,
2234    ) -> MetaResult<NotificationVersion> {
2235        let (obj_type, id) = match object {
2236            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2237            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2238            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2239            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2240            alter_set_schema_request::Object::FunctionId(id) => {
2241                (ObjectType::Function, id as ObjectId)
2242            }
2243            alter_set_schema_request::Object::ConnectionId(id) => {
2244                (ObjectType::Connection, id as ObjectId)
2245            }
2246            alter_set_schema_request::Object::SubscriptionId(id) => {
2247                (ObjectType::Subscription, id as ObjectId)
2248            }
2249        };
2250        self.metadata_manager
2251            .catalog_controller
2252            .alter_schema(obj_type, id, new_schema_id as _)
2253            .await
2254    }
2255
2256    pub async fn wait(&self) -> MetaResult<()> {
2257        let timeout_ms = 30 * 60 * 1000;
2258        for _ in 0..timeout_ms {
2259            if self
2260                .metadata_manager
2261                .catalog_controller
2262                .list_background_creating_mviews(true)
2263                .await?
2264                .is_empty()
2265            {
2266                return Ok(());
2267            }
2268
2269            sleep(Duration::from_millis(1)).await;
2270        }
2271        Err(MetaError::cancelled(format!(
2272            "timeout after {timeout_ms}ms"
2273        )))
2274    }
2275
2276    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2277        self.metadata_manager
2278            .catalog_controller
2279            .comment_on(comment)
2280            .await
2281    }
2282}
2283
2284fn report_create_object(
2285    catalog_id: u32,
2286    event_name: &str,
2287    obj_type: PbTelemetryDatabaseObject,
2288    connector_name: Option<String>,
2289    attr_info: Option<jsonbb::Value>,
2290) {
2291    report_event(
2292        PbTelemetryEventStage::CreateStreamJob,
2293        event_name,
2294        catalog_id.into(),
2295        connector_name,
2296        Some(obj_type),
2297        attr_info,
2298    );
2299}
2300
2301async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2302    match Entity::delete_many()
2303        .filter(Column::SinkId.eq(sink_id))
2304        .exec(db)
2305        .await
2306    {
2307        Ok(result) => {
2308            let deleted_count = result.rows_affected;
2309
2310            tracing::info!(
2311                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2312                deleted_count,
2313                sink_id
2314            );
2315            Ok(())
2316        }
2317        Err(e) => {
2318            tracing::error!(
2319                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2320                sink_id,
2321                e.as_report()
2322            );
2323            Err(e.into())
2324        }
2325    }
2326}