risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::{
47    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
48    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
49};
50use risingwave_pb::catalog::{
51    Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
52    Subscription, Table, View,
53};
54use risingwave_pb::common::PbActorLocation;
55use risingwave_pb::ddl_service::alter_owner_request::Object;
56use risingwave_pb::ddl_service::{
57    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
58    alter_swap_rename_request,
59};
60use risingwave_pb::meta::table_fragments::PbActorStatus;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
68use strum::Display;
69use thiserror_ext::AsReport;
70use tokio::sync::{OwnedSemaphorePermit, Semaphore};
71use tokio::time::sleep;
72use tracing::Instrument;
73
74use crate::barrier::BarrierManagerRef;
75use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
76use crate::controller::cluster::StreamingClusterInfo;
77use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
78use crate::controller::utils::build_select_node_list;
79use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
80use crate::manager::{
81    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82    NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
86    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87    TableParallelism,
88};
89use crate::stream::cdc::{
90    is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105    Restrict,
106    Cascade,
107}
108
109impl DropMode {
110    pub fn from_request_setting(cascade: bool) -> DropMode {
111        if cascade {
112            DropMode::Cascade
113        } else {
114            DropMode::Restrict
115        }
116    }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121    MaterializedView(TableId),
122    Sink(SinkId),
123    Table(Option<SourceId>, TableId),
124    Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{}", self.as_ref())?;
130        write!(f, "({})", self.id())
131    }
132}
133
134impl StreamingJobId {
135    fn id(&self) -> JobId {
136        match self {
137            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138            StreamingJobId::Index(id) => id.as_job_id(),
139            StreamingJobId::Sink(id) => id.as_job_id(),
140        }
141    }
142}
143
144/// It’s used to describe the information of the job that needs to be replaced
145/// and it will be used during replacing table and creating sink into table operations.
146pub struct ReplaceStreamJobInfo {
147    pub streaming_job: StreamingJob,
148    pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153    CreateDatabase(Database),
154    DropDatabase(DatabaseId),
155    CreateSchema(Schema),
156    DropSchema(SchemaId, DropMode),
157    CreateNonSharedSource(Source),
158    DropSource(SourceId, DropMode),
159    CreateFunction(Function),
160    DropFunction(FunctionId, DropMode),
161    CreateView(View, HashSet<ObjectId>),
162    DropView(ViewId, DropMode),
163    CreateStreamingJob {
164        stream_job: StreamingJob,
165        fragment_graph: StreamFragmentGraphProto,
166        dependencies: HashSet<ObjectId>,
167        specific_resource_group: Option<String>, // specific resource group
168        if_not_exists: bool,
169    },
170    DropStreamingJob {
171        job_id: StreamingJobId,
172        drop_mode: DropMode,
173    },
174    AlterName(alter_name_request::Object, String),
175    AlterSwapRename(alter_swap_rename_request::Object),
176    ReplaceStreamJob(ReplaceStreamJobInfo),
177    AlterNonSharedSource(Source),
178    AlterObjectOwner(Object, UserId),
179    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180    CreateConnection(Connection),
181    DropConnection(ConnectionId, DropMode),
182    CreateSecret(Secret),
183    AlterSecret(Secret),
184    DropSecret(SecretId),
185    CommentOn(Comment),
186    CreateSubscription(Subscription),
187    DropSubscription(SubscriptionId, DropMode),
188    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
190}
191
192impl DdlCommand {
193    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
194    fn object(&self) -> Either<String, ObjectId> {
195        use Either::*;
196        match self {
197            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
198            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
199            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
200            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
201            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
202            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
203            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
204            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
205            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
206            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
207            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
208            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
209            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
210            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
211            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
212            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
213            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
214            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
215            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
216            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
217            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
218            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
219            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
220            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
221            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
222            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
223            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
224            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
225        }
226    }
227
228    fn allow_in_recovery(&self) -> bool {
229        match self {
230            DdlCommand::DropDatabase(_)
231            | DdlCommand::DropSchema(_, _)
232            | DdlCommand::DropSource(_, _)
233            | DdlCommand::DropFunction(_, _)
234            | DdlCommand::DropView(_, _)
235            | DdlCommand::DropStreamingJob { .. }
236            | DdlCommand::DropConnection(_, _)
237            | DdlCommand::DropSecret(_)
238            | DdlCommand::DropSubscription(_, _)
239            | DdlCommand::AlterName(_, _)
240            | DdlCommand::AlterObjectOwner(_, _)
241            | DdlCommand::AlterSetSchema(_, _)
242            | DdlCommand::CreateDatabase(_)
243            | DdlCommand::CreateSchema(_)
244            | DdlCommand::CreateFunction(_)
245            | DdlCommand::CreateView(_, _)
246            | DdlCommand::CreateConnection(_)
247            | DdlCommand::CommentOn(_)
248            | DdlCommand::CreateSecret(_)
249            | DdlCommand::AlterSecret(_)
250            | DdlCommand::AlterSwapRename(_)
251            | DdlCommand::AlterDatabaseParam(_, _)
252            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
253            DdlCommand::CreateStreamingJob { .. }
254            | DdlCommand::CreateNonSharedSource(_)
255            | DdlCommand::ReplaceStreamJob(_)
256            | DdlCommand::AlterNonSharedSource(_)
257            | DdlCommand::CreateSubscription(_) => false,
258        }
259    }
260}
261
262#[derive(Clone)]
263pub struct DdlController {
264    pub(crate) env: MetaSrvEnv,
265
266    pub(crate) metadata_manager: MetadataManager,
267    pub(crate) stream_manager: GlobalStreamManagerRef,
268    pub(crate) source_manager: SourceManagerRef,
269    barrier_manager: BarrierManagerRef,
270
271    // The semaphore is used to limit the number of concurrent streaming job creation.
272    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
273
274    /// Sequence number for DDL commands, used for observability and debugging.
275    seq: Arc<AtomicU64>,
276}
277
278#[derive(Clone)]
279pub struct CreatingStreamingJobPermit {
280    pub(crate) semaphore: Arc<Semaphore>,
281}
282
283impl CreatingStreamingJobPermit {
284    async fn new(env: &MetaSrvEnv) -> Self {
285        let mut permits = env
286            .system_params_reader()
287            .await
288            .max_concurrent_creating_streaming_jobs() as usize;
289        if permits == 0 {
290            // if the system parameter is set to zero, use the max permitted value.
291            permits = Semaphore::MAX_PERMITS;
292        }
293        let semaphore = Arc::new(Semaphore::new(permits));
294
295        let (local_notification_tx, mut local_notification_rx) =
296            tokio::sync::mpsc::unbounded_channel();
297        env.notification_manager()
298            .insert_local_sender(local_notification_tx);
299        let semaphore_clone = semaphore.clone();
300        tokio::spawn(async move {
301            while let Some(notification) = local_notification_rx.recv().await {
302                let LocalNotification::SystemParamsChange(p) = &notification else {
303                    continue;
304                };
305                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
306                if new_permits == 0 {
307                    new_permits = Semaphore::MAX_PERMITS;
308                }
309                match permits.cmp(&new_permits) {
310                    Ordering::Less => {
311                        semaphore_clone.add_permits(new_permits - permits);
312                    }
313                    Ordering::Equal => continue,
314                    Ordering::Greater => {
315                        let to_release = permits - new_permits;
316                        let reduced = semaphore_clone.forget_permits(to_release);
317                        // TODO: implement dynamic semaphore with limits by ourself.
318                        if reduced != to_release {
319                            tracing::warn!(
320                                "no enough permits to release, expected {}, but reduced {}",
321                                to_release,
322                                reduced
323                            );
324                        }
325                    }
326                }
327                tracing::info!(
328                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
329                    permits,
330                    new_permits
331                );
332                permits = new_permits;
333            }
334        });
335
336        Self { semaphore }
337    }
338}
339
340impl DdlController {
341    pub async fn new(
342        env: MetaSrvEnv,
343        metadata_manager: MetadataManager,
344        stream_manager: GlobalStreamManagerRef,
345        source_manager: SourceManagerRef,
346        barrier_manager: BarrierManagerRef,
347    ) -> Self {
348        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
349        Self {
350            env,
351            metadata_manager,
352            stream_manager,
353            source_manager,
354            barrier_manager,
355            creating_streaming_job_permits,
356            seq: Arc::new(AtomicU64::new(0)),
357        }
358    }
359
360    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
361    pub fn next_seq(&self) -> u64 {
362        // This is a simple atomic increment operation.
363        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
364    }
365
366    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
367    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
368    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
369    /// would be a huge hassle and pain if we don't spawn here.
370    ///
371    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
372    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
373        if !command.allow_in_recovery() {
374            self.barrier_manager.check_status_running()?;
375        }
376
377        let await_tree_key = format!("DDL Command {}", self.next_seq());
378        let await_tree_span = await_tree::span!("{command}({})", command.object());
379
380        let ctrl = self.clone();
381        let fut = async move {
382            match command {
383                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
384                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
385                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
386                DdlCommand::DropSchema(schema_id, drop_mode) => {
387                    ctrl.drop_schema(schema_id, drop_mode).await
388                }
389                DdlCommand::CreateNonSharedSource(source) => {
390                    ctrl.create_non_shared_source(source).await
391                }
392                DdlCommand::DropSource(source_id, drop_mode) => {
393                    ctrl.drop_source(source_id, drop_mode).await
394                }
395                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
396                DdlCommand::DropFunction(function_id, drop_mode) => {
397                    ctrl.drop_function(function_id, drop_mode).await
398                }
399                DdlCommand::CreateView(view, dependencies) => {
400                    ctrl.create_view(view, dependencies).await
401                }
402                DdlCommand::DropView(view_id, drop_mode) => {
403                    ctrl.drop_view(view_id, drop_mode).await
404                }
405                DdlCommand::CreateStreamingJob {
406                    stream_job,
407                    fragment_graph,
408                    dependencies,
409                    specific_resource_group,
410                    if_not_exists,
411                } => {
412                    ctrl.create_streaming_job(
413                        stream_job,
414                        fragment_graph,
415                        dependencies,
416                        specific_resource_group,
417                        if_not_exists,
418                    )
419                    .await
420                }
421                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
422                    ctrl.drop_streaming_job(job_id, drop_mode).await
423                }
424                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
425                    streaming_job,
426                    fragment_graph,
427                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
428                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
429                DdlCommand::AlterObjectOwner(object, owner_id) => {
430                    ctrl.alter_owner(object, owner_id).await
431                }
432                DdlCommand::AlterSetSchema(object, new_schema_id) => {
433                    ctrl.alter_set_schema(object, new_schema_id).await
434                }
435                DdlCommand::CreateConnection(connection) => {
436                    ctrl.create_connection(connection).await
437                }
438                DdlCommand::DropConnection(connection_id, drop_mode) => {
439                    ctrl.drop_connection(connection_id, drop_mode).await
440                }
441                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
442                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
443                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
444                DdlCommand::AlterNonSharedSource(source) => {
445                    ctrl.alter_non_shared_source(source).await
446                }
447                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
448                DdlCommand::CreateSubscription(subscription) => {
449                    ctrl.create_subscription(subscription).await
450                }
451                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
452                    ctrl.drop_subscription(subscription_id, drop_mode).await
453                }
454                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
455                DdlCommand::AlterDatabaseParam(database_id, param) => {
456                    ctrl.alter_database_param(database_id, param).await
457                }
458                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
459                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
460                        .await
461                }
462            }
463        }
464        .in_current_span();
465        let fut = (self.env.await_tree_reg())
466            .register(await_tree_key, await_tree_span)
467            .instrument(Box::pin(fut));
468        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
469        Ok(Some(WaitVersion {
470            catalog_version: notification_version,
471            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
472        }))
473    }
474
475    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
476        self.barrier_manager.get_ddl_progress().await
477    }
478
479    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
480        let (version, updated_db) = self
481            .metadata_manager
482            .catalog_controller
483            .create_database(database)
484            .await?;
485        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
486        self.barrier_manager
487            .update_database_barrier(
488                updated_db.database_id,
489                updated_db.barrier_interval_ms.map(|v| v as u32),
490                updated_db.checkpoint_frequency.map(|v| v as u64),
491            )
492            .await?;
493        Ok(version)
494    }
495
496    #[tracing::instrument(skip(self), level = "debug")]
497    pub async fn reschedule_streaming_job(
498        &self,
499        job_id: JobId,
500        target: ReschedulePolicy,
501        mut deferred: bool,
502    ) -> MetaResult<()> {
503        tracing::info!("altering parallelism for job {}", job_id);
504        if self.barrier_manager.check_status_running().is_err() {
505            tracing::info!(
506                "alter parallelism is set to deferred mode because the system is in recovery state"
507            );
508            deferred = true;
509        }
510
511        self.stream_manager
512            .reschedule_streaming_job(job_id, target, deferred)
513            .await
514    }
515
516    pub async fn reschedule_cdc_table_backfill(
517        &self,
518        job_id: JobId,
519        target: ReschedulePolicy,
520    ) -> MetaResult<()> {
521        tracing::info!("alter CDC table backfill parallelism");
522        if self.barrier_manager.check_status_running().is_err() {
523            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
524        }
525        self.stream_manager
526            .reschedule_cdc_table_backfill(job_id, target)
527            .await
528    }
529
530    pub async fn reschedule_fragments(
531        &self,
532        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
533    ) -> MetaResult<()> {
534        tracing::info!(
535            "altering parallelism for fragments {:?}",
536            fragment_targets.keys()
537        );
538        let fragment_targets = fragment_targets
539            .into_iter()
540            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
541            .collect();
542
543        self.stream_manager
544            .reschedule_fragments(fragment_targets)
545            .await
546    }
547
548    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
549        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
550            .await
551    }
552
553    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
554        self.metadata_manager
555            .catalog_controller
556            .create_schema(schema)
557            .await
558    }
559
560    async fn drop_schema(
561        &self,
562        schema_id: SchemaId,
563        drop_mode: DropMode,
564    ) -> MetaResult<NotificationVersion> {
565        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
566            .await
567    }
568
569    /// Shared source is handled in [`Self::create_streaming_job`]
570    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
571        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
572            .await
573            .context("failed to create source worker")?;
574
575        let (source_id, version) = self
576            .metadata_manager
577            .catalog_controller
578            .create_source(source)
579            .await?;
580        self.source_manager
581            .register_source_with_handle(source_id, handle)
582            .await;
583        Ok(version)
584    }
585
586    async fn drop_source(
587        &self,
588        source_id: SourceId,
589        drop_mode: DropMode,
590    ) -> MetaResult<NotificationVersion> {
591        self.drop_object(ObjectType::Source, source_id, drop_mode)
592            .await
593    }
594
595    /// This replaces the source in the catalog.
596    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
597    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
598        self.metadata_manager
599            .catalog_controller
600            .alter_non_shared_source(source)
601            .await
602    }
603
604    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
605        self.metadata_manager
606            .catalog_controller
607            .create_function(function)
608            .await
609    }
610
611    async fn drop_function(
612        &self,
613        function_id: FunctionId,
614        drop_mode: DropMode,
615    ) -> MetaResult<NotificationVersion> {
616        self.drop_object(ObjectType::Function, function_id, drop_mode)
617            .await
618    }
619
620    async fn create_view(
621        &self,
622        view: View,
623        dependencies: HashSet<ObjectId>,
624    ) -> MetaResult<NotificationVersion> {
625        self.metadata_manager
626            .catalog_controller
627            .create_view(view, dependencies)
628            .await
629    }
630
631    async fn drop_view(
632        &self,
633        view_id: ViewId,
634        drop_mode: DropMode,
635    ) -> MetaResult<NotificationVersion> {
636        self.drop_object(ObjectType::View, view_id, drop_mode).await
637    }
638
639    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
640        validate_connection(&connection).await?;
641        self.metadata_manager
642            .catalog_controller
643            .create_connection(connection)
644            .await
645    }
646
647    async fn drop_connection(
648        &self,
649        connection_id: ConnectionId,
650        drop_mode: DropMode,
651    ) -> MetaResult<NotificationVersion> {
652        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
653            .await
654    }
655
656    async fn alter_database_param(
657        &self,
658        database_id: DatabaseId,
659        param: AlterDatabaseParam,
660    ) -> MetaResult<NotificationVersion> {
661        let (version, updated_db) = self
662            .metadata_manager
663            .catalog_controller
664            .alter_database_param(database_id, param)
665            .await?;
666        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
667        self.barrier_manager
668            .update_database_barrier(
669                database_id,
670                updated_db.barrier_interval_ms.map(|v| v as u32),
671                updated_db.checkpoint_frequency.map(|v| v as u64),
672            )
673            .await?;
674        Ok(version)
675    }
676
677    // The 'secret' part of the request we receive from the frontend is in plaintext;
678    // here, we need to encrypt it before storing it in the catalog.
679    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
680        let secret_store_private_key = self
681            .env
682            .opts
683            .secret_store_private_key
684            .clone()
685            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
686
687        let encrypted_payload = SecretEncryption::encrypt(
688            secret_store_private_key.as_slice(),
689            secret.get_value().as_slice(),
690        )
691        .context(format!("failed to encrypt secret {}", secret.name))?;
692        Ok(encrypted_payload
693            .serialize()
694            .context(format!("failed to serialize secret {}", secret.name))?)
695    }
696
697    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
698        // The 'secret' part of the request we receive from the frontend is in plaintext;
699        // here, we need to encrypt it before storing it in the catalog.
700        let secret_plain_payload = secret.value.clone();
701        let encrypted_payload = self.get_encrypted_payload(&secret)?;
702        secret.value = encrypted_payload;
703
704        self.metadata_manager
705            .catalog_controller
706            .create_secret(secret, secret_plain_payload)
707            .await
708    }
709
710    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
711        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
712            .await
713    }
714
715    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
716        let secret_plain_payload = secret.value.clone();
717        let encrypted_payload = self.get_encrypted_payload(&secret)?;
718        secret.value = encrypted_payload;
719        self.metadata_manager
720            .catalog_controller
721            .alter_secret(secret, secret_plain_payload)
722            .await
723    }
724
725    async fn create_subscription(
726        &self,
727        mut subscription: Subscription,
728    ) -> MetaResult<NotificationVersion> {
729        tracing::debug!("create subscription");
730        let _permit = self
731            .creating_streaming_job_permits
732            .semaphore
733            .acquire()
734            .await
735            .unwrap();
736        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
737        self.metadata_manager
738            .catalog_controller
739            .create_subscription_catalog(&mut subscription)
740            .await?;
741        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
742            tracing::debug!(error = %err.as_report(), "failed to create subscription");
743            let _ = self
744                .metadata_manager
745                .catalog_controller
746                .try_abort_creating_subscription(subscription.id)
747                .await
748                .inspect_err(|e| {
749                    tracing::error!(
750                        error = %e.as_report(),
751                        "failed to abort create subscription after failure"
752                    );
753                });
754            return Err(err);
755        }
756
757        let version = self
758            .metadata_manager
759            .catalog_controller
760            .notify_create_subscription(subscription.id)
761            .await?;
762        tracing::debug!("finish create subscription");
763        Ok(version)
764    }
765
766    async fn drop_subscription(
767        &self,
768        subscription_id: SubscriptionId,
769        drop_mode: DropMode,
770    ) -> MetaResult<NotificationVersion> {
771        tracing::debug!("preparing drop subscription");
772        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
773        let subscription = self
774            .metadata_manager
775            .catalog_controller
776            .get_subscription_by_id(subscription_id)
777            .await?;
778        let table_id = subscription.dependent_table_id;
779        let database_id = subscription.database_id;
780        let (_, version) = self
781            .metadata_manager
782            .catalog_controller
783            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
784            .await?;
785        self.stream_manager
786            .drop_subscription(database_id, subscription_id, table_id)
787            .await;
788        tracing::debug!("finish drop subscription");
789        Ok(version)
790    }
791
792    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
793    #[await_tree::instrument]
794    pub(crate) async fn validate_cdc_table(
795        &self,
796        table: &Table,
797        table_fragments: &StreamJobFragments,
798    ) -> MetaResult<()> {
799        let stream_scan_fragment = table_fragments
800            .fragments
801            .values()
802            .filter(|f| {
803                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
804                    || f.fragment_type_mask
805                        .contains(FragmentTypeFlag::StreamCdcScan)
806            })
807            .exactly_one()
808            .ok()
809            .with_context(|| {
810                format!(
811                    "expect exactly one stream scan fragment, got: {:?}",
812                    table_fragments.fragments
813                )
814            })?;
815        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
816            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
817                if let Some(o) = node.options
818                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
819                {
820                    // Use parallel CDC backfill.
821                } else {
822                    assert_eq!(
823                        stream_scan_fragment.actors.len(),
824                        1,
825                        "Stream scan fragment should have only one actor"
826                    );
827                }
828            }
829        }
830        let mut found_cdc_scan = false;
831        match &stream_scan_fragment.nodes.node_body {
832            Some(NodeBody::StreamCdcScan(_)) => {
833                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
834                if self
835                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
836                    .await?
837                {
838                    found_cdc_scan = true;
839                }
840            }
841            // When there's generated columns, the cdc scan node is wrapped in a project node
842            Some(NodeBody::Project(_)) => {
843                for input in &stream_scan_fragment.nodes.input {
844                    assert_parallelism(stream_scan_fragment, &input.node_body);
845                    if self
846                        .validate_cdc_table_inner(&input.node_body, table.id)
847                        .await?
848                    {
849                        found_cdc_scan = true;
850                    }
851                }
852            }
853            _ => {
854                bail!("Unexpected node body for stream cdc scan");
855            }
856        };
857        if !found_cdc_scan {
858            bail!("No stream cdc scan node found in stream scan fragment");
859        }
860        Ok(())
861    }
862
863    async fn validate_cdc_table_inner(
864        &self,
865        node_body: &Option<NodeBody>,
866        table_id: TableId,
867    ) -> MetaResult<bool> {
868        let meta_store = self.env.meta_store_ref();
869        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
870            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
871        {
872            let options_with_secret = WithOptionsSecResolved::new(
873                cdc_table_desc.connect_properties.clone(),
874                cdc_table_desc.secret_refs.clone(),
875            );
876
877            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
878            props.init_from_pb_cdc_table_desc(cdc_table_desc);
879
880            // Try creating a split enumerator to validate
881            let _enumerator = props
882                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
883                .await?;
884
885            if is_parallelized_backfill_enabled(stream_cdc_scan) {
886                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
887                try_init_parallel_cdc_table_snapshot_splits(
888                    table_id,
889                    cdc_table_desc,
890                    meta_store,
891                    &stream_cdc_scan.options,
892                    self.env.opts.cdc_table_split_init_insert_batch_size,
893                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
894                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
895                )
896                .await?;
897            }
898
899            tracing::debug!(?table_id, "validate cdc table success");
900            Ok(true)
901        } else {
902            Ok(false)
903        }
904    }
905
906    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
907        let migrated = self
908            .metadata_manager
909            .catalog_controller
910            .has_table_been_migrated(table_id)
911            .await?;
912        if !migrated {
913            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
914        } else {
915            Ok(())
916        }
917    }
918
919    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
920    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
921    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
922    pub async fn create_streaming_job(
923        &self,
924        mut streaming_job: StreamingJob,
925        fragment_graph: StreamFragmentGraphProto,
926        dependencies: HashSet<ObjectId>,
927        specific_resource_group: Option<String>,
928        if_not_exists: bool,
929    ) -> MetaResult<NotificationVersion> {
930        if let StreamingJob::Sink(sink) = &streaming_job
931            && let Some(target_table) = sink.target_table
932        {
933            self.validate_table_for_sink(target_table).await?;
934        }
935        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
936        let check_ret = self
937            .metadata_manager
938            .catalog_controller
939            .create_job_catalog(
940                &mut streaming_job,
941                &ctx,
942                &fragment_graph.parallelism,
943                fragment_graph.max_parallelism as _,
944                dependencies,
945                specific_resource_group.clone(),
946            )
947            .await;
948        if let Err(meta_err) = check_ret {
949            if !if_not_exists {
950                return Err(meta_err);
951            }
952            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
953                if streaming_job.create_type() == CreateType::Foreground {
954                    let database_id = streaming_job.database_id();
955                    self.metadata_manager
956                        .wait_streaming_job_finished(database_id, *job_id)
957                        .await
958                } else {
959                    Ok(IGNORED_NOTIFICATION_VERSION)
960                }
961            } else {
962                Err(meta_err)
963            };
964        }
965        let job_id = streaming_job.id();
966        tracing::debug!(
967            id = %job_id,
968            definition = streaming_job.definition(),
969            create_type = streaming_job.create_type().as_str_name(),
970            job_type = ?streaming_job.job_type(),
971            "starting streaming job",
972        );
973        // TODO: acquire permits for recovered background DDLs.
974        let permit = self
975            .creating_streaming_job_permits
976            .semaphore
977            .clone()
978            .acquire_owned()
979            .instrument_await("acquire_creating_streaming_job_permit")
980            .await
981            .unwrap();
982        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
983
984        let name = streaming_job.name();
985        let definition = streaming_job.definition();
986        let source_id = match &streaming_job {
987            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
988            _ => None,
989        };
990
991        // create streaming job.
992        match self
993            .create_streaming_job_inner(
994                ctx,
995                streaming_job,
996                fragment_graph,
997                specific_resource_group,
998                permit,
999            )
1000            .await
1001        {
1002            Ok(version) => Ok(version),
1003            Err(err) => {
1004                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1005                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1006                    id: job_id,
1007                    name,
1008                    definition,
1009                    error: err.as_report().to_string(),
1010                };
1011                self.env.event_log_manager_ref().add_event_logs(vec![
1012                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1013                ]);
1014                let (aborted, _) = self
1015                    .metadata_manager
1016                    .catalog_controller
1017                    .try_abort_creating_streaming_job(job_id, false)
1018                    .await?;
1019                if aborted {
1020                    tracing::warn!(id = %job_id, "aborted streaming job");
1021                    // FIXME: might also need other cleanup here
1022                    if let Some(source_id) = source_id {
1023                        self.source_manager
1024                            .apply_source_change(SourceChange::DropSource {
1025                                dropped_source_ids: vec![source_id],
1026                            })
1027                            .await;
1028                    }
1029                }
1030                Err(err)
1031            }
1032        }
1033    }
1034
1035    #[await_tree::instrument(boxed)]
1036    async fn create_streaming_job_inner(
1037        &self,
1038        ctx: StreamContext,
1039        mut streaming_job: StreamingJob,
1040        fragment_graph: StreamFragmentGraphProto,
1041        specific_resource_group: Option<String>,
1042        permit: OwnedSemaphorePermit,
1043    ) -> MetaResult<NotificationVersion> {
1044        let mut fragment_graph =
1045            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1046        streaming_job.set_info_from_graph(&fragment_graph);
1047
1048        // create internal table catalogs and refill table id.
1049        let incomplete_internal_tables = fragment_graph
1050            .incomplete_internal_tables()
1051            .into_values()
1052            .collect_vec();
1053        let table_id_map = self
1054            .metadata_manager
1055            .catalog_controller
1056            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1057            .await?;
1058        fragment_graph.refill_internal_table_ids(table_id_map);
1059
1060        // create fragment and actor catalogs.
1061        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1062        let (ctx, stream_job_fragments) = self
1063            .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1064            .await?;
1065
1066        let streaming_job = &ctx.streaming_job;
1067
1068        match streaming_job {
1069            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1070                self.validate_cdc_table(table, &stream_job_fragments)
1071                    .await?;
1072            }
1073            StreamingJob::Table(Some(source), ..) => {
1074                // Register the source on the connector node.
1075                self.source_manager.register_source(source).await?;
1076                let connector_name = source
1077                    .get_with_properties()
1078                    .get(UPSTREAM_SOURCE_KEY)
1079                    .cloned();
1080                let attr = source.info.as_ref().map(|source_info| {
1081                    jsonbb::json!({
1082                            "format": source_info.format().as_str_name(),
1083                            "encode": source_info.row_encode().as_str_name(),
1084                    })
1085                });
1086                report_create_object(
1087                    streaming_job.id(),
1088                    "source",
1089                    PbTelemetryDatabaseObject::Source,
1090                    connector_name,
1091                    attr,
1092                );
1093            }
1094            StreamingJob::Sink(sink) => {
1095                if sink.auto_refresh_schema_from_table.is_some() {
1096                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1097                }
1098                // Validate the sink on the connector node.
1099                validate_sink(sink).await?;
1100                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1101                let attr = sink.format_desc.as_ref().map(|sink_info| {
1102                    jsonbb::json!({
1103                        "format": sink_info.format().as_str_name(),
1104                        "encode": sink_info.encode().as_str_name(),
1105                    })
1106                });
1107                report_create_object(
1108                    streaming_job.id(),
1109                    "sink",
1110                    PbTelemetryDatabaseObject::Sink,
1111                    connector_name,
1112                    attr,
1113                );
1114            }
1115            StreamingJob::Source(source) => {
1116                // Register the source on the connector node.
1117                self.source_manager.register_source(source).await?;
1118                let connector_name = source
1119                    .get_with_properties()
1120                    .get(UPSTREAM_SOURCE_KEY)
1121                    .cloned();
1122                let attr = source.info.as_ref().map(|source_info| {
1123                    jsonbb::json!({
1124                            "format": source_info.format().as_str_name(),
1125                            "encode": source_info.row_encode().as_str_name(),
1126                    })
1127                });
1128                report_create_object(
1129                    streaming_job.id(),
1130                    "source",
1131                    PbTelemetryDatabaseObject::Source,
1132                    connector_name,
1133                    attr,
1134                );
1135            }
1136            _ => {}
1137        }
1138
1139        self.metadata_manager
1140            .catalog_controller
1141            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1142            .await?;
1143
1144        // create streaming jobs.
1145        let version = self
1146            .stream_manager
1147            .create_streaming_job(stream_job_fragments, ctx, permit)
1148            .await?;
1149
1150        Ok(version)
1151    }
1152
1153    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1154    pub async fn drop_object(
1155        &self,
1156        object_type: ObjectType,
1157        object_id: impl Into<ObjectId>,
1158        drop_mode: DropMode,
1159    ) -> MetaResult<NotificationVersion> {
1160        let object_id = object_id.into();
1161        let (release_ctx, version) = self
1162            .metadata_manager
1163            .catalog_controller
1164            .drop_object(object_type, object_id, drop_mode)
1165            .await?;
1166
1167        if object_type == ObjectType::Source {
1168            self.env
1169                .notification_manager_ref()
1170                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1171        }
1172
1173        let ReleaseContext {
1174            database_id,
1175            removed_streaming_job_ids,
1176            removed_state_table_ids,
1177            removed_source_ids,
1178            removed_secret_ids: secret_ids,
1179            removed_source_fragments,
1180            removed_actors,
1181            removed_fragments,
1182            removed_sink_fragment_by_targets,
1183            removed_iceberg_table_sinks,
1184        } = release_ctx;
1185
1186        let _guard = self.source_manager.pause_tick().await;
1187        self.stream_manager
1188            .drop_streaming_jobs(
1189                database_id,
1190                removed_actors.iter().map(|id| *id as _).collect(),
1191                removed_streaming_job_ids,
1192                removed_state_table_ids,
1193                removed_fragments.iter().map(|id| *id as _).collect(),
1194                removed_sink_fragment_by_targets
1195                    .into_iter()
1196                    .map(|(target, sinks)| {
1197                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1198                    })
1199                    .collect(),
1200            )
1201            .await;
1202
1203        // clean up sources after dropping streaming jobs.
1204        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1205        self.source_manager
1206            .apply_source_change(SourceChange::DropSource {
1207                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1208            })
1209            .await;
1210
1211        // unregister fragments and actors from source manager.
1212        // FIXME: need also unregister source backfill fragments.
1213        let dropped_source_fragments = removed_source_fragments;
1214        self.source_manager
1215            .apply_source_change(SourceChange::DropMv {
1216                dropped_source_fragments,
1217            })
1218            .await;
1219
1220        // clean up iceberg table sinks
1221        for sink in removed_iceberg_table_sinks {
1222            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1223                .expect("Iceberg sink should be valid");
1224            let iceberg_sink =
1225                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1226            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1227                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1228                tracing::info!(
1229                    "dropping iceberg table {} for dropped sink",
1230                    table_identifier
1231                );
1232
1233                let _ = iceberg_catalog
1234                    .drop_table(&table_identifier)
1235                    .await
1236                    .inspect_err(|err| {
1237                        tracing::error!(
1238                            "failed to drop iceberg table {} during cleanup: {}",
1239                            table_identifier,
1240                            err.as_report()
1241                        );
1242                    });
1243            }
1244        }
1245
1246        // remove secrets.
1247        for secret in secret_ids {
1248            LocalSecretManager::global().remove_secret(secret);
1249        }
1250        Ok(version)
1251    }
1252
1253    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1254    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1255    pub async fn replace_job(
1256        &self,
1257        mut streaming_job: StreamingJob,
1258        fragment_graph: StreamFragmentGraphProto,
1259    ) -> MetaResult<NotificationVersion> {
1260        match &streaming_job {
1261            StreamingJob::Table(..)
1262            | StreamingJob::Source(..)
1263            | StreamingJob::MaterializedView(..) => {}
1264            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1265                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1266            }
1267        }
1268
1269        let job_id = streaming_job.id();
1270
1271        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1272        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1273
1274        // Ensure the max parallelism unchanged before replacing table.
1275        let original_max_parallelism = self
1276            .metadata_manager
1277            .get_job_max_parallelism(streaming_job.id())
1278            .await?;
1279        let fragment_graph = PbStreamFragmentGraph {
1280            max_parallelism: original_max_parallelism as _,
1281            ..fragment_graph
1282        };
1283
1284        // 1. build fragment graph.
1285        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1286        streaming_job.set_info_from_graph(&fragment_graph);
1287
1288        // make it immutable
1289        let streaming_job = streaming_job;
1290
1291        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1292            let auto_refresh_schema_sinks = self
1293                .metadata_manager
1294                .catalog_controller
1295                .get_sink_auto_refresh_schema_from(table.id)
1296                .await?;
1297            if !auto_refresh_schema_sinks.is_empty() {
1298                let original_table_columns = self
1299                    .metadata_manager
1300                    .catalog_controller
1301                    .get_table_columns(table.id)
1302                    .await?;
1303                // compare column id to find newly added columns
1304                let mut original_table_column_ids: HashSet<_> = original_table_columns
1305                    .iter()
1306                    .map(|col| col.column_id())
1307                    .collect();
1308                let newly_added_columns = table
1309                    .columns
1310                    .iter()
1311                    .filter(|col| {
1312                        !original_table_column_ids.remove(&ColumnId::new(
1313                            col.column_desc.as_ref().unwrap().column_id as _,
1314                        ))
1315                    })
1316                    .map(|col| ColumnCatalog::from(col.clone()))
1317                    .collect_vec();
1318                if !original_table_column_ids.is_empty() {
1319                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1320                }
1321                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1322                for sink in auto_refresh_schema_sinks {
1323                    let sink_job_fragments = self
1324                        .metadata_manager
1325                        .get_job_fragments_by_id(sink.id.as_job_id())
1326                        .await?;
1327                    if sink_job_fragments.fragments.len() != 1 {
1328                        return Err(anyhow!(
1329                            "auto schema refresh sink must have only one fragment, but got {}",
1330                            sink_job_fragments.fragments.len()
1331                        )
1332                        .into());
1333                    }
1334                    let original_sink_fragment =
1335                        sink_job_fragments.fragments.into_values().next().unwrap();
1336                    let (new_sink_fragment, new_schema, new_log_store_table) =
1337                        rewrite_refresh_schema_sink_fragment(
1338                            &original_sink_fragment,
1339                            &sink,
1340                            &newly_added_columns,
1341                            table,
1342                            fragment_graph.table_fragment_id(),
1343                            self.env.id_gen_manager(),
1344                            self.env.actor_id_generator(),
1345                        )?;
1346
1347                    assert_eq!(
1348                        original_sink_fragment.actors.len(),
1349                        new_sink_fragment.actors.len()
1350                    );
1351                    let actor_status = (0..original_sink_fragment.actors.len())
1352                        .map(|i| {
1353                            let worker_node_id = sink_job_fragments.actor_status
1354                                [&original_sink_fragment.actors[i].actor_id]
1355                                .location
1356                                .as_ref()
1357                                .unwrap()
1358                                .worker_node_id;
1359                            (
1360                                new_sink_fragment.actors[i].actor_id,
1361                                PbActorStatus {
1362                                    location: Some(PbActorLocation { worker_node_id }),
1363                                },
1364                            )
1365                        })
1366                        .collect();
1367
1368                    let streaming_job = StreamingJob::Sink(sink);
1369
1370                    let tmp_sink_id = self
1371                        .metadata_manager
1372                        .catalog_controller
1373                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1374                        .await?
1375                        .as_sink_id();
1376                    let StreamingJob::Sink(sink) = streaming_job else {
1377                        unreachable!()
1378                    };
1379
1380                    sinks.push(AutoRefreshSchemaSinkContext {
1381                        tmp_sink_id,
1382                        original_sink: sink,
1383                        original_fragment: original_sink_fragment,
1384                        new_schema,
1385                        newly_add_fields: newly_added_columns
1386                            .iter()
1387                            .map(|col| Field::from(&col.column_desc))
1388                            .collect(),
1389                        new_fragment: new_sink_fragment,
1390                        new_log_store_table,
1391                        actor_status,
1392                    });
1393                }
1394                Some(sinks)
1395            } else {
1396                None
1397            }
1398        } else {
1399            None
1400        };
1401
1402        let tmp_id = self
1403            .metadata_manager
1404            .catalog_controller
1405            .create_job_catalog_for_replace(
1406                &streaming_job,
1407                Some(&ctx),
1408                fragment_graph.specified_parallelism().as_ref(),
1409                Some(fragment_graph.max_parallelism()),
1410            )
1411            .await?;
1412
1413        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1414            sinks
1415                .iter()
1416                .map(|sink| sink.tmp_sink_id.as_object_id())
1417                .collect_vec()
1418        });
1419
1420        tracing::debug!(id = %job_id, "building replace streaming job");
1421        let mut updated_sink_catalogs = vec![];
1422
1423        let mut drop_table_connector_ctx = None;
1424        let result: MetaResult<_> = try {
1425            let (mut ctx, mut stream_job_fragments) = self
1426                .build_replace_job(
1427                    ctx,
1428                    &streaming_job,
1429                    fragment_graph,
1430                    tmp_id,
1431                    auto_refresh_schema_sinks,
1432                )
1433                .await?;
1434            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1435            let auto_refresh_schema_sink_finish_ctx =
1436                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1437                    sinks
1438                        .iter()
1439                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1440                            tmp_sink_id: sink.tmp_sink_id,
1441                            original_sink_id: sink.original_sink.id,
1442                            columns: sink.new_schema.clone(),
1443                            new_log_store_table: sink
1444                                .new_log_store_table
1445                                .as_ref()
1446                                .map(|table| (table.id, table.columns.clone())),
1447                        })
1448                        .collect()
1449                });
1450
1451            // Handle table that has incoming sinks.
1452            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1453                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1454                let upstream_infos = self
1455                    .metadata_manager
1456                    .catalog_controller
1457                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1458                    .await?;
1459                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1460
1461                for upstream_info in &upstream_infos {
1462                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1463                    ctx.upstream_fragment_downstreams
1464                        .entry(upstream_fragment_id)
1465                        .or_default()
1466                        .push(upstream_info.new_sink_downstream.clone());
1467                    if upstream_info.sink_original_target_columns.is_empty() {
1468                        updated_sink_catalogs.push(upstream_info.sink_id);
1469                    }
1470                }
1471            }
1472
1473            let replace_upstream = ctx.replace_upstream.clone();
1474
1475            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1476                let empty_downstreams = FragmentDownstreamRelation::default();
1477                for sink in sinks {
1478                    self.metadata_manager
1479                        .catalog_controller
1480                        .prepare_streaming_job(
1481                            sink.tmp_sink_id.as_job_id(),
1482                            || [&sink.new_fragment].into_iter(),
1483                            &empty_downstreams,
1484                            true,
1485                            None,
1486                        )
1487                        .await?;
1488                }
1489            }
1490
1491            self.metadata_manager
1492                .catalog_controller
1493                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1494                .await?;
1495
1496            self.stream_manager
1497                .replace_stream_job(stream_job_fragments, ctx)
1498                .await?;
1499            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1500        };
1501
1502        match result {
1503            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1504                let version = self
1505                    .metadata_manager
1506                    .catalog_controller
1507                    .finish_replace_streaming_job(
1508                        tmp_id,
1509                        streaming_job,
1510                        replace_upstream,
1511                        SinkIntoTableContext {
1512                            updated_sink_catalogs,
1513                        },
1514                        drop_table_connector_ctx.as_ref(),
1515                        auto_refresh_schema_sink_finish_ctx,
1516                    )
1517                    .await?;
1518                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1519                    self.source_manager
1520                        .apply_source_change(SourceChange::DropSource {
1521                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1522                        })
1523                        .await;
1524                }
1525                Ok(version)
1526            }
1527            Err(err) => {
1528                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1529                let _ = self.metadata_manager
1530                    .catalog_controller
1531                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1532                    .await.inspect_err(|err| {
1533                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1534                });
1535                Err(err)
1536            }
1537        }
1538    }
1539
1540    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1541    )]
1542    async fn drop_streaming_job(
1543        &self,
1544        job_id: StreamingJobId,
1545        drop_mode: DropMode,
1546    ) -> MetaResult<NotificationVersion> {
1547        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1548
1549        let (object_id, object_type) = match job_id {
1550            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1551            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1552            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1553            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1554        };
1555
1556        let job_status = self
1557            .metadata_manager
1558            .catalog_controller
1559            .get_streaming_job_status(job_id.id())
1560            .await?;
1561        let version = match job_status {
1562            JobStatus::Initial => {
1563                unreachable!(
1564                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1565                );
1566            }
1567            JobStatus::Creating => {
1568                self.stream_manager
1569                    .cancel_streaming_jobs(vec![job_id.id()])
1570                    .await?;
1571                IGNORED_NOTIFICATION_VERSION
1572            }
1573            JobStatus::Created => {
1574                let version = self.drop_object(object_type, object_id, drop_mode).await?;
1575                #[cfg(not(madsim))]
1576                if let StreamingJobId::Sink(sink_id) = job_id {
1577                    // delete system table for exactly once iceberg sink
1578                    // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1579                    let db = self.env.meta_store_ref().conn.clone();
1580                    clean_all_rows_by_sink_id(&db, sink_id).await?;
1581                }
1582                version
1583            }
1584        };
1585
1586        Ok(version)
1587    }
1588
1589    /// Resolve the parallelism of the stream job based on the given information.
1590    ///
1591    /// Returns error if user specifies a parallelism that cannot be satisfied.
1592    fn resolve_stream_parallelism(
1593        &self,
1594        specified: Option<NonZeroUsize>,
1595        max: NonZeroUsize,
1596        cluster_info: &StreamingClusterInfo,
1597        resource_group: String,
1598    ) -> MetaResult<NonZeroUsize> {
1599        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1600        DdlController::resolve_stream_parallelism_inner(
1601            specified,
1602            max,
1603            available,
1604            &self.env.opts.default_parallelism,
1605            &resource_group,
1606        )
1607    }
1608
1609    fn resolve_stream_parallelism_inner(
1610        specified: Option<NonZeroUsize>,
1611        max: NonZeroUsize,
1612        available: Option<NonZeroUsize>,
1613        default_parallelism: &DefaultParallelism,
1614        resource_group: &str,
1615    ) -> MetaResult<NonZeroUsize> {
1616        let Some(available) = available else {
1617            bail_unavailable!(
1618                "no available slots to schedule in resource group \"{}\", \
1619                 have you allocated any compute nodes within this resource group?",
1620                resource_group
1621            );
1622        };
1623
1624        if let Some(specified) = specified {
1625            if specified > max {
1626                bail_invalid_parameter!(
1627                    "specified parallelism {} should not exceed max parallelism {}",
1628                    specified,
1629                    max,
1630                );
1631            }
1632            if specified > available {
1633                tracing::warn!(
1634                    resource_group,
1635                    specified_parallelism = specified.get(),
1636                    available_parallelism = available.get(),
1637                    "specified parallelism exceeds available slots, scheduling with specified value",
1638                );
1639            }
1640            return Ok(specified);
1641        }
1642
1643        // Use default parallelism when no specific parallelism is provided by the user.
1644        let default_parallelism = match default_parallelism {
1645            DefaultParallelism::Full => available,
1646            DefaultParallelism::Default(num) => {
1647                if *num > available {
1648                    tracing::warn!(
1649                        resource_group,
1650                        configured_parallelism = num.get(),
1651                        available_parallelism = available.get(),
1652                        "default parallelism exceeds available slots, scheduling with configured value",
1653                    );
1654                }
1655                *num
1656            }
1657        };
1658
1659        if default_parallelism > max {
1660            tracing::warn!(
1661                max_parallelism = max.get(),
1662                resource_group,
1663                "default parallelism exceeds max parallelism, capping to max",
1664            );
1665        }
1666        Ok(default_parallelism.min(max))
1667    }
1668
1669    /// Builds the actor graph:
1670    /// - Add the upstream fragments to the fragment graph
1671    /// - Schedule the fragments based on their distribution
1672    /// - Expand each fragment into one or several actors
1673    /// - Construct the fragment level backfill order control.
1674    #[await_tree::instrument]
1675    pub(crate) async fn build_stream_job(
1676        &self,
1677        stream_ctx: StreamContext,
1678        mut stream_job: StreamingJob,
1679        fragment_graph: StreamFragmentGraph,
1680        specific_resource_group: Option<String>,
1681    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1682        let id = stream_job.id();
1683        let specified_parallelism = fragment_graph.specified_parallelism();
1684        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1685
1686        // 1. Fragment Level ordering graph
1687        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1688
1689        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1690        // contains all information needed for building the actor graph.
1691
1692        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1693            fragment_graph.collect_snapshot_backfill_info()?;
1694        assert!(
1695            snapshot_backfill_info
1696                .iter()
1697                .chain([&cross_db_snapshot_backfill_info])
1698                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1699                .all(|backfill_epoch| backfill_epoch.is_none()),
1700            "should not set backfill epoch when initially build the job: {:?} {:?}",
1701            snapshot_backfill_info,
1702            cross_db_snapshot_backfill_info
1703        );
1704
1705        let locality_fragment_state_table_mapping =
1706            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1707
1708        // check if log store exists for all cross-db upstreams
1709        self.metadata_manager
1710            .catalog_controller
1711            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1712            .await?;
1713
1714        let upstream_table_ids = fragment_graph
1715            .dependent_table_ids()
1716            .iter()
1717            .filter(|id| {
1718                !cross_db_snapshot_backfill_info
1719                    .upstream_mv_table_id_to_backfill_epoch
1720                    .contains_key(id)
1721            })
1722            .cloned()
1723            .collect();
1724
1725        let (upstream_root_fragments, existing_actor_location) = self
1726            .metadata_manager
1727            .get_upstream_root_fragments(&upstream_table_ids)
1728            .await?;
1729
1730        if snapshot_backfill_info.is_some() {
1731            match stream_job {
1732                StreamingJob::MaterializedView(_)
1733                | StreamingJob::Sink(_)
1734                | StreamingJob::Index(_, _) => {}
1735                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1736                    return Err(
1737                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1738                    );
1739                }
1740            }
1741        }
1742
1743        let upstream_actors = upstream_root_fragments
1744            .values()
1745            .map(|(fragment, _)| {
1746                (
1747                    fragment.fragment_id,
1748                    fragment.actors.keys().copied().collect(),
1749                )
1750            })
1751            .collect();
1752
1753        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1754            fragment_graph,
1755            FragmentGraphUpstreamContext {
1756                upstream_root_fragments,
1757                upstream_actor_location: existing_actor_location,
1758            },
1759            (&stream_job).into(),
1760        )?;
1761        let resource_group = match specific_resource_group {
1762            None => {
1763                self.metadata_manager
1764                    .get_database_resource_group(stream_job.database_id())
1765                    .await?
1766            }
1767            Some(resource_group) => resource_group,
1768        };
1769
1770        // 3. Build the actor graph.
1771        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1772
1773        let parallelism = self.resolve_stream_parallelism(
1774            specified_parallelism,
1775            max_parallelism,
1776            &cluster_info,
1777            resource_group.clone(),
1778        )?;
1779
1780        let parallelism = self
1781            .env
1782            .system_params_reader()
1783            .await
1784            .adaptive_parallelism_strategy()
1785            .compute_target_parallelism(parallelism.get());
1786
1787        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1788        let actor_graph_builder = ActorGraphBuilder::new(
1789            id,
1790            resource_group,
1791            complete_graph,
1792            cluster_info,
1793            parallelism,
1794        )?;
1795
1796        let ActorGraphBuildResult {
1797            graph,
1798            downstream_fragment_relations,
1799            building_locations,
1800            upstream_fragment_downstreams,
1801            new_no_shuffle,
1802            replace_upstream,
1803            ..
1804        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1805        assert!(replace_upstream.is_empty());
1806
1807        // 4. Build the table fragments structure that will be persisted in the stream manager,
1808        // and the context that contains all information needed for building the
1809        // actors on the compute nodes.
1810
1811        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1812        // Otherwise, it defaults to FIXED based on deduction.
1813        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1814            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1815            _ => TableParallelism::Fixed(parallelism.get()),
1816        };
1817
1818        let stream_job_fragments = StreamJobFragments::new(
1819            id,
1820            graph,
1821            &building_locations.actor_locations,
1822            stream_ctx.clone(),
1823            table_parallelism,
1824            max_parallelism.get(),
1825        );
1826
1827        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1828            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1829        }
1830
1831        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1832            && let Ok(table_id) = sink.get_target_table()
1833        {
1834            let tables = self
1835                .metadata_manager
1836                .get_table_catalog_by_ids(&[*table_id])
1837                .await?;
1838            let target_table = tables
1839                .first()
1840                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1841            let sink_fragment = stream_job_fragments
1842                .sink_fragment()
1843                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1844            let mview_fragment_id = self
1845                .metadata_manager
1846                .catalog_controller
1847                .get_mview_fragment_by_id(table_id.as_job_id())
1848                .await?;
1849            let upstream_sink_info = build_upstream_sink_info(
1850                sink,
1851                sink_fragment.fragment_id as _,
1852                target_table,
1853                mview_fragment_id,
1854            )?;
1855            Some(upstream_sink_info)
1856        } else {
1857            None
1858        };
1859
1860        let ctx = CreateStreamingJobContext {
1861            upstream_fragment_downstreams,
1862            new_no_shuffle,
1863            upstream_actors,
1864            building_locations,
1865            definition: stream_job.definition(),
1866            create_type: stream_job.create_type(),
1867            job_type: (&stream_job).into(),
1868            streaming_job: stream_job,
1869            new_upstream_sink,
1870            option: CreateStreamingJobOption {},
1871            snapshot_backfill_info,
1872            cross_db_snapshot_backfill_info,
1873            fragment_backfill_ordering,
1874            locality_fragment_state_table_mapping,
1875        };
1876
1877        Ok((
1878            ctx,
1879            StreamJobFragmentsToCreate {
1880                inner: stream_job_fragments,
1881                downstreams: downstream_fragment_relations,
1882            },
1883        ))
1884    }
1885
1886    /// `build_replace_table` builds a job replacement and returns the context and new job
1887    /// fragments.
1888    ///
1889    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1890    /// replacement is finished.
1891    pub(crate) async fn build_replace_job(
1892        &self,
1893        stream_ctx: StreamContext,
1894        stream_job: &StreamingJob,
1895        mut fragment_graph: StreamFragmentGraph,
1896        tmp_job_id: JobId,
1897        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1898    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1899        match &stream_job {
1900            StreamingJob::Table(..)
1901            | StreamingJob::Source(..)
1902            | StreamingJob::MaterializedView(..) => {}
1903            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1904                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1905            }
1906        }
1907
1908        let id = stream_job.id();
1909
1910        // check if performing drop table connector
1911        let mut drop_table_associated_source_id = None;
1912        if let StreamingJob::Table(None, _, _) = &stream_job {
1913            drop_table_associated_source_id = self
1914                .metadata_manager
1915                .get_table_associated_source_id(id.as_mv_table_id())
1916                .await?;
1917        }
1918
1919        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1920        let old_internal_table_ids = old_fragments.internal_table_ids();
1921
1922        // handle drop table's associated source
1923        let mut drop_table_connector_ctx = None;
1924        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1925            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1926            debug_assert!(old_internal_table_ids.len() == 1);
1927
1928            drop_table_connector_ctx = Some(DropTableConnectorContext {
1929                // we do not remove the original table catalog as it's still needed for the streaming job
1930                // just need to remove the ref to the state table
1931                to_change_streaming_job_id: id,
1932                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1933                to_remove_source_id,
1934            });
1935        } else if stream_job.is_materialized_view() {
1936            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1937            // but more robust.
1938            let old_fragments_upstreams = self
1939                .metadata_manager
1940                .catalog_controller
1941                .upstream_fragments(old_fragments.fragment_ids())
1942                .await?;
1943
1944            let old_state_graph =
1945                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1946            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1947            let mapping =
1948                state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
1949                    .context("incompatible altering on the streaming job states")?;
1950
1951            fragment_graph.fit_internal_table_ids_with_mapping(mapping);
1952        } else {
1953            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
1954            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
1955            let old_internal_tables = self
1956                .metadata_manager
1957                .get_table_catalog_by_ids(&old_internal_table_ids)
1958                .await?;
1959            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1960        }
1961
1962        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
1963        // graph that contains all information needed for building the actor graph.
1964        let original_root_fragment = old_fragments
1965            .root_fragment()
1966            .expect("root fragment not found");
1967
1968        let job_type = StreamingJobType::from(stream_job);
1969
1970        // Extract the downstream fragments from the fragment graph.
1971        let (mut downstream_fragments, mut downstream_actor_location) =
1972            self.metadata_manager.get_downstream_fragments(id).await?;
1973
1974        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1975            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1976                .iter()
1977                .map(|sink| sink.original_fragment.fragment_id)
1978                .collect();
1979            for (_, downstream_fragment, _) in &mut downstream_fragments {
1980                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1981                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1982                }) {
1983                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1984                    for actor_id in downstream_fragment.actors.keys() {
1985                        downstream_actor_location.remove(actor_id);
1986                    }
1987                    for (actor_id, status) in &sink.actor_status {
1988                        downstream_actor_location
1989                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
1990                    }
1991
1992                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
1993                }
1994            }
1995            assert!(remaining_fragment.is_empty());
1996        }
1997
1998        // build complete graph based on the table job type
1999        let complete_graph = match &job_type {
2000            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2001                CompleteStreamFragmentGraph::with_downstreams(
2002                    fragment_graph,
2003                    FragmentGraphDownstreamContext {
2004                        original_root_fragment_id: original_root_fragment.fragment_id,
2005                        downstream_fragments,
2006                        downstream_actor_location,
2007                    },
2008                    job_type,
2009                )?
2010            }
2011            StreamingJobType::Table(TableJobType::SharedCdcSource)
2012            | StreamingJobType::MaterializedView => {
2013                // CDC tables or materialized views can have upstream jobs as well.
2014                let (upstream_root_fragments, upstream_actor_location) = self
2015                    .metadata_manager
2016                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2017                    .await?;
2018
2019                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2020                    fragment_graph,
2021                    FragmentGraphUpstreamContext {
2022                        upstream_root_fragments,
2023                        upstream_actor_location,
2024                    },
2025                    FragmentGraphDownstreamContext {
2026                        original_root_fragment_id: original_root_fragment.fragment_id,
2027                        downstream_fragments,
2028                        downstream_actor_location,
2029                    },
2030                    job_type,
2031                )?
2032            }
2033            _ => unreachable!(),
2034        };
2035
2036        let resource_group = self
2037            .metadata_manager
2038            .get_existing_job_resource_group(id)
2039            .await?;
2040
2041        // 2. Build the actor graph.
2042        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2043
2044        // XXX: what is this parallelism?
2045        // Is it "assigned parallelism"?
2046        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2047            .expect("The number of actors in the original table fragment should be greater than 0");
2048
2049        let actor_graph_builder = ActorGraphBuilder::new(
2050            id,
2051            resource_group,
2052            complete_graph,
2053            cluster_info,
2054            parallelism,
2055        )?;
2056
2057        let ActorGraphBuildResult {
2058            graph,
2059            downstream_fragment_relations,
2060            building_locations,
2061            upstream_fragment_downstreams,
2062            mut replace_upstream,
2063            new_no_shuffle,
2064            ..
2065        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2066
2067        // general table & source does not have upstream job, so the dispatchers should be empty
2068        if matches!(
2069            job_type,
2070            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2071        ) {
2072            assert!(upstream_fragment_downstreams.is_empty());
2073        }
2074
2075        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2076        // the context that contains all information needed for building the actors on the compute
2077        // nodes.
2078        let stream_job_fragments = StreamJobFragments::new(
2079            tmp_job_id,
2080            graph,
2081            &building_locations.actor_locations,
2082            stream_ctx,
2083            old_fragments.assigned_parallelism,
2084            old_fragments.max_parallelism,
2085        );
2086
2087        if let Some(sinks) = &auto_refresh_schema_sinks {
2088            for sink in sinks {
2089                replace_upstream
2090                    .remove(&sink.new_fragment.fragment_id)
2091                    .expect("should exist");
2092            }
2093        }
2094
2095        // Note: no need to set `vnode_count` as it's already set by the frontend.
2096        // See `get_replace_table_plan`.
2097
2098        let ctx = ReplaceStreamJobContext {
2099            old_fragments,
2100            replace_upstream,
2101            new_no_shuffle,
2102            upstream_fragment_downstreams,
2103            building_locations,
2104            streaming_job: stream_job.clone(),
2105            tmp_id: tmp_job_id,
2106            drop_table_connector_ctx,
2107            auto_refresh_schema_sinks,
2108        };
2109
2110        Ok((
2111            ctx,
2112            StreamJobFragmentsToCreate {
2113                inner: stream_job_fragments,
2114                downstreams: downstream_fragment_relations,
2115            },
2116        ))
2117    }
2118
2119    async fn alter_name(
2120        &self,
2121        relation: alter_name_request::Object,
2122        new_name: &str,
2123    ) -> MetaResult<NotificationVersion> {
2124        let (obj_type, id) = match relation {
2125            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2126            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2127            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2128            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2129            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2130            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2131            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2132            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2133        };
2134        self.metadata_manager
2135            .catalog_controller
2136            .alter_name(obj_type, id, new_name)
2137            .await
2138    }
2139
2140    async fn alter_swap_rename(
2141        &self,
2142        object: alter_swap_rename_request::Object,
2143    ) -> MetaResult<NotificationVersion> {
2144        let (obj_type, src_id, dst_id) = match object {
2145            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2146            alter_swap_rename_request::Object::Table(objs) => {
2147                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2148                (ObjectType::Table, src_id, dst_id)
2149            }
2150            alter_swap_rename_request::Object::View(objs) => {
2151                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2152                (ObjectType::View, src_id, dst_id)
2153            }
2154            alter_swap_rename_request::Object::Source(objs) => {
2155                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2156                (ObjectType::Source, src_id, dst_id)
2157            }
2158            alter_swap_rename_request::Object::Sink(objs) => {
2159                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2160                (ObjectType::Sink, src_id, dst_id)
2161            }
2162            alter_swap_rename_request::Object::Subscription(objs) => {
2163                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2164                (ObjectType::Subscription, src_id, dst_id)
2165            }
2166        };
2167
2168        self.metadata_manager
2169            .catalog_controller
2170            .alter_swap_rename(obj_type, src_id, dst_id)
2171            .await
2172    }
2173
2174    async fn alter_owner(
2175        &self,
2176        object: Object,
2177        owner_id: UserId,
2178    ) -> MetaResult<NotificationVersion> {
2179        let (obj_type, id) = match object {
2180            Object::TableId(id) => (ObjectType::Table, id),
2181            Object::ViewId(id) => (ObjectType::View, id),
2182            Object::SourceId(id) => (ObjectType::Source, id),
2183            Object::SinkId(id) => (ObjectType::Sink, id),
2184            Object::SchemaId(id) => (ObjectType::Schema, id),
2185            Object::DatabaseId(id) => (ObjectType::Database, id),
2186            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2187            Object::ConnectionId(id) => (ObjectType::Connection, id),
2188        };
2189        self.metadata_manager
2190            .catalog_controller
2191            .alter_owner(obj_type, id.into(), owner_id as _)
2192            .await
2193    }
2194
2195    async fn alter_set_schema(
2196        &self,
2197        object: alter_set_schema_request::Object,
2198        new_schema_id: SchemaId,
2199    ) -> MetaResult<NotificationVersion> {
2200        let (obj_type, id) = match object {
2201            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2202            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2203            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2204            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2205            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2206            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2207            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2208        };
2209        self.metadata_manager
2210            .catalog_controller
2211            .alter_schema(obj_type, id.into(), new_schema_id as _)
2212            .await
2213    }
2214
2215    pub async fn wait(&self) -> MetaResult<()> {
2216        let timeout_ms = 30 * 60 * 1000;
2217        for _ in 0..timeout_ms {
2218            if self
2219                .metadata_manager
2220                .catalog_controller
2221                .list_background_creating_jobs(true, None)
2222                .await?
2223                .is_empty()
2224            {
2225                return Ok(());
2226            }
2227
2228            sleep(Duration::from_millis(1)).await;
2229        }
2230        Err(MetaError::cancelled(format!(
2231            "timeout after {timeout_ms}ms"
2232        )))
2233    }
2234
2235    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2236        self.metadata_manager
2237            .catalog_controller
2238            .comment_on(comment)
2239            .await
2240    }
2241
2242    async fn alter_streaming_job_config(
2243        &self,
2244        job_id: JobId,
2245        entries_to_add: HashMap<String, String>,
2246        keys_to_remove: Vec<String>,
2247    ) -> MetaResult<NotificationVersion> {
2248        self.metadata_manager
2249            .catalog_controller
2250            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2251            .await
2252    }
2253}
2254
2255fn report_create_object(
2256    job_id: JobId,
2257    event_name: &str,
2258    obj_type: PbTelemetryDatabaseObject,
2259    connector_name: Option<String>,
2260    attr_info: Option<jsonbb::Value>,
2261) {
2262    report_event(
2263        PbTelemetryEventStage::CreateStreamJob,
2264        event_name,
2265        job_id.as_raw_id() as _,
2266        connector_name,
2267        Some(obj_type),
2268        attr_info,
2269    );
2270}
2271
2272async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: SinkId) -> MetaResult<()> {
2273    match Entity::delete_many()
2274        .filter(Column::SinkId.eq(sink_id))
2275        .exec(db)
2276        .await
2277    {
2278        Ok(result) => {
2279            let deleted_count = result.rows_affected;
2280
2281            tracing::info!(
2282                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2283                deleted_count,
2284                sink_id
2285            );
2286            Ok(())
2287        }
2288        Err(e) => {
2289            tracing::error!(
2290                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2291                sink_id,
2292                e.as_report()
2293            );
2294            Err(e.into())
2295        }
2296    }
2297}
2298
2299pub fn build_upstream_sink_info(
2300    sink: &PbSink,
2301    sink_fragment_id: FragmentId,
2302    target_table: &PbTable,
2303    target_fragment_id: FragmentId,
2304) -> MetaResult<UpstreamSinkInfo> {
2305    let sink_columns = if !sink.original_target_columns.is_empty() {
2306        sink.original_target_columns.clone()
2307    } else {
2308        // This is due to the fact that the value did not exist in earlier versions,
2309        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2310        // Therefore the columns of the table at this point are `original_target_columns`.
2311        // This value of sink will be filled on the meta.
2312        target_table.columns.clone()
2313    };
2314
2315    let sink_output_fields = sink_columns
2316        .iter()
2317        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2318        .collect_vec();
2319    let output_indices = (0..sink_output_fields.len())
2320        .map(|i| i as u32)
2321        .collect_vec();
2322
2323    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2324        let sink_idx_by_col_id = sink_columns
2325            .iter()
2326            .enumerate()
2327            .map(|(idx, col)| {
2328                let column_id = col.column_desc.as_ref().unwrap().column_id;
2329                (column_id, idx as u32)
2330            })
2331            .collect::<HashMap<_, _>>();
2332        target_table
2333            .distribution_key
2334            .iter()
2335            .map(|dist_idx| {
2336                let column_id = target_table.columns[*dist_idx as usize]
2337                    .column_desc
2338                    .as_ref()
2339                    .unwrap()
2340                    .column_id;
2341                let sink_idx = sink_idx_by_col_id
2342                    .get(&column_id)
2343                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2344                Ok(*sink_idx)
2345            })
2346            .collect::<anyhow::Result<Vec<_>>>()?
2347    };
2348    let dist_key_indices =
2349        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2350    let downstream_fragment_id = target_fragment_id as _;
2351    let new_downstream_relation = DownstreamFragmentRelation {
2352        downstream_fragment_id,
2353        dispatcher_type: DispatcherType::Hash,
2354        dist_key_indices,
2355        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2356    };
2357    let current_target_columns = target_table.get_columns();
2358    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2359    Ok(UpstreamSinkInfo {
2360        sink_id: sink.id,
2361        sink_fragment_id: sink_fragment_id as _,
2362        sink_output_fields,
2363        sink_original_target_columns: sink.get_original_target_columns().clone(),
2364        project_exprs,
2365        new_sink_downstream: new_downstream_relation,
2366    })
2367}
2368
2369pub fn refill_upstream_sink_union_in_table(
2370    union_fragment_root: &mut PbStreamNode,
2371    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2372) {
2373    visit_stream_node_cont_mut(union_fragment_root, |node| {
2374        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2375            let init_upstreams = upstream_sink_infos
2376                .iter()
2377                .map(|info| PbUpstreamSinkInfo {
2378                    upstream_fragment_id: info.sink_fragment_id,
2379                    sink_output_schema: info.sink_output_fields.clone(),
2380                    project_exprs: info.project_exprs.clone(),
2381                })
2382                .collect();
2383            upstream_sink_union.init_upstreams = init_upstreams;
2384            false
2385        } else {
2386            true
2387        }
2388    });
2389}
2390
2391#[cfg(test)]
2392mod tests {
2393    use std::num::NonZeroUsize;
2394
2395    use super::*;
2396
2397    #[test]
2398    fn test_specified_parallelism_exceeds_available() {
2399        let result = DdlController::resolve_stream_parallelism_inner(
2400            Some(NonZeroUsize::new(100).unwrap()),
2401            NonZeroUsize::new(256).unwrap(),
2402            Some(NonZeroUsize::new(4).unwrap()),
2403            &DefaultParallelism::Full,
2404            "default",
2405        )
2406        .unwrap();
2407        assert_eq!(result.get(), 100);
2408    }
2409
2410    #[test]
2411    fn test_allows_default_parallelism_over_available() {
2412        let result = DdlController::resolve_stream_parallelism_inner(
2413            None,
2414            NonZeroUsize::new(256).unwrap(),
2415            Some(NonZeroUsize::new(4).unwrap()),
2416            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2417            "default",
2418        )
2419        .unwrap();
2420        assert_eq!(result.get(), 50);
2421    }
2422
2423    #[test]
2424    fn test_full_parallelism_capped_by_max() {
2425        let result = DdlController::resolve_stream_parallelism_inner(
2426            None,
2427            NonZeroUsize::new(6).unwrap(),
2428            Some(NonZeroUsize::new(10).unwrap()),
2429            &DefaultParallelism::Full,
2430            "default",
2431        )
2432        .unwrap();
2433        assert_eq!(result.get(), 6);
2434    }
2435
2436    #[test]
2437    fn test_no_available_slots_returns_error() {
2438        let result = DdlController::resolve_stream_parallelism_inner(
2439            None,
2440            NonZeroUsize::new(4).unwrap(),
2441            None,
2442            &DefaultParallelism::Full,
2443            "default",
2444        );
2445        assert!(matches!(
2446            result,
2447            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2448        ));
2449    }
2450
2451    #[test]
2452    fn test_specified_over_max_returns_error() {
2453        let result = DdlController::resolve_stream_parallelism_inner(
2454            Some(NonZeroUsize::new(8).unwrap()),
2455            NonZeroUsize::new(4).unwrap(),
2456            Some(NonZeroUsize::new(10).unwrap()),
2457            &DefaultParallelism::Full,
2458            "default",
2459        );
2460        assert!(matches!(
2461            result,
2462            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2463        ));
2464    }
2465}