risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83    NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
87    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
88    TableParallelism,
89};
90use crate::stream::cdc::{
91    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
92};
93use crate::stream::{
94    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
95    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
96    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
97    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
98    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
99    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
100};
101use crate::telemetry::report_event;
102use crate::{MetaError, MetaResult};
103
104#[derive(PartialEq)]
105pub enum DropMode {
106    Restrict,
107    Cascade,
108}
109
110impl DropMode {
111    pub fn from_request_setting(cascade: bool) -> DropMode {
112        if cascade {
113            DropMode::Cascade
114        } else {
115            DropMode::Restrict
116        }
117    }
118}
119
120#[derive(strum::AsRefStr)]
121pub enum StreamingJobId {
122    MaterializedView(TableId),
123    Sink(SinkId),
124    Table(Option<SourceId>, TableId),
125    Index(IndexId),
126}
127
128impl std::fmt::Display for StreamingJobId {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        write!(f, "{}", self.as_ref())?;
131        write!(f, "({})", self.id())
132    }
133}
134
135impl StreamingJobId {
136    fn id(&self) -> JobId {
137        match self {
138            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
139            StreamingJobId::Index(id) => id.as_job_id(),
140            StreamingJobId::Sink(id) => id.as_job_id(),
141        }
142    }
143}
144
145/// It’s used to describe the information of the job that needs to be replaced
146/// and it will be used during replacing table and creating sink into table operations.
147pub struct ReplaceStreamJobInfo {
148    pub streaming_job: StreamingJob,
149    pub fragment_graph: StreamFragmentGraphProto,
150}
151
152#[derive(Display)]
153pub enum DdlCommand {
154    CreateDatabase(Database),
155    DropDatabase(DatabaseId),
156    CreateSchema(Schema),
157    DropSchema(SchemaId, DropMode),
158    CreateNonSharedSource(Source),
159    DropSource(SourceId, DropMode),
160    ResetSource(SourceId),
161    CreateFunction(Function),
162    DropFunction(FunctionId, DropMode),
163    CreateView(View, HashSet<ObjectId>),
164    DropView(ViewId, DropMode),
165    CreateStreamingJob {
166        stream_job: StreamingJob,
167        fragment_graph: StreamFragmentGraphProto,
168        dependencies: HashSet<ObjectId>,
169        resource_type: streaming_job_resource_type::ResourceType,
170        if_not_exists: bool,
171    },
172    DropStreamingJob {
173        job_id: StreamingJobId,
174        drop_mode: DropMode,
175    },
176    AlterName(alter_name_request::Object, String),
177    AlterSwapRename(alter_swap_rename_request::Object),
178    ReplaceStreamJob(ReplaceStreamJobInfo),
179    AlterNonSharedSource(Source),
180    AlterObjectOwner(Object, UserId),
181    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
182    CreateConnection(Connection),
183    DropConnection(ConnectionId, DropMode),
184    CreateSecret(Secret),
185    AlterSecret(Secret),
186    DropSecret(SecretId),
187    CommentOn(Comment),
188    CreateSubscription(Subscription),
189    DropSubscription(SubscriptionId, DropMode),
190    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
191    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
192}
193
194impl DdlCommand {
195    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
196    fn object(&self) -> Either<String, ObjectId> {
197        use Either::*;
198        match self {
199            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
200            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
201            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
202            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
203            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
204            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
205            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
206            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
207            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
208            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
209            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
210            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
211            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
212            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
213            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
214            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
215            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
216            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
217            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
218            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
219            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
220            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
221            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
222            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
223            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
224            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
225            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
226            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
227            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
228        }
229    }
230
231    fn allow_in_recovery(&self) -> bool {
232        match self {
233            DdlCommand::DropDatabase(_)
234            | DdlCommand::DropSchema(_, _)
235            | DdlCommand::DropSource(_, _)
236            | DdlCommand::DropFunction(_, _)
237            | DdlCommand::DropView(_, _)
238            | DdlCommand::DropStreamingJob { .. }
239            | DdlCommand::DropConnection(_, _)
240            | DdlCommand::DropSecret(_)
241            | DdlCommand::DropSubscription(_, _)
242            | DdlCommand::AlterName(_, _)
243            | DdlCommand::AlterObjectOwner(_, _)
244            | DdlCommand::AlterSetSchema(_, _)
245            | DdlCommand::CreateDatabase(_)
246            | DdlCommand::CreateSchema(_)
247            | DdlCommand::CreateFunction(_)
248            | DdlCommand::CreateView(_, _)
249            | DdlCommand::CreateConnection(_)
250            | DdlCommand::CommentOn(_)
251            | DdlCommand::CreateSecret(_)
252            | DdlCommand::AlterSecret(_)
253            | DdlCommand::AlterSwapRename(_)
254            | DdlCommand::AlterDatabaseParam(_, _)
255            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
256            DdlCommand::CreateStreamingJob { .. }
257            | DdlCommand::CreateNonSharedSource(_)
258            | DdlCommand::ReplaceStreamJob(_)
259            | DdlCommand::AlterNonSharedSource(_)
260            | DdlCommand::ResetSource(_)
261            | DdlCommand::CreateSubscription(_) => false,
262        }
263    }
264}
265
266#[derive(Clone)]
267pub struct DdlController {
268    pub(crate) env: MetaSrvEnv,
269
270    pub(crate) metadata_manager: MetadataManager,
271    pub(crate) stream_manager: GlobalStreamManagerRef,
272    pub(crate) source_manager: SourceManagerRef,
273    barrier_manager: BarrierManagerRef,
274    sink_manager: SinkCoordinatorManager,
275    iceberg_compaction_manager: IcebergCompactionManagerRef,
276
277    // The semaphore is used to limit the number of concurrent streaming job creation.
278    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
279
280    /// Sequence number for DDL commands, used for observability and debugging.
281    seq: Arc<AtomicU64>,
282}
283
284#[derive(Clone)]
285pub struct CreatingStreamingJobPermit {
286    pub(crate) semaphore: Arc<Semaphore>,
287}
288
289impl CreatingStreamingJobPermit {
290    async fn new(env: &MetaSrvEnv) -> Self {
291        let mut permits = env
292            .system_params_reader()
293            .await
294            .max_concurrent_creating_streaming_jobs() as usize;
295        if permits == 0 {
296            // if the system parameter is set to zero, use the max permitted value.
297            permits = Semaphore::MAX_PERMITS;
298        }
299        let semaphore = Arc::new(Semaphore::new(permits));
300
301        let (local_notification_tx, mut local_notification_rx) =
302            tokio::sync::mpsc::unbounded_channel();
303        env.notification_manager()
304            .insert_local_sender(local_notification_tx);
305        let semaphore_clone = semaphore.clone();
306        tokio::spawn(async move {
307            while let Some(notification) = local_notification_rx.recv().await {
308                let LocalNotification::SystemParamsChange(p) = &notification else {
309                    continue;
310                };
311                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
312                if new_permits == 0 {
313                    new_permits = Semaphore::MAX_PERMITS;
314                }
315                match permits.cmp(&new_permits) {
316                    Ordering::Less => {
317                        semaphore_clone.add_permits(new_permits - permits);
318                    }
319                    Ordering::Equal => continue,
320                    Ordering::Greater => {
321                        let to_release = permits - new_permits;
322                        let reduced = semaphore_clone.forget_permits(to_release);
323                        // TODO: implement dynamic semaphore with limits by ourself.
324                        if reduced != to_release {
325                            tracing::warn!(
326                                "no enough permits to release, expected {}, but reduced {}",
327                                to_release,
328                                reduced
329                            );
330                        }
331                    }
332                }
333                tracing::info!(
334                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
335                    permits,
336                    new_permits
337                );
338                permits = new_permits;
339            }
340        });
341
342        Self { semaphore }
343    }
344}
345
346impl DdlController {
347    pub async fn new(
348        env: MetaSrvEnv,
349        metadata_manager: MetadataManager,
350        stream_manager: GlobalStreamManagerRef,
351        source_manager: SourceManagerRef,
352        barrier_manager: BarrierManagerRef,
353        sink_manager: SinkCoordinatorManager,
354        iceberg_compaction_manager: IcebergCompactionManagerRef,
355    ) -> Self {
356        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
357        Self {
358            env,
359            metadata_manager,
360            stream_manager,
361            source_manager,
362            barrier_manager,
363            sink_manager,
364            iceberg_compaction_manager,
365            creating_streaming_job_permits,
366            seq: Arc::new(AtomicU64::new(0)),
367        }
368    }
369
370    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
371    pub fn next_seq(&self) -> u64 {
372        // This is a simple atomic increment operation.
373        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
374    }
375
376    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
377    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
378    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
379    /// would be a huge hassle and pain if we don't spawn here.
380    ///
381    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
382    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
383        if !command.allow_in_recovery() {
384            self.barrier_manager.check_status_running()?;
385        }
386
387        let await_tree_key = format!("DDL Command {}", self.next_seq());
388        let await_tree_span = await_tree::span!("{command}({})", command.object());
389
390        let ctrl = self.clone();
391        let fut = async move {
392            match command {
393                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
394                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
395                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
396                DdlCommand::DropSchema(schema_id, drop_mode) => {
397                    ctrl.drop_schema(schema_id, drop_mode).await
398                }
399                DdlCommand::CreateNonSharedSource(source) => {
400                    ctrl.create_non_shared_source(source).await
401                }
402                DdlCommand::DropSource(source_id, drop_mode) => {
403                    ctrl.drop_source(source_id, drop_mode).await
404                }
405                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
406                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
407                DdlCommand::DropFunction(function_id, drop_mode) => {
408                    ctrl.drop_function(function_id, drop_mode).await
409                }
410                DdlCommand::CreateView(view, dependencies) => {
411                    ctrl.create_view(view, dependencies).await
412                }
413                DdlCommand::DropView(view_id, drop_mode) => {
414                    ctrl.drop_view(view_id, drop_mode).await
415                }
416                DdlCommand::CreateStreamingJob {
417                    stream_job,
418                    fragment_graph,
419                    dependencies,
420                    resource_type,
421                    if_not_exists,
422                } => {
423                    ctrl.create_streaming_job(
424                        stream_job,
425                        fragment_graph,
426                        dependencies,
427                        resource_type,
428                        if_not_exists,
429                    )
430                    .await
431                }
432                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
433                    ctrl.drop_streaming_job(job_id, drop_mode).await
434                }
435                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
436                    streaming_job,
437                    fragment_graph,
438                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
439                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
440                DdlCommand::AlterObjectOwner(object, owner_id) => {
441                    ctrl.alter_owner(object, owner_id).await
442                }
443                DdlCommand::AlterSetSchema(object, new_schema_id) => {
444                    ctrl.alter_set_schema(object, new_schema_id).await
445                }
446                DdlCommand::CreateConnection(connection) => {
447                    ctrl.create_connection(connection).await
448                }
449                DdlCommand::DropConnection(connection_id, drop_mode) => {
450                    ctrl.drop_connection(connection_id, drop_mode).await
451                }
452                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
453                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
454                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
455                DdlCommand::AlterNonSharedSource(source) => {
456                    ctrl.alter_non_shared_source(source).await
457                }
458                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
459                DdlCommand::CreateSubscription(subscription) => {
460                    ctrl.create_subscription(subscription).await
461                }
462                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
463                    ctrl.drop_subscription(subscription_id, drop_mode).await
464                }
465                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
466                DdlCommand::AlterDatabaseParam(database_id, param) => {
467                    ctrl.alter_database_param(database_id, param).await
468                }
469                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
470                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
471                        .await
472                }
473            }
474        }
475        .in_current_span();
476        let fut = (self.env.await_tree_reg())
477            .register(await_tree_key, await_tree_span)
478            .instrument(Box::pin(fut));
479        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
480        Ok(Some(WaitVersion {
481            catalog_version: notification_version,
482            hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
483        }))
484    }
485
486    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
487        self.barrier_manager.get_ddl_progress().await
488    }
489
490    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
491        let (version, updated_db) = self
492            .metadata_manager
493            .catalog_controller
494            .create_database(database)
495            .await?;
496        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
497        self.barrier_manager
498            .update_database_barrier(
499                updated_db.database_id,
500                updated_db.barrier_interval_ms.map(|v| v as u32),
501                updated_db.checkpoint_frequency.map(|v| v as u64),
502            )
503            .await?;
504        Ok(version)
505    }
506
507    #[tracing::instrument(skip(self), level = "debug")]
508    pub async fn reschedule_streaming_job(
509        &self,
510        job_id: JobId,
511        target: ReschedulePolicy,
512        mut deferred: bool,
513    ) -> MetaResult<()> {
514        tracing::info!("altering parallelism for job {}", job_id);
515        if self.barrier_manager.check_status_running().is_err() {
516            tracing::info!(
517                "alter parallelism is set to deferred mode because the system is in recovery state"
518            );
519            deferred = true;
520        }
521
522        self.stream_manager
523            .reschedule_streaming_job(job_id, target, deferred)
524            .await
525    }
526
527    pub async fn reschedule_cdc_table_backfill(
528        &self,
529        job_id: JobId,
530        target: ReschedulePolicy,
531    ) -> MetaResult<()> {
532        tracing::info!("alter CDC table backfill parallelism");
533        if self.barrier_manager.check_status_running().is_err() {
534            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
535        }
536        self.stream_manager
537            .reschedule_cdc_table_backfill(job_id, target)
538            .await
539    }
540
541    pub async fn reschedule_fragments(
542        &self,
543        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
544    ) -> MetaResult<()> {
545        tracing::info!(
546            "altering parallelism for fragments {:?}",
547            fragment_targets.keys()
548        );
549        let fragment_targets = fragment_targets
550            .into_iter()
551            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
552            .collect();
553
554        self.stream_manager
555            .reschedule_fragments(fragment_targets)
556            .await
557    }
558
559    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
560        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
561            .await
562    }
563
564    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
565        self.metadata_manager
566            .catalog_controller
567            .create_schema(schema)
568            .await
569    }
570
571    async fn drop_schema(
572        &self,
573        schema_id: SchemaId,
574        drop_mode: DropMode,
575    ) -> MetaResult<NotificationVersion> {
576        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
577            .await
578    }
579
580    /// Shared source is handled in [`Self::create_streaming_job`]
581    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
582        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
583            .await
584            .context("failed to create source worker")?;
585
586        let (source_id, version) = self
587            .metadata_manager
588            .catalog_controller
589            .create_source(source)
590            .await?;
591        self.source_manager
592            .register_source_with_handle(source_id, handle)
593            .await;
594        Ok(version)
595    }
596
597    async fn drop_source(
598        &self,
599        source_id: SourceId,
600        drop_mode: DropMode,
601    ) -> MetaResult<NotificationVersion> {
602        self.drop_object(ObjectType::Source, source_id, drop_mode)
603            .await
604    }
605
606    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
607        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
608
609        // Get database_id for the source
610        let database_id = self
611            .metadata_manager
612            .catalog_controller
613            .get_object_database_id(source_id)
614            .await?;
615
616        self.stream_manager
617            .barrier_scheduler
618            .run_command(database_id, Command::ResetSource { source_id })
619            .await?;
620
621        // RESET SOURCE doesn't modify catalog, so return the current catalog version
622        let version = self
623            .metadata_manager
624            .catalog_controller
625            .current_notification_version()
626            .await;
627        Ok(version)
628    }
629
630    /// This replaces the source in the catalog.
631    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
632    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
633        self.metadata_manager
634            .catalog_controller
635            .alter_non_shared_source(source)
636            .await
637    }
638
639    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
640        self.metadata_manager
641            .catalog_controller
642            .create_function(function)
643            .await
644    }
645
646    async fn drop_function(
647        &self,
648        function_id: FunctionId,
649        drop_mode: DropMode,
650    ) -> MetaResult<NotificationVersion> {
651        self.drop_object(ObjectType::Function, function_id, drop_mode)
652            .await
653    }
654
655    async fn create_view(
656        &self,
657        view: View,
658        dependencies: HashSet<ObjectId>,
659    ) -> MetaResult<NotificationVersion> {
660        self.metadata_manager
661            .catalog_controller
662            .create_view(view, dependencies)
663            .await
664    }
665
666    async fn drop_view(
667        &self,
668        view_id: ViewId,
669        drop_mode: DropMode,
670    ) -> MetaResult<NotificationVersion> {
671        self.drop_object(ObjectType::View, view_id, drop_mode).await
672    }
673
674    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
675        validate_connection(&connection).await?;
676        self.metadata_manager
677            .catalog_controller
678            .create_connection(connection)
679            .await
680    }
681
682    async fn drop_connection(
683        &self,
684        connection_id: ConnectionId,
685        drop_mode: DropMode,
686    ) -> MetaResult<NotificationVersion> {
687        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
688            .await
689    }
690
691    async fn alter_database_param(
692        &self,
693        database_id: DatabaseId,
694        param: AlterDatabaseParam,
695    ) -> MetaResult<NotificationVersion> {
696        let (version, updated_db) = self
697            .metadata_manager
698            .catalog_controller
699            .alter_database_param(database_id, param)
700            .await?;
701        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
702        self.barrier_manager
703            .update_database_barrier(
704                database_id,
705                updated_db.barrier_interval_ms.map(|v| v as u32),
706                updated_db.checkpoint_frequency.map(|v| v as u64),
707            )
708            .await?;
709        Ok(version)
710    }
711
712    // The 'secret' part of the request we receive from the frontend is in plaintext;
713    // here, we need to encrypt it before storing it in the catalog.
714    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
715        let secret_store_private_key = self
716            .env
717            .opts
718            .secret_store_private_key
719            .clone()
720            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
721
722        let encrypted_payload = SecretEncryption::encrypt(
723            secret_store_private_key.as_slice(),
724            secret.get_value().as_slice(),
725        )
726        .context(format!("failed to encrypt secret {}", secret.name))?;
727        Ok(encrypted_payload
728            .serialize()
729            .context(format!("failed to serialize secret {}", secret.name))?)
730    }
731
732    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
733        // The 'secret' part of the request we receive from the frontend is in plaintext;
734        // here, we need to encrypt it before storing it in the catalog.
735        let secret_plain_payload = secret.value.clone();
736        let encrypted_payload = self.get_encrypted_payload(&secret)?;
737        secret.value = encrypted_payload;
738
739        self.metadata_manager
740            .catalog_controller
741            .create_secret(secret, secret_plain_payload)
742            .await
743    }
744
745    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
746        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
747            .await
748    }
749
750    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
751        let secret_plain_payload = secret.value.clone();
752        let encrypted_payload = self.get_encrypted_payload(&secret)?;
753        secret.value = encrypted_payload;
754        self.metadata_manager
755            .catalog_controller
756            .alter_secret(secret, secret_plain_payload)
757            .await
758    }
759
760    async fn create_subscription(
761        &self,
762        mut subscription: Subscription,
763    ) -> MetaResult<NotificationVersion> {
764        tracing::debug!("create subscription");
765        let _permit = self
766            .creating_streaming_job_permits
767            .semaphore
768            .acquire()
769            .await
770            .unwrap();
771        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
772        self.metadata_manager
773            .catalog_controller
774            .create_subscription_catalog(&mut subscription)
775            .await?;
776        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
777            tracing::debug!(error = %err.as_report(), "failed to create subscription");
778            let _ = self
779                .metadata_manager
780                .catalog_controller
781                .try_abort_creating_subscription(subscription.id)
782                .await
783                .inspect_err(|e| {
784                    tracing::error!(
785                        error = %e.as_report(),
786                        "failed to abort create subscription after failure"
787                    );
788                });
789            return Err(err);
790        }
791
792        let version = self
793            .metadata_manager
794            .catalog_controller
795            .notify_create_subscription(subscription.id)
796            .await?;
797        tracing::debug!("finish create subscription");
798        Ok(version)
799    }
800
801    async fn drop_subscription(
802        &self,
803        subscription_id: SubscriptionId,
804        drop_mode: DropMode,
805    ) -> MetaResult<NotificationVersion> {
806        tracing::debug!("preparing drop subscription");
807        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
808        let subscription = self
809            .metadata_manager
810            .catalog_controller
811            .get_subscription_by_id(subscription_id)
812            .await?;
813        let table_id = subscription.dependent_table_id;
814        let database_id = subscription.database_id;
815        let (_, version) = self
816            .metadata_manager
817            .catalog_controller
818            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
819            .await?;
820        self.stream_manager
821            .drop_subscription(database_id, subscription_id, table_id)
822            .await;
823        tracing::debug!("finish drop subscription");
824        Ok(version)
825    }
826
827    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
828    #[await_tree::instrument]
829    pub(crate) async fn validate_cdc_table(
830        &self,
831        table: &Table,
832        table_fragments: &StreamJobFragments,
833    ) -> MetaResult<()> {
834        let stream_scan_fragment = table_fragments
835            .fragments
836            .values()
837            .filter(|f| {
838                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
839                    || f.fragment_type_mask
840                        .contains(FragmentTypeFlag::StreamCdcScan)
841            })
842            .exactly_one()
843            .ok()
844            .with_context(|| {
845                format!(
846                    "expect exactly one stream scan fragment, got: {:?}",
847                    table_fragments.fragments
848                )
849            })?;
850        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
851            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
852                if let Some(o) = node.options
853                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
854                {
855                    // Use parallel CDC backfill.
856                } else {
857                    assert_eq!(
858                        stream_scan_fragment.actors.len(),
859                        1,
860                        "Stream scan fragment should have only one actor"
861                    );
862                }
863            }
864        }
865        let mut found_cdc_scan = false;
866        match &stream_scan_fragment.nodes.node_body {
867            Some(NodeBody::StreamCdcScan(_)) => {
868                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
869                if self
870                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
871                    .await?
872                {
873                    found_cdc_scan = true;
874                }
875            }
876            // When there's generated columns, the cdc scan node is wrapped in a project node
877            Some(NodeBody::Project(_)) => {
878                for input in &stream_scan_fragment.nodes.input {
879                    assert_parallelism(stream_scan_fragment, &input.node_body);
880                    if self
881                        .validate_cdc_table_inner(&input.node_body, table.id)
882                        .await?
883                    {
884                        found_cdc_scan = true;
885                    }
886                }
887            }
888            _ => {
889                bail!("Unexpected node body for stream cdc scan");
890            }
891        };
892        if !found_cdc_scan {
893            bail!("No stream cdc scan node found in stream scan fragment");
894        }
895        Ok(())
896    }
897
898    async fn validate_cdc_table_inner(
899        &self,
900        node_body: &Option<NodeBody>,
901        table_id: TableId,
902    ) -> MetaResult<bool> {
903        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
904            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
905        {
906            let options_with_secret = WithOptionsSecResolved::new(
907                cdc_table_desc.connect_properties.clone(),
908                cdc_table_desc.secret_refs.clone(),
909            );
910
911            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
912            props.init_from_pb_cdc_table_desc(cdc_table_desc);
913
914            // Try creating a split enumerator to validate
915            let _enumerator = props
916                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
917                .await?;
918
919            tracing::debug!(?table_id, "validate cdc table success");
920            Ok(true)
921        } else {
922            Ok(false)
923        }
924    }
925
926    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
927        let migrated = self
928            .metadata_manager
929            .catalog_controller
930            .has_table_been_migrated(table_id)
931            .await?;
932        if !migrated {
933            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
934        } else {
935            Ok(())
936        }
937    }
938
939    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
940    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
941    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
942    pub async fn create_streaming_job(
943        &self,
944        mut streaming_job: StreamingJob,
945        fragment_graph: StreamFragmentGraphProto,
946        dependencies: HashSet<ObjectId>,
947        resource_type: streaming_job_resource_type::ResourceType,
948        if_not_exists: bool,
949    ) -> MetaResult<NotificationVersion> {
950        if let StreamingJob::Sink(sink) = &streaming_job
951            && let Some(target_table) = sink.target_table
952        {
953            self.validate_table_for_sink(target_table).await?;
954        }
955        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
956        let check_ret = self
957            .metadata_manager
958            .catalog_controller
959            .create_job_catalog(
960                &mut streaming_job,
961                &ctx,
962                &fragment_graph.parallelism,
963                fragment_graph.max_parallelism as _,
964                dependencies,
965                resource_type.clone(),
966                &fragment_graph.backfill_parallelism,
967            )
968            .await;
969        if let Err(meta_err) = check_ret {
970            if !if_not_exists {
971                return Err(meta_err);
972            }
973            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
974                if streaming_job.create_type() == CreateType::Foreground {
975                    let database_id = streaming_job.database_id();
976                    self.metadata_manager
977                        .wait_streaming_job_finished(database_id, *job_id)
978                        .await
979                } else {
980                    Ok(IGNORED_NOTIFICATION_VERSION)
981                }
982            } else {
983                Err(meta_err)
984            };
985        }
986        let job_id = streaming_job.id();
987        tracing::debug!(
988            id = %job_id,
989            definition = streaming_job.definition(),
990            create_type = streaming_job.create_type().as_str_name(),
991            job_type = ?streaming_job.job_type(),
992            "starting streaming job",
993        );
994        // TODO: acquire permits for recovered background DDLs.
995        let permit = self
996            .creating_streaming_job_permits
997            .semaphore
998            .clone()
999            .acquire_owned()
1000            .instrument_await("acquire_creating_streaming_job_permit")
1001            .await
1002            .unwrap();
1003        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1004
1005        let name = streaming_job.name();
1006        let definition = streaming_job.definition();
1007        let source_id = match &streaming_job {
1008            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1009            _ => None,
1010        };
1011
1012        // create streaming job.
1013        match self
1014            .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1015            .await
1016        {
1017            Ok(version) => Ok(version),
1018            Err(err) => {
1019                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1020                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1021                    id: job_id,
1022                    name,
1023                    definition,
1024                    error: err.as_report().to_string(),
1025                };
1026                self.env.event_log_manager_ref().add_event_logs(vec![
1027                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1028                ]);
1029                let (aborted, _) = self
1030                    .metadata_manager
1031                    .catalog_controller
1032                    .try_abort_creating_streaming_job(job_id, false)
1033                    .await?;
1034                if aborted {
1035                    tracing::warn!(id = %job_id, "aborted streaming job");
1036                    // FIXME: might also need other cleanup here
1037                    if let Some(source_id) = source_id {
1038                        self.source_manager
1039                            .apply_source_change(SourceChange::DropSource {
1040                                dropped_source_ids: vec![source_id],
1041                            })
1042                            .await;
1043                    }
1044                }
1045                Err(err)
1046            }
1047        }
1048    }
1049
1050    #[await_tree::instrument(boxed)]
1051    async fn create_streaming_job_inner(
1052        &self,
1053        ctx: StreamContext,
1054        mut streaming_job: StreamingJob,
1055        fragment_graph: StreamFragmentGraphProto,
1056        resource_type: streaming_job_resource_type::ResourceType,
1057        permit: OwnedSemaphorePermit,
1058    ) -> MetaResult<NotificationVersion> {
1059        let mut fragment_graph =
1060            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1061        streaming_job.set_info_from_graph(&fragment_graph);
1062
1063        // create internal table catalogs and refill table id.
1064        let incomplete_internal_tables = fragment_graph
1065            .incomplete_internal_tables()
1066            .into_values()
1067            .collect_vec();
1068        let table_id_map = self
1069            .metadata_manager
1070            .catalog_controller
1071            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1072            .await?;
1073        fragment_graph.refill_internal_table_ids(table_id_map);
1074
1075        // create fragment and actor catalogs.
1076        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1077        let (ctx, stream_job_fragments) = self
1078            .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1079            .await?;
1080
1081        let streaming_job = &ctx.streaming_job;
1082
1083        match streaming_job {
1084            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1085                self.validate_cdc_table(table, &stream_job_fragments)
1086                    .await?;
1087            }
1088            StreamingJob::Table(Some(source), ..) => {
1089                // Register the source on the connector node.
1090                self.source_manager.register_source(source).await?;
1091                let connector_name = source
1092                    .get_with_properties()
1093                    .get(UPSTREAM_SOURCE_KEY)
1094                    .cloned();
1095                let attr = source.info.as_ref().map(|source_info| {
1096                    jsonbb::json!({
1097                            "format": source_info.format().as_str_name(),
1098                            "encode": source_info.row_encode().as_str_name(),
1099                    })
1100                });
1101                report_create_object(
1102                    streaming_job.id(),
1103                    "source",
1104                    PbTelemetryDatabaseObject::Source,
1105                    connector_name,
1106                    attr,
1107                );
1108            }
1109            StreamingJob::Sink(sink) => {
1110                if sink.auto_refresh_schema_from_table.is_some() {
1111                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1112                }
1113                // Validate the sink on the connector node.
1114                validate_sink(sink).await?;
1115                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1116                let attr = sink.format_desc.as_ref().map(|sink_info| {
1117                    jsonbb::json!({
1118                        "format": sink_info.format().as_str_name(),
1119                        "encode": sink_info.encode().as_str_name(),
1120                    })
1121                });
1122                report_create_object(
1123                    streaming_job.id(),
1124                    "sink",
1125                    PbTelemetryDatabaseObject::Sink,
1126                    connector_name,
1127                    attr,
1128                );
1129            }
1130            StreamingJob::Source(source) => {
1131                // Register the source on the connector node.
1132                self.source_manager.register_source(source).await?;
1133                let connector_name = source
1134                    .get_with_properties()
1135                    .get(UPSTREAM_SOURCE_KEY)
1136                    .cloned();
1137                let attr = source.info.as_ref().map(|source_info| {
1138                    jsonbb::json!({
1139                            "format": source_info.format().as_str_name(),
1140                            "encode": source_info.row_encode().as_str_name(),
1141                    })
1142                });
1143                report_create_object(
1144                    streaming_job.id(),
1145                    "source",
1146                    PbTelemetryDatabaseObject::Source,
1147                    connector_name,
1148                    attr,
1149                );
1150            }
1151            _ => {}
1152        }
1153
1154        let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1155        self.metadata_manager
1156            .catalog_controller
1157            .prepare_stream_job_fragments(
1158                &stream_job_fragments,
1159                streaming_job,
1160                false,
1161                Some(backfill_orders),
1162            )
1163            .await?;
1164
1165        // create streaming jobs.
1166        let version = self
1167            .stream_manager
1168            .create_streaming_job(stream_job_fragments, ctx, permit)
1169            .await?;
1170
1171        Ok(version)
1172    }
1173
1174    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1175    pub async fn drop_object(
1176        &self,
1177        object_type: ObjectType,
1178        object_id: impl Into<ObjectId>,
1179        drop_mode: DropMode,
1180    ) -> MetaResult<NotificationVersion> {
1181        let object_id = object_id.into();
1182        let (release_ctx, version) = self
1183            .metadata_manager
1184            .catalog_controller
1185            .drop_object(object_type, object_id, drop_mode)
1186            .await?;
1187
1188        if object_type == ObjectType::Source {
1189            self.env
1190                .notification_manager_ref()
1191                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1192        }
1193
1194        let ReleaseContext {
1195            database_id,
1196            removed_streaming_job_ids,
1197            removed_state_table_ids,
1198            removed_source_ids,
1199            removed_secret_ids: secret_ids,
1200            removed_source_fragments,
1201            removed_actors,
1202            removed_fragments,
1203            removed_sink_fragment_by_targets,
1204            removed_iceberg_table_sinks,
1205        } = release_ctx;
1206
1207        let _guard = self.source_manager.pause_tick().await;
1208        self.stream_manager
1209            .drop_streaming_jobs(
1210                database_id,
1211                removed_actors.iter().map(|id| *id as _).collect(),
1212                removed_streaming_job_ids,
1213                removed_state_table_ids,
1214                removed_fragments.iter().map(|id| *id as _).collect(),
1215                removed_sink_fragment_by_targets
1216                    .into_iter()
1217                    .map(|(target, sinks)| {
1218                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1219                    })
1220                    .collect(),
1221            )
1222            .await;
1223
1224        // clean up sources after dropping streaming jobs.
1225        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1226        self.source_manager
1227            .apply_source_change(SourceChange::DropSource {
1228                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1229            })
1230            .await;
1231
1232        // unregister fragments and actors from source manager.
1233        // FIXME: need also unregister source backfill fragments.
1234        let dropped_source_fragments = removed_source_fragments;
1235        self.source_manager
1236            .apply_source_change(SourceChange::DropMv {
1237                dropped_source_fragments,
1238            })
1239            .await;
1240
1241        // clean up iceberg table sinks
1242        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1243            .iter()
1244            .map(|sink| sink.id)
1245            .collect();
1246
1247        for sink in removed_iceberg_table_sinks {
1248            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1249                .expect("Iceberg sink should be valid");
1250            let iceberg_sink =
1251                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1252            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1253                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1254                tracing::info!(
1255                    "dropping iceberg table {} for dropped sink",
1256                    table_identifier
1257                );
1258
1259                let _ = iceberg_catalog
1260                    .drop_table(&table_identifier)
1261                    .await
1262                    .inspect_err(|err| {
1263                        tracing::error!(
1264                            "failed to drop iceberg table {} during cleanup: {}",
1265                            table_identifier,
1266                            err.as_report()
1267                        );
1268                    });
1269            }
1270        }
1271
1272        // stop sink coordinators for iceberg table sinks
1273        if !iceberg_sink_ids.is_empty() {
1274            self.sink_manager
1275                .stop_sink_coordinator(iceberg_sink_ids.clone())
1276                .await;
1277
1278            for sink_id in iceberg_sink_ids {
1279                self.iceberg_compaction_manager
1280                    .clear_iceberg_commits_by_sink_id(sink_id);
1281            }
1282        }
1283
1284        // remove secrets.
1285        for secret in secret_ids {
1286            LocalSecretManager::global().remove_secret(secret);
1287        }
1288        Ok(version)
1289    }
1290
1291    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1292    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1293    pub async fn replace_job(
1294        &self,
1295        mut streaming_job: StreamingJob,
1296        fragment_graph: StreamFragmentGraphProto,
1297    ) -> MetaResult<NotificationVersion> {
1298        match &streaming_job {
1299            StreamingJob::Table(..)
1300            | StreamingJob::Source(..)
1301            | StreamingJob::MaterializedView(..) => {}
1302            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1303                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1304            }
1305        }
1306
1307        let job_id = streaming_job.id();
1308
1309        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1310        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1311
1312        // Ensure the max parallelism unchanged before replacing table.
1313        let original_max_parallelism = self
1314            .metadata_manager
1315            .get_job_max_parallelism(streaming_job.id())
1316            .await?;
1317        let fragment_graph = PbStreamFragmentGraph {
1318            max_parallelism: original_max_parallelism as _,
1319            ..fragment_graph
1320        };
1321
1322        // 1. build fragment graph.
1323        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1324        streaming_job.set_info_from_graph(&fragment_graph);
1325
1326        // make it immutable
1327        let streaming_job = streaming_job;
1328
1329        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1330            let auto_refresh_schema_sinks = self
1331                .metadata_manager
1332                .catalog_controller
1333                .get_sink_auto_refresh_schema_from(table.id)
1334                .await?;
1335            if !auto_refresh_schema_sinks.is_empty() {
1336                let original_table_columns = self
1337                    .metadata_manager
1338                    .catalog_controller
1339                    .get_table_columns(table.id)
1340                    .await?;
1341                // compare column id to find newly added columns
1342                let mut original_table_column_ids: HashSet<_> = original_table_columns
1343                    .iter()
1344                    .map(|col| col.column_id())
1345                    .collect();
1346                let newly_added_columns = table
1347                    .columns
1348                    .iter()
1349                    .filter(|col| {
1350                        !original_table_column_ids.remove(&ColumnId::new(
1351                            col.column_desc.as_ref().unwrap().column_id as _,
1352                        ))
1353                    })
1354                    .map(|col| ColumnCatalog::from(col.clone()))
1355                    .collect_vec();
1356                if !original_table_column_ids.is_empty() {
1357                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1358                }
1359                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1360                for sink in auto_refresh_schema_sinks {
1361                    let sink_job_fragments = self
1362                        .metadata_manager
1363                        .get_job_fragments_by_id(sink.id.as_job_id())
1364                        .await?;
1365                    if sink_job_fragments.fragments.len() != 1 {
1366                        return Err(anyhow!(
1367                            "auto schema refresh sink must have only one fragment, but got {}",
1368                            sink_job_fragments.fragments.len()
1369                        )
1370                        .into());
1371                    }
1372                    let original_sink_fragment =
1373                        sink_job_fragments.fragments.into_values().next().unwrap();
1374                    let (new_sink_fragment, new_schema, new_log_store_table) =
1375                        rewrite_refresh_schema_sink_fragment(
1376                            &original_sink_fragment,
1377                            &sink,
1378                            &newly_added_columns,
1379                            table,
1380                            fragment_graph.table_fragment_id(),
1381                            self.env.id_gen_manager(),
1382                            self.env.actor_id_generator(),
1383                        )?;
1384
1385                    assert_eq!(
1386                        original_sink_fragment.actors.len(),
1387                        new_sink_fragment.actors.len()
1388                    );
1389                    let actor_status = (0..original_sink_fragment.actors.len())
1390                        .map(|i| {
1391                            let worker_node_id = sink_job_fragments.actor_status
1392                                [&original_sink_fragment.actors[i].actor_id]
1393                                .location
1394                                .as_ref()
1395                                .unwrap()
1396                                .worker_node_id;
1397                            (
1398                                new_sink_fragment.actors[i].actor_id,
1399                                PbActorStatus {
1400                                    location: Some(PbActorLocation { worker_node_id }),
1401                                },
1402                            )
1403                        })
1404                        .collect();
1405
1406                    let streaming_job = StreamingJob::Sink(sink);
1407
1408                    let tmp_sink_id = self
1409                        .metadata_manager
1410                        .catalog_controller
1411                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1412                        .await?
1413                        .as_sink_id();
1414                    let StreamingJob::Sink(sink) = streaming_job else {
1415                        unreachable!()
1416                    };
1417
1418                    sinks.push(AutoRefreshSchemaSinkContext {
1419                        tmp_sink_id,
1420                        original_sink: sink,
1421                        original_fragment: original_sink_fragment,
1422                        new_schema,
1423                        newly_add_fields: newly_added_columns
1424                            .iter()
1425                            .map(|col| Field::from(&col.column_desc))
1426                            .collect(),
1427                        new_fragment: new_sink_fragment,
1428                        new_log_store_table,
1429                        actor_status,
1430                    });
1431                }
1432                Some(sinks)
1433            } else {
1434                None
1435            }
1436        } else {
1437            None
1438        };
1439
1440        let tmp_id = self
1441            .metadata_manager
1442            .catalog_controller
1443            .create_job_catalog_for_replace(
1444                &streaming_job,
1445                Some(&ctx),
1446                fragment_graph.specified_parallelism().as_ref(),
1447                Some(fragment_graph.max_parallelism()),
1448            )
1449            .await?;
1450
1451        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1452            sinks
1453                .iter()
1454                .map(|sink| sink.tmp_sink_id.as_object_id())
1455                .collect_vec()
1456        });
1457
1458        tracing::debug!(id = %job_id, "building replace streaming job");
1459        let mut updated_sink_catalogs = vec![];
1460
1461        let mut drop_table_connector_ctx = None;
1462        let result: MetaResult<_> = try {
1463            let (mut ctx, mut stream_job_fragments) = self
1464                .build_replace_job(
1465                    ctx,
1466                    &streaming_job,
1467                    fragment_graph,
1468                    tmp_id,
1469                    auto_refresh_schema_sinks,
1470                )
1471                .await?;
1472            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1473            let auto_refresh_schema_sink_finish_ctx =
1474                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1475                    sinks
1476                        .iter()
1477                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1478                            tmp_sink_id: sink.tmp_sink_id,
1479                            original_sink_id: sink.original_sink.id,
1480                            columns: sink.new_schema.clone(),
1481                            new_log_store_table: sink
1482                                .new_log_store_table
1483                                .as_ref()
1484                                .map(|table| (table.id, table.columns.clone())),
1485                        })
1486                        .collect()
1487                });
1488
1489            // Handle table that has incoming sinks.
1490            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1491                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1492                let upstream_infos = self
1493                    .metadata_manager
1494                    .catalog_controller
1495                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1496                    .await?;
1497                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1498
1499                for upstream_info in &upstream_infos {
1500                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1501                    ctx.upstream_fragment_downstreams
1502                        .entry(upstream_fragment_id)
1503                        .or_default()
1504                        .push(upstream_info.new_sink_downstream.clone());
1505                    if upstream_info.sink_original_target_columns.is_empty() {
1506                        updated_sink_catalogs.push(upstream_info.sink_id);
1507                    }
1508                }
1509            }
1510
1511            let replace_upstream = ctx.replace_upstream.clone();
1512
1513            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1514                let empty_downstreams = FragmentDownstreamRelation::default();
1515                for sink in sinks {
1516                    self.metadata_manager
1517                        .catalog_controller
1518                        .prepare_streaming_job(
1519                            sink.tmp_sink_id.as_job_id(),
1520                            || [&sink.new_fragment].into_iter(),
1521                            &empty_downstreams,
1522                            true,
1523                            None,
1524                            None,
1525                        )
1526                        .await?;
1527                }
1528            }
1529
1530            self.metadata_manager
1531                .catalog_controller
1532                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1533                .await?;
1534
1535            self.stream_manager
1536                .replace_stream_job(stream_job_fragments, ctx)
1537                .await?;
1538            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1539        };
1540
1541        match result {
1542            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1543                let version = self
1544                    .metadata_manager
1545                    .catalog_controller
1546                    .finish_replace_streaming_job(
1547                        tmp_id,
1548                        streaming_job,
1549                        replace_upstream,
1550                        SinkIntoTableContext {
1551                            updated_sink_catalogs,
1552                        },
1553                        drop_table_connector_ctx.as_ref(),
1554                        auto_refresh_schema_sink_finish_ctx,
1555                    )
1556                    .await?;
1557                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1558                    self.source_manager
1559                        .apply_source_change(SourceChange::DropSource {
1560                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1561                        })
1562                        .await;
1563                }
1564                Ok(version)
1565            }
1566            Err(err) => {
1567                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1568                let _ = self.metadata_manager
1569                    .catalog_controller
1570                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1571                    .await.inspect_err(|err| {
1572                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1573                });
1574                Err(err)
1575            }
1576        }
1577    }
1578
1579    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1580    )]
1581    async fn drop_streaming_job(
1582        &self,
1583        job_id: StreamingJobId,
1584        drop_mode: DropMode,
1585    ) -> MetaResult<NotificationVersion> {
1586        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1587
1588        let (object_id, object_type) = match job_id {
1589            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1590            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1591            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1592            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1593        };
1594
1595        let job_status = self
1596            .metadata_manager
1597            .catalog_controller
1598            .get_streaming_job_status(job_id.id())
1599            .await?;
1600        let version = match job_status {
1601            JobStatus::Initial => {
1602                unreachable!(
1603                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1604                );
1605            }
1606            JobStatus::Creating => {
1607                self.stream_manager
1608                    .cancel_streaming_jobs(vec![job_id.id()])
1609                    .await?;
1610                IGNORED_NOTIFICATION_VERSION
1611            }
1612            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1613        };
1614
1615        Ok(version)
1616    }
1617
1618    /// Resolve the parallelism of the stream job based on the given information.
1619    ///
1620    /// Returns error if user specifies a parallelism that cannot be satisfied.
1621    fn resolve_stream_parallelism(
1622        &self,
1623        specified: Option<NonZeroUsize>,
1624        max: NonZeroUsize,
1625        cluster_info: &StreamingClusterInfo,
1626        resource_group: String,
1627    ) -> MetaResult<NonZeroUsize> {
1628        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1629        DdlController::resolve_stream_parallelism_inner(
1630            specified,
1631            max,
1632            available,
1633            &self.env.opts.default_parallelism,
1634            &resource_group,
1635        )
1636    }
1637
1638    fn resolve_stream_parallelism_inner(
1639        specified: Option<NonZeroUsize>,
1640        max: NonZeroUsize,
1641        available: Option<NonZeroUsize>,
1642        default_parallelism: &DefaultParallelism,
1643        resource_group: &str,
1644    ) -> MetaResult<NonZeroUsize> {
1645        let Some(available) = available else {
1646            bail_unavailable!(
1647                "no available slots to schedule in resource group \"{}\", \
1648                 have you allocated any compute nodes within this resource group?",
1649                resource_group
1650            );
1651        };
1652
1653        if let Some(specified) = specified {
1654            if specified > max {
1655                bail_invalid_parameter!(
1656                    "specified parallelism {} should not exceed max parallelism {}",
1657                    specified,
1658                    max,
1659                );
1660            }
1661            if specified > available {
1662                tracing::warn!(
1663                    resource_group,
1664                    specified_parallelism = specified.get(),
1665                    available_parallelism = available.get(),
1666                    "specified parallelism exceeds available slots, scheduling with specified value",
1667                );
1668            }
1669            return Ok(specified);
1670        }
1671
1672        // Use default parallelism when no specific parallelism is provided by the user.
1673        let default_parallelism = match default_parallelism {
1674            DefaultParallelism::Full => available,
1675            DefaultParallelism::Default(num) => {
1676                if *num > available {
1677                    tracing::warn!(
1678                        resource_group,
1679                        configured_parallelism = num.get(),
1680                        available_parallelism = available.get(),
1681                        "default parallelism exceeds available slots, scheduling with configured value",
1682                    );
1683                }
1684                *num
1685            }
1686        };
1687
1688        if default_parallelism > max {
1689            tracing::warn!(
1690                max_parallelism = max.get(),
1691                resource_group,
1692                "default parallelism exceeds max parallelism, capping to max",
1693            );
1694        }
1695        Ok(default_parallelism.min(max))
1696    }
1697
1698    /// Builds the actor graph:
1699    /// - Add the upstream fragments to the fragment graph
1700    /// - Schedule the fragments based on their distribution
1701    /// - Expand each fragment into one or several actors
1702    /// - Construct the fragment level backfill order control.
1703    #[await_tree::instrument]
1704    pub(crate) async fn build_stream_job(
1705        &self,
1706        stream_ctx: StreamContext,
1707        mut stream_job: StreamingJob,
1708        fragment_graph: StreamFragmentGraph,
1709        resource_type: streaming_job_resource_type::ResourceType,
1710    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1711        let id = stream_job.id();
1712        let specified_parallelism = fragment_graph.specified_parallelism();
1713        let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1714        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1715
1716        // 1. Fragment Level ordering graph
1717        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1718
1719        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1720        // contains all information needed for building the actor graph.
1721
1722        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1723            fragment_graph.collect_snapshot_backfill_info()?;
1724        assert!(
1725            snapshot_backfill_info
1726                .iter()
1727                .chain([&cross_db_snapshot_backfill_info])
1728                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1729                .all(|backfill_epoch| backfill_epoch.is_none()),
1730            "should not set backfill epoch when initially build the job: {:?} {:?}",
1731            snapshot_backfill_info,
1732            cross_db_snapshot_backfill_info
1733        );
1734
1735        let locality_fragment_state_table_mapping =
1736            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1737
1738        // check if log store exists for all cross-db upstreams
1739        self.metadata_manager
1740            .catalog_controller
1741            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1742            .await?;
1743
1744        let upstream_table_ids = fragment_graph
1745            .dependent_table_ids()
1746            .iter()
1747            .filter(|id| {
1748                !cross_db_snapshot_backfill_info
1749                    .upstream_mv_table_id_to_backfill_epoch
1750                    .contains_key(*id)
1751            })
1752            .cloned()
1753            .collect();
1754
1755        let (upstream_root_fragments, existing_actor_location) = self
1756            .metadata_manager
1757            .get_upstream_root_fragments(&upstream_table_ids)
1758            .await?;
1759
1760        if snapshot_backfill_info.is_some() {
1761            match stream_job {
1762                StreamingJob::MaterializedView(_)
1763                | StreamingJob::Sink(_)
1764                | StreamingJob::Index(_, _) => {}
1765                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1766                    return Err(
1767                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1768                    );
1769                }
1770            }
1771        }
1772
1773        let upstream_actors = upstream_root_fragments
1774            .values()
1775            .map(|(fragment, _)| {
1776                (
1777                    fragment.fragment_id,
1778                    fragment.actors.keys().copied().collect(),
1779                )
1780            })
1781            .collect();
1782
1783        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1784            fragment_graph,
1785            FragmentGraphUpstreamContext {
1786                upstream_root_fragments,
1787                upstream_actor_location: existing_actor_location,
1788            },
1789            (&stream_job).into(),
1790        )?;
1791        let resource_group = if let Some(group) = resource_type.resource_group() {
1792            group
1793        } else {
1794            self.metadata_manager
1795                .get_database_resource_group(stream_job.database_id())
1796                .await?
1797        };
1798        let is_serverless_backfill = matches!(
1799            &resource_type,
1800            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1801        );
1802
1803        // 3. Build the actor graph.
1804        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1805
1806        let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1807        let parallelism = self.resolve_stream_parallelism(
1808            initial_parallelism,
1809            max_parallelism,
1810            &cluster_info,
1811            resource_group.clone(),
1812        )?;
1813
1814        let parallelism = if initial_parallelism.is_some() {
1815            parallelism.get()
1816        } else {
1817            // Prefer job-level override for initial scheduling, fallback to system strategy.
1818            let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1819                Some(strategy) => strategy,
1820                None => self
1821                    .env
1822                    .system_params_reader()
1823                    .await
1824                    .adaptive_parallelism_strategy(),
1825            };
1826            adaptive_strategy.compute_target_parallelism(parallelism.get())
1827        };
1828
1829        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1830        let actor_graph_builder = ActorGraphBuilder::new(
1831            id,
1832            resource_group,
1833            complete_graph,
1834            cluster_info,
1835            parallelism,
1836        )?;
1837
1838        let ActorGraphBuildResult {
1839            graph,
1840            downstream_fragment_relations,
1841            building_locations,
1842            upstream_fragment_downstreams,
1843            new_no_shuffle,
1844            replace_upstream,
1845            ..
1846        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1847        assert!(replace_upstream.is_empty());
1848
1849        // 4. Build the table fragments structure that will be persisted in the stream manager,
1850        // and the context that contains all information needed for building the
1851        // actors on the compute nodes.
1852
1853        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1854        // Otherwise, it defaults to FIXED based on deduction.
1855        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1856            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1857            _ => TableParallelism::Fixed(parallelism.get()),
1858        };
1859
1860        let stream_job_fragments = StreamJobFragments::new(
1861            id,
1862            graph,
1863            &building_locations.actor_locations,
1864            stream_ctx.clone(),
1865            table_parallelism,
1866            max_parallelism.get(),
1867        );
1868
1869        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1870            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1871        }
1872
1873        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1874            && let Ok(table_id) = sink.get_target_table()
1875        {
1876            let tables = self
1877                .metadata_manager
1878                .get_table_catalog_by_ids(&[*table_id])
1879                .await?;
1880            let target_table = tables
1881                .first()
1882                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1883            let sink_fragment = stream_job_fragments
1884                .sink_fragment()
1885                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1886            let mview_fragment_id = self
1887                .metadata_manager
1888                .catalog_controller
1889                .get_mview_fragment_by_id(table_id.as_job_id())
1890                .await?;
1891            let upstream_sink_info = build_upstream_sink_info(
1892                sink.id,
1893                sink.original_target_columns.clone(),
1894                sink_fragment.fragment_id as _,
1895                target_table,
1896                mview_fragment_id,
1897            )?;
1898            Some(upstream_sink_info)
1899        } else {
1900            None
1901        };
1902
1903        let mut cdc_table_snapshot_splits = None;
1904        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1905            && let Some((_, stream_cdc_scan)) =
1906                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1907        {
1908            {
1909                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1910                let splits = try_init_parallel_cdc_table_snapshot_splits(
1911                    table.id,
1912                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1913                    self.env.meta_store_ref(),
1914                    stream_cdc_scan.options.as_ref().unwrap(),
1915                    self.env.opts.cdc_table_split_init_insert_batch_size,
1916                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1917                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1918                )
1919                .await?;
1920                cdc_table_snapshot_splits = Some(splits);
1921            }
1922        }
1923
1924        let ctx = CreateStreamingJobContext {
1925            upstream_fragment_downstreams,
1926            new_no_shuffle,
1927            upstream_actors,
1928            building_locations,
1929            definition: stream_job.definition(),
1930            create_type: stream_job.create_type(),
1931            job_type: (&stream_job).into(),
1932            streaming_job: stream_job,
1933            new_upstream_sink,
1934            option: CreateStreamingJobOption {},
1935            snapshot_backfill_info,
1936            cross_db_snapshot_backfill_info,
1937            fragment_backfill_ordering,
1938            locality_fragment_state_table_mapping,
1939            cdc_table_snapshot_splits,
1940            is_serverless_backfill,
1941        };
1942
1943        Ok((
1944            ctx,
1945            StreamJobFragmentsToCreate {
1946                inner: stream_job_fragments,
1947                downstreams: downstream_fragment_relations,
1948            },
1949        ))
1950    }
1951
1952    /// `build_replace_table` builds a job replacement and returns the context and new job
1953    /// fragments.
1954    ///
1955    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1956    /// replacement is finished.
1957    pub(crate) async fn build_replace_job(
1958        &self,
1959        stream_ctx: StreamContext,
1960        stream_job: &StreamingJob,
1961        mut fragment_graph: StreamFragmentGraph,
1962        tmp_job_id: JobId,
1963        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1964    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1965        match &stream_job {
1966            StreamingJob::Table(..)
1967            | StreamingJob::Source(..)
1968            | StreamingJob::MaterializedView(..) => {}
1969            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1970                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1971            }
1972        }
1973
1974        let id = stream_job.id();
1975
1976        // check if performing drop table connector
1977        let mut drop_table_associated_source_id = None;
1978        if let StreamingJob::Table(None, _, _) = &stream_job {
1979            drop_table_associated_source_id = self
1980                .metadata_manager
1981                .get_table_associated_source_id(id.as_mv_table_id())
1982                .await?;
1983        }
1984
1985        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1986        let old_internal_table_ids = old_fragments.internal_table_ids();
1987
1988        // handle drop table's associated source
1989        let mut drop_table_connector_ctx = None;
1990        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1991            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1992            debug_assert!(old_internal_table_ids.len() == 1);
1993
1994            drop_table_connector_ctx = Some(DropTableConnectorContext {
1995                // we do not remove the original table catalog as it's still needed for the streaming job
1996                // just need to remove the ref to the state table
1997                to_change_streaming_job_id: id,
1998                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1999                to_remove_source_id,
2000            });
2001        } else if stream_job.is_materialized_view() {
2002            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
2003            // but more robust.
2004            let old_fragments_upstreams = self
2005                .metadata_manager
2006                .catalog_controller
2007                .upstream_fragments(old_fragments.fragment_ids())
2008                .await?;
2009
2010            let old_state_graph =
2011                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2012            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2013            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2014                .context("incompatible altering on the streaming job states")?;
2015
2016            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2017            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2018        } else {
2019            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2020            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2021            let old_internal_tables = self
2022                .metadata_manager
2023                .get_table_catalog_by_ids(&old_internal_table_ids)
2024                .await?;
2025            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2026        }
2027
2028        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2029        // graph that contains all information needed for building the actor graph.
2030        let original_root_fragment = old_fragments
2031            .root_fragment()
2032            .expect("root fragment not found");
2033
2034        let job_type = StreamingJobType::from(stream_job);
2035
2036        // Extract the downstream fragments from the fragment graph.
2037        let (mut downstream_fragments, mut downstream_actor_location) =
2038            self.metadata_manager.get_downstream_fragments(id).await?;
2039
2040        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2041            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2042                .iter()
2043                .map(|sink| sink.original_fragment.fragment_id)
2044                .collect();
2045            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2046                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2047                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2048                }) {
2049                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2050                    for actor_id in downstream_fragment.actors.keys() {
2051                        downstream_actor_location.remove(actor_id);
2052                    }
2053                    for (actor_id, status) in &sink.actor_status {
2054                        downstream_actor_location
2055                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2056                    }
2057
2058                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2059                    *nodes = sink.new_fragment.nodes.clone();
2060                }
2061            }
2062            assert!(remaining_fragment.is_empty());
2063        }
2064
2065        // build complete graph based on the table job type
2066        let complete_graph = match &job_type {
2067            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2068                CompleteStreamFragmentGraph::with_downstreams(
2069                    fragment_graph,
2070                    FragmentGraphDownstreamContext {
2071                        original_root_fragment_id: original_root_fragment.fragment_id,
2072                        downstream_fragments,
2073                        downstream_actor_location,
2074                    },
2075                    job_type,
2076                )?
2077            }
2078            StreamingJobType::Table(TableJobType::SharedCdcSource)
2079            | StreamingJobType::MaterializedView => {
2080                // CDC tables or materialized views can have upstream jobs as well.
2081                let (upstream_root_fragments, upstream_actor_location) = self
2082                    .metadata_manager
2083                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2084                    .await?;
2085
2086                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2087                    fragment_graph,
2088                    FragmentGraphUpstreamContext {
2089                        upstream_root_fragments,
2090                        upstream_actor_location,
2091                    },
2092                    FragmentGraphDownstreamContext {
2093                        original_root_fragment_id: original_root_fragment.fragment_id,
2094                        downstream_fragments,
2095                        downstream_actor_location,
2096                    },
2097                    job_type,
2098                )?
2099            }
2100            _ => unreachable!(),
2101        };
2102
2103        let resource_group = self
2104            .metadata_manager
2105            .get_existing_job_resource_group(id)
2106            .await?;
2107
2108        // 2. Build the actor graph.
2109        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2110
2111        // XXX: what is this parallelism?
2112        // Is it "assigned parallelism"?
2113        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2114            .expect("The number of actors in the original table fragment should be greater than 0");
2115
2116        let actor_graph_builder = ActorGraphBuilder::new(
2117            id,
2118            resource_group,
2119            complete_graph,
2120            cluster_info,
2121            parallelism,
2122        )?;
2123
2124        let ActorGraphBuildResult {
2125            graph,
2126            downstream_fragment_relations,
2127            building_locations,
2128            upstream_fragment_downstreams,
2129            mut replace_upstream,
2130            new_no_shuffle,
2131            ..
2132        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2133
2134        // general table & source does not have upstream job, so the dispatchers should be empty
2135        if matches!(
2136            job_type,
2137            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2138        ) {
2139            assert!(upstream_fragment_downstreams.is_empty());
2140        }
2141
2142        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2143        // the context that contains all information needed for building the actors on the compute
2144        // nodes.
2145        let stream_job_fragments = StreamJobFragments::new(
2146            tmp_job_id,
2147            graph,
2148            &building_locations.actor_locations,
2149            stream_ctx,
2150            old_fragments.assigned_parallelism,
2151            old_fragments.max_parallelism,
2152        );
2153
2154        if let Some(sinks) = &auto_refresh_schema_sinks {
2155            for sink in sinks {
2156                replace_upstream
2157                    .remove(&sink.new_fragment.fragment_id)
2158                    .expect("should exist");
2159            }
2160        }
2161
2162        // Note: no need to set `vnode_count` as it's already set by the frontend.
2163        // See `get_replace_table_plan`.
2164
2165        let ctx = ReplaceStreamJobContext {
2166            old_fragments,
2167            replace_upstream,
2168            new_no_shuffle,
2169            upstream_fragment_downstreams,
2170            building_locations,
2171            streaming_job: stream_job.clone(),
2172            tmp_id: tmp_job_id,
2173            drop_table_connector_ctx,
2174            auto_refresh_schema_sinks,
2175        };
2176
2177        Ok((
2178            ctx,
2179            StreamJobFragmentsToCreate {
2180                inner: stream_job_fragments,
2181                downstreams: downstream_fragment_relations,
2182            },
2183        ))
2184    }
2185
2186    async fn alter_name(
2187        &self,
2188        relation: alter_name_request::Object,
2189        new_name: &str,
2190    ) -> MetaResult<NotificationVersion> {
2191        let (obj_type, id) = match relation {
2192            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2193            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2194            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2195            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2196            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2197            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2198            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2199            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2200        };
2201        self.metadata_manager
2202            .catalog_controller
2203            .alter_name(obj_type, id, new_name)
2204            .await
2205    }
2206
2207    async fn alter_swap_rename(
2208        &self,
2209        object: alter_swap_rename_request::Object,
2210    ) -> MetaResult<NotificationVersion> {
2211        let (obj_type, src_id, dst_id) = match object {
2212            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2213            alter_swap_rename_request::Object::Table(objs) => {
2214                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2215                (ObjectType::Table, src_id, dst_id)
2216            }
2217            alter_swap_rename_request::Object::View(objs) => {
2218                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2219                (ObjectType::View, src_id, dst_id)
2220            }
2221            alter_swap_rename_request::Object::Source(objs) => {
2222                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2223                (ObjectType::Source, src_id, dst_id)
2224            }
2225            alter_swap_rename_request::Object::Sink(objs) => {
2226                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2227                (ObjectType::Sink, src_id, dst_id)
2228            }
2229            alter_swap_rename_request::Object::Subscription(objs) => {
2230                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2231                (ObjectType::Subscription, src_id, dst_id)
2232            }
2233        };
2234
2235        self.metadata_manager
2236            .catalog_controller
2237            .alter_swap_rename(obj_type, src_id, dst_id)
2238            .await
2239    }
2240
2241    async fn alter_owner(
2242        &self,
2243        object: Object,
2244        owner_id: UserId,
2245    ) -> MetaResult<NotificationVersion> {
2246        let (obj_type, id) = match object {
2247            Object::TableId(id) => (ObjectType::Table, id),
2248            Object::ViewId(id) => (ObjectType::View, id),
2249            Object::SourceId(id) => (ObjectType::Source, id),
2250            Object::SinkId(id) => (ObjectType::Sink, id),
2251            Object::SchemaId(id) => (ObjectType::Schema, id),
2252            Object::DatabaseId(id) => (ObjectType::Database, id),
2253            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2254            Object::ConnectionId(id) => (ObjectType::Connection, id),
2255            Object::FunctionId(id) => (ObjectType::Function, id),
2256            Object::SecretId(id) => (ObjectType::Secret, id),
2257        };
2258        self.metadata_manager
2259            .catalog_controller
2260            .alter_owner(obj_type, id.into(), owner_id as _)
2261            .await
2262    }
2263
2264    async fn alter_set_schema(
2265        &self,
2266        object: alter_set_schema_request::Object,
2267        new_schema_id: SchemaId,
2268    ) -> MetaResult<NotificationVersion> {
2269        let (obj_type, id) = match object {
2270            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2271            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2272            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2273            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2274            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2275            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2276            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2277        };
2278        self.metadata_manager
2279            .catalog_controller
2280            .alter_schema(obj_type, id.into(), new_schema_id as _)
2281            .await
2282    }
2283
2284    pub async fn wait(&self) -> MetaResult<()> {
2285        let timeout_ms = 30 * 60 * 1000;
2286        for _ in 0..timeout_ms {
2287            if self
2288                .metadata_manager
2289                .catalog_controller
2290                .list_background_creating_jobs(true, None)
2291                .await?
2292                .is_empty()
2293            {
2294                return Ok(());
2295            }
2296
2297            sleep(Duration::from_millis(1)).await;
2298        }
2299        Err(MetaError::cancelled(format!(
2300            "timeout after {timeout_ms}ms"
2301        )))
2302    }
2303
2304    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2305        self.metadata_manager
2306            .catalog_controller
2307            .comment_on(comment)
2308            .await
2309    }
2310
2311    async fn alter_streaming_job_config(
2312        &self,
2313        job_id: JobId,
2314        entries_to_add: HashMap<String, String>,
2315        keys_to_remove: Vec<String>,
2316    ) -> MetaResult<NotificationVersion> {
2317        self.metadata_manager
2318            .catalog_controller
2319            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2320            .await
2321    }
2322}
2323
2324fn report_create_object(
2325    job_id: JobId,
2326    event_name: &str,
2327    obj_type: PbTelemetryDatabaseObject,
2328    connector_name: Option<String>,
2329    attr_info: Option<jsonbb::Value>,
2330) {
2331    report_event(
2332        PbTelemetryEventStage::CreateStreamJob,
2333        event_name,
2334        job_id.as_raw_id() as _,
2335        connector_name,
2336        Some(obj_type),
2337        attr_info,
2338    );
2339}
2340
2341pub fn build_upstream_sink_info(
2342    sink_id: SinkId,
2343    original_target_columns: Vec<PbColumnCatalog>,
2344    sink_fragment_id: FragmentId,
2345    target_table: &PbTable,
2346    target_fragment_id: FragmentId,
2347) -> MetaResult<UpstreamSinkInfo> {
2348    let sink_columns = if !original_target_columns.is_empty() {
2349        original_target_columns.clone()
2350    } else {
2351        // This is due to the fact that the value did not exist in earlier versions,
2352        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2353        // Therefore the columns of the table at this point are `original_target_columns`.
2354        // This value of sink will be filled on the meta.
2355        target_table.columns.clone()
2356    };
2357
2358    let sink_output_fields = sink_columns
2359        .iter()
2360        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2361        .collect_vec();
2362    let output_indices = (0..sink_output_fields.len())
2363        .map(|i| i as u32)
2364        .collect_vec();
2365
2366    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2367        let sink_idx_by_col_id = sink_columns
2368            .iter()
2369            .enumerate()
2370            .map(|(idx, col)| {
2371                let column_id = col.column_desc.as_ref().unwrap().column_id;
2372                (column_id, idx as u32)
2373            })
2374            .collect::<HashMap<_, _>>();
2375        target_table
2376            .distribution_key
2377            .iter()
2378            .map(|dist_idx| {
2379                let column_id = target_table.columns[*dist_idx as usize]
2380                    .column_desc
2381                    .as_ref()
2382                    .unwrap()
2383                    .column_id;
2384                let sink_idx = sink_idx_by_col_id
2385                    .get(&column_id)
2386                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2387                Ok(*sink_idx)
2388            })
2389            .collect::<anyhow::Result<Vec<_>>>()?
2390    };
2391    let dist_key_indices =
2392        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2393    let downstream_fragment_id = target_fragment_id as _;
2394    let new_downstream_relation = DownstreamFragmentRelation {
2395        downstream_fragment_id,
2396        dispatcher_type: DispatcherType::Hash,
2397        dist_key_indices,
2398        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2399    };
2400    let current_target_columns = target_table.get_columns();
2401    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2402    Ok(UpstreamSinkInfo {
2403        sink_id,
2404        sink_fragment_id: sink_fragment_id as _,
2405        sink_output_fields,
2406        sink_original_target_columns: original_target_columns,
2407        project_exprs,
2408        new_sink_downstream: new_downstream_relation,
2409    })
2410}
2411
2412pub fn refill_upstream_sink_union_in_table(
2413    union_fragment_root: &mut PbStreamNode,
2414    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2415) {
2416    visit_stream_node_cont_mut(union_fragment_root, |node| {
2417        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2418            let init_upstreams = upstream_sink_infos
2419                .iter()
2420                .map(|info| PbUpstreamSinkInfo {
2421                    upstream_fragment_id: info.sink_fragment_id,
2422                    sink_output_schema: info.sink_output_fields.clone(),
2423                    project_exprs: info.project_exprs.clone(),
2424                })
2425                .collect();
2426            upstream_sink_union.init_upstreams = init_upstreams;
2427            false
2428        } else {
2429            true
2430        }
2431    });
2432}
2433
2434#[cfg(test)]
2435mod tests {
2436    use std::num::NonZeroUsize;
2437
2438    use super::*;
2439
2440    #[test]
2441    fn test_specified_parallelism_exceeds_available() {
2442        let result = DdlController::resolve_stream_parallelism_inner(
2443            Some(NonZeroUsize::new(100).unwrap()),
2444            NonZeroUsize::new(256).unwrap(),
2445            Some(NonZeroUsize::new(4).unwrap()),
2446            &DefaultParallelism::Full,
2447            "default",
2448        )
2449        .unwrap();
2450        assert_eq!(result.get(), 100);
2451    }
2452
2453    #[test]
2454    fn test_allows_default_parallelism_over_available() {
2455        let result = DdlController::resolve_stream_parallelism_inner(
2456            None,
2457            NonZeroUsize::new(256).unwrap(),
2458            Some(NonZeroUsize::new(4).unwrap()),
2459            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2460            "default",
2461        )
2462        .unwrap();
2463        assert_eq!(result.get(), 50);
2464    }
2465
2466    #[test]
2467    fn test_full_parallelism_capped_by_max() {
2468        let result = DdlController::resolve_stream_parallelism_inner(
2469            None,
2470            NonZeroUsize::new(6).unwrap(),
2471            Some(NonZeroUsize::new(10).unwrap()),
2472            &DefaultParallelism::Full,
2473            "default",
2474        )
2475        .unwrap();
2476        assert_eq!(result.get(), 6);
2477    }
2478
2479    #[test]
2480    fn test_no_available_slots_returns_error() {
2481        let result = DdlController::resolve_stream_parallelism_inner(
2482            None,
2483            NonZeroUsize::new(4).unwrap(),
2484            None,
2485            &DefaultParallelism::Full,
2486            "default",
2487        );
2488        assert!(matches!(
2489            result,
2490            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2491        ));
2492    }
2493
2494    #[test]
2495    fn test_specified_over_max_returns_error() {
2496        let result = DdlController::resolve_stream_parallelism_inner(
2497            Some(NonZeroUsize::new(8).unwrap()),
2498            NonZeroUsize::new(4).unwrap(),
2499            Some(NonZeroUsize::new(10).unwrap()),
2500            &DefaultParallelism::Full,
2501            "default",
2502        );
2503        assert!(matches!(
2504            result,
2505            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2506        ));
2507    }
2508}