risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48    streaming_job,
49};
50use risingwave_pb::catalog::{
51    Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
52    Subscription, Table, View,
53};
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType as PbFragmentDistributionType;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83    NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86    DownstreamFragmentRelation, FragmentDownstreamRelation, FragmentId as CatalogFragmentId,
87    StreamContext, StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
88};
89use crate::stream::cdc::{
90    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105    Restrict,
106    Cascade,
107}
108
109impl DropMode {
110    pub fn from_request_setting(cascade: bool) -> DropMode {
111        if cascade {
112            DropMode::Cascade
113        } else {
114            DropMode::Restrict
115        }
116    }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121    MaterializedView(TableId),
122    Sink(SinkId),
123    Table(Option<SourceId>, TableId),
124    Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{}", self.as_ref())?;
130        write!(f, "({})", self.id())
131    }
132}
133
134impl StreamingJobId {
135    fn id(&self) -> JobId {
136        match self {
137            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138            StreamingJobId::Index(id) => id.as_job_id(),
139            StreamingJobId::Sink(id) => id.as_job_id(),
140        }
141    }
142}
143
144/// It’s used to describe the information of the job that needs to be replaced
145/// and it will be used during replacing table and creating sink into table operations.
146pub struct ReplaceStreamJobInfo {
147    pub streaming_job: StreamingJob,
148    pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153    CreateDatabase(Database),
154    DropDatabase(DatabaseId),
155    CreateSchema(Schema),
156    DropSchema(SchemaId, DropMode),
157    CreateNonSharedSource(Source),
158    DropSource(SourceId, DropMode),
159    ResetSource(SourceId),
160    CreateFunction(Function),
161    DropFunction(FunctionId, DropMode),
162    CreateView(View, HashSet<ObjectId>),
163    DropView(ViewId, DropMode),
164    CreateStreamingJob {
165        stream_job: StreamingJob,
166        fragment_graph: StreamFragmentGraphProto,
167        dependencies: HashSet<ObjectId>,
168        resource_type: streaming_job_resource_type::ResourceType,
169        if_not_exists: bool,
170    },
171    DropStreamingJob {
172        job_id: StreamingJobId,
173        drop_mode: DropMode,
174    },
175    AlterName(alter_name_request::Object, String),
176    AlterSwapRename(alter_swap_rename_request::Object),
177    ReplaceStreamJob(ReplaceStreamJobInfo),
178    AlterNonSharedSource(Source),
179    AlterObjectOwner(Object, UserId),
180    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
181    CreateConnection(Connection),
182    DropConnection(ConnectionId, DropMode),
183    CreateSecret(Secret),
184    AlterSecret(Secret),
185    DropSecret(SecretId, DropMode),
186    CommentOn(Comment),
187    CreateSubscription(Subscription),
188    DropSubscription(SubscriptionId, DropMode),
189    AlterSubscriptionRetention {
190        subscription_id: SubscriptionId,
191        retention_seconds: u64,
192        definition: String,
193    },
194    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
195    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
196}
197
198impl DdlCommand {
199    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
200    fn object(&self) -> Either<String, ObjectId> {
201        use Either::*;
202        match self {
203            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
204            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
205            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
206            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
207            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
208            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
209            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
210            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
211            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
212            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
213            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
214            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
215            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
216            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
217            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
218            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
219            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
220            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
221            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
222            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
223            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
224            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
225            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
226            DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
227            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
228            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
229            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
230            DdlCommand::AlterSubscriptionRetention {
231                subscription_id, ..
232            } => Right(subscription_id.as_object_id()),
233            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
234            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
235        }
236    }
237
238    fn allow_in_recovery(&self) -> bool {
239        match self {
240            DdlCommand::DropDatabase(_)
241            | DdlCommand::DropSchema(_, _)
242            | DdlCommand::DropSource(_, _)
243            | DdlCommand::DropFunction(_, _)
244            | DdlCommand::DropView(_, _)
245            | DdlCommand::DropStreamingJob { .. }
246            | DdlCommand::DropConnection(_, _)
247            | DdlCommand::DropSecret(_, _)
248            | DdlCommand::DropSubscription(_, _)
249            | DdlCommand::AlterName(_, _)
250            | DdlCommand::AlterObjectOwner(_, _)
251            | DdlCommand::AlterSetSchema(_, _)
252            | DdlCommand::CreateDatabase(_)
253            | DdlCommand::CreateSchema(_)
254            | DdlCommand::CreateFunction(_)
255            | DdlCommand::CreateView(_, _)
256            | DdlCommand::CreateConnection(_)
257            | DdlCommand::CommentOn(_)
258            | DdlCommand::CreateSecret(_)
259            | DdlCommand::AlterSecret(_)
260            | DdlCommand::AlterSwapRename(_)
261            | DdlCommand::AlterDatabaseParam(_, _)
262            | DdlCommand::AlterStreamingJobConfig(_, _, _)
263            | DdlCommand::AlterSubscriptionRetention { .. } => true,
264            DdlCommand::CreateStreamingJob { .. }
265            | DdlCommand::CreateNonSharedSource(_)
266            | DdlCommand::ReplaceStreamJob(_)
267            | DdlCommand::AlterNonSharedSource(_)
268            | DdlCommand::ResetSource(_)
269            | DdlCommand::CreateSubscription(_) => false,
270        }
271    }
272}
273
274#[derive(Clone)]
275pub struct DdlController {
276    pub(crate) env: MetaSrvEnv,
277
278    pub(crate) metadata_manager: MetadataManager,
279    pub(crate) stream_manager: GlobalStreamManagerRef,
280    pub(crate) source_manager: SourceManagerRef,
281    barrier_manager: BarrierManagerRef,
282    sink_manager: SinkCoordinatorManager,
283    iceberg_compaction_manager: IcebergCompactionManagerRef,
284
285    // The semaphore is used to limit the number of concurrent streaming job creation.
286    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
287
288    /// Sequence number for DDL commands, used for observability and debugging.
289    seq: Arc<AtomicU64>,
290}
291
292#[derive(Clone)]
293pub struct CreatingStreamingJobPermit {
294    pub(crate) semaphore: Arc<Semaphore>,
295}
296
297impl CreatingStreamingJobPermit {
298    async fn new(env: &MetaSrvEnv) -> Self {
299        let mut permits = env
300            .system_params_reader()
301            .await
302            .max_concurrent_creating_streaming_jobs() as usize;
303        if permits == 0 {
304            // if the system parameter is set to zero, use the max permitted value.
305            permits = Semaphore::MAX_PERMITS;
306        }
307        let semaphore = Arc::new(Semaphore::new(permits));
308
309        let (local_notification_tx, mut local_notification_rx) =
310            tokio::sync::mpsc::unbounded_channel();
311        env.notification_manager()
312            .insert_local_sender(local_notification_tx);
313        let semaphore_clone = semaphore.clone();
314        tokio::spawn(async move {
315            while let Some(notification) = local_notification_rx.recv().await {
316                let LocalNotification::SystemParamsChange(p) = &notification else {
317                    continue;
318                };
319                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
320                if new_permits == 0 {
321                    new_permits = Semaphore::MAX_PERMITS;
322                }
323                match permits.cmp(&new_permits) {
324                    Ordering::Less => {
325                        semaphore_clone.add_permits(new_permits - permits);
326                    }
327                    Ordering::Equal => continue,
328                    Ordering::Greater => {
329                        let to_release = permits - new_permits;
330                        let reduced = semaphore_clone.forget_permits(to_release);
331                        // TODO: implement dynamic semaphore with limits by ourself.
332                        if reduced != to_release {
333                            tracing::warn!(
334                                "no enough permits to release, expected {}, but reduced {}",
335                                to_release,
336                                reduced
337                            );
338                        }
339                    }
340                }
341                tracing::info!(
342                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
343                    permits,
344                    new_permits
345                );
346                permits = new_permits;
347            }
348        });
349
350        Self { semaphore }
351    }
352}
353
354impl DdlController {
355    pub async fn new(
356        env: MetaSrvEnv,
357        metadata_manager: MetadataManager,
358        stream_manager: GlobalStreamManagerRef,
359        source_manager: SourceManagerRef,
360        barrier_manager: BarrierManagerRef,
361        sink_manager: SinkCoordinatorManager,
362        iceberg_compaction_manager: IcebergCompactionManagerRef,
363    ) -> Self {
364        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
365        Self {
366            env,
367            metadata_manager,
368            stream_manager,
369            source_manager,
370            barrier_manager,
371            sink_manager,
372            iceberg_compaction_manager,
373            creating_streaming_job_permits,
374            seq: Arc::new(AtomicU64::new(0)),
375        }
376    }
377
378    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
379    pub fn next_seq(&self) -> u64 {
380        // This is a simple atomic increment operation.
381        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
382    }
383
384    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
385    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
386    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
387    /// would be a huge hassle and pain if we don't spawn here.
388    ///
389    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
390    #[allow(clippy::large_stack_frames)]
391    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
392        if !command.allow_in_recovery() {
393            self.barrier_manager.check_status_running()?;
394        }
395
396        let await_tree_key = format!("DDL Command {}", self.next_seq());
397        let await_tree_span = await_tree::span!("{command}({})", command.object());
398
399        let ctrl = self.clone();
400        let fut = Box::pin(async move {
401            match command {
402                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
403                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
404                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
405                DdlCommand::DropSchema(schema_id, drop_mode) => {
406                    ctrl.drop_schema(schema_id, drop_mode).await
407                }
408                DdlCommand::CreateNonSharedSource(source) => {
409                    ctrl.create_non_shared_source(source).await
410                }
411                DdlCommand::DropSource(source_id, drop_mode) => {
412                    ctrl.drop_source(source_id, drop_mode).await
413                }
414                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
415                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
416                DdlCommand::DropFunction(function_id, drop_mode) => {
417                    ctrl.drop_function(function_id, drop_mode).await
418                }
419                DdlCommand::CreateView(view, dependencies) => {
420                    ctrl.create_view(view, dependencies).await
421                }
422                DdlCommand::DropView(view_id, drop_mode) => {
423                    ctrl.drop_view(view_id, drop_mode).await
424                }
425                DdlCommand::CreateStreamingJob {
426                    stream_job,
427                    fragment_graph,
428                    dependencies,
429                    resource_type,
430                    if_not_exists,
431                } => {
432                    ctrl.create_streaming_job(
433                        stream_job,
434                        fragment_graph,
435                        dependencies,
436                        resource_type,
437                        if_not_exists,
438                    )
439                    .await
440                }
441                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
442                    ctrl.drop_streaming_job(job_id, drop_mode).await
443                }
444                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
445                    streaming_job,
446                    fragment_graph,
447                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
448                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
449                DdlCommand::AlterObjectOwner(object, owner_id) => {
450                    ctrl.alter_owner(object, owner_id).await
451                }
452                DdlCommand::AlterSetSchema(object, new_schema_id) => {
453                    ctrl.alter_set_schema(object, new_schema_id).await
454                }
455                DdlCommand::CreateConnection(connection) => {
456                    ctrl.create_connection(connection).await
457                }
458                DdlCommand::DropConnection(connection_id, drop_mode) => {
459                    ctrl.drop_connection(connection_id, drop_mode).await
460                }
461                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
462                DdlCommand::DropSecret(secret_id, drop_mode) => {
463                    ctrl.drop_secret(secret_id, drop_mode).await
464                }
465                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
466                DdlCommand::AlterNonSharedSource(source) => {
467                    ctrl.alter_non_shared_source(source).await
468                }
469                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
470                DdlCommand::CreateSubscription(subscription) => {
471                    ctrl.create_subscription(subscription).await
472                }
473                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
474                    ctrl.drop_subscription(subscription_id, drop_mode).await
475                }
476                DdlCommand::AlterSubscriptionRetention {
477                    subscription_id,
478                    retention_seconds,
479                    definition,
480                } => {
481                    ctrl.alter_subscription_retention(
482                        subscription_id,
483                        retention_seconds,
484                        definition,
485                    )
486                    .await
487                }
488                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
489                DdlCommand::AlterDatabaseParam(database_id, param) => {
490                    ctrl.alter_database_param(database_id, param).await
491                }
492                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
493                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
494                        .await
495                }
496            }
497        })
498        .in_current_span();
499        let fut = (self.env.await_tree_reg())
500            .register(await_tree_key, await_tree_span)
501            .instrument(Box::pin(fut));
502        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
503        Ok(Some(WaitVersion {
504            catalog_version: notification_version,
505            hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
506        }))
507    }
508
509    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
510        self.barrier_manager.get_ddl_progress().await
511    }
512
513    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
514        let (version, updated_db) = self
515            .metadata_manager
516            .catalog_controller
517            .create_database(database)
518            .await?;
519        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
520        self.barrier_manager
521            .update_database_barrier(
522                updated_db.database_id,
523                updated_db.barrier_interval_ms.map(|v| v as u32),
524                updated_db.checkpoint_frequency.map(|v| v as u64),
525            )
526            .await?;
527        Ok(version)
528    }
529
530    #[tracing::instrument(skip(self), level = "debug")]
531    pub async fn reschedule_streaming_job(
532        &self,
533        job_id: JobId,
534        target: ReschedulePolicy,
535        mut deferred: bool,
536    ) -> MetaResult<()> {
537        tracing::info!("altering parallelism for job {}", job_id);
538        if self.barrier_manager.check_status_running().is_err() {
539            tracing::info!(
540                "alter parallelism is set to deferred mode because the system is in recovery state"
541            );
542            deferred = true;
543        }
544
545        self.stream_manager
546            .reschedule_streaming_job(job_id, target, deferred)
547            .await
548    }
549
550    pub async fn reschedule_streaming_job_backfill_parallelism(
551        &self,
552        job_id: JobId,
553        parallelism: Option<StreamingParallelism>,
554        mut deferred: bool,
555    ) -> MetaResult<()> {
556        tracing::info!("altering backfill parallelism for job {}", job_id);
557        if self.barrier_manager.check_status_running().is_err() {
558            tracing::info!(
559                "alter backfill parallelism is set to deferred mode because the system is in recovery state"
560            );
561            deferred = true;
562        }
563
564        self.stream_manager
565            .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
566            .await
567    }
568
569    pub async fn reschedule_cdc_table_backfill(
570        &self,
571        job_id: JobId,
572        target: ReschedulePolicy,
573    ) -> MetaResult<()> {
574        tracing::info!("alter CDC table backfill parallelism");
575        if self.barrier_manager.check_status_running().is_err() {
576            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
577        }
578        self.stream_manager
579            .reschedule_cdc_table_backfill(job_id, target)
580            .await
581    }
582
583    pub async fn reschedule_fragments(
584        &self,
585        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
586    ) -> MetaResult<()> {
587        tracing::info!(
588            "altering parallelism for fragments {:?}",
589            fragment_targets.keys()
590        );
591        let fragment_targets = fragment_targets
592            .into_iter()
593            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
594            .collect();
595
596        self.stream_manager
597            .reschedule_fragments(fragment_targets)
598            .await
599    }
600
601    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
602        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
603            .await
604    }
605
606    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
607        self.metadata_manager
608            .catalog_controller
609            .create_schema(schema)
610            .await
611    }
612
613    async fn drop_schema(
614        &self,
615        schema_id: SchemaId,
616        drop_mode: DropMode,
617    ) -> MetaResult<NotificationVersion> {
618        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
619            .await
620    }
621
622    /// Shared source is handled in [`Self::create_streaming_job`]
623    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
624        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
625            .await
626            .context("failed to create source worker")?;
627
628        let (source_id, version) = self
629            .metadata_manager
630            .catalog_controller
631            .create_source(source)
632            .await?;
633        self.source_manager
634            .register_source_with_handle(source_id, handle)
635            .await;
636        Ok(version)
637    }
638
639    async fn drop_source(
640        &self,
641        source_id: SourceId,
642        drop_mode: DropMode,
643    ) -> MetaResult<NotificationVersion> {
644        self.drop_object(ObjectType::Source, source_id, drop_mode)
645            .await
646    }
647
648    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
649        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
650
651        // Get database_id for the source
652        let database_id = self
653            .metadata_manager
654            .catalog_controller
655            .get_object_database_id(source_id)
656            .await?;
657
658        self.stream_manager
659            .barrier_scheduler
660            .run_command(database_id, Command::ResetSource { source_id })
661            .await?;
662
663        // RESET SOURCE doesn't modify catalog, so return the current catalog version
664        let version = self
665            .metadata_manager
666            .catalog_controller
667            .notify_frontend_trivial()
668            .await;
669        Ok(version)
670    }
671
672    /// This replaces the source in the catalog.
673    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
674    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
675        self.metadata_manager
676            .catalog_controller
677            .alter_non_shared_source(source)
678            .await
679    }
680
681    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
682        self.metadata_manager
683            .catalog_controller
684            .create_function(function)
685            .await
686    }
687
688    async fn drop_function(
689        &self,
690        function_id: FunctionId,
691        drop_mode: DropMode,
692    ) -> MetaResult<NotificationVersion> {
693        self.drop_object(ObjectType::Function, function_id, drop_mode)
694            .await
695    }
696
697    async fn create_view(
698        &self,
699        view: View,
700        dependencies: HashSet<ObjectId>,
701    ) -> MetaResult<NotificationVersion> {
702        self.metadata_manager
703            .catalog_controller
704            .create_view(view, dependencies)
705            .await
706    }
707
708    async fn drop_view(
709        &self,
710        view_id: ViewId,
711        drop_mode: DropMode,
712    ) -> MetaResult<NotificationVersion> {
713        self.drop_object(ObjectType::View, view_id, drop_mode).await
714    }
715
716    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
717        validate_connection(&connection).await?;
718        self.metadata_manager
719            .catalog_controller
720            .create_connection(connection)
721            .await
722    }
723
724    async fn drop_connection(
725        &self,
726        connection_id: ConnectionId,
727        drop_mode: DropMode,
728    ) -> MetaResult<NotificationVersion> {
729        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
730            .await
731    }
732
733    async fn alter_database_param(
734        &self,
735        database_id: DatabaseId,
736        param: AlterDatabaseParam,
737    ) -> MetaResult<NotificationVersion> {
738        let (version, updated_db) = self
739            .metadata_manager
740            .catalog_controller
741            .alter_database_param(database_id, param)
742            .await?;
743        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
744        self.barrier_manager
745            .update_database_barrier(
746                database_id,
747                updated_db.barrier_interval_ms.map(|v| v as u32),
748                updated_db.checkpoint_frequency.map(|v| v as u64),
749            )
750            .await?;
751        Ok(version)
752    }
753
754    // The 'secret' part of the request we receive from the frontend is in plaintext;
755    // here, we need to encrypt it before storing it in the catalog.
756    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
757        let secret_store_private_key = self
758            .env
759            .opts
760            .secret_store_private_key
761            .clone()
762            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
763
764        let encrypted_payload = SecretEncryption::encrypt(
765            secret_store_private_key.as_slice(),
766            secret.get_value().as_slice(),
767        )
768        .context(format!("failed to encrypt secret {}", secret.name))?;
769        Ok(encrypted_payload
770            .serialize()
771            .context(format!("failed to serialize secret {}", secret.name))?)
772    }
773
774    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
775        // The 'secret' part of the request we receive from the frontend is in plaintext;
776        // here, we need to encrypt it before storing it in the catalog.
777        let secret_plain_payload = secret.value.clone();
778        let encrypted_payload = self.get_encrypted_payload(&secret)?;
779        secret.value = encrypted_payload;
780
781        self.metadata_manager
782            .catalog_controller
783            .create_secret(secret, secret_plain_payload)
784            .await
785    }
786
787    async fn drop_secret(
788        &self,
789        secret_id: SecretId,
790        drop_mode: DropMode,
791    ) -> MetaResult<NotificationVersion> {
792        self.drop_object(ObjectType::Secret, secret_id, drop_mode)
793            .await
794    }
795
796    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
797        let secret_plain_payload = secret.value.clone();
798        let encrypted_payload = self.get_encrypted_payload(&secret)?;
799        secret.value = encrypted_payload;
800        self.metadata_manager
801            .catalog_controller
802            .alter_secret(secret, secret_plain_payload)
803            .await
804    }
805
806    async fn create_subscription(
807        &self,
808        mut subscription: Subscription,
809    ) -> MetaResult<NotificationVersion> {
810        tracing::debug!("create subscription");
811        let _permit = self
812            .creating_streaming_job_permits
813            .semaphore
814            .acquire()
815            .await
816            .unwrap();
817        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
818        self.metadata_manager
819            .catalog_controller
820            .create_subscription_catalog(&mut subscription)
821            .await?;
822        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
823            tracing::debug!(error = %err.as_report(), "failed to create subscription");
824            let _ = self
825                .metadata_manager
826                .catalog_controller
827                .try_abort_creating_subscription(subscription.id)
828                .await
829                .inspect_err(|e| {
830                    tracing::error!(
831                        error = %e.as_report(),
832                        "failed to abort create subscription after failure"
833                    );
834                });
835            return Err(err);
836        }
837
838        let version = self
839            .metadata_manager
840            .catalog_controller
841            .notify_create_subscription(subscription.id)
842            .await?;
843        tracing::debug!("finish create subscription");
844        Ok(version)
845    }
846
847    async fn drop_subscription(
848        &self,
849        subscription_id: SubscriptionId,
850        drop_mode: DropMode,
851    ) -> MetaResult<NotificationVersion> {
852        tracing::debug!("preparing drop subscription");
853        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
854        let subscription = self
855            .metadata_manager
856            .catalog_controller
857            .get_subscription_by_id(subscription_id)
858            .await?;
859        let table_id = subscription.dependent_table_id;
860        let database_id = subscription.database_id;
861        let (_, version) = self
862            .metadata_manager
863            .catalog_controller
864            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
865            .await?;
866        self.stream_manager
867            .drop_subscription(database_id, subscription_id, table_id)
868            .await;
869        tracing::debug!("finish drop subscription");
870        Ok(version)
871    }
872
873    async fn alter_subscription_retention(
874        &self,
875        subscription_id: SubscriptionId,
876        retention_seconds: u64,
877        definition: String,
878    ) -> MetaResult<NotificationVersion> {
879        tracing::debug!("alter subscription retention");
880        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
881        let (version, subscription) = self
882            .metadata_manager
883            .catalog_controller
884            .alter_subscription_retention(subscription_id, retention_seconds, definition)
885            .await?;
886        self.stream_manager
887            .alter_subscription_retention(
888                subscription.database_id,
889                subscription.id,
890                subscription.dependent_table_id,
891                subscription.retention_seconds,
892            )
893            .await?;
894        tracing::debug!("finish alter subscription retention");
895        Ok(version)
896    }
897
898    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
899    #[await_tree::instrument]
900    pub(crate) async fn validate_cdc_table(
901        &self,
902        table: &Table,
903        table_fragments: &StreamJobFragments,
904    ) -> MetaResult<()> {
905        let stream_scan_fragment = table_fragments
906            .fragments
907            .values()
908            .filter(|f| {
909                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
910                    || f.fragment_type_mask
911                        .contains(FragmentTypeFlag::StreamCdcScan)
912            })
913            .exactly_one()
914            .ok()
915            .with_context(|| {
916                format!(
917                    "expect exactly one stream scan fragment, got: {:?}",
918                    table_fragments.fragments
919                )
920            })?;
921        fn assert_parallelism(
922            distribution_type: PbFragmentDistributionType,
923            node_body: &Option<NodeBody>,
924        ) {
925            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
926                if let Some(o) = node.options
927                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
928                {
929                    // Use parallel CDC backfill.
930                } else {
931                    assert_eq!(
932                        distribution_type,
933                        PbFragmentDistributionType::Single,
934                        "Non-parallelized CDC scan fragment should have Single distribution"
935                    );
936                }
937            }
938        }
939        let mut found_cdc_scan = false;
940        match &stream_scan_fragment.nodes.node_body {
941            Some(NodeBody::StreamCdcScan(_)) => {
942                assert_parallelism(
943                    stream_scan_fragment.distribution_type,
944                    &stream_scan_fragment.nodes.node_body,
945                );
946                if self
947                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
948                    .await?
949                {
950                    found_cdc_scan = true;
951                }
952            }
953            // When there's generated columns, the cdc scan node is wrapped in a project node
954            Some(NodeBody::Project(_)) => {
955                for input in &stream_scan_fragment.nodes.input {
956                    assert_parallelism(stream_scan_fragment.distribution_type, &input.node_body);
957                    if self
958                        .validate_cdc_table_inner(&input.node_body, table.id)
959                        .await?
960                    {
961                        found_cdc_scan = true;
962                    }
963                }
964            }
965            _ => {
966                bail!("Unexpected node body for stream cdc scan");
967            }
968        };
969        if !found_cdc_scan {
970            bail!("No stream cdc scan node found in stream scan fragment");
971        }
972        Ok(())
973    }
974
975    async fn validate_cdc_table_inner(
976        &self,
977        node_body: &Option<NodeBody>,
978        table_id: TableId,
979    ) -> MetaResult<bool> {
980        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
981            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
982        {
983            let options_with_secret = WithOptionsSecResolved::new(
984                cdc_table_desc.connect_properties.clone(),
985                cdc_table_desc.secret_refs.clone(),
986            );
987
988            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
989            props.init_from_pb_cdc_table_desc(cdc_table_desc);
990
991            // Try creating a split enumerator to validate
992            let _enumerator = props
993                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
994                .await?;
995
996            tracing::debug!(?table_id, "validate cdc table success");
997            Ok(true)
998        } else {
999            Ok(false)
1000        }
1001    }
1002
1003    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
1004        let migrated = self
1005            .metadata_manager
1006            .catalog_controller
1007            .has_table_been_migrated(table_id)
1008            .await?;
1009        if !migrated {
1010            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
1011        } else {
1012            Ok(())
1013        }
1014    }
1015
1016    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
1017    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
1018    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1019    pub async fn create_streaming_job(
1020        &self,
1021        mut streaming_job: StreamingJob,
1022        fragment_graph: StreamFragmentGraphProto,
1023        dependencies: HashSet<ObjectId>,
1024        resource_type: streaming_job_resource_type::ResourceType,
1025        if_not_exists: bool,
1026    ) -> MetaResult<NotificationVersion> {
1027        if let StreamingJob::Sink(sink) = &streaming_job
1028            && let Some(target_table) = sink.target_table
1029        {
1030            self.validate_table_for_sink(target_table).await?;
1031        }
1032        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1033        let streaming_job_model = match self
1034            .metadata_manager
1035            .catalog_controller
1036            .create_job_catalog(
1037                &mut streaming_job,
1038                &ctx,
1039                &fragment_graph.parallelism,
1040                fragment_graph.max_parallelism as _,
1041                dependencies,
1042                resource_type.clone(),
1043                &fragment_graph.backfill_parallelism,
1044            )
1045            .await
1046        {
1047            Ok(model) => model,
1048            Err(meta_err) => {
1049                if !if_not_exists {
1050                    return Err(meta_err);
1051                }
1052                return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1053                    if streaming_job.create_type() == CreateType::Foreground {
1054                        let database_id = streaming_job.database_id();
1055                        self.metadata_manager
1056                            .wait_streaming_job_finished(database_id, *job_id)
1057                            .await
1058                    } else {
1059                        Ok(IGNORED_NOTIFICATION_VERSION)
1060                    }
1061                } else {
1062                    Err(meta_err)
1063                };
1064            }
1065        };
1066        let job_id = streaming_job.id();
1067        tracing::debug!(
1068            id = %job_id,
1069            definition = streaming_job.definition(),
1070            create_type = streaming_job.create_type().as_str_name(),
1071            job_type = ?streaming_job.job_type(),
1072            "starting streaming job",
1073        );
1074        // TODO: acquire permits for recovered background DDLs.
1075        let permit = self
1076            .creating_streaming_job_permits
1077            .semaphore
1078            .clone()
1079            .acquire_owned()
1080            .instrument_await("acquire_creating_streaming_job_permit")
1081            .await
1082            .unwrap();
1083        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1084
1085        let name = streaming_job.name();
1086        let definition = streaming_job.definition();
1087        let source_id = match &streaming_job {
1088            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1089            _ => None,
1090        };
1091
1092        // create streaming job.
1093        match self
1094            .create_streaming_job_inner(
1095                ctx,
1096                streaming_job,
1097                fragment_graph,
1098                resource_type,
1099                permit,
1100                streaming_job_model,
1101            )
1102            .await
1103        {
1104            Ok(version) => Ok(version),
1105            Err(err) => {
1106                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1107                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1108                    id: job_id,
1109                    name,
1110                    definition,
1111                    error: err.as_report().to_string(),
1112                };
1113                self.env.event_log_manager_ref().add_event_logs(vec![
1114                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1115                ]);
1116                let (aborted, _) = self
1117                    .metadata_manager
1118                    .catalog_controller
1119                    .try_abort_creating_streaming_job(job_id, false)
1120                    .await?;
1121                if aborted {
1122                    tracing::warn!(id = %job_id, "aborted streaming job");
1123                    // FIXME: might also need other cleanup here
1124                    if let Some(source_id) = source_id {
1125                        self.source_manager
1126                            .apply_source_change(SourceChange::DropSource {
1127                                dropped_source_ids: vec![source_id],
1128                            })
1129                            .await;
1130                    }
1131                }
1132                Err(err)
1133            }
1134        }
1135    }
1136
1137    #[await_tree::instrument(boxed)]
1138    async fn create_streaming_job_inner(
1139        &self,
1140        ctx: StreamContext,
1141        mut streaming_job: StreamingJob,
1142        fragment_graph: StreamFragmentGraphProto,
1143        resource_type: streaming_job_resource_type::ResourceType,
1144        permit: OwnedSemaphorePermit,
1145        streaming_job_model: streaming_job::Model,
1146    ) -> MetaResult<NotificationVersion> {
1147        let mut fragment_graph =
1148            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1149        streaming_job.set_info_from_graph(&fragment_graph);
1150
1151        // create internal table catalogs and refill table id.
1152        let incomplete_internal_tables = fragment_graph
1153            .incomplete_internal_tables()
1154            .into_values()
1155            .collect_vec();
1156        let table_id_map = self
1157            .metadata_manager
1158            .catalog_controller
1159            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1160            .await?;
1161        fragment_graph.refill_internal_table_ids(table_id_map);
1162
1163        // create fragment and actor catalogs.
1164        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1165        let (ctx, stream_job_fragments) = self
1166            .build_stream_job(
1167                ctx,
1168                streaming_job,
1169                fragment_graph,
1170                resource_type,
1171                streaming_job_model,
1172            )
1173            .await?;
1174
1175        let streaming_job = &ctx.streaming_job;
1176
1177        match streaming_job {
1178            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1179                self.validate_cdc_table(table, &stream_job_fragments)
1180                    .await?;
1181            }
1182            StreamingJob::Table(Some(source), ..) => {
1183                // Register the source on the connector node.
1184                self.source_manager.register_source(source).await?;
1185                let connector_name = source
1186                    .get_with_properties()
1187                    .get(UPSTREAM_SOURCE_KEY)
1188                    .cloned();
1189                let attr = source.info.as_ref().map(|source_info| {
1190                    jsonbb::json!({
1191                            "format": source_info.format().as_str_name(),
1192                            "encode": source_info.row_encode().as_str_name(),
1193                    })
1194                });
1195                report_create_object(
1196                    streaming_job.id(),
1197                    "source",
1198                    PbTelemetryDatabaseObject::Source,
1199                    connector_name,
1200                    attr,
1201                );
1202            }
1203            StreamingJob::Sink(sink) => {
1204                if sink.auto_refresh_schema_from_table.is_some() {
1205                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1206                }
1207                // Validate the sink on the connector node.
1208                validate_sink(sink).await?;
1209                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1210                let attr = sink.format_desc.as_ref().map(|sink_info| {
1211                    jsonbb::json!({
1212                        "format": sink_info.format().as_str_name(),
1213                        "encode": sink_info.encode().as_str_name(),
1214                    })
1215                });
1216                report_create_object(
1217                    streaming_job.id(),
1218                    "sink",
1219                    PbTelemetryDatabaseObject::Sink,
1220                    connector_name,
1221                    attr,
1222                );
1223            }
1224            StreamingJob::Source(source) => {
1225                // Register the source on the connector node.
1226                self.source_manager.register_source(source).await?;
1227                let connector_name = source
1228                    .get_with_properties()
1229                    .get(UPSTREAM_SOURCE_KEY)
1230                    .cloned();
1231                let attr = source.info.as_ref().map(|source_info| {
1232                    jsonbb::json!({
1233                            "format": source_info.format().as_str_name(),
1234                            "encode": source_info.row_encode().as_str_name(),
1235                    })
1236                });
1237                report_create_object(
1238                    streaming_job.id(),
1239                    "source",
1240                    PbTelemetryDatabaseObject::Source,
1241                    connector_name,
1242                    attr,
1243                );
1244            }
1245            _ => {}
1246        }
1247
1248        let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1249        self.metadata_manager
1250            .catalog_controller
1251            .prepare_stream_job_fragments(
1252                &stream_job_fragments,
1253                streaming_job,
1254                false,
1255                Some(backfill_orders),
1256            )
1257            .await?;
1258
1259        // create streaming jobs.
1260        let version = self
1261            .stream_manager
1262            .create_streaming_job(stream_job_fragments, ctx, permit)
1263            .await?;
1264
1265        Ok(version)
1266    }
1267
1268    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1269    pub async fn drop_object(
1270        &self,
1271        object_type: ObjectType,
1272        object_id: impl Into<ObjectId>,
1273        drop_mode: DropMode,
1274    ) -> MetaResult<NotificationVersion> {
1275        let object_id = object_id.into();
1276        // Fence reschedule and source tick before catalog deletion so post-collect split updates
1277        // cannot race with dropped fragments.
1278        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1279        let _source_tick_pause_guard = self.source_manager.pause_tick().await;
1280
1281        let (release_ctx, version) = self
1282            .metadata_manager
1283            .catalog_controller
1284            .drop_object(object_type, object_id, drop_mode)
1285            .await?;
1286
1287        if object_type == ObjectType::Source {
1288            self.env
1289                .notification_manager_ref()
1290                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1291        }
1292
1293        let ReleaseContext {
1294            database_id,
1295            removed_streaming_job_ids,
1296            removed_state_table_ids,
1297            removed_source_ids,
1298            removed_secret_ids: secret_ids,
1299            removed_source_fragments,
1300            removed_fragments,
1301            removed_sink_fragment_by_targets,
1302            removed_iceberg_table_sinks,
1303        } = release_ctx;
1304
1305        // Notify serving module about deleted fragments so it can clean up serving vnode mappings.
1306        // This is driven by the fragment model deletion (cascade from Object::delete_many),
1307        // decoupled from the barrier-driven streaming mapping notifications.
1308        self.env
1309            .notification_manager_ref()
1310            .notify_serving_fragment_mapping_delete(
1311                removed_fragments.iter().map(|id| *id as _).collect(),
1312            );
1313
1314        self.stream_manager
1315            .drop_streaming_jobs(
1316                database_id,
1317                removed_streaming_job_ids,
1318                removed_state_table_ids,
1319                removed_fragments.iter().map(|id| *id as _).collect(),
1320                removed_sink_fragment_by_targets
1321                    .into_iter()
1322                    .map(|(target, sinks)| {
1323                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1324                    })
1325                    .collect(),
1326            )
1327            .await;
1328
1329        // clean up sources after dropping streaming jobs.
1330        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1331        self.source_manager
1332            .apply_source_change(SourceChange::DropSource {
1333                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1334            })
1335            .await;
1336
1337        // unregister fragments and actors from source manager.
1338        // FIXME: need also unregister source backfill fragments.
1339        let dropped_source_fragments = removed_source_fragments;
1340        self.source_manager
1341            .apply_source_change(SourceChange::DropMv {
1342                dropped_source_fragments,
1343            })
1344            .await;
1345
1346        // clean up iceberg table sinks
1347        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1348            .iter()
1349            .map(|sink| sink.id)
1350            .collect();
1351
1352        for sink in removed_iceberg_table_sinks {
1353            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1354                .expect("Iceberg sink should be valid");
1355            let iceberg_sink =
1356                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1357            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1358                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1359                tracing::info!(
1360                    "dropping iceberg table {} for dropped sink",
1361                    table_identifier
1362                );
1363
1364                let _ = iceberg_catalog
1365                    .drop_table(&table_identifier)
1366                    .await
1367                    .inspect_err(|err| {
1368                        tracing::error!(
1369                            "failed to drop iceberg table {} during cleanup: {}",
1370                            table_identifier,
1371                            err.as_report()
1372                        );
1373                    });
1374            }
1375        }
1376
1377        // stop sink coordinators for iceberg table sinks
1378        if !iceberg_sink_ids.is_empty() {
1379            self.sink_manager
1380                .stop_sink_coordinator(iceberg_sink_ids.clone())
1381                .await;
1382
1383            for sink_id in iceberg_sink_ids {
1384                self.iceberg_compaction_manager
1385                    .clear_iceberg_commits_by_sink_id(sink_id);
1386            }
1387        }
1388
1389        // remove secrets.
1390        for secret in secret_ids {
1391            LocalSecretManager::global().remove_secret(secret);
1392        }
1393        Ok(version)
1394    }
1395
1396    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1397    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1398    pub async fn replace_job(
1399        &self,
1400        mut streaming_job: StreamingJob,
1401        fragment_graph: StreamFragmentGraphProto,
1402    ) -> MetaResult<NotificationVersion> {
1403        match &streaming_job {
1404            StreamingJob::Table(..)
1405            | StreamingJob::Source(..)
1406            | StreamingJob::MaterializedView(..) => {}
1407            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1408                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1409            }
1410        }
1411
1412        let job_id = streaming_job.id();
1413
1414        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1415        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1416
1417        // Ensure the max parallelism unchanged before replacing table.
1418        let original_max_parallelism = self
1419            .metadata_manager
1420            .get_job_max_parallelism(streaming_job.id())
1421            .await?;
1422        let fragment_graph = PbStreamFragmentGraph {
1423            max_parallelism: original_max_parallelism as _,
1424            ..fragment_graph
1425        };
1426
1427        // 1. build fragment graph.
1428        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1429        streaming_job.set_info_from_graph(&fragment_graph);
1430
1431        // make it immutable
1432        let streaming_job = streaming_job;
1433
1434        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1435            let auto_refresh_schema_sinks = self
1436                .metadata_manager
1437                .catalog_controller
1438                .get_sink_auto_refresh_schema_from(table.id)
1439                .await?;
1440            if !auto_refresh_schema_sinks.is_empty() {
1441                let original_table_columns = self
1442                    .metadata_manager
1443                    .catalog_controller
1444                    .get_table_columns(table.id)
1445                    .await?;
1446                // compare column id to find newly added and removed columns
1447                let original_table_column_ids: HashSet<_> = original_table_columns
1448                    .iter()
1449                    .map(|col| col.column_id())
1450                    .collect();
1451                let new_table_column_ids: HashSet<_> = table
1452                    .columns
1453                    .iter()
1454                    .map(|col| ColumnId::new(col.column_desc.as_ref().unwrap().column_id as _))
1455                    .collect();
1456                let newly_added_columns = table
1457                    .columns
1458                    .iter()
1459                    .filter(|col| {
1460                        !original_table_column_ids.contains(&ColumnId::new(
1461                            col.column_desc.as_ref().unwrap().column_id as _,
1462                        ))
1463                    })
1464                    .map(|col| ColumnCatalog::from(col.clone()))
1465                    .collect_vec();
1466                let removed_columns = original_table_columns
1467                    .iter()
1468                    .filter(|col| !new_table_column_ids.contains(&col.column_id()))
1469                    .cloned()
1470                    .collect_vec();
1471                // TODO: Remove this guard when auto schema refresh sink fully supports drop column.
1472                if !removed_columns.is_empty() {
1473                    return Err(anyhow!(
1474                        "new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}",
1475                        table.columns,
1476                        original_table_columns,
1477                        removed_columns
1478                            .iter()
1479                            .map(|col| col.column_id())
1480                            .collect_vec()
1481                    )
1482                    .into());
1483                }
1484                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1485                for sink in auto_refresh_schema_sinks {
1486                    let sink_job_fragments = self
1487                        .metadata_manager
1488                        .get_job_fragments_by_id(sink.id.as_job_id())
1489                        .await?;
1490                    if sink_job_fragments.fragments.len() != 1 {
1491                        return Err(anyhow!(
1492                            "auto schema refresh sink must have only one fragment, but got {}",
1493                            sink_job_fragments.fragments.len()
1494                        )
1495                        .into());
1496                    }
1497                    let sink_ctx = sink_job_fragments.ctx;
1498                    let original_sink_fragment =
1499                        sink_job_fragments.fragments.into_values().next().unwrap();
1500                    let (new_sink_fragment, new_schema, new_log_store_table) =
1501                        rewrite_refresh_schema_sink_fragment(
1502                            &original_sink_fragment,
1503                            &sink,
1504                            &newly_added_columns,
1505                            &removed_columns,
1506                            table,
1507                            fragment_graph.table_fragment_id(),
1508                            self.env.id_gen_manager(),
1509                        )?;
1510
1511                    let streaming_job = StreamingJob::Sink(sink);
1512
1513                    let tmp_sink_model = self
1514                        .metadata_manager
1515                        .catalog_controller
1516                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1517                        .await?;
1518                    let tmp_sink_id = tmp_sink_model.job_id.as_sink_id();
1519                    let StreamingJob::Sink(sink) = streaming_job else {
1520                        unreachable!()
1521                    };
1522
1523                    sinks.push(AutoRefreshSchemaSinkContext {
1524                        tmp_sink_id,
1525                        original_sink: sink,
1526                        original_fragment: original_sink_fragment,
1527                        new_schema,
1528                        newly_add_fields: newly_added_columns
1529                            .iter()
1530                            .map(|col| Field::from(&col.column_desc))
1531                            .collect(),
1532                        removed_column_names: removed_columns
1533                            .iter()
1534                            .map(|col| col.name.clone())
1535                            .collect(),
1536                        new_fragment: new_sink_fragment,
1537                        new_log_store_table: new_log_store_table.map(Box::new),
1538                        ctx: sink_ctx,
1539                    });
1540                }
1541                Some(sinks)
1542            } else {
1543                None
1544            }
1545        } else {
1546            None
1547        };
1548
1549        let streaming_job_model = self
1550            .metadata_manager
1551            .catalog_controller
1552            .create_job_catalog_for_replace(
1553                &streaming_job,
1554                Some(&ctx),
1555                fragment_graph.specified_parallelism().as_ref(),
1556                Some(fragment_graph.max_parallelism()),
1557            )
1558            .await?;
1559        let tmp_id = streaming_job_model.job_id;
1560
1561        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1562            sinks
1563                .iter()
1564                .map(|sink| sink.tmp_sink_id.as_object_id())
1565                .collect_vec()
1566        });
1567
1568        tracing::debug!(id = %job_id, "building replace streaming job");
1569        let mut updated_sink_catalogs = vec![];
1570
1571        let mut drop_table_connector_ctx = None;
1572        let result: MetaResult<_> = try {
1573            let (mut ctx, mut stream_job_fragments) = self
1574                .build_replace_job(
1575                    ctx,
1576                    &streaming_job,
1577                    fragment_graph,
1578                    tmp_id,
1579                    auto_refresh_schema_sinks,
1580                    streaming_job_model,
1581                )
1582                .await?;
1583            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1584            let auto_refresh_schema_sink_finish_ctx =
1585                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1586                    sinks
1587                        .iter()
1588                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1589                            tmp_sink_id: sink.tmp_sink_id,
1590                            original_sink_id: sink.original_sink.id,
1591                            columns: sink.new_schema.clone(),
1592                            new_log_store_table: sink.new_log_store_table.clone(),
1593                        })
1594                        .collect()
1595                });
1596
1597            // Handle table that has incoming sinks.
1598            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1599                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1600                let upstream_infos = self
1601                    .metadata_manager
1602                    .catalog_controller
1603                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1604                    .await?;
1605                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1606
1607                for upstream_info in &upstream_infos {
1608                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1609                    ctx.upstream_fragment_downstreams
1610                        .entry(upstream_fragment_id)
1611                        .or_default()
1612                        .push(upstream_info.new_sink_downstream.clone());
1613                    if upstream_info.sink_original_target_columns.is_empty() {
1614                        updated_sink_catalogs.push(upstream_info.sink_id);
1615                    }
1616                }
1617            }
1618
1619            let replace_upstream = ctx.replace_upstream.clone();
1620
1621            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1622                let empty_downstreams = FragmentDownstreamRelation::default();
1623                for sink in sinks {
1624                    self.metadata_manager
1625                        .catalog_controller
1626                        .prepare_streaming_job(
1627                            sink.tmp_sink_id.as_job_id(),
1628                            || [&sink.new_fragment].into_iter(),
1629                            &empty_downstreams,
1630                            true,
1631                            None,
1632                            None,
1633                        )
1634                        .await?;
1635                }
1636            }
1637
1638            self.metadata_manager
1639                .catalog_controller
1640                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1641                .await?;
1642
1643            self.stream_manager
1644                .replace_stream_job(stream_job_fragments, ctx)
1645                .await?;
1646            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1647        };
1648
1649        match result {
1650            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1651                let version = self
1652                    .metadata_manager
1653                    .catalog_controller
1654                    .finish_replace_streaming_job(
1655                        tmp_id,
1656                        streaming_job,
1657                        replace_upstream,
1658                        SinkIntoTableContext {
1659                            updated_sink_catalogs,
1660                        },
1661                        drop_table_connector_ctx.as_ref(),
1662                        auto_refresh_schema_sink_finish_ctx,
1663                    )
1664                    .await?;
1665                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1666                    self.source_manager
1667                        .apply_source_change(SourceChange::DropSource {
1668                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1669                        })
1670                        .await;
1671                }
1672                Ok(version)
1673            }
1674            Err(err) => {
1675                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1676                let _ = self.metadata_manager
1677                    .catalog_controller
1678                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1679                    .await.inspect_err(|err| {
1680                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1681                });
1682                Err(err)
1683            }
1684        }
1685    }
1686
1687    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1688    )]
1689    async fn drop_streaming_job(
1690        &self,
1691        job_id: StreamingJobId,
1692        drop_mode: DropMode,
1693    ) -> MetaResult<NotificationVersion> {
1694        let (object_id, object_type) = match job_id {
1695            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1696            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1697            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1698            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1699        };
1700
1701        let job_status = self
1702            .metadata_manager
1703            .catalog_controller
1704            .get_streaming_job_status(job_id.id())
1705            .await?;
1706        let version = match job_status {
1707            JobStatus::Initial => {
1708                self.metadata_manager
1709                    .catalog_controller
1710                    .try_abort_creating_streaming_job(job_id.id(), true)
1711                    .await?;
1712                IGNORED_NOTIFICATION_VERSION
1713            }
1714            JobStatus::Creating => {
1715                self.stream_manager
1716                    .cancel_streaming_jobs(vec![job_id.id()])
1717                    .await?;
1718                IGNORED_NOTIFICATION_VERSION
1719            }
1720            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1721        };
1722
1723        Ok(version)
1724    }
1725
1726    /// Resolve the parallelism of the stream job based on the given information.
1727    ///
1728    /// Returns error if user specifies a parallelism that cannot be satisfied.
1729    fn resolve_stream_parallelism(
1730        &self,
1731        specified: Option<NonZeroUsize>,
1732        max: NonZeroUsize,
1733        cluster_info: &StreamingClusterInfo,
1734        resource_group: String,
1735    ) -> MetaResult<NonZeroUsize> {
1736        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1737        DdlController::resolve_stream_parallelism_inner(
1738            specified,
1739            max,
1740            available,
1741            &self.env.opts.default_parallelism,
1742            &resource_group,
1743        )
1744    }
1745
1746    fn resolve_stream_parallelism_inner(
1747        specified: Option<NonZeroUsize>,
1748        max: NonZeroUsize,
1749        available: Option<NonZeroUsize>,
1750        default_parallelism: &DefaultParallelism,
1751        resource_group: &str,
1752    ) -> MetaResult<NonZeroUsize> {
1753        let Some(available) = available else {
1754            bail_unavailable!(
1755                "no available slots to schedule in resource group \"{}\", \
1756                 have you allocated any compute nodes within this resource group?",
1757                resource_group
1758            );
1759        };
1760
1761        if let Some(specified) = specified {
1762            if specified > max {
1763                bail_invalid_parameter!(
1764                    "specified parallelism {} should not exceed max parallelism {}",
1765                    specified,
1766                    max,
1767                );
1768            }
1769            if specified > available {
1770                tracing::warn!(
1771                    resource_group,
1772                    specified_parallelism = specified.get(),
1773                    available_parallelism = available.get(),
1774                    "specified parallelism exceeds available slots, scheduling with specified value",
1775                );
1776            }
1777            return Ok(specified);
1778        }
1779
1780        // Use default parallelism when no specific parallelism is provided by the user.
1781        let default_parallelism = match default_parallelism {
1782            DefaultParallelism::Full => available,
1783            DefaultParallelism::Default(num) => {
1784                if *num > available {
1785                    tracing::warn!(
1786                        resource_group,
1787                        configured_parallelism = num.get(),
1788                        available_parallelism = available.get(),
1789                        "default parallelism exceeds available slots, scheduling with configured value",
1790                    );
1791                }
1792                *num
1793            }
1794        };
1795
1796        if default_parallelism > max {
1797            tracing::warn!(
1798                max_parallelism = max.get(),
1799                resource_group,
1800                "default parallelism exceeds max parallelism, capping to max",
1801            );
1802        }
1803        Ok(default_parallelism.min(max))
1804    }
1805
1806    /// Builds the actor graph:
1807    /// - Add the upstream fragments to the fragment graph
1808    /// - Schedule the fragments based on their distribution
1809    /// - Expand each fragment into one or several actors
1810    /// - Construct the fragment level backfill order control.
1811    #[await_tree::instrument]
1812    pub(crate) async fn build_stream_job(
1813        &self,
1814        stream_ctx: StreamContext,
1815        mut stream_job: StreamingJob,
1816        fragment_graph: StreamFragmentGraph,
1817        resource_type: streaming_job_resource_type::ResourceType,
1818        streaming_job_model: streaming_job::Model,
1819    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1820        let id = stream_job.id();
1821        let specified_parallelism = fragment_graph.specified_parallelism();
1822        let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1823        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1824
1825        // 1. Fragment Level ordering graph
1826        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1827
1828        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1829        // contains all information needed for building the actor graph.
1830
1831        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1832            fragment_graph.collect_snapshot_backfill_info()?;
1833        assert!(
1834            snapshot_backfill_info
1835                .iter()
1836                .chain([&cross_db_snapshot_backfill_info])
1837                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1838                .all(|backfill_epoch| backfill_epoch.is_none()),
1839            "should not set backfill epoch when initially build the job: {:?} {:?}",
1840            snapshot_backfill_info,
1841            cross_db_snapshot_backfill_info
1842        );
1843
1844        let locality_fragment_state_table_mapping =
1845            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1846
1847        // check if log store exists for all cross-db upstreams
1848        self.metadata_manager
1849            .catalog_controller
1850            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1851            .await?;
1852
1853        let upstream_table_ids = fragment_graph
1854            .dependent_table_ids()
1855            .iter()
1856            .filter(|id| {
1857                !cross_db_snapshot_backfill_info
1858                    .upstream_mv_table_id_to_backfill_epoch
1859                    .contains_key(*id)
1860            })
1861            .cloned()
1862            .collect();
1863
1864        let upstream_root_fragments = self
1865            .metadata_manager
1866            .get_upstream_root_fragments(&upstream_table_ids)
1867            .await?;
1868
1869        if snapshot_backfill_info.is_some() {
1870            match stream_job {
1871                StreamingJob::MaterializedView(_)
1872                | StreamingJob::Sink(_)
1873                | StreamingJob::Index(_, _) => {}
1874                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1875                    return Err(
1876                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1877                    );
1878                }
1879            }
1880        }
1881
1882        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1883            fragment_graph,
1884            FragmentGraphUpstreamContext {
1885                upstream_root_fragments,
1886            },
1887            (&stream_job).into(),
1888        )?;
1889        let resource_group = if let Some(group) = resource_type.resource_group() {
1890            group
1891        } else {
1892            self.metadata_manager
1893                .get_database_resource_group(stream_job.database_id())
1894                .await?
1895        };
1896        let is_serverless_backfill = matches!(
1897            &resource_type,
1898            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1899        );
1900
1901        // 3. Build the actor graph.
1902        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1903
1904        let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1905        let parallelism = self.resolve_stream_parallelism(
1906            initial_parallelism,
1907            max_parallelism,
1908            &cluster_info,
1909            resource_group.clone(),
1910        )?;
1911
1912        let parallelism = if initial_parallelism.is_some() {
1913            parallelism.get()
1914        } else {
1915            // Prefer job-level override for initial scheduling, fallback to system strategy.
1916            let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1917                Some(strategy) => strategy,
1918                None => self
1919                    .env
1920                    .system_params_reader()
1921                    .await
1922                    .adaptive_parallelism_strategy(),
1923            };
1924            adaptive_strategy.compute_target_parallelism(parallelism.get())
1925        };
1926
1927        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1928        let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, parallelism)?;
1929
1930        let ActorGraphBuildResult {
1931            graph,
1932            downstream_fragment_relations,
1933            upstream_fragment_downstreams,
1934            replace_upstream,
1935        } = actor_graph_builder.generate_graph()?;
1936        assert!(replace_upstream.is_empty());
1937
1938        // 4. Build the table fragments structure that will be persisted in the stream manager,
1939        // and the context that contains all information needed for building the
1940        // actors on the compute nodes.
1941
1942        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1943        // Otherwise, it defaults to FIXED based on deduction.
1944        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1945            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1946            _ => TableParallelism::Fixed(parallelism.get()),
1947        };
1948
1949        let stream_job_fragments = StreamJobFragments::new(
1950            id,
1951            graph,
1952            stream_ctx.clone(),
1953            table_parallelism,
1954            max_parallelism.get(),
1955        );
1956
1957        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1958            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1959        }
1960
1961        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1962            && let Ok(table_id) = sink.get_target_table()
1963        {
1964            let tables = self
1965                .metadata_manager
1966                .get_table_catalog_by_ids(&[*table_id])
1967                .await?;
1968            let target_table = tables
1969                .first()
1970                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1971            let sink_fragment = stream_job_fragments
1972                .sink_fragment()
1973                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1974            let mview_fragment_id = self
1975                .metadata_manager
1976                .catalog_controller
1977                .get_mview_fragment_by_id(table_id.as_job_id())
1978                .await?;
1979            let upstream_sink_info = build_upstream_sink_info(
1980                sink.id,
1981                sink.original_target_columns.clone(),
1982                sink_fragment.fragment_id as _,
1983                target_table,
1984                mview_fragment_id,
1985            )?;
1986            Some(upstream_sink_info)
1987        } else {
1988            None
1989        };
1990
1991        let mut cdc_table_snapshot_splits = None;
1992        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1993            && let Some((_, stream_cdc_scan)) =
1994                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1995        {
1996            {
1997                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1998                let splits = try_init_parallel_cdc_table_snapshot_splits(
1999                    table.id,
2000                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
2001                    self.env.meta_store_ref(),
2002                    stream_cdc_scan.options.as_ref().unwrap(),
2003                    self.env.opts.cdc_table_split_init_insert_batch_size,
2004                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
2005                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
2006                )
2007                .await?;
2008                cdc_table_snapshot_splits = Some(splits);
2009            }
2010        }
2011
2012        let ctx = CreateStreamingJobContext {
2013            upstream_fragment_downstreams,
2014            database_resource_group: resource_group,
2015            definition: stream_job.definition(),
2016            create_type: stream_job.create_type(),
2017            job_type: (&stream_job).into(),
2018            streaming_job: stream_job,
2019            new_upstream_sink,
2020            option: CreateStreamingJobOption {},
2021            snapshot_backfill_info,
2022            cross_db_snapshot_backfill_info,
2023            fragment_backfill_ordering,
2024            locality_fragment_state_table_mapping,
2025            cdc_table_snapshot_splits,
2026            is_serverless_backfill,
2027            streaming_job_model,
2028        };
2029
2030        Ok((
2031            ctx,
2032            StreamJobFragmentsToCreate {
2033                inner: stream_job_fragments,
2034                downstreams: downstream_fragment_relations,
2035            },
2036        ))
2037    }
2038
2039    /// `build_replace_table` builds a job replacement and returns the context and new job
2040    /// fragments.
2041    ///
2042    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
2043    /// replacement is finished.
2044    pub(crate) async fn build_replace_job(
2045        &self,
2046        stream_ctx: StreamContext,
2047        stream_job: &StreamingJob,
2048        mut fragment_graph: StreamFragmentGraph,
2049        tmp_job_id: JobId,
2050        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
2051        streaming_job_model: streaming_job::Model,
2052    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2053        match &stream_job {
2054            StreamingJob::Table(..)
2055            | StreamingJob::Source(..)
2056            | StreamingJob::MaterializedView(..) => {}
2057            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2058                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2059            }
2060        }
2061
2062        let id = stream_job.id();
2063
2064        // check if performing drop table connector
2065        let mut drop_table_associated_source_id = None;
2066        if let StreamingJob::Table(None, _, _) = &stream_job {
2067            drop_table_associated_source_id = self
2068                .metadata_manager
2069                .get_table_associated_source_id(id.as_mv_table_id())
2070                .await?;
2071        }
2072
2073        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
2074        let old_internal_table_ids = old_fragments.internal_table_ids();
2075
2076        // handle drop table's associated source
2077        let mut drop_table_connector_ctx = None;
2078        if let Some(to_remove_source_id) = drop_table_associated_source_id {
2079            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
2080            debug_assert!(old_internal_table_ids.len() == 1);
2081
2082            drop_table_connector_ctx = Some(DropTableConnectorContext {
2083                // we do not remove the original table catalog as it's still needed for the streaming job
2084                // just need to remove the ref to the state table
2085                to_change_streaming_job_id: id,
2086                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
2087                to_remove_source_id,
2088            });
2089        } else if stream_job.is_materialized_view() {
2090            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
2091            // but more robust.
2092            let old_fragments_upstreams = self
2093                .metadata_manager
2094                .catalog_controller
2095                .upstream_fragments(old_fragments.fragment_ids())
2096                .await?;
2097
2098            let old_state_graph =
2099                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2100            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2101            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2102                .context("incompatible altering on the streaming job states")?;
2103
2104            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2105            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2106        } else {
2107            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2108            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2109            let old_internal_tables = self
2110                .metadata_manager
2111                .get_table_catalog_by_ids(&old_internal_table_ids)
2112                .await?;
2113            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2114        }
2115
2116        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2117        // graph that contains all information needed for building the actor graph.
2118        let original_root_fragment = old_fragments
2119            .root_fragment()
2120            .expect("root fragment not found");
2121
2122        let job_type = StreamingJobType::from(stream_job);
2123
2124        // Extract the downstream fragments from the fragment graph.
2125        let mut downstream_fragments = self.metadata_manager.get_downstream_fragments(id).await?;
2126
2127        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2128            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2129                .iter()
2130                .map(|sink| sink.original_fragment.fragment_id)
2131                .collect();
2132            for (_, downstream_fragment) in &mut downstream_fragments {
2133                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2134                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2135                }) {
2136                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2137                    // Actor locations will be resolved inside barrier worker during rendering.
2138                    // For now, just replace fragment info and nodes.
2139                    *downstream_fragment = sink.new_fragment.clone();
2140                }
2141            }
2142            assert!(remaining_fragment.is_empty());
2143        }
2144
2145        // build complete graph based on the table job type
2146        let complete_graph = match &job_type {
2147            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2148                CompleteStreamFragmentGraph::with_downstreams(
2149                    fragment_graph,
2150                    FragmentGraphDownstreamContext {
2151                        original_root_fragment_id: original_root_fragment.fragment_id,
2152                        downstream_fragments,
2153                    },
2154                    job_type,
2155                )?
2156            }
2157            StreamingJobType::Table(TableJobType::SharedCdcSource)
2158            | StreamingJobType::MaterializedView => {
2159                // CDC tables or materialized views can have upstream jobs as well.
2160                let upstream_root_fragments = self
2161                    .metadata_manager
2162                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2163                    .await?;
2164
2165                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2166                    fragment_graph,
2167                    FragmentGraphUpstreamContext {
2168                        upstream_root_fragments,
2169                    },
2170                    FragmentGraphDownstreamContext {
2171                        original_root_fragment_id: original_root_fragment.fragment_id,
2172                        downstream_fragments,
2173                    },
2174                    job_type,
2175                )?
2176            }
2177            _ => unreachable!(),
2178        };
2179
2180        let resource_group = self
2181            .metadata_manager
2182            .get_database_resource_group(stream_job.database_id())
2183            .await?;
2184
2185        // XXX: what is this parallelism?
2186        // Is it "assigned parallelism"?
2187        let parallelism = NonZeroUsize::new(match old_fragments.assigned_parallelism {
2188            TableParallelism::Fixed(n) => n,
2189            TableParallelism::Adaptive | TableParallelism::Custom => 1,
2190        })
2191        .expect("The number of actors in the original table fragment should be greater than 0");
2192
2193        let actor_graph_builder = ActorGraphBuilder::new(id, complete_graph, parallelism)?;
2194
2195        let ActorGraphBuildResult {
2196            graph,
2197            downstream_fragment_relations,
2198            upstream_fragment_downstreams,
2199            mut replace_upstream,
2200        } = actor_graph_builder.generate_graph()?;
2201
2202        // general table & source does not have upstream job, so the dispatchers should be empty
2203        if matches!(
2204            job_type,
2205            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2206        ) {
2207            assert!(upstream_fragment_downstreams.is_empty());
2208        }
2209
2210        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2211        // the context that contains all information needed for building the actors on the compute
2212        // nodes.
2213        let stream_job_fragments = StreamJobFragments::new(
2214            tmp_job_id,
2215            graph,
2216            stream_ctx,
2217            old_fragments.assigned_parallelism,
2218            old_fragments.max_parallelism,
2219        );
2220
2221        if let Some(sinks) = &auto_refresh_schema_sinks {
2222            for sink in sinks {
2223                replace_upstream
2224                    .remove(&sink.new_fragment.fragment_id)
2225                    .expect("should exist");
2226            }
2227        }
2228
2229        // Note: no need to set `vnode_count` as it's already set by the frontend.
2230        // See `get_replace_table_plan`.
2231
2232        let ctx = ReplaceStreamJobContext {
2233            old_fragments,
2234            replace_upstream,
2235            upstream_fragment_downstreams,
2236            streaming_job: stream_job.clone(),
2237            database_resource_group: resource_group,
2238            tmp_id: tmp_job_id,
2239            drop_table_connector_ctx,
2240            auto_refresh_schema_sinks,
2241            streaming_job_model,
2242        };
2243
2244        Ok((
2245            ctx,
2246            StreamJobFragmentsToCreate {
2247                inner: stream_job_fragments,
2248                downstreams: downstream_fragment_relations,
2249            },
2250        ))
2251    }
2252
2253    async fn alter_name(
2254        &self,
2255        relation: alter_name_request::Object,
2256        new_name: &str,
2257    ) -> MetaResult<NotificationVersion> {
2258        let (obj_type, id): (ObjectType, ObjectId) = match relation {
2259            alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2260            alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2261            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2262            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2263            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2264            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2265            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2266            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2267        };
2268        self.metadata_manager
2269            .catalog_controller
2270            .alter_name(obj_type, id, new_name)
2271            .await
2272    }
2273
2274    async fn alter_swap_rename(
2275        &self,
2276        object: alter_swap_rename_request::Object,
2277    ) -> MetaResult<NotificationVersion> {
2278        let (obj_type, src_id, dst_id) = match object {
2279            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2280            alter_swap_rename_request::Object::Table(objs) => {
2281                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2282                (ObjectType::Table, src_id, dst_id)
2283            }
2284            alter_swap_rename_request::Object::View(objs) => {
2285                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2286                (ObjectType::View, src_id, dst_id)
2287            }
2288            alter_swap_rename_request::Object::Source(objs) => {
2289                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2290                (ObjectType::Source, src_id, dst_id)
2291            }
2292            alter_swap_rename_request::Object::Sink(objs) => {
2293                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2294                (ObjectType::Sink, src_id, dst_id)
2295            }
2296            alter_swap_rename_request::Object::Subscription(objs) => {
2297                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2298                (ObjectType::Subscription, src_id, dst_id)
2299            }
2300        };
2301
2302        self.metadata_manager
2303            .catalog_controller
2304            .alter_swap_rename(obj_type, src_id, dst_id)
2305            .await
2306    }
2307
2308    async fn alter_owner(
2309        &self,
2310        object: Object,
2311        owner_id: UserId,
2312    ) -> MetaResult<NotificationVersion> {
2313        let (obj_type, id): (ObjectType, ObjectId) = match object {
2314            Object::TableId(id) => (ObjectType::Table, id.into()),
2315            Object::ViewId(id) => (ObjectType::View, id.into()),
2316            Object::SourceId(id) => (ObjectType::Source, id.into()),
2317            Object::SinkId(id) => (ObjectType::Sink, id.into()),
2318            Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2319            Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2320            Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2321            Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2322            Object::FunctionId(id) => (ObjectType::Function, id.into()),
2323            Object::SecretId(id) => (ObjectType::Secret, id.into()),
2324        };
2325        self.metadata_manager
2326            .catalog_controller
2327            .alter_owner(obj_type, id, owner_id as _)
2328            .await
2329    }
2330
2331    async fn alter_set_schema(
2332        &self,
2333        object: alter_set_schema_request::Object,
2334        new_schema_id: SchemaId,
2335    ) -> MetaResult<NotificationVersion> {
2336        let (obj_type, id): (ObjectType, ObjectId) = match object {
2337            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2338            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2339            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2340            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2341            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2342            alter_set_schema_request::Object::ConnectionId(id) => {
2343                (ObjectType::Connection, id.into())
2344            }
2345            alter_set_schema_request::Object::SubscriptionId(id) => {
2346                (ObjectType::Subscription, id.into())
2347            }
2348        };
2349        self.metadata_manager
2350            .catalog_controller
2351            .alter_schema(obj_type, id, new_schema_id)
2352            .await
2353    }
2354
2355    pub async fn wait(&self, job_id: Option<JobId>) -> MetaResult<WaitVersion> {
2356        if let Some(job_id) = job_id {
2357            let database_id = self
2358                .metadata_manager
2359                .catalog_controller
2360                .get_object_database_id(job_id)
2361                .await?;
2362            let catalog_version = self
2363                .metadata_manager
2364                .wait_streaming_job_finished(database_id, job_id)
2365                .await?;
2366            let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2367            return Ok(WaitVersion {
2368                catalog_version,
2369                hummock_version_id,
2370            });
2371        }
2372
2373        let timeout_ms = 2 * 60 * 60 * 1000;
2374        let poll_interval = Duration::from_millis(100);
2375        for _ in 0..(timeout_ms / poll_interval.as_millis() as usize) {
2376            let background_jobs = self
2377                .metadata_manager
2378                .catalog_controller
2379                .list_background_creating_jobs(true, None)
2380                .await?;
2381            if background_jobs.is_empty() {
2382                let catalog_version = self
2383                    .metadata_manager
2384                    .catalog_controller
2385                    .notify_frontend_trivial()
2386                    .await;
2387                let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2388                return Ok(WaitVersion {
2389                    catalog_version,
2390                    hummock_version_id,
2391                });
2392            }
2393
2394            sleep(poll_interval).await;
2395        }
2396        Err(MetaError::cancelled(format!(
2397            "timeout after {timeout_ms}ms"
2398        )))
2399    }
2400
2401    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2402        self.metadata_manager
2403            .catalog_controller
2404            .comment_on(comment)
2405            .await
2406    }
2407
2408    async fn alter_streaming_job_config(
2409        &self,
2410        job_id: JobId,
2411        entries_to_add: HashMap<String, String>,
2412        keys_to_remove: Vec<String>,
2413    ) -> MetaResult<NotificationVersion> {
2414        self.metadata_manager
2415            .catalog_controller
2416            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2417            .await
2418    }
2419}
2420
2421fn report_create_object(
2422    job_id: JobId,
2423    event_name: &str,
2424    obj_type: PbTelemetryDatabaseObject,
2425    connector_name: Option<String>,
2426    attr_info: Option<jsonbb::Value>,
2427) {
2428    report_event(
2429        PbTelemetryEventStage::CreateStreamJob,
2430        event_name,
2431        job_id.as_raw_id() as _,
2432        connector_name,
2433        Some(obj_type),
2434        attr_info,
2435    );
2436}
2437
2438pub fn build_upstream_sink_info(
2439    sink_id: SinkId,
2440    original_target_columns: Vec<PbColumnCatalog>,
2441    sink_fragment_id: FragmentId,
2442    target_table: &PbTable,
2443    target_fragment_id: FragmentId,
2444) -> MetaResult<UpstreamSinkInfo> {
2445    let sink_columns = if !original_target_columns.is_empty() {
2446        original_target_columns.clone()
2447    } else {
2448        // This is due to the fact that the value did not exist in earlier versions,
2449        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2450        // Therefore the columns of the table at this point are `original_target_columns`.
2451        // This value of sink will be filled on the meta.
2452        target_table.columns.clone()
2453    };
2454
2455    let sink_output_fields = sink_columns
2456        .iter()
2457        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2458        .collect_vec();
2459    let output_indices = (0..sink_output_fields.len())
2460        .map(|i| i as u32)
2461        .collect_vec();
2462
2463    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2464        let sink_idx_by_col_id = sink_columns
2465            .iter()
2466            .enumerate()
2467            .map(|(idx, col)| {
2468                let column_id = col.column_desc.as_ref().unwrap().column_id;
2469                (column_id, idx as u32)
2470            })
2471            .collect::<HashMap<_, _>>();
2472        target_table
2473            .distribution_key
2474            .iter()
2475            .map(|dist_idx| {
2476                let column_id = target_table.columns[*dist_idx as usize]
2477                    .column_desc
2478                    .as_ref()
2479                    .unwrap()
2480                    .column_id;
2481                let sink_idx = sink_idx_by_col_id
2482                    .get(&column_id)
2483                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2484                Ok(*sink_idx)
2485            })
2486            .collect::<anyhow::Result<Vec<_>>>()?
2487    };
2488    let dist_key_indices =
2489        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2490    let downstream_fragment_id = target_fragment_id as _;
2491    let new_downstream_relation = DownstreamFragmentRelation {
2492        downstream_fragment_id,
2493        dispatcher_type: DispatcherType::Hash,
2494        dist_key_indices,
2495        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2496    };
2497    let current_target_columns = target_table.get_columns();
2498    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2499    Ok(UpstreamSinkInfo {
2500        sink_id,
2501        sink_fragment_id: sink_fragment_id as _,
2502        sink_output_fields,
2503        sink_original_target_columns: original_target_columns,
2504        project_exprs,
2505        new_sink_downstream: new_downstream_relation,
2506    })
2507}
2508
2509pub fn refill_upstream_sink_union_in_table(
2510    union_fragment_root: &mut PbStreamNode,
2511    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2512) {
2513    visit_stream_node_cont_mut(union_fragment_root, |node| {
2514        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2515            let init_upstreams = upstream_sink_infos
2516                .iter()
2517                .map(|info| PbUpstreamSinkInfo {
2518                    upstream_fragment_id: info.sink_fragment_id,
2519                    sink_output_schema: info.sink_output_fields.clone(),
2520                    project_exprs: info.project_exprs.clone(),
2521                })
2522                .collect();
2523            upstream_sink_union.init_upstreams = init_upstreams;
2524            false
2525        } else {
2526            true
2527        }
2528    });
2529}
2530
2531#[cfg(test)]
2532mod tests {
2533    use std::num::NonZeroUsize;
2534
2535    use super::*;
2536
2537    #[test]
2538    fn test_specified_parallelism_exceeds_available() {
2539        let result = DdlController::resolve_stream_parallelism_inner(
2540            Some(NonZeroUsize::new(100).unwrap()),
2541            NonZeroUsize::new(256).unwrap(),
2542            Some(NonZeroUsize::new(4).unwrap()),
2543            &DefaultParallelism::Full,
2544            "default",
2545        )
2546        .unwrap();
2547        assert_eq!(result.get(), 100);
2548    }
2549
2550    #[test]
2551    fn test_allows_default_parallelism_over_available() {
2552        let result = DdlController::resolve_stream_parallelism_inner(
2553            None,
2554            NonZeroUsize::new(256).unwrap(),
2555            Some(NonZeroUsize::new(4).unwrap()),
2556            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2557            "default",
2558        )
2559        .unwrap();
2560        assert_eq!(result.get(), 50);
2561    }
2562
2563    #[test]
2564    fn test_full_parallelism_capped_by_max() {
2565        let result = DdlController::resolve_stream_parallelism_inner(
2566            None,
2567            NonZeroUsize::new(6).unwrap(),
2568            Some(NonZeroUsize::new(10).unwrap()),
2569            &DefaultParallelism::Full,
2570            "default",
2571        )
2572        .unwrap();
2573        assert_eq!(result.get(), 6);
2574    }
2575
2576    #[test]
2577    fn test_no_available_slots_returns_error() {
2578        let result = DdlController::resolve_stream_parallelism_inner(
2579            None,
2580            NonZeroUsize::new(4).unwrap(),
2581            None,
2582            &DefaultParallelism::Full,
2583            "default",
2584        );
2585        assert!(matches!(
2586            result,
2587            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2588        ));
2589    }
2590
2591    #[test]
2592    fn test_specified_over_max_returns_error() {
2593        let result = DdlController::resolve_stream_parallelism_inner(
2594            Some(NonZeroUsize::new(8).unwrap()),
2595            NonZeroUsize::new(4).unwrap(),
2596            Some(NonZeroUsize::new(10).unwrap()),
2597            &DefaultParallelism::Full,
2598            "default",
2599        );
2600        assert!(matches!(
2601            result,
2602            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2603        ));
2604    }
2605}