risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63    StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::sink_coordination::SinkCoordinatorManager;
79use crate::manager::{
80    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
81    NotificationVersion, StreamingJob, StreamingJobType,
82};
83use crate::model::{
84    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
85    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
86    TableParallelism,
87};
88use crate::stream::cdc::{
89    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
96    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
97    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104    Restrict,
105    Cascade,
106}
107
108impl DropMode {
109    pub fn from_request_setting(cascade: bool) -> DropMode {
110        if cascade {
111            DropMode::Cascade
112        } else {
113            DropMode::Restrict
114        }
115    }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120    MaterializedView(TableId),
121    Sink(SinkId),
122    Table(Option<SourceId>, TableId),
123    Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "{}", self.as_ref())?;
129        write!(f, "({})", self.id())
130    }
131}
132
133impl StreamingJobId {
134    fn id(&self) -> JobId {
135        match self {
136            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137            StreamingJobId::Index(id) => id.as_job_id(),
138            StreamingJobId::Sink(id) => id.as_job_id(),
139        }
140    }
141}
142
143/// It’s used to describe the information of the job that needs to be replaced
144/// and it will be used during replacing table and creating sink into table operations.
145pub struct ReplaceStreamJobInfo {
146    pub streaming_job: StreamingJob,
147    pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152    CreateDatabase(Database),
153    DropDatabase(DatabaseId),
154    CreateSchema(Schema),
155    DropSchema(SchemaId, DropMode),
156    CreateNonSharedSource(Source),
157    DropSource(SourceId, DropMode),
158    CreateFunction(Function),
159    DropFunction(FunctionId, DropMode),
160    CreateView(View, HashSet<ObjectId>),
161    DropView(ViewId, DropMode),
162    CreateStreamingJob {
163        stream_job: StreamingJob,
164        fragment_graph: StreamFragmentGraphProto,
165        dependencies: HashSet<ObjectId>,
166        specific_resource_group: Option<String>, // specific resource group
167        if_not_exists: bool,
168    },
169    DropStreamingJob {
170        job_id: StreamingJobId,
171        drop_mode: DropMode,
172    },
173    AlterName(alter_name_request::Object, String),
174    AlterSwapRename(alter_swap_rename_request::Object),
175    ReplaceStreamJob(ReplaceStreamJobInfo),
176    AlterNonSharedSource(Source),
177    AlterObjectOwner(Object, UserId),
178    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
179    CreateConnection(Connection),
180    DropConnection(ConnectionId, DropMode),
181    CreateSecret(Secret),
182    AlterSecret(Secret),
183    DropSecret(SecretId),
184    CommentOn(Comment),
185    CreateSubscription(Subscription),
186    DropSubscription(SubscriptionId, DropMode),
187    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
188    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
189}
190
191impl DdlCommand {
192    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
193    fn object(&self) -> Either<String, ObjectId> {
194        use Either::*;
195        match self {
196            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
197            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
198            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
199            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
200            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
201            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
202            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
203            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
204            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
205            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
206            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
207            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
208            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
209            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
210            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
211            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
212            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
213            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
214            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
215            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
216            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
217            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
218            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
219            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
220            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
221            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
222            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
223            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
224        }
225    }
226
227    fn allow_in_recovery(&self) -> bool {
228        match self {
229            DdlCommand::DropDatabase(_)
230            | DdlCommand::DropSchema(_, _)
231            | DdlCommand::DropSource(_, _)
232            | DdlCommand::DropFunction(_, _)
233            | DdlCommand::DropView(_, _)
234            | DdlCommand::DropStreamingJob { .. }
235            | DdlCommand::DropConnection(_, _)
236            | DdlCommand::DropSecret(_)
237            | DdlCommand::DropSubscription(_, _)
238            | DdlCommand::AlterName(_, _)
239            | DdlCommand::AlterObjectOwner(_, _)
240            | DdlCommand::AlterSetSchema(_, _)
241            | DdlCommand::CreateDatabase(_)
242            | DdlCommand::CreateSchema(_)
243            | DdlCommand::CreateFunction(_)
244            | DdlCommand::CreateView(_, _)
245            | DdlCommand::CreateConnection(_)
246            | DdlCommand::CommentOn(_)
247            | DdlCommand::CreateSecret(_)
248            | DdlCommand::AlterSecret(_)
249            | DdlCommand::AlterSwapRename(_)
250            | DdlCommand::AlterDatabaseParam(_, _)
251            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
252            DdlCommand::CreateStreamingJob { .. }
253            | DdlCommand::CreateNonSharedSource(_)
254            | DdlCommand::ReplaceStreamJob(_)
255            | DdlCommand::AlterNonSharedSource(_)
256            | DdlCommand::CreateSubscription(_) => false,
257        }
258    }
259}
260
261#[derive(Clone)]
262pub struct DdlController {
263    pub(crate) env: MetaSrvEnv,
264
265    pub(crate) metadata_manager: MetadataManager,
266    pub(crate) stream_manager: GlobalStreamManagerRef,
267    pub(crate) source_manager: SourceManagerRef,
268    barrier_manager: BarrierManagerRef,
269    sink_manager: SinkCoordinatorManager,
270
271    // The semaphore is used to limit the number of concurrent streaming job creation.
272    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
273
274    /// Sequence number for DDL commands, used for observability and debugging.
275    seq: Arc<AtomicU64>,
276}
277
278#[derive(Clone)]
279pub struct CreatingStreamingJobPermit {
280    pub(crate) semaphore: Arc<Semaphore>,
281}
282
283impl CreatingStreamingJobPermit {
284    async fn new(env: &MetaSrvEnv) -> Self {
285        let mut permits = env
286            .system_params_reader()
287            .await
288            .max_concurrent_creating_streaming_jobs() as usize;
289        if permits == 0 {
290            // if the system parameter is set to zero, use the max permitted value.
291            permits = Semaphore::MAX_PERMITS;
292        }
293        let semaphore = Arc::new(Semaphore::new(permits));
294
295        let (local_notification_tx, mut local_notification_rx) =
296            tokio::sync::mpsc::unbounded_channel();
297        env.notification_manager()
298            .insert_local_sender(local_notification_tx);
299        let semaphore_clone = semaphore.clone();
300        tokio::spawn(async move {
301            while let Some(notification) = local_notification_rx.recv().await {
302                let LocalNotification::SystemParamsChange(p) = &notification else {
303                    continue;
304                };
305                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
306                if new_permits == 0 {
307                    new_permits = Semaphore::MAX_PERMITS;
308                }
309                match permits.cmp(&new_permits) {
310                    Ordering::Less => {
311                        semaphore_clone.add_permits(new_permits - permits);
312                    }
313                    Ordering::Equal => continue,
314                    Ordering::Greater => {
315                        let to_release = permits - new_permits;
316                        let reduced = semaphore_clone.forget_permits(to_release);
317                        // TODO: implement dynamic semaphore with limits by ourself.
318                        if reduced != to_release {
319                            tracing::warn!(
320                                "no enough permits to release, expected {}, but reduced {}",
321                                to_release,
322                                reduced
323                            );
324                        }
325                    }
326                }
327                tracing::info!(
328                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
329                    permits,
330                    new_permits
331                );
332                permits = new_permits;
333            }
334        });
335
336        Self { semaphore }
337    }
338}
339
340impl DdlController {
341    pub async fn new(
342        env: MetaSrvEnv,
343        metadata_manager: MetadataManager,
344        stream_manager: GlobalStreamManagerRef,
345        source_manager: SourceManagerRef,
346        barrier_manager: BarrierManagerRef,
347        sink_manager: SinkCoordinatorManager,
348    ) -> Self {
349        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
350        Self {
351            env,
352            metadata_manager,
353            stream_manager,
354            source_manager,
355            barrier_manager,
356            sink_manager,
357            creating_streaming_job_permits,
358            seq: Arc::new(AtomicU64::new(0)),
359        }
360    }
361
362    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
363    pub fn next_seq(&self) -> u64 {
364        // This is a simple atomic increment operation.
365        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
366    }
367
368    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
369    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
370    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
371    /// would be a huge hassle and pain if we don't spawn here.
372    ///
373    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
374    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
375        if !command.allow_in_recovery() {
376            self.barrier_manager.check_status_running()?;
377        }
378
379        let await_tree_key = format!("DDL Command {}", self.next_seq());
380        let await_tree_span = await_tree::span!("{command}({})", command.object());
381
382        let ctrl = self.clone();
383        let fut = async move {
384            match command {
385                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
386                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
387                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
388                DdlCommand::DropSchema(schema_id, drop_mode) => {
389                    ctrl.drop_schema(schema_id, drop_mode).await
390                }
391                DdlCommand::CreateNonSharedSource(source) => {
392                    ctrl.create_non_shared_source(source).await
393                }
394                DdlCommand::DropSource(source_id, drop_mode) => {
395                    ctrl.drop_source(source_id, drop_mode).await
396                }
397                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
398                DdlCommand::DropFunction(function_id, drop_mode) => {
399                    ctrl.drop_function(function_id, drop_mode).await
400                }
401                DdlCommand::CreateView(view, dependencies) => {
402                    ctrl.create_view(view, dependencies).await
403                }
404                DdlCommand::DropView(view_id, drop_mode) => {
405                    ctrl.drop_view(view_id, drop_mode).await
406                }
407                DdlCommand::CreateStreamingJob {
408                    stream_job,
409                    fragment_graph,
410                    dependencies,
411                    specific_resource_group,
412                    if_not_exists,
413                } => {
414                    ctrl.create_streaming_job(
415                        stream_job,
416                        fragment_graph,
417                        dependencies,
418                        specific_resource_group,
419                        if_not_exists,
420                    )
421                    .await
422                }
423                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
424                    ctrl.drop_streaming_job(job_id, drop_mode).await
425                }
426                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
427                    streaming_job,
428                    fragment_graph,
429                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
430                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
431                DdlCommand::AlterObjectOwner(object, owner_id) => {
432                    ctrl.alter_owner(object, owner_id).await
433                }
434                DdlCommand::AlterSetSchema(object, new_schema_id) => {
435                    ctrl.alter_set_schema(object, new_schema_id).await
436                }
437                DdlCommand::CreateConnection(connection) => {
438                    ctrl.create_connection(connection).await
439                }
440                DdlCommand::DropConnection(connection_id, drop_mode) => {
441                    ctrl.drop_connection(connection_id, drop_mode).await
442                }
443                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
444                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
445                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
446                DdlCommand::AlterNonSharedSource(source) => {
447                    ctrl.alter_non_shared_source(source).await
448                }
449                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
450                DdlCommand::CreateSubscription(subscription) => {
451                    ctrl.create_subscription(subscription).await
452                }
453                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
454                    ctrl.drop_subscription(subscription_id, drop_mode).await
455                }
456                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
457                DdlCommand::AlterDatabaseParam(database_id, param) => {
458                    ctrl.alter_database_param(database_id, param).await
459                }
460                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
461                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
462                        .await
463                }
464            }
465        }
466        .in_current_span();
467        let fut = (self.env.await_tree_reg())
468            .register(await_tree_key, await_tree_span)
469            .instrument(Box::pin(fut));
470        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
471        Ok(Some(WaitVersion {
472            catalog_version: notification_version,
473            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
474        }))
475    }
476
477    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
478        self.barrier_manager.get_ddl_progress().await
479    }
480
481    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
482        let (version, updated_db) = self
483            .metadata_manager
484            .catalog_controller
485            .create_database(database)
486            .await?;
487        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
488        self.barrier_manager
489            .update_database_barrier(
490                updated_db.database_id,
491                updated_db.barrier_interval_ms.map(|v| v as u32),
492                updated_db.checkpoint_frequency.map(|v| v as u64),
493            )
494            .await?;
495        Ok(version)
496    }
497
498    #[tracing::instrument(skip(self), level = "debug")]
499    pub async fn reschedule_streaming_job(
500        &self,
501        job_id: JobId,
502        target: ReschedulePolicy,
503        mut deferred: bool,
504    ) -> MetaResult<()> {
505        tracing::info!("altering parallelism for job {}", job_id);
506        if self.barrier_manager.check_status_running().is_err() {
507            tracing::info!(
508                "alter parallelism is set to deferred mode because the system is in recovery state"
509            );
510            deferred = true;
511        }
512
513        self.stream_manager
514            .reschedule_streaming_job(job_id, target, deferred)
515            .await
516    }
517
518    pub async fn reschedule_cdc_table_backfill(
519        &self,
520        job_id: JobId,
521        target: ReschedulePolicy,
522    ) -> MetaResult<()> {
523        tracing::info!("alter CDC table backfill parallelism");
524        if self.barrier_manager.check_status_running().is_err() {
525            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
526        }
527        self.stream_manager
528            .reschedule_cdc_table_backfill(job_id, target)
529            .await
530    }
531
532    pub async fn reschedule_fragments(
533        &self,
534        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
535    ) -> MetaResult<()> {
536        tracing::info!(
537            "altering parallelism for fragments {:?}",
538            fragment_targets.keys()
539        );
540        let fragment_targets = fragment_targets
541            .into_iter()
542            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
543            .collect();
544
545        self.stream_manager
546            .reschedule_fragments(fragment_targets)
547            .await
548    }
549
550    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
551        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
552            .await
553    }
554
555    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
556        self.metadata_manager
557            .catalog_controller
558            .create_schema(schema)
559            .await
560    }
561
562    async fn drop_schema(
563        &self,
564        schema_id: SchemaId,
565        drop_mode: DropMode,
566    ) -> MetaResult<NotificationVersion> {
567        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
568            .await
569    }
570
571    /// Shared source is handled in [`Self::create_streaming_job`]
572    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
573        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
574            .await
575            .context("failed to create source worker")?;
576
577        let (source_id, version) = self
578            .metadata_manager
579            .catalog_controller
580            .create_source(source)
581            .await?;
582        self.source_manager
583            .register_source_with_handle(source_id, handle)
584            .await;
585        Ok(version)
586    }
587
588    async fn drop_source(
589        &self,
590        source_id: SourceId,
591        drop_mode: DropMode,
592    ) -> MetaResult<NotificationVersion> {
593        self.drop_object(ObjectType::Source, source_id, drop_mode)
594            .await
595    }
596
597    /// This replaces the source in the catalog.
598    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
599    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
600        self.metadata_manager
601            .catalog_controller
602            .alter_non_shared_source(source)
603            .await
604    }
605
606    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
607        self.metadata_manager
608            .catalog_controller
609            .create_function(function)
610            .await
611    }
612
613    async fn drop_function(
614        &self,
615        function_id: FunctionId,
616        drop_mode: DropMode,
617    ) -> MetaResult<NotificationVersion> {
618        self.drop_object(ObjectType::Function, function_id, drop_mode)
619            .await
620    }
621
622    async fn create_view(
623        &self,
624        view: View,
625        dependencies: HashSet<ObjectId>,
626    ) -> MetaResult<NotificationVersion> {
627        self.metadata_manager
628            .catalog_controller
629            .create_view(view, dependencies)
630            .await
631    }
632
633    async fn drop_view(
634        &self,
635        view_id: ViewId,
636        drop_mode: DropMode,
637    ) -> MetaResult<NotificationVersion> {
638        self.drop_object(ObjectType::View, view_id, drop_mode).await
639    }
640
641    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
642        validate_connection(&connection).await?;
643        self.metadata_manager
644            .catalog_controller
645            .create_connection(connection)
646            .await
647    }
648
649    async fn drop_connection(
650        &self,
651        connection_id: ConnectionId,
652        drop_mode: DropMode,
653    ) -> MetaResult<NotificationVersion> {
654        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
655            .await
656    }
657
658    async fn alter_database_param(
659        &self,
660        database_id: DatabaseId,
661        param: AlterDatabaseParam,
662    ) -> MetaResult<NotificationVersion> {
663        let (version, updated_db) = self
664            .metadata_manager
665            .catalog_controller
666            .alter_database_param(database_id, param)
667            .await?;
668        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
669        self.barrier_manager
670            .update_database_barrier(
671                database_id,
672                updated_db.barrier_interval_ms.map(|v| v as u32),
673                updated_db.checkpoint_frequency.map(|v| v as u64),
674            )
675            .await?;
676        Ok(version)
677    }
678
679    // The 'secret' part of the request we receive from the frontend is in plaintext;
680    // here, we need to encrypt it before storing it in the catalog.
681    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
682        let secret_store_private_key = self
683            .env
684            .opts
685            .secret_store_private_key
686            .clone()
687            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
688
689        let encrypted_payload = SecretEncryption::encrypt(
690            secret_store_private_key.as_slice(),
691            secret.get_value().as_slice(),
692        )
693        .context(format!("failed to encrypt secret {}", secret.name))?;
694        Ok(encrypted_payload
695            .serialize()
696            .context(format!("failed to serialize secret {}", secret.name))?)
697    }
698
699    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
700        // The 'secret' part of the request we receive from the frontend is in plaintext;
701        // here, we need to encrypt it before storing it in the catalog.
702        let secret_plain_payload = secret.value.clone();
703        let encrypted_payload = self.get_encrypted_payload(&secret)?;
704        secret.value = encrypted_payload;
705
706        self.metadata_manager
707            .catalog_controller
708            .create_secret(secret, secret_plain_payload)
709            .await
710    }
711
712    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
713        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
714            .await
715    }
716
717    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
718        let secret_plain_payload = secret.value.clone();
719        let encrypted_payload = self.get_encrypted_payload(&secret)?;
720        secret.value = encrypted_payload;
721        self.metadata_manager
722            .catalog_controller
723            .alter_secret(secret, secret_plain_payload)
724            .await
725    }
726
727    async fn create_subscription(
728        &self,
729        mut subscription: Subscription,
730    ) -> MetaResult<NotificationVersion> {
731        tracing::debug!("create subscription");
732        let _permit = self
733            .creating_streaming_job_permits
734            .semaphore
735            .acquire()
736            .await
737            .unwrap();
738        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
739        self.metadata_manager
740            .catalog_controller
741            .create_subscription_catalog(&mut subscription)
742            .await?;
743        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
744            tracing::debug!(error = %err.as_report(), "failed to create subscription");
745            let _ = self
746                .metadata_manager
747                .catalog_controller
748                .try_abort_creating_subscription(subscription.id)
749                .await
750                .inspect_err(|e| {
751                    tracing::error!(
752                        error = %e.as_report(),
753                        "failed to abort create subscription after failure"
754                    );
755                });
756            return Err(err);
757        }
758
759        let version = self
760            .metadata_manager
761            .catalog_controller
762            .notify_create_subscription(subscription.id)
763            .await?;
764        tracing::debug!("finish create subscription");
765        Ok(version)
766    }
767
768    async fn drop_subscription(
769        &self,
770        subscription_id: SubscriptionId,
771        drop_mode: DropMode,
772    ) -> MetaResult<NotificationVersion> {
773        tracing::debug!("preparing drop subscription");
774        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
775        let subscription = self
776            .metadata_manager
777            .catalog_controller
778            .get_subscription_by_id(subscription_id)
779            .await?;
780        let table_id = subscription.dependent_table_id;
781        let database_id = subscription.database_id;
782        let (_, version) = self
783            .metadata_manager
784            .catalog_controller
785            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
786            .await?;
787        self.stream_manager
788            .drop_subscription(database_id, subscription_id, table_id)
789            .await;
790        tracing::debug!("finish drop subscription");
791        Ok(version)
792    }
793
794    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
795    #[await_tree::instrument]
796    pub(crate) async fn validate_cdc_table(
797        &self,
798        table: &Table,
799        table_fragments: &StreamJobFragments,
800    ) -> MetaResult<()> {
801        let stream_scan_fragment = table_fragments
802            .fragments
803            .values()
804            .filter(|f| {
805                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
806                    || f.fragment_type_mask
807                        .contains(FragmentTypeFlag::StreamCdcScan)
808            })
809            .exactly_one()
810            .ok()
811            .with_context(|| {
812                format!(
813                    "expect exactly one stream scan fragment, got: {:?}",
814                    table_fragments.fragments
815                )
816            })?;
817        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
818            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
819                if let Some(o) = node.options
820                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
821                {
822                    // Use parallel CDC backfill.
823                } else {
824                    assert_eq!(
825                        stream_scan_fragment.actors.len(),
826                        1,
827                        "Stream scan fragment should have only one actor"
828                    );
829                }
830            }
831        }
832        let mut found_cdc_scan = false;
833        match &stream_scan_fragment.nodes.node_body {
834            Some(NodeBody::StreamCdcScan(_)) => {
835                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
836                if self
837                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
838                    .await?
839                {
840                    found_cdc_scan = true;
841                }
842            }
843            // When there's generated columns, the cdc scan node is wrapped in a project node
844            Some(NodeBody::Project(_)) => {
845                for input in &stream_scan_fragment.nodes.input {
846                    assert_parallelism(stream_scan_fragment, &input.node_body);
847                    if self
848                        .validate_cdc_table_inner(&input.node_body, table.id)
849                        .await?
850                    {
851                        found_cdc_scan = true;
852                    }
853                }
854            }
855            _ => {
856                bail!("Unexpected node body for stream cdc scan");
857            }
858        };
859        if !found_cdc_scan {
860            bail!("No stream cdc scan node found in stream scan fragment");
861        }
862        Ok(())
863    }
864
865    async fn validate_cdc_table_inner(
866        &self,
867        node_body: &Option<NodeBody>,
868        table_id: TableId,
869    ) -> MetaResult<bool> {
870        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
871            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
872        {
873            let options_with_secret = WithOptionsSecResolved::new(
874                cdc_table_desc.connect_properties.clone(),
875                cdc_table_desc.secret_refs.clone(),
876            );
877
878            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
879            props.init_from_pb_cdc_table_desc(cdc_table_desc);
880
881            // Try creating a split enumerator to validate
882            let _enumerator = props
883                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
884                .await?;
885
886            tracing::debug!(?table_id, "validate cdc table success");
887            Ok(true)
888        } else {
889            Ok(false)
890        }
891    }
892
893    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
894        let migrated = self
895            .metadata_manager
896            .catalog_controller
897            .has_table_been_migrated(table_id)
898            .await?;
899        if !migrated {
900            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
901        } else {
902            Ok(())
903        }
904    }
905
906    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
907    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
908    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
909    pub async fn create_streaming_job(
910        &self,
911        mut streaming_job: StreamingJob,
912        fragment_graph: StreamFragmentGraphProto,
913        dependencies: HashSet<ObjectId>,
914        specific_resource_group: Option<String>,
915        if_not_exists: bool,
916    ) -> MetaResult<NotificationVersion> {
917        if let StreamingJob::Sink(sink) = &streaming_job
918            && let Some(target_table) = sink.target_table
919        {
920            self.validate_table_for_sink(target_table).await?;
921        }
922        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
923        let check_ret = self
924            .metadata_manager
925            .catalog_controller
926            .create_job_catalog(
927                &mut streaming_job,
928                &ctx,
929                &fragment_graph.parallelism,
930                fragment_graph.max_parallelism as _,
931                dependencies,
932                specific_resource_group.clone(),
933            )
934            .await;
935        if let Err(meta_err) = check_ret {
936            if !if_not_exists {
937                return Err(meta_err);
938            }
939            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
940                if streaming_job.create_type() == CreateType::Foreground {
941                    let database_id = streaming_job.database_id();
942                    self.metadata_manager
943                        .wait_streaming_job_finished(database_id, *job_id)
944                        .await
945                } else {
946                    Ok(IGNORED_NOTIFICATION_VERSION)
947                }
948            } else {
949                Err(meta_err)
950            };
951        }
952        let job_id = streaming_job.id();
953        tracing::debug!(
954            id = %job_id,
955            definition = streaming_job.definition(),
956            create_type = streaming_job.create_type().as_str_name(),
957            job_type = ?streaming_job.job_type(),
958            "starting streaming job",
959        );
960        // TODO: acquire permits for recovered background DDLs.
961        let permit = self
962            .creating_streaming_job_permits
963            .semaphore
964            .clone()
965            .acquire_owned()
966            .instrument_await("acquire_creating_streaming_job_permit")
967            .await
968            .unwrap();
969        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
970
971        let name = streaming_job.name();
972        let definition = streaming_job.definition();
973        let source_id = match &streaming_job {
974            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
975            _ => None,
976        };
977
978        // create streaming job.
979        match self
980            .create_streaming_job_inner(
981                ctx,
982                streaming_job,
983                fragment_graph,
984                specific_resource_group,
985                permit,
986            )
987            .await
988        {
989            Ok(version) => Ok(version),
990            Err(err) => {
991                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
992                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
993                    id: job_id,
994                    name,
995                    definition,
996                    error: err.as_report().to_string(),
997                };
998                self.env.event_log_manager_ref().add_event_logs(vec![
999                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1000                ]);
1001                let (aborted, _) = self
1002                    .metadata_manager
1003                    .catalog_controller
1004                    .try_abort_creating_streaming_job(job_id, false)
1005                    .await?;
1006                if aborted {
1007                    tracing::warn!(id = %job_id, "aborted streaming job");
1008                    // FIXME: might also need other cleanup here
1009                    if let Some(source_id) = source_id {
1010                        self.source_manager
1011                            .apply_source_change(SourceChange::DropSource {
1012                                dropped_source_ids: vec![source_id],
1013                            })
1014                            .await;
1015                    }
1016                }
1017                Err(err)
1018            }
1019        }
1020    }
1021
1022    #[await_tree::instrument(boxed)]
1023    async fn create_streaming_job_inner(
1024        &self,
1025        ctx: StreamContext,
1026        mut streaming_job: StreamingJob,
1027        fragment_graph: StreamFragmentGraphProto,
1028        specific_resource_group: Option<String>,
1029        permit: OwnedSemaphorePermit,
1030    ) -> MetaResult<NotificationVersion> {
1031        let mut fragment_graph =
1032            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1033        streaming_job.set_info_from_graph(&fragment_graph);
1034
1035        // create internal table catalogs and refill table id.
1036        let incomplete_internal_tables = fragment_graph
1037            .incomplete_internal_tables()
1038            .into_values()
1039            .collect_vec();
1040        let table_id_map = self
1041            .metadata_manager
1042            .catalog_controller
1043            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1044            .await?;
1045        fragment_graph.refill_internal_table_ids(table_id_map);
1046
1047        // create fragment and actor catalogs.
1048        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1049        let (ctx, stream_job_fragments) = self
1050            .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1051            .await?;
1052
1053        let streaming_job = &ctx.streaming_job;
1054
1055        match streaming_job {
1056            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1057                self.validate_cdc_table(table, &stream_job_fragments)
1058                    .await?;
1059            }
1060            StreamingJob::Table(Some(source), ..) => {
1061                // Register the source on the connector node.
1062                self.source_manager.register_source(source).await?;
1063                let connector_name = source
1064                    .get_with_properties()
1065                    .get(UPSTREAM_SOURCE_KEY)
1066                    .cloned();
1067                let attr = source.info.as_ref().map(|source_info| {
1068                    jsonbb::json!({
1069                            "format": source_info.format().as_str_name(),
1070                            "encode": source_info.row_encode().as_str_name(),
1071                    })
1072                });
1073                report_create_object(
1074                    streaming_job.id(),
1075                    "source",
1076                    PbTelemetryDatabaseObject::Source,
1077                    connector_name,
1078                    attr,
1079                );
1080            }
1081            StreamingJob::Sink(sink) => {
1082                if sink.auto_refresh_schema_from_table.is_some() {
1083                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1084                }
1085                // Validate the sink on the connector node.
1086                validate_sink(sink).await?;
1087                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1088                let attr = sink.format_desc.as_ref().map(|sink_info| {
1089                    jsonbb::json!({
1090                        "format": sink_info.format().as_str_name(),
1091                        "encode": sink_info.encode().as_str_name(),
1092                    })
1093                });
1094                report_create_object(
1095                    streaming_job.id(),
1096                    "sink",
1097                    PbTelemetryDatabaseObject::Sink,
1098                    connector_name,
1099                    attr,
1100                );
1101            }
1102            StreamingJob::Source(source) => {
1103                // Register the source on the connector node.
1104                self.source_manager.register_source(source).await?;
1105                let connector_name = source
1106                    .get_with_properties()
1107                    .get(UPSTREAM_SOURCE_KEY)
1108                    .cloned();
1109                let attr = source.info.as_ref().map(|source_info| {
1110                    jsonbb::json!({
1111                            "format": source_info.format().as_str_name(),
1112                            "encode": source_info.row_encode().as_str_name(),
1113                    })
1114                });
1115                report_create_object(
1116                    streaming_job.id(),
1117                    "source",
1118                    PbTelemetryDatabaseObject::Source,
1119                    connector_name,
1120                    attr,
1121                );
1122            }
1123            _ => {}
1124        }
1125
1126        self.metadata_manager
1127            .catalog_controller
1128            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1129            .await?;
1130
1131        // create streaming jobs.
1132        let version = self
1133            .stream_manager
1134            .create_streaming_job(stream_job_fragments, ctx, permit)
1135            .await?;
1136
1137        Ok(version)
1138    }
1139
1140    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1141    pub async fn drop_object(
1142        &self,
1143        object_type: ObjectType,
1144        object_id: impl Into<ObjectId>,
1145        drop_mode: DropMode,
1146    ) -> MetaResult<NotificationVersion> {
1147        let object_id = object_id.into();
1148        let (release_ctx, version) = self
1149            .metadata_manager
1150            .catalog_controller
1151            .drop_object(object_type, object_id, drop_mode)
1152            .await?;
1153
1154        if object_type == ObjectType::Source {
1155            self.env
1156                .notification_manager_ref()
1157                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1158        }
1159
1160        let ReleaseContext {
1161            database_id,
1162            removed_streaming_job_ids,
1163            removed_state_table_ids,
1164            removed_source_ids,
1165            removed_secret_ids: secret_ids,
1166            removed_source_fragments,
1167            removed_actors,
1168            removed_fragments,
1169            removed_sink_fragment_by_targets,
1170            removed_iceberg_table_sinks,
1171        } = release_ctx;
1172
1173        let _guard = self.source_manager.pause_tick().await;
1174        self.stream_manager
1175            .drop_streaming_jobs(
1176                database_id,
1177                removed_actors.iter().map(|id| *id as _).collect(),
1178                removed_streaming_job_ids,
1179                removed_state_table_ids,
1180                removed_fragments.iter().map(|id| *id as _).collect(),
1181                removed_sink_fragment_by_targets
1182                    .into_iter()
1183                    .map(|(target, sinks)| {
1184                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1185                    })
1186                    .collect(),
1187            )
1188            .await;
1189
1190        // clean up sources after dropping streaming jobs.
1191        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1192        self.source_manager
1193            .apply_source_change(SourceChange::DropSource {
1194                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1195            })
1196            .await;
1197
1198        // unregister fragments and actors from source manager.
1199        // FIXME: need also unregister source backfill fragments.
1200        let dropped_source_fragments = removed_source_fragments;
1201        self.source_manager
1202            .apply_source_change(SourceChange::DropMv {
1203                dropped_source_fragments,
1204            })
1205            .await;
1206
1207        // clean up iceberg table sinks
1208        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1209            .iter()
1210            .map(|sink| sink.id)
1211            .collect();
1212
1213        for sink in removed_iceberg_table_sinks {
1214            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1215                .expect("Iceberg sink should be valid");
1216            let iceberg_sink =
1217                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1218            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1219                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1220                tracing::info!(
1221                    "dropping iceberg table {} for dropped sink",
1222                    table_identifier
1223                );
1224
1225                let _ = iceberg_catalog
1226                    .drop_table(&table_identifier)
1227                    .await
1228                    .inspect_err(|err| {
1229                        tracing::error!(
1230                            "failed to drop iceberg table {} during cleanup: {}",
1231                            table_identifier,
1232                            err.as_report()
1233                        );
1234                    });
1235            }
1236        }
1237
1238        // stop sink coordinators for iceberg table sinks
1239        if !iceberg_sink_ids.is_empty() {
1240            self.sink_manager
1241                .stop_sink_coordinator(iceberg_sink_ids)
1242                .await;
1243        }
1244
1245        // remove secrets.
1246        for secret in secret_ids {
1247            LocalSecretManager::global().remove_secret(secret);
1248        }
1249        Ok(version)
1250    }
1251
1252    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1253    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1254    pub async fn replace_job(
1255        &self,
1256        mut streaming_job: StreamingJob,
1257        fragment_graph: StreamFragmentGraphProto,
1258    ) -> MetaResult<NotificationVersion> {
1259        match &streaming_job {
1260            StreamingJob::Table(..)
1261            | StreamingJob::Source(..)
1262            | StreamingJob::MaterializedView(..) => {}
1263            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1264                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1265            }
1266        }
1267
1268        let job_id = streaming_job.id();
1269
1270        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1271        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1272
1273        // Ensure the max parallelism unchanged before replacing table.
1274        let original_max_parallelism = self
1275            .metadata_manager
1276            .get_job_max_parallelism(streaming_job.id())
1277            .await?;
1278        let fragment_graph = PbStreamFragmentGraph {
1279            max_parallelism: original_max_parallelism as _,
1280            ..fragment_graph
1281        };
1282
1283        // 1. build fragment graph.
1284        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1285        streaming_job.set_info_from_graph(&fragment_graph);
1286
1287        // make it immutable
1288        let streaming_job = streaming_job;
1289
1290        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1291            let auto_refresh_schema_sinks = self
1292                .metadata_manager
1293                .catalog_controller
1294                .get_sink_auto_refresh_schema_from(table.id)
1295                .await?;
1296            if !auto_refresh_schema_sinks.is_empty() {
1297                let original_table_columns = self
1298                    .metadata_manager
1299                    .catalog_controller
1300                    .get_table_columns(table.id)
1301                    .await?;
1302                // compare column id to find newly added columns
1303                let mut original_table_column_ids: HashSet<_> = original_table_columns
1304                    .iter()
1305                    .map(|col| col.column_id())
1306                    .collect();
1307                let newly_added_columns = table
1308                    .columns
1309                    .iter()
1310                    .filter(|col| {
1311                        !original_table_column_ids.remove(&ColumnId::new(
1312                            col.column_desc.as_ref().unwrap().column_id as _,
1313                        ))
1314                    })
1315                    .map(|col| ColumnCatalog::from(col.clone()))
1316                    .collect_vec();
1317                if !original_table_column_ids.is_empty() {
1318                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1319                }
1320                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1321                for sink in auto_refresh_schema_sinks {
1322                    let sink_job_fragments = self
1323                        .metadata_manager
1324                        .get_job_fragments_by_id(sink.id.as_job_id())
1325                        .await?;
1326                    if sink_job_fragments.fragments.len() != 1 {
1327                        return Err(anyhow!(
1328                            "auto schema refresh sink must have only one fragment, but got {}",
1329                            sink_job_fragments.fragments.len()
1330                        )
1331                        .into());
1332                    }
1333                    let original_sink_fragment =
1334                        sink_job_fragments.fragments.into_values().next().unwrap();
1335                    let (new_sink_fragment, new_schema, new_log_store_table) =
1336                        rewrite_refresh_schema_sink_fragment(
1337                            &original_sink_fragment,
1338                            &sink,
1339                            &newly_added_columns,
1340                            table,
1341                            fragment_graph.table_fragment_id(),
1342                            self.env.id_gen_manager(),
1343                            self.env.actor_id_generator(),
1344                        )?;
1345
1346                    assert_eq!(
1347                        original_sink_fragment.actors.len(),
1348                        new_sink_fragment.actors.len()
1349                    );
1350                    let actor_status = (0..original_sink_fragment.actors.len())
1351                        .map(|i| {
1352                            let worker_node_id = sink_job_fragments.actor_status
1353                                [&original_sink_fragment.actors[i].actor_id]
1354                                .location
1355                                .as_ref()
1356                                .unwrap()
1357                                .worker_node_id;
1358                            (
1359                                new_sink_fragment.actors[i].actor_id,
1360                                PbActorStatus {
1361                                    location: Some(PbActorLocation { worker_node_id }),
1362                                },
1363                            )
1364                        })
1365                        .collect();
1366
1367                    let streaming_job = StreamingJob::Sink(sink);
1368
1369                    let tmp_sink_id = self
1370                        .metadata_manager
1371                        .catalog_controller
1372                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1373                        .await?
1374                        .as_sink_id();
1375                    let StreamingJob::Sink(sink) = streaming_job else {
1376                        unreachable!()
1377                    };
1378
1379                    sinks.push(AutoRefreshSchemaSinkContext {
1380                        tmp_sink_id,
1381                        original_sink: sink,
1382                        original_fragment: original_sink_fragment,
1383                        new_schema,
1384                        newly_add_fields: newly_added_columns
1385                            .iter()
1386                            .map(|col| Field::from(&col.column_desc))
1387                            .collect(),
1388                        new_fragment: new_sink_fragment,
1389                        new_log_store_table,
1390                        actor_status,
1391                    });
1392                }
1393                Some(sinks)
1394            } else {
1395                None
1396            }
1397        } else {
1398            None
1399        };
1400
1401        let tmp_id = self
1402            .metadata_manager
1403            .catalog_controller
1404            .create_job_catalog_for_replace(
1405                &streaming_job,
1406                Some(&ctx),
1407                fragment_graph.specified_parallelism().as_ref(),
1408                Some(fragment_graph.max_parallelism()),
1409            )
1410            .await?;
1411
1412        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1413            sinks
1414                .iter()
1415                .map(|sink| sink.tmp_sink_id.as_object_id())
1416                .collect_vec()
1417        });
1418
1419        tracing::debug!(id = %job_id, "building replace streaming job");
1420        let mut updated_sink_catalogs = vec![];
1421
1422        let mut drop_table_connector_ctx = None;
1423        let result: MetaResult<_> = try {
1424            let (mut ctx, mut stream_job_fragments) = self
1425                .build_replace_job(
1426                    ctx,
1427                    &streaming_job,
1428                    fragment_graph,
1429                    tmp_id,
1430                    auto_refresh_schema_sinks,
1431                )
1432                .await?;
1433            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1434            let auto_refresh_schema_sink_finish_ctx =
1435                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1436                    sinks
1437                        .iter()
1438                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1439                            tmp_sink_id: sink.tmp_sink_id,
1440                            original_sink_id: sink.original_sink.id,
1441                            columns: sink.new_schema.clone(),
1442                            new_log_store_table: sink
1443                                .new_log_store_table
1444                                .as_ref()
1445                                .map(|table| (table.id, table.columns.clone())),
1446                        })
1447                        .collect()
1448                });
1449
1450            // Handle table that has incoming sinks.
1451            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1452                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1453                let upstream_infos = self
1454                    .metadata_manager
1455                    .catalog_controller
1456                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1457                    .await?;
1458                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1459
1460                for upstream_info in &upstream_infos {
1461                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1462                    ctx.upstream_fragment_downstreams
1463                        .entry(upstream_fragment_id)
1464                        .or_default()
1465                        .push(upstream_info.new_sink_downstream.clone());
1466                    if upstream_info.sink_original_target_columns.is_empty() {
1467                        updated_sink_catalogs.push(upstream_info.sink_id);
1468                    }
1469                }
1470            }
1471
1472            let replace_upstream = ctx.replace_upstream.clone();
1473
1474            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1475                let empty_downstreams = FragmentDownstreamRelation::default();
1476                for sink in sinks {
1477                    self.metadata_manager
1478                        .catalog_controller
1479                        .prepare_streaming_job(
1480                            sink.tmp_sink_id.as_job_id(),
1481                            || [&sink.new_fragment].into_iter(),
1482                            &empty_downstreams,
1483                            true,
1484                            None,
1485                        )
1486                        .await?;
1487                }
1488            }
1489
1490            self.metadata_manager
1491                .catalog_controller
1492                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1493                .await?;
1494
1495            self.stream_manager
1496                .replace_stream_job(stream_job_fragments, ctx)
1497                .await?;
1498            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1499        };
1500
1501        match result {
1502            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1503                let version = self
1504                    .metadata_manager
1505                    .catalog_controller
1506                    .finish_replace_streaming_job(
1507                        tmp_id,
1508                        streaming_job,
1509                        replace_upstream,
1510                        SinkIntoTableContext {
1511                            updated_sink_catalogs,
1512                        },
1513                        drop_table_connector_ctx.as_ref(),
1514                        auto_refresh_schema_sink_finish_ctx,
1515                    )
1516                    .await?;
1517                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1518                    self.source_manager
1519                        .apply_source_change(SourceChange::DropSource {
1520                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1521                        })
1522                        .await;
1523                }
1524                Ok(version)
1525            }
1526            Err(err) => {
1527                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1528                let _ = self.metadata_manager
1529                    .catalog_controller
1530                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1531                    .await.inspect_err(|err| {
1532                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1533                });
1534                Err(err)
1535            }
1536        }
1537    }
1538
1539    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1540    )]
1541    async fn drop_streaming_job(
1542        &self,
1543        job_id: StreamingJobId,
1544        drop_mode: DropMode,
1545    ) -> MetaResult<NotificationVersion> {
1546        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1547
1548        let (object_id, object_type) = match job_id {
1549            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1550            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1551            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1552            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1553        };
1554
1555        let job_status = self
1556            .metadata_manager
1557            .catalog_controller
1558            .get_streaming_job_status(job_id.id())
1559            .await?;
1560        let version = match job_status {
1561            JobStatus::Initial => {
1562                unreachable!(
1563                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1564                );
1565            }
1566            JobStatus::Creating => {
1567                self.stream_manager
1568                    .cancel_streaming_jobs(vec![job_id.id()])
1569                    .await?;
1570                IGNORED_NOTIFICATION_VERSION
1571            }
1572            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1573        };
1574
1575        Ok(version)
1576    }
1577
1578    /// Resolve the parallelism of the stream job based on the given information.
1579    ///
1580    /// Returns error if user specifies a parallelism that cannot be satisfied.
1581    fn resolve_stream_parallelism(
1582        &self,
1583        specified: Option<NonZeroUsize>,
1584        max: NonZeroUsize,
1585        cluster_info: &StreamingClusterInfo,
1586        resource_group: String,
1587    ) -> MetaResult<NonZeroUsize> {
1588        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1589        DdlController::resolve_stream_parallelism_inner(
1590            specified,
1591            max,
1592            available,
1593            &self.env.opts.default_parallelism,
1594            &resource_group,
1595        )
1596    }
1597
1598    fn resolve_stream_parallelism_inner(
1599        specified: Option<NonZeroUsize>,
1600        max: NonZeroUsize,
1601        available: Option<NonZeroUsize>,
1602        default_parallelism: &DefaultParallelism,
1603        resource_group: &str,
1604    ) -> MetaResult<NonZeroUsize> {
1605        let Some(available) = available else {
1606            bail_unavailable!(
1607                "no available slots to schedule in resource group \"{}\", \
1608                 have you allocated any compute nodes within this resource group?",
1609                resource_group
1610            );
1611        };
1612
1613        if let Some(specified) = specified {
1614            if specified > max {
1615                bail_invalid_parameter!(
1616                    "specified parallelism {} should not exceed max parallelism {}",
1617                    specified,
1618                    max,
1619                );
1620            }
1621            if specified > available {
1622                tracing::warn!(
1623                    resource_group,
1624                    specified_parallelism = specified.get(),
1625                    available_parallelism = available.get(),
1626                    "specified parallelism exceeds available slots, scheduling with specified value",
1627                );
1628            }
1629            return Ok(specified);
1630        }
1631
1632        // Use default parallelism when no specific parallelism is provided by the user.
1633        let default_parallelism = match default_parallelism {
1634            DefaultParallelism::Full => available,
1635            DefaultParallelism::Default(num) => {
1636                if *num > available {
1637                    tracing::warn!(
1638                        resource_group,
1639                        configured_parallelism = num.get(),
1640                        available_parallelism = available.get(),
1641                        "default parallelism exceeds available slots, scheduling with configured value",
1642                    );
1643                }
1644                *num
1645            }
1646        };
1647
1648        if default_parallelism > max {
1649            tracing::warn!(
1650                max_parallelism = max.get(),
1651                resource_group,
1652                "default parallelism exceeds max parallelism, capping to max",
1653            );
1654        }
1655        Ok(default_parallelism.min(max))
1656    }
1657
1658    /// Builds the actor graph:
1659    /// - Add the upstream fragments to the fragment graph
1660    /// - Schedule the fragments based on their distribution
1661    /// - Expand each fragment into one or several actors
1662    /// - Construct the fragment level backfill order control.
1663    #[await_tree::instrument]
1664    pub(crate) async fn build_stream_job(
1665        &self,
1666        stream_ctx: StreamContext,
1667        mut stream_job: StreamingJob,
1668        fragment_graph: StreamFragmentGraph,
1669        specific_resource_group: Option<String>,
1670    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1671        let id = stream_job.id();
1672        let specified_parallelism = fragment_graph.specified_parallelism();
1673        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1674
1675        // 1. Fragment Level ordering graph
1676        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1677
1678        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1679        // contains all information needed for building the actor graph.
1680
1681        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1682            fragment_graph.collect_snapshot_backfill_info()?;
1683        assert!(
1684            snapshot_backfill_info
1685                .iter()
1686                .chain([&cross_db_snapshot_backfill_info])
1687                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1688                .all(|backfill_epoch| backfill_epoch.is_none()),
1689            "should not set backfill epoch when initially build the job: {:?} {:?}",
1690            snapshot_backfill_info,
1691            cross_db_snapshot_backfill_info
1692        );
1693
1694        let locality_fragment_state_table_mapping =
1695            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1696
1697        // check if log store exists for all cross-db upstreams
1698        self.metadata_manager
1699            .catalog_controller
1700            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1701            .await?;
1702
1703        let upstream_table_ids = fragment_graph
1704            .dependent_table_ids()
1705            .iter()
1706            .filter(|id| {
1707                !cross_db_snapshot_backfill_info
1708                    .upstream_mv_table_id_to_backfill_epoch
1709                    .contains_key(id)
1710            })
1711            .cloned()
1712            .collect();
1713
1714        let (upstream_root_fragments, existing_actor_location) = self
1715            .metadata_manager
1716            .get_upstream_root_fragments(&upstream_table_ids)
1717            .await?;
1718
1719        if snapshot_backfill_info.is_some() {
1720            match stream_job {
1721                StreamingJob::MaterializedView(_)
1722                | StreamingJob::Sink(_)
1723                | StreamingJob::Index(_, _) => {}
1724                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1725                    return Err(
1726                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1727                    );
1728                }
1729            }
1730        }
1731
1732        let upstream_actors = upstream_root_fragments
1733            .values()
1734            .map(|(fragment, _)| {
1735                (
1736                    fragment.fragment_id,
1737                    fragment.actors.keys().copied().collect(),
1738                )
1739            })
1740            .collect();
1741
1742        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1743            fragment_graph,
1744            FragmentGraphUpstreamContext {
1745                upstream_root_fragments,
1746                upstream_actor_location: existing_actor_location,
1747            },
1748            (&stream_job).into(),
1749        )?;
1750        let resource_group = match specific_resource_group {
1751            None => {
1752                self.metadata_manager
1753                    .get_database_resource_group(stream_job.database_id())
1754                    .await?
1755            }
1756            Some(resource_group) => resource_group,
1757        };
1758
1759        // 3. Build the actor graph.
1760        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1761
1762        let parallelism = self.resolve_stream_parallelism(
1763            specified_parallelism,
1764            max_parallelism,
1765            &cluster_info,
1766            resource_group.clone(),
1767        )?;
1768
1769        let parallelism = self
1770            .env
1771            .system_params_reader()
1772            .await
1773            .adaptive_parallelism_strategy()
1774            .compute_target_parallelism(parallelism.get());
1775
1776        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1777        let actor_graph_builder = ActorGraphBuilder::new(
1778            id,
1779            resource_group,
1780            complete_graph,
1781            cluster_info,
1782            parallelism,
1783        )?;
1784
1785        let ActorGraphBuildResult {
1786            graph,
1787            downstream_fragment_relations,
1788            building_locations,
1789            upstream_fragment_downstreams,
1790            new_no_shuffle,
1791            replace_upstream,
1792            ..
1793        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1794        assert!(replace_upstream.is_empty());
1795
1796        // 4. Build the table fragments structure that will be persisted in the stream manager,
1797        // and the context that contains all information needed for building the
1798        // actors on the compute nodes.
1799
1800        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1801        // Otherwise, it defaults to FIXED based on deduction.
1802        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1803            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1804            _ => TableParallelism::Fixed(parallelism.get()),
1805        };
1806
1807        let stream_job_fragments = StreamJobFragments::new(
1808            id,
1809            graph,
1810            &building_locations.actor_locations,
1811            stream_ctx.clone(),
1812            table_parallelism,
1813            max_parallelism.get(),
1814        );
1815
1816        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1817            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1818        }
1819
1820        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1821            && let Ok(table_id) = sink.get_target_table()
1822        {
1823            let tables = self
1824                .metadata_manager
1825                .get_table_catalog_by_ids(&[*table_id])
1826                .await?;
1827            let target_table = tables
1828                .first()
1829                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1830            let sink_fragment = stream_job_fragments
1831                .sink_fragment()
1832                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1833            let mview_fragment_id = self
1834                .metadata_manager
1835                .catalog_controller
1836                .get_mview_fragment_by_id(table_id.as_job_id())
1837                .await?;
1838            let upstream_sink_info = build_upstream_sink_info(
1839                sink,
1840                sink_fragment.fragment_id as _,
1841                target_table,
1842                mview_fragment_id,
1843            )?;
1844            Some(upstream_sink_info)
1845        } else {
1846            None
1847        };
1848
1849        let mut cdc_table_snapshot_splits = None;
1850        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1851            && let Some((_, stream_cdc_scan)) =
1852                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1853        {
1854            {
1855                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1856                let splits = try_init_parallel_cdc_table_snapshot_splits(
1857                    table.id,
1858                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1859                    self.env.meta_store_ref(),
1860                    stream_cdc_scan.options.as_ref().unwrap(),
1861                    self.env.opts.cdc_table_split_init_insert_batch_size,
1862                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1863                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1864                )
1865                .await?;
1866                cdc_table_snapshot_splits = Some(splits);
1867            }
1868        }
1869
1870        let ctx = CreateStreamingJobContext {
1871            upstream_fragment_downstreams,
1872            new_no_shuffle,
1873            upstream_actors,
1874            building_locations,
1875            definition: stream_job.definition(),
1876            create_type: stream_job.create_type(),
1877            job_type: (&stream_job).into(),
1878            streaming_job: stream_job,
1879            new_upstream_sink,
1880            option: CreateStreamingJobOption {},
1881            snapshot_backfill_info,
1882            cross_db_snapshot_backfill_info,
1883            fragment_backfill_ordering,
1884            locality_fragment_state_table_mapping,
1885            cdc_table_snapshot_splits,
1886        };
1887
1888        Ok((
1889            ctx,
1890            StreamJobFragmentsToCreate {
1891                inner: stream_job_fragments,
1892                downstreams: downstream_fragment_relations,
1893            },
1894        ))
1895    }
1896
1897    /// `build_replace_table` builds a job replacement and returns the context and new job
1898    /// fragments.
1899    ///
1900    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1901    /// replacement is finished.
1902    pub(crate) async fn build_replace_job(
1903        &self,
1904        stream_ctx: StreamContext,
1905        stream_job: &StreamingJob,
1906        mut fragment_graph: StreamFragmentGraph,
1907        tmp_job_id: JobId,
1908        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1909    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1910        match &stream_job {
1911            StreamingJob::Table(..)
1912            | StreamingJob::Source(..)
1913            | StreamingJob::MaterializedView(..) => {}
1914            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1915                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1916            }
1917        }
1918
1919        let id = stream_job.id();
1920
1921        // check if performing drop table connector
1922        let mut drop_table_associated_source_id = None;
1923        if let StreamingJob::Table(None, _, _) = &stream_job {
1924            drop_table_associated_source_id = self
1925                .metadata_manager
1926                .get_table_associated_source_id(id.as_mv_table_id())
1927                .await?;
1928        }
1929
1930        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1931        let old_internal_table_ids = old_fragments.internal_table_ids();
1932
1933        // handle drop table's associated source
1934        let mut drop_table_connector_ctx = None;
1935        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1936            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1937            debug_assert!(old_internal_table_ids.len() == 1);
1938
1939            drop_table_connector_ctx = Some(DropTableConnectorContext {
1940                // we do not remove the original table catalog as it's still needed for the streaming job
1941                // just need to remove the ref to the state table
1942                to_change_streaming_job_id: id,
1943                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1944                to_remove_source_id,
1945            });
1946        } else if stream_job.is_materialized_view() {
1947            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1948            // but more robust.
1949            let old_fragments_upstreams = self
1950                .metadata_manager
1951                .catalog_controller
1952                .upstream_fragments(old_fragments.fragment_ids())
1953                .await?;
1954
1955            let old_state_graph =
1956                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1957            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1958            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1959                .context("incompatible altering on the streaming job states")?;
1960
1961            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1962            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1963        } else {
1964            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
1965            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
1966            let old_internal_tables = self
1967                .metadata_manager
1968                .get_table_catalog_by_ids(&old_internal_table_ids)
1969                .await?;
1970            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1971        }
1972
1973        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
1974        // graph that contains all information needed for building the actor graph.
1975        let original_root_fragment = old_fragments
1976            .root_fragment()
1977            .expect("root fragment not found");
1978
1979        let job_type = StreamingJobType::from(stream_job);
1980
1981        // Extract the downstream fragments from the fragment graph.
1982        let (mut downstream_fragments, mut downstream_actor_location) =
1983            self.metadata_manager.get_downstream_fragments(id).await?;
1984
1985        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1986            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1987                .iter()
1988                .map(|sink| sink.original_fragment.fragment_id)
1989                .collect();
1990            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
1991                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1992                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1993                }) {
1994                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1995                    for actor_id in downstream_fragment.actors.keys() {
1996                        downstream_actor_location.remove(actor_id);
1997                    }
1998                    for (actor_id, status) in &sink.actor_status {
1999                        downstream_actor_location
2000                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2001                    }
2002
2003                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2004                    *nodes = sink.new_fragment.nodes.clone();
2005                }
2006            }
2007            assert!(remaining_fragment.is_empty());
2008        }
2009
2010        // build complete graph based on the table job type
2011        let complete_graph = match &job_type {
2012            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2013                CompleteStreamFragmentGraph::with_downstreams(
2014                    fragment_graph,
2015                    FragmentGraphDownstreamContext {
2016                        original_root_fragment_id: original_root_fragment.fragment_id,
2017                        downstream_fragments,
2018                        downstream_actor_location,
2019                    },
2020                    job_type,
2021                )?
2022            }
2023            StreamingJobType::Table(TableJobType::SharedCdcSource)
2024            | StreamingJobType::MaterializedView => {
2025                // CDC tables or materialized views can have upstream jobs as well.
2026                let (upstream_root_fragments, upstream_actor_location) = self
2027                    .metadata_manager
2028                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2029                    .await?;
2030
2031                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2032                    fragment_graph,
2033                    FragmentGraphUpstreamContext {
2034                        upstream_root_fragments,
2035                        upstream_actor_location,
2036                    },
2037                    FragmentGraphDownstreamContext {
2038                        original_root_fragment_id: original_root_fragment.fragment_id,
2039                        downstream_fragments,
2040                        downstream_actor_location,
2041                    },
2042                    job_type,
2043                )?
2044            }
2045            _ => unreachable!(),
2046        };
2047
2048        let resource_group = self
2049            .metadata_manager
2050            .get_existing_job_resource_group(id)
2051            .await?;
2052
2053        // 2. Build the actor graph.
2054        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2055
2056        // XXX: what is this parallelism?
2057        // Is it "assigned parallelism"?
2058        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2059            .expect("The number of actors in the original table fragment should be greater than 0");
2060
2061        let actor_graph_builder = ActorGraphBuilder::new(
2062            id,
2063            resource_group,
2064            complete_graph,
2065            cluster_info,
2066            parallelism,
2067        )?;
2068
2069        let ActorGraphBuildResult {
2070            graph,
2071            downstream_fragment_relations,
2072            building_locations,
2073            upstream_fragment_downstreams,
2074            mut replace_upstream,
2075            new_no_shuffle,
2076            ..
2077        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2078
2079        // general table & source does not have upstream job, so the dispatchers should be empty
2080        if matches!(
2081            job_type,
2082            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2083        ) {
2084            assert!(upstream_fragment_downstreams.is_empty());
2085        }
2086
2087        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2088        // the context that contains all information needed for building the actors on the compute
2089        // nodes.
2090        let stream_job_fragments = StreamJobFragments::new(
2091            tmp_job_id,
2092            graph,
2093            &building_locations.actor_locations,
2094            stream_ctx,
2095            old_fragments.assigned_parallelism,
2096            old_fragments.max_parallelism,
2097        );
2098
2099        if let Some(sinks) = &auto_refresh_schema_sinks {
2100            for sink in sinks {
2101                replace_upstream
2102                    .remove(&sink.new_fragment.fragment_id)
2103                    .expect("should exist");
2104            }
2105        }
2106
2107        // Note: no need to set `vnode_count` as it's already set by the frontend.
2108        // See `get_replace_table_plan`.
2109
2110        let ctx = ReplaceStreamJobContext {
2111            old_fragments,
2112            replace_upstream,
2113            new_no_shuffle,
2114            upstream_fragment_downstreams,
2115            building_locations,
2116            streaming_job: stream_job.clone(),
2117            tmp_id: tmp_job_id,
2118            drop_table_connector_ctx,
2119            auto_refresh_schema_sinks,
2120        };
2121
2122        Ok((
2123            ctx,
2124            StreamJobFragmentsToCreate {
2125                inner: stream_job_fragments,
2126                downstreams: downstream_fragment_relations,
2127            },
2128        ))
2129    }
2130
2131    async fn alter_name(
2132        &self,
2133        relation: alter_name_request::Object,
2134        new_name: &str,
2135    ) -> MetaResult<NotificationVersion> {
2136        let (obj_type, id) = match relation {
2137            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2138            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2139            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2140            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2141            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2142            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2143            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2144            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2145        };
2146        self.metadata_manager
2147            .catalog_controller
2148            .alter_name(obj_type, id, new_name)
2149            .await
2150    }
2151
2152    async fn alter_swap_rename(
2153        &self,
2154        object: alter_swap_rename_request::Object,
2155    ) -> MetaResult<NotificationVersion> {
2156        let (obj_type, src_id, dst_id) = match object {
2157            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2158            alter_swap_rename_request::Object::Table(objs) => {
2159                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2160                (ObjectType::Table, src_id, dst_id)
2161            }
2162            alter_swap_rename_request::Object::View(objs) => {
2163                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2164                (ObjectType::View, src_id, dst_id)
2165            }
2166            alter_swap_rename_request::Object::Source(objs) => {
2167                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2168                (ObjectType::Source, src_id, dst_id)
2169            }
2170            alter_swap_rename_request::Object::Sink(objs) => {
2171                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2172                (ObjectType::Sink, src_id, dst_id)
2173            }
2174            alter_swap_rename_request::Object::Subscription(objs) => {
2175                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2176                (ObjectType::Subscription, src_id, dst_id)
2177            }
2178        };
2179
2180        self.metadata_manager
2181            .catalog_controller
2182            .alter_swap_rename(obj_type, src_id, dst_id)
2183            .await
2184    }
2185
2186    async fn alter_owner(
2187        &self,
2188        object: Object,
2189        owner_id: UserId,
2190    ) -> MetaResult<NotificationVersion> {
2191        let (obj_type, id) = match object {
2192            Object::TableId(id) => (ObjectType::Table, id),
2193            Object::ViewId(id) => (ObjectType::View, id),
2194            Object::SourceId(id) => (ObjectType::Source, id),
2195            Object::SinkId(id) => (ObjectType::Sink, id),
2196            Object::SchemaId(id) => (ObjectType::Schema, id),
2197            Object::DatabaseId(id) => (ObjectType::Database, id),
2198            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2199            Object::ConnectionId(id) => (ObjectType::Connection, id),
2200        };
2201        self.metadata_manager
2202            .catalog_controller
2203            .alter_owner(obj_type, id.into(), owner_id as _)
2204            .await
2205    }
2206
2207    async fn alter_set_schema(
2208        &self,
2209        object: alter_set_schema_request::Object,
2210        new_schema_id: SchemaId,
2211    ) -> MetaResult<NotificationVersion> {
2212        let (obj_type, id) = match object {
2213            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2214            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2215            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2216            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2217            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2218            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2219            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2220        };
2221        self.metadata_manager
2222            .catalog_controller
2223            .alter_schema(obj_type, id.into(), new_schema_id as _)
2224            .await
2225    }
2226
2227    pub async fn wait(&self) -> MetaResult<()> {
2228        let timeout_ms = 30 * 60 * 1000;
2229        for _ in 0..timeout_ms {
2230            if self
2231                .metadata_manager
2232                .catalog_controller
2233                .list_background_creating_jobs(true, None)
2234                .await?
2235                .is_empty()
2236            {
2237                return Ok(());
2238            }
2239
2240            sleep(Duration::from_millis(1)).await;
2241        }
2242        Err(MetaError::cancelled(format!(
2243            "timeout after {timeout_ms}ms"
2244        )))
2245    }
2246
2247    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2248        self.metadata_manager
2249            .catalog_controller
2250            .comment_on(comment)
2251            .await
2252    }
2253
2254    async fn alter_streaming_job_config(
2255        &self,
2256        job_id: JobId,
2257        entries_to_add: HashMap<String, String>,
2258        keys_to_remove: Vec<String>,
2259    ) -> MetaResult<NotificationVersion> {
2260        self.metadata_manager
2261            .catalog_controller
2262            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2263            .await
2264    }
2265}
2266
2267fn report_create_object(
2268    job_id: JobId,
2269    event_name: &str,
2270    obj_type: PbTelemetryDatabaseObject,
2271    connector_name: Option<String>,
2272    attr_info: Option<jsonbb::Value>,
2273) {
2274    report_event(
2275        PbTelemetryEventStage::CreateStreamJob,
2276        event_name,
2277        job_id.as_raw_id() as _,
2278        connector_name,
2279        Some(obj_type),
2280        attr_info,
2281    );
2282}
2283
2284pub fn build_upstream_sink_info(
2285    sink: &PbSink,
2286    sink_fragment_id: FragmentId,
2287    target_table: &PbTable,
2288    target_fragment_id: FragmentId,
2289) -> MetaResult<UpstreamSinkInfo> {
2290    let sink_columns = if !sink.original_target_columns.is_empty() {
2291        sink.original_target_columns.clone()
2292    } else {
2293        // This is due to the fact that the value did not exist in earlier versions,
2294        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2295        // Therefore the columns of the table at this point are `original_target_columns`.
2296        // This value of sink will be filled on the meta.
2297        target_table.columns.clone()
2298    };
2299
2300    let sink_output_fields = sink_columns
2301        .iter()
2302        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2303        .collect_vec();
2304    let output_indices = (0..sink_output_fields.len())
2305        .map(|i| i as u32)
2306        .collect_vec();
2307
2308    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2309        let sink_idx_by_col_id = sink_columns
2310            .iter()
2311            .enumerate()
2312            .map(|(idx, col)| {
2313                let column_id = col.column_desc.as_ref().unwrap().column_id;
2314                (column_id, idx as u32)
2315            })
2316            .collect::<HashMap<_, _>>();
2317        target_table
2318            .distribution_key
2319            .iter()
2320            .map(|dist_idx| {
2321                let column_id = target_table.columns[*dist_idx as usize]
2322                    .column_desc
2323                    .as_ref()
2324                    .unwrap()
2325                    .column_id;
2326                let sink_idx = sink_idx_by_col_id
2327                    .get(&column_id)
2328                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2329                Ok(*sink_idx)
2330            })
2331            .collect::<anyhow::Result<Vec<_>>>()?
2332    };
2333    let dist_key_indices =
2334        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2335    let downstream_fragment_id = target_fragment_id as _;
2336    let new_downstream_relation = DownstreamFragmentRelation {
2337        downstream_fragment_id,
2338        dispatcher_type: DispatcherType::Hash,
2339        dist_key_indices,
2340        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2341    };
2342    let current_target_columns = target_table.get_columns();
2343    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2344    Ok(UpstreamSinkInfo {
2345        sink_id: sink.id,
2346        sink_fragment_id: sink_fragment_id as _,
2347        sink_output_fields,
2348        sink_original_target_columns: sink.get_original_target_columns().clone(),
2349        project_exprs,
2350        new_sink_downstream: new_downstream_relation,
2351    })
2352}
2353
2354pub fn refill_upstream_sink_union_in_table(
2355    union_fragment_root: &mut PbStreamNode,
2356    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2357) {
2358    visit_stream_node_cont_mut(union_fragment_root, |node| {
2359        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2360            let init_upstreams = upstream_sink_infos
2361                .iter()
2362                .map(|info| PbUpstreamSinkInfo {
2363                    upstream_fragment_id: info.sink_fragment_id,
2364                    sink_output_schema: info.sink_output_fields.clone(),
2365                    project_exprs: info.project_exprs.clone(),
2366                })
2367                .collect();
2368            upstream_sink_union.init_upstreams = init_upstreams;
2369            false
2370        } else {
2371            true
2372        }
2373    });
2374}
2375
2376#[cfg(test)]
2377mod tests {
2378    use std::num::NonZeroUsize;
2379
2380    use super::*;
2381
2382    #[test]
2383    fn test_specified_parallelism_exceeds_available() {
2384        let result = DdlController::resolve_stream_parallelism_inner(
2385            Some(NonZeroUsize::new(100).unwrap()),
2386            NonZeroUsize::new(256).unwrap(),
2387            Some(NonZeroUsize::new(4).unwrap()),
2388            &DefaultParallelism::Full,
2389            "default",
2390        )
2391        .unwrap();
2392        assert_eq!(result.get(), 100);
2393    }
2394
2395    #[test]
2396    fn test_allows_default_parallelism_over_available() {
2397        let result = DdlController::resolve_stream_parallelism_inner(
2398            None,
2399            NonZeroUsize::new(256).unwrap(),
2400            Some(NonZeroUsize::new(4).unwrap()),
2401            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2402            "default",
2403        )
2404        .unwrap();
2405        assert_eq!(result.get(), 50);
2406    }
2407
2408    #[test]
2409    fn test_full_parallelism_capped_by_max() {
2410        let result = DdlController::resolve_stream_parallelism_inner(
2411            None,
2412            NonZeroUsize::new(6).unwrap(),
2413            Some(NonZeroUsize::new(10).unwrap()),
2414            &DefaultParallelism::Full,
2415            "default",
2416        )
2417        .unwrap();
2418        assert_eq!(result.get(), 6);
2419    }
2420
2421    #[test]
2422    fn test_no_available_slots_returns_error() {
2423        let result = DdlController::resolve_stream_parallelism_inner(
2424            None,
2425            NonZeroUsize::new(4).unwrap(),
2426            None,
2427            &DefaultParallelism::Full,
2428            "default",
2429        );
2430        assert!(matches!(
2431            result,
2432            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2433        ));
2434    }
2435
2436    #[test]
2437    fn test_specified_over_max_returns_error() {
2438        let result = DdlController::resolve_stream_parallelism_inner(
2439            Some(NonZeroUsize::new(8).unwrap()),
2440            NonZeroUsize::new(4).unwrap(),
2441            Some(NonZeroUsize::new(10).unwrap()),
2442            &DefaultParallelism::Full,
2443            "default",
2444        );
2445        assert!(matches!(
2446            result,
2447            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2448        ));
2449    }
2450}