risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83    NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
87    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
88    TableParallelism,
89};
90use crate::stream::cdc::{
91    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
92};
93use crate::stream::{
94    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
95    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
96    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
97    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
98    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
99    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
100};
101use crate::telemetry::report_event;
102use crate::{MetaError, MetaResult};
103
104#[derive(PartialEq)]
105pub enum DropMode {
106    Restrict,
107    Cascade,
108}
109
110impl DropMode {
111    pub fn from_request_setting(cascade: bool) -> DropMode {
112        if cascade {
113            DropMode::Cascade
114        } else {
115            DropMode::Restrict
116        }
117    }
118}
119
120#[derive(strum::AsRefStr)]
121pub enum StreamingJobId {
122    MaterializedView(TableId),
123    Sink(SinkId),
124    Table(Option<SourceId>, TableId),
125    Index(IndexId),
126}
127
128impl std::fmt::Display for StreamingJobId {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        write!(f, "{}", self.as_ref())?;
131        write!(f, "({})", self.id())
132    }
133}
134
135impl StreamingJobId {
136    fn id(&self) -> JobId {
137        match self {
138            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
139            StreamingJobId::Index(id) => id.as_job_id(),
140            StreamingJobId::Sink(id) => id.as_job_id(),
141        }
142    }
143}
144
145/// It’s used to describe the information of the job that needs to be replaced
146/// and it will be used during replacing table and creating sink into table operations.
147pub struct ReplaceStreamJobInfo {
148    pub streaming_job: StreamingJob,
149    pub fragment_graph: StreamFragmentGraphProto,
150}
151
152#[derive(Display)]
153pub enum DdlCommand {
154    CreateDatabase(Database),
155    DropDatabase(DatabaseId),
156    CreateSchema(Schema),
157    DropSchema(SchemaId, DropMode),
158    CreateNonSharedSource(Source),
159    DropSource(SourceId, DropMode),
160    ResetSource(SourceId),
161    CreateFunction(Function),
162    DropFunction(FunctionId, DropMode),
163    CreateView(View, HashSet<ObjectId>),
164    DropView(ViewId, DropMode),
165    CreateStreamingJob {
166        stream_job: StreamingJob,
167        fragment_graph: StreamFragmentGraphProto,
168        dependencies: HashSet<ObjectId>,
169        resource_type: streaming_job_resource_type::ResourceType,
170        if_not_exists: bool,
171    },
172    DropStreamingJob {
173        job_id: StreamingJobId,
174        drop_mode: DropMode,
175    },
176    AlterName(alter_name_request::Object, String),
177    AlterSwapRename(alter_swap_rename_request::Object),
178    ReplaceStreamJob(ReplaceStreamJobInfo),
179    AlterNonSharedSource(Source),
180    AlterObjectOwner(Object, UserId),
181    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
182    CreateConnection(Connection),
183    DropConnection(ConnectionId, DropMode),
184    CreateSecret(Secret),
185    AlterSecret(Secret),
186    DropSecret(SecretId, DropMode),
187    CommentOn(Comment),
188    CreateSubscription(Subscription),
189    DropSubscription(SubscriptionId, DropMode),
190    AlterSubscriptionRetention {
191        subscription_id: SubscriptionId,
192        retention_seconds: u64,
193        definition: String,
194    },
195    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
196    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
197}
198
199impl DdlCommand {
200    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
201    fn object(&self) -> Either<String, ObjectId> {
202        use Either::*;
203        match self {
204            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
205            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
206            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
207            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
208            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
209            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
210            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
211            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
212            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
213            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
214            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
215            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
216            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
217            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
218            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
219            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
220            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
221            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
222            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
223            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
224            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
225            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
226            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
227            DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
228            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
229            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
230            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
231            DdlCommand::AlterSubscriptionRetention {
232                subscription_id, ..
233            } => Right(subscription_id.as_object_id()),
234            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
235            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
236        }
237    }
238
239    fn allow_in_recovery(&self) -> bool {
240        match self {
241            DdlCommand::DropDatabase(_)
242            | DdlCommand::DropSchema(_, _)
243            | DdlCommand::DropSource(_, _)
244            | DdlCommand::DropFunction(_, _)
245            | DdlCommand::DropView(_, _)
246            | DdlCommand::DropStreamingJob { .. }
247            | DdlCommand::DropConnection(_, _)
248            | DdlCommand::DropSecret(_, _)
249            | DdlCommand::DropSubscription(_, _)
250            | DdlCommand::AlterName(_, _)
251            | DdlCommand::AlterObjectOwner(_, _)
252            | DdlCommand::AlterSetSchema(_, _)
253            | DdlCommand::CreateDatabase(_)
254            | DdlCommand::CreateSchema(_)
255            | DdlCommand::CreateFunction(_)
256            | DdlCommand::CreateView(_, _)
257            | DdlCommand::CreateConnection(_)
258            | DdlCommand::CommentOn(_)
259            | DdlCommand::CreateSecret(_)
260            | DdlCommand::AlterSecret(_)
261            | DdlCommand::AlterSwapRename(_)
262            | DdlCommand::AlterDatabaseParam(_, _)
263            | DdlCommand::AlterStreamingJobConfig(_, _, _)
264            | DdlCommand::AlterSubscriptionRetention { .. } => true,
265            DdlCommand::CreateStreamingJob { .. }
266            | DdlCommand::CreateNonSharedSource(_)
267            | DdlCommand::ReplaceStreamJob(_)
268            | DdlCommand::AlterNonSharedSource(_)
269            | DdlCommand::ResetSource(_)
270            | DdlCommand::CreateSubscription(_) => false,
271        }
272    }
273}
274
275#[derive(Clone)]
276pub struct DdlController {
277    pub(crate) env: MetaSrvEnv,
278
279    pub(crate) metadata_manager: MetadataManager,
280    pub(crate) stream_manager: GlobalStreamManagerRef,
281    pub(crate) source_manager: SourceManagerRef,
282    barrier_manager: BarrierManagerRef,
283    sink_manager: SinkCoordinatorManager,
284    iceberg_compaction_manager: IcebergCompactionManagerRef,
285
286    // The semaphore is used to limit the number of concurrent streaming job creation.
287    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
288
289    /// Sequence number for DDL commands, used for observability and debugging.
290    seq: Arc<AtomicU64>,
291}
292
293#[derive(Clone)]
294pub struct CreatingStreamingJobPermit {
295    pub(crate) semaphore: Arc<Semaphore>,
296}
297
298impl CreatingStreamingJobPermit {
299    async fn new(env: &MetaSrvEnv) -> Self {
300        let mut permits = env
301            .system_params_reader()
302            .await
303            .max_concurrent_creating_streaming_jobs() as usize;
304        if permits == 0 {
305            // if the system parameter is set to zero, use the max permitted value.
306            permits = Semaphore::MAX_PERMITS;
307        }
308        let semaphore = Arc::new(Semaphore::new(permits));
309
310        let (local_notification_tx, mut local_notification_rx) =
311            tokio::sync::mpsc::unbounded_channel();
312        env.notification_manager()
313            .insert_local_sender(local_notification_tx);
314        let semaphore_clone = semaphore.clone();
315        tokio::spawn(async move {
316            while let Some(notification) = local_notification_rx.recv().await {
317                let LocalNotification::SystemParamsChange(p) = &notification else {
318                    continue;
319                };
320                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
321                if new_permits == 0 {
322                    new_permits = Semaphore::MAX_PERMITS;
323                }
324                match permits.cmp(&new_permits) {
325                    Ordering::Less => {
326                        semaphore_clone.add_permits(new_permits - permits);
327                    }
328                    Ordering::Equal => continue,
329                    Ordering::Greater => {
330                        let to_release = permits - new_permits;
331                        let reduced = semaphore_clone.forget_permits(to_release);
332                        // TODO: implement dynamic semaphore with limits by ourself.
333                        if reduced != to_release {
334                            tracing::warn!(
335                                "no enough permits to release, expected {}, but reduced {}",
336                                to_release,
337                                reduced
338                            );
339                        }
340                    }
341                }
342                tracing::info!(
343                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
344                    permits,
345                    new_permits
346                );
347                permits = new_permits;
348            }
349        });
350
351        Self { semaphore }
352    }
353}
354
355impl DdlController {
356    pub async fn new(
357        env: MetaSrvEnv,
358        metadata_manager: MetadataManager,
359        stream_manager: GlobalStreamManagerRef,
360        source_manager: SourceManagerRef,
361        barrier_manager: BarrierManagerRef,
362        sink_manager: SinkCoordinatorManager,
363        iceberg_compaction_manager: IcebergCompactionManagerRef,
364    ) -> Self {
365        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
366        Self {
367            env,
368            metadata_manager,
369            stream_manager,
370            source_manager,
371            barrier_manager,
372            sink_manager,
373            iceberg_compaction_manager,
374            creating_streaming_job_permits,
375            seq: Arc::new(AtomicU64::new(0)),
376        }
377    }
378
379    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
380    pub fn next_seq(&self) -> u64 {
381        // This is a simple atomic increment operation.
382        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
383    }
384
385    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
386    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
387    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
388    /// would be a huge hassle and pain if we don't spawn here.
389    ///
390    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
391    #[allow(clippy::large_stack_frames)]
392    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
393        if !command.allow_in_recovery() {
394            self.barrier_manager.check_status_running()?;
395        }
396
397        let await_tree_key = format!("DDL Command {}", self.next_seq());
398        let await_tree_span = await_tree::span!("{command}({})", command.object());
399
400        let ctrl = self.clone();
401        let fut = Box::pin(async move {
402            match command {
403                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
404                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
405                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
406                DdlCommand::DropSchema(schema_id, drop_mode) => {
407                    ctrl.drop_schema(schema_id, drop_mode).await
408                }
409                DdlCommand::CreateNonSharedSource(source) => {
410                    ctrl.create_non_shared_source(source).await
411                }
412                DdlCommand::DropSource(source_id, drop_mode) => {
413                    ctrl.drop_source(source_id, drop_mode).await
414                }
415                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
416                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
417                DdlCommand::DropFunction(function_id, drop_mode) => {
418                    ctrl.drop_function(function_id, drop_mode).await
419                }
420                DdlCommand::CreateView(view, dependencies) => {
421                    ctrl.create_view(view, dependencies).await
422                }
423                DdlCommand::DropView(view_id, drop_mode) => {
424                    ctrl.drop_view(view_id, drop_mode).await
425                }
426                DdlCommand::CreateStreamingJob {
427                    stream_job,
428                    fragment_graph,
429                    dependencies,
430                    resource_type,
431                    if_not_exists,
432                } => {
433                    ctrl.create_streaming_job(
434                        stream_job,
435                        fragment_graph,
436                        dependencies,
437                        resource_type,
438                        if_not_exists,
439                    )
440                    .await
441                }
442                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
443                    ctrl.drop_streaming_job(job_id, drop_mode).await
444                }
445                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
446                    streaming_job,
447                    fragment_graph,
448                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
449                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
450                DdlCommand::AlterObjectOwner(object, owner_id) => {
451                    ctrl.alter_owner(object, owner_id).await
452                }
453                DdlCommand::AlterSetSchema(object, new_schema_id) => {
454                    ctrl.alter_set_schema(object, new_schema_id).await
455                }
456                DdlCommand::CreateConnection(connection) => {
457                    ctrl.create_connection(connection).await
458                }
459                DdlCommand::DropConnection(connection_id, drop_mode) => {
460                    ctrl.drop_connection(connection_id, drop_mode).await
461                }
462                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
463                DdlCommand::DropSecret(secret_id, drop_mode) => {
464                    ctrl.drop_secret(secret_id, drop_mode).await
465                }
466                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
467                DdlCommand::AlterNonSharedSource(source) => {
468                    ctrl.alter_non_shared_source(source).await
469                }
470                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
471                DdlCommand::CreateSubscription(subscription) => {
472                    ctrl.create_subscription(subscription).await
473                }
474                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
475                    ctrl.drop_subscription(subscription_id, drop_mode).await
476                }
477                DdlCommand::AlterSubscriptionRetention {
478                    subscription_id,
479                    retention_seconds,
480                    definition,
481                } => {
482                    ctrl.alter_subscription_retention(
483                        subscription_id,
484                        retention_seconds,
485                        definition,
486                    )
487                    .await
488                }
489                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
490                DdlCommand::AlterDatabaseParam(database_id, param) => {
491                    ctrl.alter_database_param(database_id, param).await
492                }
493                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
494                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
495                        .await
496                }
497            }
498        })
499        .in_current_span();
500        let fut = (self.env.await_tree_reg())
501            .register(await_tree_key, await_tree_span)
502            .instrument(Box::pin(fut));
503        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
504        Ok(Some(WaitVersion {
505            catalog_version: notification_version,
506            hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
507        }))
508    }
509
510    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
511        self.barrier_manager.get_ddl_progress().await
512    }
513
514    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
515        let (version, updated_db) = self
516            .metadata_manager
517            .catalog_controller
518            .create_database(database)
519            .await?;
520        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
521        self.barrier_manager
522            .update_database_barrier(
523                updated_db.database_id,
524                updated_db.barrier_interval_ms.map(|v| v as u32),
525                updated_db.checkpoint_frequency.map(|v| v as u64),
526            )
527            .await?;
528        Ok(version)
529    }
530
531    #[tracing::instrument(skip(self), level = "debug")]
532    pub async fn reschedule_streaming_job(
533        &self,
534        job_id: JobId,
535        target: ReschedulePolicy,
536        mut deferred: bool,
537    ) -> MetaResult<()> {
538        tracing::info!("altering parallelism for job {}", job_id);
539        if self.barrier_manager.check_status_running().is_err() {
540            tracing::info!(
541                "alter parallelism is set to deferred mode because the system is in recovery state"
542            );
543            deferred = true;
544        }
545
546        self.stream_manager
547            .reschedule_streaming_job(job_id, target, deferred)
548            .await
549    }
550
551    pub async fn reschedule_streaming_job_backfill_parallelism(
552        &self,
553        job_id: JobId,
554        parallelism: Option<StreamingParallelism>,
555        mut deferred: bool,
556    ) -> MetaResult<()> {
557        tracing::info!("altering backfill parallelism for job {}", job_id);
558        if self.barrier_manager.check_status_running().is_err() {
559            tracing::info!(
560                "alter backfill parallelism is set to deferred mode because the system is in recovery state"
561            );
562            deferred = true;
563        }
564
565        self.stream_manager
566            .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
567            .await
568    }
569
570    pub async fn reschedule_cdc_table_backfill(
571        &self,
572        job_id: JobId,
573        target: ReschedulePolicy,
574    ) -> MetaResult<()> {
575        tracing::info!("alter CDC table backfill parallelism");
576        if self.barrier_manager.check_status_running().is_err() {
577            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
578        }
579        self.stream_manager
580            .reschedule_cdc_table_backfill(job_id, target)
581            .await
582    }
583
584    pub async fn reschedule_fragments(
585        &self,
586        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
587    ) -> MetaResult<()> {
588        tracing::info!(
589            "altering parallelism for fragments {:?}",
590            fragment_targets.keys()
591        );
592        let fragment_targets = fragment_targets
593            .into_iter()
594            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
595            .collect();
596
597        self.stream_manager
598            .reschedule_fragments(fragment_targets)
599            .await
600    }
601
602    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
603        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
604            .await
605    }
606
607    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
608        self.metadata_manager
609            .catalog_controller
610            .create_schema(schema)
611            .await
612    }
613
614    async fn drop_schema(
615        &self,
616        schema_id: SchemaId,
617        drop_mode: DropMode,
618    ) -> MetaResult<NotificationVersion> {
619        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
620            .await
621    }
622
623    /// Shared source is handled in [`Self::create_streaming_job`]
624    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
625        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
626            .await
627            .context("failed to create source worker")?;
628
629        let (source_id, version) = self
630            .metadata_manager
631            .catalog_controller
632            .create_source(source)
633            .await?;
634        self.source_manager
635            .register_source_with_handle(source_id, handle)
636            .await;
637        Ok(version)
638    }
639
640    async fn drop_source(
641        &self,
642        source_id: SourceId,
643        drop_mode: DropMode,
644    ) -> MetaResult<NotificationVersion> {
645        self.drop_object(ObjectType::Source, source_id, drop_mode)
646            .await
647    }
648
649    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
650        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
651
652        // Get database_id for the source
653        let database_id = self
654            .metadata_manager
655            .catalog_controller
656            .get_object_database_id(source_id)
657            .await?;
658
659        self.stream_manager
660            .barrier_scheduler
661            .run_command(database_id, Command::ResetSource { source_id })
662            .await?;
663
664        // RESET SOURCE doesn't modify catalog, so return the current catalog version
665        let version = self
666            .metadata_manager
667            .catalog_controller
668            .notify_frontend_trivial()
669            .await;
670        Ok(version)
671    }
672
673    /// This replaces the source in the catalog.
674    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
675    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
676        self.metadata_manager
677            .catalog_controller
678            .alter_non_shared_source(source)
679            .await
680    }
681
682    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
683        self.metadata_manager
684            .catalog_controller
685            .create_function(function)
686            .await
687    }
688
689    async fn drop_function(
690        &self,
691        function_id: FunctionId,
692        drop_mode: DropMode,
693    ) -> MetaResult<NotificationVersion> {
694        self.drop_object(ObjectType::Function, function_id, drop_mode)
695            .await
696    }
697
698    async fn create_view(
699        &self,
700        view: View,
701        dependencies: HashSet<ObjectId>,
702    ) -> MetaResult<NotificationVersion> {
703        self.metadata_manager
704            .catalog_controller
705            .create_view(view, dependencies)
706            .await
707    }
708
709    async fn drop_view(
710        &self,
711        view_id: ViewId,
712        drop_mode: DropMode,
713    ) -> MetaResult<NotificationVersion> {
714        self.drop_object(ObjectType::View, view_id, drop_mode).await
715    }
716
717    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
718        validate_connection(&connection).await?;
719        self.metadata_manager
720            .catalog_controller
721            .create_connection(connection)
722            .await
723    }
724
725    async fn drop_connection(
726        &self,
727        connection_id: ConnectionId,
728        drop_mode: DropMode,
729    ) -> MetaResult<NotificationVersion> {
730        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
731            .await
732    }
733
734    async fn alter_database_param(
735        &self,
736        database_id: DatabaseId,
737        param: AlterDatabaseParam,
738    ) -> MetaResult<NotificationVersion> {
739        let (version, updated_db) = self
740            .metadata_manager
741            .catalog_controller
742            .alter_database_param(database_id, param)
743            .await?;
744        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
745        self.barrier_manager
746            .update_database_barrier(
747                database_id,
748                updated_db.barrier_interval_ms.map(|v| v as u32),
749                updated_db.checkpoint_frequency.map(|v| v as u64),
750            )
751            .await?;
752        Ok(version)
753    }
754
755    // The 'secret' part of the request we receive from the frontend is in plaintext;
756    // here, we need to encrypt it before storing it in the catalog.
757    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
758        let secret_store_private_key = self
759            .env
760            .opts
761            .secret_store_private_key
762            .clone()
763            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
764
765        let encrypted_payload = SecretEncryption::encrypt(
766            secret_store_private_key.as_slice(),
767            secret.get_value().as_slice(),
768        )
769        .context(format!("failed to encrypt secret {}", secret.name))?;
770        Ok(encrypted_payload
771            .serialize()
772            .context(format!("failed to serialize secret {}", secret.name))?)
773    }
774
775    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
776        // The 'secret' part of the request we receive from the frontend is in plaintext;
777        // here, we need to encrypt it before storing it in the catalog.
778        let secret_plain_payload = secret.value.clone();
779        let encrypted_payload = self.get_encrypted_payload(&secret)?;
780        secret.value = encrypted_payload;
781
782        self.metadata_manager
783            .catalog_controller
784            .create_secret(secret, secret_plain_payload)
785            .await
786    }
787
788    async fn drop_secret(
789        &self,
790        secret_id: SecretId,
791        drop_mode: DropMode,
792    ) -> MetaResult<NotificationVersion> {
793        self.drop_object(ObjectType::Secret, secret_id, drop_mode)
794            .await
795    }
796
797    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
798        let secret_plain_payload = secret.value.clone();
799        let encrypted_payload = self.get_encrypted_payload(&secret)?;
800        secret.value = encrypted_payload;
801        self.metadata_manager
802            .catalog_controller
803            .alter_secret(secret, secret_plain_payload)
804            .await
805    }
806
807    async fn create_subscription(
808        &self,
809        mut subscription: Subscription,
810    ) -> MetaResult<NotificationVersion> {
811        tracing::debug!("create subscription");
812        let _permit = self
813            .creating_streaming_job_permits
814            .semaphore
815            .acquire()
816            .await
817            .unwrap();
818        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
819        self.metadata_manager
820            .catalog_controller
821            .create_subscription_catalog(&mut subscription)
822            .await?;
823        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
824            tracing::debug!(error = %err.as_report(), "failed to create subscription");
825            let _ = self
826                .metadata_manager
827                .catalog_controller
828                .try_abort_creating_subscription(subscription.id)
829                .await
830                .inspect_err(|e| {
831                    tracing::error!(
832                        error = %e.as_report(),
833                        "failed to abort create subscription after failure"
834                    );
835                });
836            return Err(err);
837        }
838
839        let version = self
840            .metadata_manager
841            .catalog_controller
842            .notify_create_subscription(subscription.id)
843            .await?;
844        tracing::debug!("finish create subscription");
845        Ok(version)
846    }
847
848    async fn drop_subscription(
849        &self,
850        subscription_id: SubscriptionId,
851        drop_mode: DropMode,
852    ) -> MetaResult<NotificationVersion> {
853        tracing::debug!("preparing drop subscription");
854        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
855        let subscription = self
856            .metadata_manager
857            .catalog_controller
858            .get_subscription_by_id(subscription_id)
859            .await?;
860        let table_id = subscription.dependent_table_id;
861        let database_id = subscription.database_id;
862        let (_, version) = self
863            .metadata_manager
864            .catalog_controller
865            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
866            .await?;
867        self.stream_manager
868            .drop_subscription(database_id, subscription_id, table_id)
869            .await;
870        tracing::debug!("finish drop subscription");
871        Ok(version)
872    }
873
874    async fn alter_subscription_retention(
875        &self,
876        subscription_id: SubscriptionId,
877        retention_seconds: u64,
878        definition: String,
879    ) -> MetaResult<NotificationVersion> {
880        tracing::debug!("alter subscription retention");
881        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
882        let (version, subscription) = self
883            .metadata_manager
884            .catalog_controller
885            .alter_subscription_retention(subscription_id, retention_seconds, definition)
886            .await?;
887        self.stream_manager
888            .alter_subscription_retention(
889                subscription.database_id,
890                subscription.id,
891                subscription.dependent_table_id,
892                subscription.retention_seconds,
893            )
894            .await?;
895        tracing::debug!("finish alter subscription retention");
896        Ok(version)
897    }
898
899    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
900    #[await_tree::instrument]
901    pub(crate) async fn validate_cdc_table(
902        &self,
903        table: &Table,
904        table_fragments: &StreamJobFragments,
905    ) -> MetaResult<()> {
906        let stream_scan_fragment = table_fragments
907            .fragments
908            .values()
909            .filter(|f| {
910                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
911                    || f.fragment_type_mask
912                        .contains(FragmentTypeFlag::StreamCdcScan)
913            })
914            .exactly_one()
915            .ok()
916            .with_context(|| {
917                format!(
918                    "expect exactly one stream scan fragment, got: {:?}",
919                    table_fragments.fragments
920                )
921            })?;
922        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
923            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
924                if let Some(o) = node.options
925                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
926                {
927                    // Use parallel CDC backfill.
928                } else {
929                    assert_eq!(
930                        stream_scan_fragment.actors.len(),
931                        1,
932                        "Stream scan fragment should have only one actor"
933                    );
934                }
935            }
936        }
937        let mut found_cdc_scan = false;
938        match &stream_scan_fragment.nodes.node_body {
939            Some(NodeBody::StreamCdcScan(_)) => {
940                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
941                if self
942                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
943                    .await?
944                {
945                    found_cdc_scan = true;
946                }
947            }
948            // When there's generated columns, the cdc scan node is wrapped in a project node
949            Some(NodeBody::Project(_)) => {
950                for input in &stream_scan_fragment.nodes.input {
951                    assert_parallelism(stream_scan_fragment, &input.node_body);
952                    if self
953                        .validate_cdc_table_inner(&input.node_body, table.id)
954                        .await?
955                    {
956                        found_cdc_scan = true;
957                    }
958                }
959            }
960            _ => {
961                bail!("Unexpected node body for stream cdc scan");
962            }
963        };
964        if !found_cdc_scan {
965            bail!("No stream cdc scan node found in stream scan fragment");
966        }
967        Ok(())
968    }
969
970    async fn validate_cdc_table_inner(
971        &self,
972        node_body: &Option<NodeBody>,
973        table_id: TableId,
974    ) -> MetaResult<bool> {
975        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
976            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
977        {
978            let options_with_secret = WithOptionsSecResolved::new(
979                cdc_table_desc.connect_properties.clone(),
980                cdc_table_desc.secret_refs.clone(),
981            );
982
983            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
984            props.init_from_pb_cdc_table_desc(cdc_table_desc);
985
986            // Try creating a split enumerator to validate
987            let _enumerator = props
988                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
989                .await?;
990
991            tracing::debug!(?table_id, "validate cdc table success");
992            Ok(true)
993        } else {
994            Ok(false)
995        }
996    }
997
998    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
999        let migrated = self
1000            .metadata_manager
1001            .catalog_controller
1002            .has_table_been_migrated(table_id)
1003            .await?;
1004        if !migrated {
1005            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
1006        } else {
1007            Ok(())
1008        }
1009    }
1010
1011    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
1012    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
1013    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1014    pub async fn create_streaming_job(
1015        &self,
1016        mut streaming_job: StreamingJob,
1017        fragment_graph: StreamFragmentGraphProto,
1018        dependencies: HashSet<ObjectId>,
1019        resource_type: streaming_job_resource_type::ResourceType,
1020        if_not_exists: bool,
1021    ) -> MetaResult<NotificationVersion> {
1022        if let StreamingJob::Sink(sink) = &streaming_job
1023            && let Some(target_table) = sink.target_table
1024        {
1025            self.validate_table_for_sink(target_table).await?;
1026        }
1027        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1028        let check_ret = self
1029            .metadata_manager
1030            .catalog_controller
1031            .create_job_catalog(
1032                &mut streaming_job,
1033                &ctx,
1034                &fragment_graph.parallelism,
1035                fragment_graph.max_parallelism as _,
1036                dependencies,
1037                resource_type.clone(),
1038                &fragment_graph.backfill_parallelism,
1039            )
1040            .await;
1041        if let Err(meta_err) = check_ret {
1042            if !if_not_exists {
1043                return Err(meta_err);
1044            }
1045            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1046                if streaming_job.create_type() == CreateType::Foreground {
1047                    let database_id = streaming_job.database_id();
1048                    self.metadata_manager
1049                        .wait_streaming_job_finished(database_id, *job_id)
1050                        .await
1051                } else {
1052                    Ok(IGNORED_NOTIFICATION_VERSION)
1053                }
1054            } else {
1055                Err(meta_err)
1056            };
1057        }
1058        let job_id = streaming_job.id();
1059        tracing::debug!(
1060            id = %job_id,
1061            definition = streaming_job.definition(),
1062            create_type = streaming_job.create_type().as_str_name(),
1063            job_type = ?streaming_job.job_type(),
1064            "starting streaming job",
1065        );
1066        // TODO: acquire permits for recovered background DDLs.
1067        let permit = self
1068            .creating_streaming_job_permits
1069            .semaphore
1070            .clone()
1071            .acquire_owned()
1072            .instrument_await("acquire_creating_streaming_job_permit")
1073            .await
1074            .unwrap();
1075        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1076
1077        let name = streaming_job.name();
1078        let definition = streaming_job.definition();
1079        let source_id = match &streaming_job {
1080            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1081            _ => None,
1082        };
1083
1084        // create streaming job.
1085        match self
1086            .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1087            .await
1088        {
1089            Ok(version) => Ok(version),
1090            Err(err) => {
1091                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1092                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1093                    id: job_id,
1094                    name,
1095                    definition,
1096                    error: err.as_report().to_string(),
1097                };
1098                self.env.event_log_manager_ref().add_event_logs(vec![
1099                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1100                ]);
1101                let (aborted, _) = self
1102                    .metadata_manager
1103                    .catalog_controller
1104                    .try_abort_creating_streaming_job(job_id, false)
1105                    .await?;
1106                if aborted {
1107                    tracing::warn!(id = %job_id, "aborted streaming job");
1108                    // FIXME: might also need other cleanup here
1109                    if let Some(source_id) = source_id {
1110                        self.source_manager
1111                            .apply_source_change(SourceChange::DropSource {
1112                                dropped_source_ids: vec![source_id],
1113                            })
1114                            .await;
1115                    }
1116                }
1117                Err(err)
1118            }
1119        }
1120    }
1121
1122    #[await_tree::instrument(boxed)]
1123    async fn create_streaming_job_inner(
1124        &self,
1125        ctx: StreamContext,
1126        mut streaming_job: StreamingJob,
1127        fragment_graph: StreamFragmentGraphProto,
1128        resource_type: streaming_job_resource_type::ResourceType,
1129        permit: OwnedSemaphorePermit,
1130    ) -> MetaResult<NotificationVersion> {
1131        let mut fragment_graph =
1132            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1133        streaming_job.set_info_from_graph(&fragment_graph);
1134
1135        // create internal table catalogs and refill table id.
1136        let incomplete_internal_tables = fragment_graph
1137            .incomplete_internal_tables()
1138            .into_values()
1139            .collect_vec();
1140        let table_id_map = self
1141            .metadata_manager
1142            .catalog_controller
1143            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1144            .await?;
1145        fragment_graph.refill_internal_table_ids(table_id_map);
1146
1147        // create fragment and actor catalogs.
1148        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1149        let (ctx, stream_job_fragments) = self
1150            .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1151            .await?;
1152
1153        let streaming_job = &ctx.streaming_job;
1154
1155        match streaming_job {
1156            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1157                self.validate_cdc_table(table, &stream_job_fragments)
1158                    .await?;
1159            }
1160            StreamingJob::Table(Some(source), ..) => {
1161                // Register the source on the connector node.
1162                self.source_manager.register_source(source).await?;
1163                let connector_name = source
1164                    .get_with_properties()
1165                    .get(UPSTREAM_SOURCE_KEY)
1166                    .cloned();
1167                let attr = source.info.as_ref().map(|source_info| {
1168                    jsonbb::json!({
1169                            "format": source_info.format().as_str_name(),
1170                            "encode": source_info.row_encode().as_str_name(),
1171                    })
1172                });
1173                report_create_object(
1174                    streaming_job.id(),
1175                    "source",
1176                    PbTelemetryDatabaseObject::Source,
1177                    connector_name,
1178                    attr,
1179                );
1180            }
1181            StreamingJob::Sink(sink) => {
1182                if sink.auto_refresh_schema_from_table.is_some() {
1183                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1184                }
1185                // Validate the sink on the connector node.
1186                validate_sink(sink).await?;
1187                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1188                let attr = sink.format_desc.as_ref().map(|sink_info| {
1189                    jsonbb::json!({
1190                        "format": sink_info.format().as_str_name(),
1191                        "encode": sink_info.encode().as_str_name(),
1192                    })
1193                });
1194                report_create_object(
1195                    streaming_job.id(),
1196                    "sink",
1197                    PbTelemetryDatabaseObject::Sink,
1198                    connector_name,
1199                    attr,
1200                );
1201            }
1202            StreamingJob::Source(source) => {
1203                // Register the source on the connector node.
1204                self.source_manager.register_source(source).await?;
1205                let connector_name = source
1206                    .get_with_properties()
1207                    .get(UPSTREAM_SOURCE_KEY)
1208                    .cloned();
1209                let attr = source.info.as_ref().map(|source_info| {
1210                    jsonbb::json!({
1211                            "format": source_info.format().as_str_name(),
1212                            "encode": source_info.row_encode().as_str_name(),
1213                    })
1214                });
1215                report_create_object(
1216                    streaming_job.id(),
1217                    "source",
1218                    PbTelemetryDatabaseObject::Source,
1219                    connector_name,
1220                    attr,
1221                );
1222            }
1223            _ => {}
1224        }
1225
1226        let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1227        self.metadata_manager
1228            .catalog_controller
1229            .prepare_stream_job_fragments(
1230                &stream_job_fragments,
1231                streaming_job,
1232                false,
1233                Some(backfill_orders),
1234            )
1235            .await?;
1236
1237        // create streaming jobs.
1238        let version = self
1239            .stream_manager
1240            .create_streaming_job(stream_job_fragments, ctx, permit)
1241            .await?;
1242
1243        Ok(version)
1244    }
1245
1246    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1247    pub async fn drop_object(
1248        &self,
1249        object_type: ObjectType,
1250        object_id: impl Into<ObjectId>,
1251        drop_mode: DropMode,
1252    ) -> MetaResult<NotificationVersion> {
1253        let object_id = object_id.into();
1254        // Fence reschedule and source tick before catalog deletion so post-collect split updates
1255        // cannot race with dropped fragments.
1256        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1257        let _source_tick_pause_guard = self.source_manager.pause_tick().await;
1258
1259        let (release_ctx, version) = self
1260            .metadata_manager
1261            .catalog_controller
1262            .drop_object(object_type, object_id, drop_mode)
1263            .await?;
1264
1265        if object_type == ObjectType::Source {
1266            self.env
1267                .notification_manager_ref()
1268                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1269        }
1270
1271        let ReleaseContext {
1272            database_id,
1273            removed_streaming_job_ids,
1274            removed_state_table_ids,
1275            removed_source_ids,
1276            removed_secret_ids: secret_ids,
1277            removed_source_fragments,
1278            removed_actors,
1279            removed_fragments,
1280            removed_sink_fragment_by_targets,
1281            removed_iceberg_table_sinks,
1282        } = release_ctx;
1283
1284        self.stream_manager
1285            .drop_streaming_jobs(
1286                database_id,
1287                removed_actors.iter().map(|id| *id as _).collect(),
1288                removed_streaming_job_ids,
1289                removed_state_table_ids,
1290                removed_fragments.iter().map(|id| *id as _).collect(),
1291                removed_sink_fragment_by_targets
1292                    .into_iter()
1293                    .map(|(target, sinks)| {
1294                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1295                    })
1296                    .collect(),
1297            )
1298            .await;
1299
1300        // clean up sources after dropping streaming jobs.
1301        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1302        self.source_manager
1303            .apply_source_change(SourceChange::DropSource {
1304                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1305            })
1306            .await;
1307
1308        // unregister fragments and actors from source manager.
1309        // FIXME: need also unregister source backfill fragments.
1310        let dropped_source_fragments = removed_source_fragments;
1311        self.source_manager
1312            .apply_source_change(SourceChange::DropMv {
1313                dropped_source_fragments,
1314            })
1315            .await;
1316
1317        // clean up iceberg table sinks
1318        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1319            .iter()
1320            .map(|sink| sink.id)
1321            .collect();
1322
1323        for sink in removed_iceberg_table_sinks {
1324            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1325                .expect("Iceberg sink should be valid");
1326            let iceberg_sink =
1327                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1328            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1329                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1330                tracing::info!(
1331                    "dropping iceberg table {} for dropped sink",
1332                    table_identifier
1333                );
1334
1335                let _ = iceberg_catalog
1336                    .drop_table(&table_identifier)
1337                    .await
1338                    .inspect_err(|err| {
1339                        tracing::error!(
1340                            "failed to drop iceberg table {} during cleanup: {}",
1341                            table_identifier,
1342                            err.as_report()
1343                        );
1344                    });
1345            }
1346        }
1347
1348        // stop sink coordinators for iceberg table sinks
1349        if !iceberg_sink_ids.is_empty() {
1350            self.sink_manager
1351                .stop_sink_coordinator(iceberg_sink_ids.clone())
1352                .await;
1353
1354            for sink_id in iceberg_sink_ids {
1355                self.iceberg_compaction_manager
1356                    .clear_iceberg_commits_by_sink_id(sink_id);
1357            }
1358        }
1359
1360        // remove secrets.
1361        for secret in secret_ids {
1362            LocalSecretManager::global().remove_secret(secret);
1363        }
1364        Ok(version)
1365    }
1366
1367    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1368    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1369    pub async fn replace_job(
1370        &self,
1371        mut streaming_job: StreamingJob,
1372        fragment_graph: StreamFragmentGraphProto,
1373    ) -> MetaResult<NotificationVersion> {
1374        match &streaming_job {
1375            StreamingJob::Table(..)
1376            | StreamingJob::Source(..)
1377            | StreamingJob::MaterializedView(..) => {}
1378            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1379                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1380            }
1381        }
1382
1383        let job_id = streaming_job.id();
1384
1385        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1386        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1387
1388        // Ensure the max parallelism unchanged before replacing table.
1389        let original_max_parallelism = self
1390            .metadata_manager
1391            .get_job_max_parallelism(streaming_job.id())
1392            .await?;
1393        let fragment_graph = PbStreamFragmentGraph {
1394            max_parallelism: original_max_parallelism as _,
1395            ..fragment_graph
1396        };
1397
1398        // 1. build fragment graph.
1399        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1400        streaming_job.set_info_from_graph(&fragment_graph);
1401
1402        // make it immutable
1403        let streaming_job = streaming_job;
1404
1405        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1406            let auto_refresh_schema_sinks = self
1407                .metadata_manager
1408                .catalog_controller
1409                .get_sink_auto_refresh_schema_from(table.id)
1410                .await?;
1411            if !auto_refresh_schema_sinks.is_empty() {
1412                let original_table_columns = self
1413                    .metadata_manager
1414                    .catalog_controller
1415                    .get_table_columns(table.id)
1416                    .await?;
1417                // compare column id to find newly added columns
1418                let mut original_table_column_ids: HashSet<_> = original_table_columns
1419                    .iter()
1420                    .map(|col| col.column_id())
1421                    .collect();
1422                let newly_added_columns = table
1423                    .columns
1424                    .iter()
1425                    .filter(|col| {
1426                        !original_table_column_ids.remove(&ColumnId::new(
1427                            col.column_desc.as_ref().unwrap().column_id as _,
1428                        ))
1429                    })
1430                    .map(|col| ColumnCatalog::from(col.clone()))
1431                    .collect_vec();
1432                if !original_table_column_ids.is_empty() {
1433                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1434                }
1435                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1436                for sink in auto_refresh_schema_sinks {
1437                    let sink_job_fragments = self
1438                        .metadata_manager
1439                        .get_job_fragments_by_id(sink.id.as_job_id())
1440                        .await?;
1441                    if sink_job_fragments.fragments.len() != 1 {
1442                        return Err(anyhow!(
1443                            "auto schema refresh sink must have only one fragment, but got {}",
1444                            sink_job_fragments.fragments.len()
1445                        )
1446                        .into());
1447                    }
1448                    let original_sink_fragment =
1449                        sink_job_fragments.fragments.into_values().next().unwrap();
1450                    let (new_sink_fragment, new_schema, new_log_store_table) =
1451                        rewrite_refresh_schema_sink_fragment(
1452                            &original_sink_fragment,
1453                            &sink,
1454                            &newly_added_columns,
1455                            table,
1456                            fragment_graph.table_fragment_id(),
1457                            self.env.id_gen_manager(),
1458                            self.env.actor_id_generator(),
1459                        )?;
1460
1461                    assert_eq!(
1462                        original_sink_fragment.actors.len(),
1463                        new_sink_fragment.actors.len()
1464                    );
1465                    let actor_status = (0..original_sink_fragment.actors.len())
1466                        .map(|i| {
1467                            let worker_node_id = sink_job_fragments.actor_status
1468                                [&original_sink_fragment.actors[i].actor_id]
1469                                .location
1470                                .as_ref()
1471                                .unwrap()
1472                                .worker_node_id;
1473                            (
1474                                new_sink_fragment.actors[i].actor_id,
1475                                PbActorStatus {
1476                                    location: Some(PbActorLocation { worker_node_id }),
1477                                },
1478                            )
1479                        })
1480                        .collect();
1481
1482                    let streaming_job = StreamingJob::Sink(sink);
1483
1484                    let tmp_sink_id = self
1485                        .metadata_manager
1486                        .catalog_controller
1487                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1488                        .await?
1489                        .as_sink_id();
1490                    let StreamingJob::Sink(sink) = streaming_job else {
1491                        unreachable!()
1492                    };
1493
1494                    sinks.push(AutoRefreshSchemaSinkContext {
1495                        tmp_sink_id,
1496                        original_sink: sink,
1497                        original_fragment: original_sink_fragment,
1498                        new_schema,
1499                        newly_add_fields: newly_added_columns
1500                            .iter()
1501                            .map(|col| Field::from(&col.column_desc))
1502                            .collect(),
1503                        new_fragment: new_sink_fragment,
1504                        new_log_store_table,
1505                        actor_status,
1506                    });
1507                }
1508                Some(sinks)
1509            } else {
1510                None
1511            }
1512        } else {
1513            None
1514        };
1515
1516        let tmp_id = self
1517            .metadata_manager
1518            .catalog_controller
1519            .create_job_catalog_for_replace(
1520                &streaming_job,
1521                Some(&ctx),
1522                fragment_graph.specified_parallelism().as_ref(),
1523                Some(fragment_graph.max_parallelism()),
1524            )
1525            .await?;
1526
1527        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1528            sinks
1529                .iter()
1530                .map(|sink| sink.tmp_sink_id.as_object_id())
1531                .collect_vec()
1532        });
1533
1534        tracing::debug!(id = %job_id, "building replace streaming job");
1535        let mut updated_sink_catalogs = vec![];
1536
1537        let mut drop_table_connector_ctx = None;
1538        let result: MetaResult<_> = try {
1539            let (mut ctx, mut stream_job_fragments) = self
1540                .build_replace_job(
1541                    ctx,
1542                    &streaming_job,
1543                    fragment_graph,
1544                    tmp_id,
1545                    auto_refresh_schema_sinks,
1546                )
1547                .await?;
1548            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1549            let auto_refresh_schema_sink_finish_ctx =
1550                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1551                    sinks
1552                        .iter()
1553                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1554                            tmp_sink_id: sink.tmp_sink_id,
1555                            original_sink_id: sink.original_sink.id,
1556                            columns: sink.new_schema.clone(),
1557                            new_log_store_table: sink
1558                                .new_log_store_table
1559                                .as_ref()
1560                                .map(|table| (table.id, table.columns.clone())),
1561                        })
1562                        .collect()
1563                });
1564
1565            // Handle table that has incoming sinks.
1566            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1567                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1568                let upstream_infos = self
1569                    .metadata_manager
1570                    .catalog_controller
1571                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1572                    .await?;
1573                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1574
1575                for upstream_info in &upstream_infos {
1576                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1577                    ctx.upstream_fragment_downstreams
1578                        .entry(upstream_fragment_id)
1579                        .or_default()
1580                        .push(upstream_info.new_sink_downstream.clone());
1581                    if upstream_info.sink_original_target_columns.is_empty() {
1582                        updated_sink_catalogs.push(upstream_info.sink_id);
1583                    }
1584                }
1585            }
1586
1587            let replace_upstream = ctx.replace_upstream.clone();
1588
1589            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1590                let empty_downstreams = FragmentDownstreamRelation::default();
1591                for sink in sinks {
1592                    self.metadata_manager
1593                        .catalog_controller
1594                        .prepare_streaming_job(
1595                            sink.tmp_sink_id.as_job_id(),
1596                            || [&sink.new_fragment].into_iter(),
1597                            &empty_downstreams,
1598                            true,
1599                            None,
1600                            None,
1601                        )
1602                        .await?;
1603                }
1604            }
1605
1606            self.metadata_manager
1607                .catalog_controller
1608                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1609                .await?;
1610
1611            self.stream_manager
1612                .replace_stream_job(stream_job_fragments, ctx)
1613                .await?;
1614            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1615        };
1616
1617        match result {
1618            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1619                let version = self
1620                    .metadata_manager
1621                    .catalog_controller
1622                    .finish_replace_streaming_job(
1623                        tmp_id,
1624                        streaming_job,
1625                        replace_upstream,
1626                        SinkIntoTableContext {
1627                            updated_sink_catalogs,
1628                        },
1629                        drop_table_connector_ctx.as_ref(),
1630                        auto_refresh_schema_sink_finish_ctx,
1631                    )
1632                    .await?;
1633                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1634                    self.source_manager
1635                        .apply_source_change(SourceChange::DropSource {
1636                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1637                        })
1638                        .await;
1639                }
1640                Ok(version)
1641            }
1642            Err(err) => {
1643                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1644                let _ = self.metadata_manager
1645                    .catalog_controller
1646                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1647                    .await.inspect_err(|err| {
1648                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1649                });
1650                Err(err)
1651            }
1652        }
1653    }
1654
1655    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1656    )]
1657    async fn drop_streaming_job(
1658        &self,
1659        job_id: StreamingJobId,
1660        drop_mode: DropMode,
1661    ) -> MetaResult<NotificationVersion> {
1662        let (object_id, object_type) = match job_id {
1663            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1664            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1665            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1666            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1667        };
1668
1669        let job_status = self
1670            .metadata_manager
1671            .catalog_controller
1672            .get_streaming_job_status(job_id.id())
1673            .await?;
1674        let version = match job_status {
1675            JobStatus::Initial => {
1676                unreachable!(
1677                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1678                );
1679            }
1680            JobStatus::Creating => {
1681                self.stream_manager
1682                    .cancel_streaming_jobs(vec![job_id.id()])
1683                    .await?;
1684                IGNORED_NOTIFICATION_VERSION
1685            }
1686            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1687        };
1688
1689        Ok(version)
1690    }
1691
1692    /// Resolve the parallelism of the stream job based on the given information.
1693    ///
1694    /// Returns error if user specifies a parallelism that cannot be satisfied.
1695    fn resolve_stream_parallelism(
1696        &self,
1697        specified: Option<NonZeroUsize>,
1698        max: NonZeroUsize,
1699        cluster_info: &StreamingClusterInfo,
1700        resource_group: String,
1701    ) -> MetaResult<NonZeroUsize> {
1702        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1703        DdlController::resolve_stream_parallelism_inner(
1704            specified,
1705            max,
1706            available,
1707            &self.env.opts.default_parallelism,
1708            &resource_group,
1709        )
1710    }
1711
1712    fn resolve_stream_parallelism_inner(
1713        specified: Option<NonZeroUsize>,
1714        max: NonZeroUsize,
1715        available: Option<NonZeroUsize>,
1716        default_parallelism: &DefaultParallelism,
1717        resource_group: &str,
1718    ) -> MetaResult<NonZeroUsize> {
1719        let Some(available) = available else {
1720            bail_unavailable!(
1721                "no available slots to schedule in resource group \"{}\", \
1722                 have you allocated any compute nodes within this resource group?",
1723                resource_group
1724            );
1725        };
1726
1727        if let Some(specified) = specified {
1728            if specified > max {
1729                bail_invalid_parameter!(
1730                    "specified parallelism {} should not exceed max parallelism {}",
1731                    specified,
1732                    max,
1733                );
1734            }
1735            if specified > available {
1736                tracing::warn!(
1737                    resource_group,
1738                    specified_parallelism = specified.get(),
1739                    available_parallelism = available.get(),
1740                    "specified parallelism exceeds available slots, scheduling with specified value",
1741                );
1742            }
1743            return Ok(specified);
1744        }
1745
1746        // Use default parallelism when no specific parallelism is provided by the user.
1747        let default_parallelism = match default_parallelism {
1748            DefaultParallelism::Full => available,
1749            DefaultParallelism::Default(num) => {
1750                if *num > available {
1751                    tracing::warn!(
1752                        resource_group,
1753                        configured_parallelism = num.get(),
1754                        available_parallelism = available.get(),
1755                        "default parallelism exceeds available slots, scheduling with configured value",
1756                    );
1757                }
1758                *num
1759            }
1760        };
1761
1762        if default_parallelism > max {
1763            tracing::warn!(
1764                max_parallelism = max.get(),
1765                resource_group,
1766                "default parallelism exceeds max parallelism, capping to max",
1767            );
1768        }
1769        Ok(default_parallelism.min(max))
1770    }
1771
1772    /// Builds the actor graph:
1773    /// - Add the upstream fragments to the fragment graph
1774    /// - Schedule the fragments based on their distribution
1775    /// - Expand each fragment into one or several actors
1776    /// - Construct the fragment level backfill order control.
1777    #[await_tree::instrument]
1778    pub(crate) async fn build_stream_job(
1779        &self,
1780        stream_ctx: StreamContext,
1781        mut stream_job: StreamingJob,
1782        fragment_graph: StreamFragmentGraph,
1783        resource_type: streaming_job_resource_type::ResourceType,
1784    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1785        let id = stream_job.id();
1786        let specified_parallelism = fragment_graph.specified_parallelism();
1787        let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1788        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1789
1790        // 1. Fragment Level ordering graph
1791        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1792
1793        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1794        // contains all information needed for building the actor graph.
1795
1796        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1797            fragment_graph.collect_snapshot_backfill_info()?;
1798        assert!(
1799            snapshot_backfill_info
1800                .iter()
1801                .chain([&cross_db_snapshot_backfill_info])
1802                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1803                .all(|backfill_epoch| backfill_epoch.is_none()),
1804            "should not set backfill epoch when initially build the job: {:?} {:?}",
1805            snapshot_backfill_info,
1806            cross_db_snapshot_backfill_info
1807        );
1808
1809        let locality_fragment_state_table_mapping =
1810            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1811
1812        // check if log store exists for all cross-db upstreams
1813        self.metadata_manager
1814            .catalog_controller
1815            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1816            .await?;
1817
1818        let upstream_table_ids = fragment_graph
1819            .dependent_table_ids()
1820            .iter()
1821            .filter(|id| {
1822                !cross_db_snapshot_backfill_info
1823                    .upstream_mv_table_id_to_backfill_epoch
1824                    .contains_key(*id)
1825            })
1826            .cloned()
1827            .collect();
1828
1829        let (upstream_root_fragments, existing_actor_location) = self
1830            .metadata_manager
1831            .get_upstream_root_fragments(&upstream_table_ids)
1832            .await?;
1833
1834        if snapshot_backfill_info.is_some() {
1835            match stream_job {
1836                StreamingJob::MaterializedView(_)
1837                | StreamingJob::Sink(_)
1838                | StreamingJob::Index(_, _) => {}
1839                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1840                    return Err(
1841                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1842                    );
1843                }
1844            }
1845        }
1846
1847        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1848            fragment_graph,
1849            FragmentGraphUpstreamContext {
1850                upstream_root_fragments,
1851                upstream_actor_location: existing_actor_location,
1852            },
1853            (&stream_job).into(),
1854        )?;
1855        let resource_group = if let Some(group) = resource_type.resource_group() {
1856            group
1857        } else {
1858            self.metadata_manager
1859                .get_database_resource_group(stream_job.database_id())
1860                .await?
1861        };
1862        let is_serverless_backfill = matches!(
1863            &resource_type,
1864            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1865        );
1866
1867        // 3. Build the actor graph.
1868        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1869
1870        let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1871        let parallelism = self.resolve_stream_parallelism(
1872            initial_parallelism,
1873            max_parallelism,
1874            &cluster_info,
1875            resource_group.clone(),
1876        )?;
1877
1878        let parallelism = if initial_parallelism.is_some() {
1879            parallelism.get()
1880        } else {
1881            // Prefer job-level override for initial scheduling, fallback to system strategy.
1882            let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1883                Some(strategy) => strategy,
1884                None => self
1885                    .env
1886                    .system_params_reader()
1887                    .await
1888                    .adaptive_parallelism_strategy(),
1889            };
1890            adaptive_strategy.compute_target_parallelism(parallelism.get())
1891        };
1892
1893        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1894        let actor_graph_builder = ActorGraphBuilder::new(
1895            id,
1896            resource_group,
1897            complete_graph,
1898            cluster_info,
1899            parallelism,
1900        )?;
1901
1902        let ActorGraphBuildResult {
1903            graph,
1904            downstream_fragment_relations,
1905            building_locations,
1906            upstream_fragment_downstreams,
1907            replace_upstream,
1908            ..
1909        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1910        assert!(replace_upstream.is_empty());
1911
1912        // 4. Build the table fragments structure that will be persisted in the stream manager,
1913        // and the context that contains all information needed for building the
1914        // actors on the compute nodes.
1915
1916        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1917        // Otherwise, it defaults to FIXED based on deduction.
1918        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1919            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1920            _ => TableParallelism::Fixed(parallelism.get()),
1921        };
1922
1923        let stream_job_fragments = StreamJobFragments::new(
1924            id,
1925            graph,
1926            &building_locations.actor_locations,
1927            stream_ctx.clone(),
1928            table_parallelism,
1929            max_parallelism.get(),
1930        );
1931
1932        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1933            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1934        }
1935
1936        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1937            && let Ok(table_id) = sink.get_target_table()
1938        {
1939            let tables = self
1940                .metadata_manager
1941                .get_table_catalog_by_ids(&[*table_id])
1942                .await?;
1943            let target_table = tables
1944                .first()
1945                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1946            let sink_fragment = stream_job_fragments
1947                .sink_fragment()
1948                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1949            let mview_fragment_id = self
1950                .metadata_manager
1951                .catalog_controller
1952                .get_mview_fragment_by_id(table_id.as_job_id())
1953                .await?;
1954            let upstream_sink_info = build_upstream_sink_info(
1955                sink.id,
1956                sink.original_target_columns.clone(),
1957                sink_fragment.fragment_id as _,
1958                target_table,
1959                mview_fragment_id,
1960            )?;
1961            Some(upstream_sink_info)
1962        } else {
1963            None
1964        };
1965
1966        let mut cdc_table_snapshot_splits = None;
1967        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1968            && let Some((_, stream_cdc_scan)) =
1969                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1970        {
1971            {
1972                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1973                let splits = try_init_parallel_cdc_table_snapshot_splits(
1974                    table.id,
1975                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1976                    self.env.meta_store_ref(),
1977                    stream_cdc_scan.options.as_ref().unwrap(),
1978                    self.env.opts.cdc_table_split_init_insert_batch_size,
1979                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1980                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1981                )
1982                .await?;
1983                cdc_table_snapshot_splits = Some(splits);
1984            }
1985        }
1986
1987        let ctx = CreateStreamingJobContext {
1988            upstream_fragment_downstreams,
1989            building_locations,
1990            definition: stream_job.definition(),
1991            create_type: stream_job.create_type(),
1992            job_type: (&stream_job).into(),
1993            streaming_job: stream_job,
1994            new_upstream_sink,
1995            option: CreateStreamingJobOption {},
1996            snapshot_backfill_info,
1997            cross_db_snapshot_backfill_info,
1998            fragment_backfill_ordering,
1999            locality_fragment_state_table_mapping,
2000            cdc_table_snapshot_splits,
2001            is_serverless_backfill,
2002        };
2003
2004        Ok((
2005            ctx,
2006            StreamJobFragmentsToCreate {
2007                inner: stream_job_fragments,
2008                downstreams: downstream_fragment_relations,
2009            },
2010        ))
2011    }
2012
2013    /// `build_replace_table` builds a job replacement and returns the context and new job
2014    /// fragments.
2015    ///
2016    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
2017    /// replacement is finished.
2018    pub(crate) async fn build_replace_job(
2019        &self,
2020        stream_ctx: StreamContext,
2021        stream_job: &StreamingJob,
2022        mut fragment_graph: StreamFragmentGraph,
2023        tmp_job_id: JobId,
2024        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
2025    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2026        match &stream_job {
2027            StreamingJob::Table(..)
2028            | StreamingJob::Source(..)
2029            | StreamingJob::MaterializedView(..) => {}
2030            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2031                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2032            }
2033        }
2034
2035        let id = stream_job.id();
2036
2037        // check if performing drop table connector
2038        let mut drop_table_associated_source_id = None;
2039        if let StreamingJob::Table(None, _, _) = &stream_job {
2040            drop_table_associated_source_id = self
2041                .metadata_manager
2042                .get_table_associated_source_id(id.as_mv_table_id())
2043                .await?;
2044        }
2045
2046        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
2047        let old_internal_table_ids = old_fragments.internal_table_ids();
2048
2049        // handle drop table's associated source
2050        let mut drop_table_connector_ctx = None;
2051        if let Some(to_remove_source_id) = drop_table_associated_source_id {
2052            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
2053            debug_assert!(old_internal_table_ids.len() == 1);
2054
2055            drop_table_connector_ctx = Some(DropTableConnectorContext {
2056                // we do not remove the original table catalog as it's still needed for the streaming job
2057                // just need to remove the ref to the state table
2058                to_change_streaming_job_id: id,
2059                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
2060                to_remove_source_id,
2061            });
2062        } else if stream_job.is_materialized_view() {
2063            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
2064            // but more robust.
2065            let old_fragments_upstreams = self
2066                .metadata_manager
2067                .catalog_controller
2068                .upstream_fragments(old_fragments.fragment_ids())
2069                .await?;
2070
2071            let old_state_graph =
2072                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2073            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2074            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2075                .context("incompatible altering on the streaming job states")?;
2076
2077            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2078            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2079        } else {
2080            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2081            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2082            let old_internal_tables = self
2083                .metadata_manager
2084                .get_table_catalog_by_ids(&old_internal_table_ids)
2085                .await?;
2086            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2087        }
2088
2089        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2090        // graph that contains all information needed for building the actor graph.
2091        let original_root_fragment = old_fragments
2092            .root_fragment()
2093            .expect("root fragment not found");
2094
2095        let job_type = StreamingJobType::from(stream_job);
2096
2097        // Extract the downstream fragments from the fragment graph.
2098        let (mut downstream_fragments, mut downstream_actor_location) =
2099            self.metadata_manager.get_downstream_fragments(id).await?;
2100
2101        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2102            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2103                .iter()
2104                .map(|sink| sink.original_fragment.fragment_id)
2105                .collect();
2106            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2107                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2108                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2109                }) {
2110                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2111                    for actor_id in downstream_fragment.actors.keys() {
2112                        downstream_actor_location.remove(actor_id);
2113                    }
2114                    for (actor_id, status) in &sink.actor_status {
2115                        downstream_actor_location
2116                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2117                    }
2118
2119                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2120                    *nodes = sink.new_fragment.nodes.clone();
2121                }
2122            }
2123            assert!(remaining_fragment.is_empty());
2124        }
2125
2126        // build complete graph based on the table job type
2127        let complete_graph = match &job_type {
2128            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2129                CompleteStreamFragmentGraph::with_downstreams(
2130                    fragment_graph,
2131                    FragmentGraphDownstreamContext {
2132                        original_root_fragment_id: original_root_fragment.fragment_id,
2133                        downstream_fragments,
2134                        downstream_actor_location,
2135                    },
2136                    job_type,
2137                )?
2138            }
2139            StreamingJobType::Table(TableJobType::SharedCdcSource)
2140            | StreamingJobType::MaterializedView => {
2141                // CDC tables or materialized views can have upstream jobs as well.
2142                let (upstream_root_fragments, upstream_actor_location) = self
2143                    .metadata_manager
2144                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2145                    .await?;
2146
2147                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2148                    fragment_graph,
2149                    FragmentGraphUpstreamContext {
2150                        upstream_root_fragments,
2151                        upstream_actor_location,
2152                    },
2153                    FragmentGraphDownstreamContext {
2154                        original_root_fragment_id: original_root_fragment.fragment_id,
2155                        downstream_fragments,
2156                        downstream_actor_location,
2157                    },
2158                    job_type,
2159                )?
2160            }
2161            _ => unreachable!(),
2162        };
2163
2164        let resource_group = self
2165            .metadata_manager
2166            .get_existing_job_resource_group(id)
2167            .await?;
2168
2169        // 2. Build the actor graph.
2170        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2171
2172        // XXX: what is this parallelism?
2173        // Is it "assigned parallelism"?
2174        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2175            .expect("The number of actors in the original table fragment should be greater than 0");
2176
2177        let actor_graph_builder = ActorGraphBuilder::new(
2178            id,
2179            resource_group,
2180            complete_graph,
2181            cluster_info,
2182            parallelism,
2183        )?;
2184
2185        let ActorGraphBuildResult {
2186            graph,
2187            downstream_fragment_relations,
2188            building_locations,
2189            upstream_fragment_downstreams,
2190            mut replace_upstream,
2191            ..
2192        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2193
2194        // general table & source does not have upstream job, so the dispatchers should be empty
2195        if matches!(
2196            job_type,
2197            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2198        ) {
2199            assert!(upstream_fragment_downstreams.is_empty());
2200        }
2201
2202        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2203        // the context that contains all information needed for building the actors on the compute
2204        // nodes.
2205        let stream_job_fragments = StreamJobFragments::new(
2206            tmp_job_id,
2207            graph,
2208            &building_locations.actor_locations,
2209            stream_ctx,
2210            old_fragments.assigned_parallelism,
2211            old_fragments.max_parallelism,
2212        );
2213
2214        if let Some(sinks) = &auto_refresh_schema_sinks {
2215            for sink in sinks {
2216                replace_upstream
2217                    .remove(&sink.new_fragment.fragment_id)
2218                    .expect("should exist");
2219            }
2220        }
2221
2222        // Note: no need to set `vnode_count` as it's already set by the frontend.
2223        // See `get_replace_table_plan`.
2224
2225        let ctx = ReplaceStreamJobContext {
2226            old_fragments,
2227            replace_upstream,
2228            upstream_fragment_downstreams,
2229            building_locations,
2230            streaming_job: stream_job.clone(),
2231            tmp_id: tmp_job_id,
2232            drop_table_connector_ctx,
2233            auto_refresh_schema_sinks,
2234        };
2235
2236        Ok((
2237            ctx,
2238            StreamJobFragmentsToCreate {
2239                inner: stream_job_fragments,
2240                downstreams: downstream_fragment_relations,
2241            },
2242        ))
2243    }
2244
2245    async fn alter_name(
2246        &self,
2247        relation: alter_name_request::Object,
2248        new_name: &str,
2249    ) -> MetaResult<NotificationVersion> {
2250        let (obj_type, id): (ObjectType, ObjectId) = match relation {
2251            alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2252            alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2253            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2254            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2255            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2256            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2257            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2258            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2259        };
2260        self.metadata_manager
2261            .catalog_controller
2262            .alter_name(obj_type, id, new_name)
2263            .await
2264    }
2265
2266    async fn alter_swap_rename(
2267        &self,
2268        object: alter_swap_rename_request::Object,
2269    ) -> MetaResult<NotificationVersion> {
2270        let (obj_type, src_id, dst_id) = match object {
2271            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2272            alter_swap_rename_request::Object::Table(objs) => {
2273                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2274                (ObjectType::Table, src_id, dst_id)
2275            }
2276            alter_swap_rename_request::Object::View(objs) => {
2277                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2278                (ObjectType::View, src_id, dst_id)
2279            }
2280            alter_swap_rename_request::Object::Source(objs) => {
2281                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2282                (ObjectType::Source, src_id, dst_id)
2283            }
2284            alter_swap_rename_request::Object::Sink(objs) => {
2285                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2286                (ObjectType::Sink, src_id, dst_id)
2287            }
2288            alter_swap_rename_request::Object::Subscription(objs) => {
2289                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2290                (ObjectType::Subscription, src_id, dst_id)
2291            }
2292        };
2293
2294        self.metadata_manager
2295            .catalog_controller
2296            .alter_swap_rename(obj_type, src_id, dst_id)
2297            .await
2298    }
2299
2300    async fn alter_owner(
2301        &self,
2302        object: Object,
2303        owner_id: UserId,
2304    ) -> MetaResult<NotificationVersion> {
2305        let (obj_type, id): (ObjectType, ObjectId) = match object {
2306            Object::TableId(id) => (ObjectType::Table, id.into()),
2307            Object::ViewId(id) => (ObjectType::View, id.into()),
2308            Object::SourceId(id) => (ObjectType::Source, id.into()),
2309            Object::SinkId(id) => (ObjectType::Sink, id.into()),
2310            Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2311            Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2312            Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2313            Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2314            Object::FunctionId(id) => (ObjectType::Function, id.into()),
2315            Object::SecretId(id) => (ObjectType::Secret, id.into()),
2316        };
2317        self.metadata_manager
2318            .catalog_controller
2319            .alter_owner(obj_type, id, owner_id as _)
2320            .await
2321    }
2322
2323    async fn alter_set_schema(
2324        &self,
2325        object: alter_set_schema_request::Object,
2326        new_schema_id: SchemaId,
2327    ) -> MetaResult<NotificationVersion> {
2328        let (obj_type, id): (ObjectType, ObjectId) = match object {
2329            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2330            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2331            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2332            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2333            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2334            alter_set_schema_request::Object::ConnectionId(id) => {
2335                (ObjectType::Connection, id.into())
2336            }
2337            alter_set_schema_request::Object::SubscriptionId(id) => {
2338                (ObjectType::Subscription, id.into())
2339            }
2340        };
2341        self.metadata_manager
2342            .catalog_controller
2343            .alter_schema(obj_type, id, new_schema_id)
2344            .await
2345    }
2346
2347    pub async fn wait(&self) -> MetaResult<WaitVersion> {
2348        let timeout_ms = 30 * 60 * 1000;
2349        for _ in 0..timeout_ms {
2350            if self
2351                .metadata_manager
2352                .catalog_controller
2353                .list_background_creating_jobs(true, None)
2354                .await?
2355                .is_empty()
2356            {
2357                let catalog_version = self
2358                    .metadata_manager
2359                    .catalog_controller
2360                    .notify_frontend_trivial()
2361                    .await;
2362                let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2363                return Ok(WaitVersion {
2364                    catalog_version,
2365                    hummock_version_id,
2366                });
2367            }
2368
2369            sleep(Duration::from_millis(1)).await;
2370        }
2371        Err(MetaError::cancelled(format!(
2372            "timeout after {timeout_ms}ms"
2373        )))
2374    }
2375
2376    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2377        self.metadata_manager
2378            .catalog_controller
2379            .comment_on(comment)
2380            .await
2381    }
2382
2383    async fn alter_streaming_job_config(
2384        &self,
2385        job_id: JobId,
2386        entries_to_add: HashMap<String, String>,
2387        keys_to_remove: Vec<String>,
2388    ) -> MetaResult<NotificationVersion> {
2389        self.metadata_manager
2390            .catalog_controller
2391            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2392            .await
2393    }
2394}
2395
2396fn report_create_object(
2397    job_id: JobId,
2398    event_name: &str,
2399    obj_type: PbTelemetryDatabaseObject,
2400    connector_name: Option<String>,
2401    attr_info: Option<jsonbb::Value>,
2402) {
2403    report_event(
2404        PbTelemetryEventStage::CreateStreamJob,
2405        event_name,
2406        job_id.as_raw_id() as _,
2407        connector_name,
2408        Some(obj_type),
2409        attr_info,
2410    );
2411}
2412
2413pub fn build_upstream_sink_info(
2414    sink_id: SinkId,
2415    original_target_columns: Vec<PbColumnCatalog>,
2416    sink_fragment_id: FragmentId,
2417    target_table: &PbTable,
2418    target_fragment_id: FragmentId,
2419) -> MetaResult<UpstreamSinkInfo> {
2420    let sink_columns = if !original_target_columns.is_empty() {
2421        original_target_columns.clone()
2422    } else {
2423        // This is due to the fact that the value did not exist in earlier versions,
2424        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2425        // Therefore the columns of the table at this point are `original_target_columns`.
2426        // This value of sink will be filled on the meta.
2427        target_table.columns.clone()
2428    };
2429
2430    let sink_output_fields = sink_columns
2431        .iter()
2432        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2433        .collect_vec();
2434    let output_indices = (0..sink_output_fields.len())
2435        .map(|i| i as u32)
2436        .collect_vec();
2437
2438    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2439        let sink_idx_by_col_id = sink_columns
2440            .iter()
2441            .enumerate()
2442            .map(|(idx, col)| {
2443                let column_id = col.column_desc.as_ref().unwrap().column_id;
2444                (column_id, idx as u32)
2445            })
2446            .collect::<HashMap<_, _>>();
2447        target_table
2448            .distribution_key
2449            .iter()
2450            .map(|dist_idx| {
2451                let column_id = target_table.columns[*dist_idx as usize]
2452                    .column_desc
2453                    .as_ref()
2454                    .unwrap()
2455                    .column_id;
2456                let sink_idx = sink_idx_by_col_id
2457                    .get(&column_id)
2458                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2459                Ok(*sink_idx)
2460            })
2461            .collect::<anyhow::Result<Vec<_>>>()?
2462    };
2463    let dist_key_indices =
2464        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2465    let downstream_fragment_id = target_fragment_id as _;
2466    let new_downstream_relation = DownstreamFragmentRelation {
2467        downstream_fragment_id,
2468        dispatcher_type: DispatcherType::Hash,
2469        dist_key_indices,
2470        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2471    };
2472    let current_target_columns = target_table.get_columns();
2473    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2474    Ok(UpstreamSinkInfo {
2475        sink_id,
2476        sink_fragment_id: sink_fragment_id as _,
2477        sink_output_fields,
2478        sink_original_target_columns: original_target_columns,
2479        project_exprs,
2480        new_sink_downstream: new_downstream_relation,
2481    })
2482}
2483
2484pub fn refill_upstream_sink_union_in_table(
2485    union_fragment_root: &mut PbStreamNode,
2486    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2487) {
2488    visit_stream_node_cont_mut(union_fragment_root, |node| {
2489        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2490            let init_upstreams = upstream_sink_infos
2491                .iter()
2492                .map(|info| PbUpstreamSinkInfo {
2493                    upstream_fragment_id: info.sink_fragment_id,
2494                    sink_output_schema: info.sink_output_fields.clone(),
2495                    project_exprs: info.project_exprs.clone(),
2496                })
2497                .collect();
2498            upstream_sink_union.init_upstreams = init_upstreams;
2499            false
2500        } else {
2501            true
2502        }
2503    });
2504}
2505
2506#[cfg(test)]
2507mod tests {
2508    use std::num::NonZeroUsize;
2509
2510    use super::*;
2511
2512    #[test]
2513    fn test_specified_parallelism_exceeds_available() {
2514        let result = DdlController::resolve_stream_parallelism_inner(
2515            Some(NonZeroUsize::new(100).unwrap()),
2516            NonZeroUsize::new(256).unwrap(),
2517            Some(NonZeroUsize::new(4).unwrap()),
2518            &DefaultParallelism::Full,
2519            "default",
2520        )
2521        .unwrap();
2522        assert_eq!(result.get(), 100);
2523    }
2524
2525    #[test]
2526    fn test_allows_default_parallelism_over_available() {
2527        let result = DdlController::resolve_stream_parallelism_inner(
2528            None,
2529            NonZeroUsize::new(256).unwrap(),
2530            Some(NonZeroUsize::new(4).unwrap()),
2531            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2532            "default",
2533        )
2534        .unwrap();
2535        assert_eq!(result.get(), 50);
2536    }
2537
2538    #[test]
2539    fn test_full_parallelism_capped_by_max() {
2540        let result = DdlController::resolve_stream_parallelism_inner(
2541            None,
2542            NonZeroUsize::new(6).unwrap(),
2543            Some(NonZeroUsize::new(10).unwrap()),
2544            &DefaultParallelism::Full,
2545            "default",
2546        )
2547        .unwrap();
2548        assert_eq!(result.get(), 6);
2549    }
2550
2551    #[test]
2552    fn test_no_available_slots_returns_error() {
2553        let result = DdlController::resolve_stream_parallelism_inner(
2554            None,
2555            NonZeroUsize::new(4).unwrap(),
2556            None,
2557            &DefaultParallelism::Full,
2558            "default",
2559        );
2560        assert!(matches!(
2561            result,
2562            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2563        ));
2564    }
2565
2566    #[test]
2567    fn test_specified_over_max_returns_error() {
2568        let result = DdlController::resolve_stream_parallelism_inner(
2569            Some(NonZeroUsize::new(8).unwrap()),
2570            NonZeroUsize::new(4).unwrap(),
2571            Some(NonZeroUsize::new(10).unwrap()),
2572            &DefaultParallelism::Full,
2573            "default",
2574        );
2575        assert!(matches!(
2576            result,
2577            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2578        ));
2579    }
2580}