risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
80use crate::manager::sink_coordination::SinkCoordinatorManager;
81use crate::manager::{
82    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
83    NotificationVersion, StreamingJob, StreamingJobType,
84};
85use crate::model::{
86    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
87    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
88    TableParallelism,
89};
90use crate::stream::cdc::{
91    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
92};
93use crate::stream::{
94    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
95    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
96    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
97    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
98    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
99    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
100};
101use crate::telemetry::report_event;
102use crate::{MetaError, MetaResult};
103
104#[derive(PartialEq)]
105pub enum DropMode {
106    Restrict,
107    Cascade,
108}
109
110impl DropMode {
111    pub fn from_request_setting(cascade: bool) -> DropMode {
112        if cascade {
113            DropMode::Cascade
114        } else {
115            DropMode::Restrict
116        }
117    }
118}
119
120#[derive(strum::AsRefStr)]
121pub enum StreamingJobId {
122    MaterializedView(TableId),
123    Sink(SinkId),
124    Table(Option<SourceId>, TableId),
125    Index(IndexId),
126}
127
128impl std::fmt::Display for StreamingJobId {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        write!(f, "{}", self.as_ref())?;
131        write!(f, "({})", self.id())
132    }
133}
134
135impl StreamingJobId {
136    fn id(&self) -> JobId {
137        match self {
138            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
139            StreamingJobId::Index(id) => id.as_job_id(),
140            StreamingJobId::Sink(id) => id.as_job_id(),
141        }
142    }
143}
144
145/// It’s used to describe the information of the job that needs to be replaced
146/// and it will be used during replacing table and creating sink into table operations.
147pub struct ReplaceStreamJobInfo {
148    pub streaming_job: StreamingJob,
149    pub fragment_graph: StreamFragmentGraphProto,
150}
151
152#[derive(Display)]
153pub enum DdlCommand {
154    CreateDatabase(Database),
155    DropDatabase(DatabaseId),
156    CreateSchema(Schema),
157    DropSchema(SchemaId, DropMode),
158    CreateNonSharedSource(Source),
159    DropSource(SourceId, DropMode),
160    ResetSource(SourceId),
161    CreateFunction(Function),
162    DropFunction(FunctionId, DropMode),
163    CreateView(View, HashSet<ObjectId>),
164    DropView(ViewId, DropMode),
165    CreateStreamingJob {
166        stream_job: StreamingJob,
167        fragment_graph: StreamFragmentGraphProto,
168        dependencies: HashSet<ObjectId>,
169        resource_type: streaming_job_resource_type::ResourceType,
170        if_not_exists: bool,
171    },
172    DropStreamingJob {
173        job_id: StreamingJobId,
174        drop_mode: DropMode,
175    },
176    AlterName(alter_name_request::Object, String),
177    AlterSwapRename(alter_swap_rename_request::Object),
178    ReplaceStreamJob(ReplaceStreamJobInfo),
179    AlterNonSharedSource(Source),
180    AlterObjectOwner(Object, UserId),
181    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
182    CreateConnection(Connection),
183    DropConnection(ConnectionId, DropMode),
184    CreateSecret(Secret),
185    AlterSecret(Secret),
186    DropSecret(SecretId, DropMode),
187    CommentOn(Comment),
188    CreateSubscription(Subscription),
189    DropSubscription(SubscriptionId, DropMode),
190    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
191    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
192}
193
194impl DdlCommand {
195    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
196    fn object(&self) -> Either<String, ObjectId> {
197        use Either::*;
198        match self {
199            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
200            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
201            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
202            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
203            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
204            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
205            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
206            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
207            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
208            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
209            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
210            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
211            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
212            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
213            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
214            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
215            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
216            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
217            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
218            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
219            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
220            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
221            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
222            DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
223            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
224            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
225            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
226            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
227            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
228        }
229    }
230
231    fn allow_in_recovery(&self) -> bool {
232        match self {
233            DdlCommand::DropDatabase(_)
234            | DdlCommand::DropSchema(_, _)
235            | DdlCommand::DropSource(_, _)
236            | DdlCommand::DropFunction(_, _)
237            | DdlCommand::DropView(_, _)
238            | DdlCommand::DropStreamingJob { .. }
239            | DdlCommand::DropConnection(_, _)
240            | DdlCommand::DropSecret(_, _)
241            | DdlCommand::DropSubscription(_, _)
242            | DdlCommand::AlterName(_, _)
243            | DdlCommand::AlterObjectOwner(_, _)
244            | DdlCommand::AlterSetSchema(_, _)
245            | DdlCommand::CreateDatabase(_)
246            | DdlCommand::CreateSchema(_)
247            | DdlCommand::CreateFunction(_)
248            | DdlCommand::CreateView(_, _)
249            | DdlCommand::CreateConnection(_)
250            | DdlCommand::CommentOn(_)
251            | DdlCommand::CreateSecret(_)
252            | DdlCommand::AlterSecret(_)
253            | DdlCommand::AlterSwapRename(_)
254            | DdlCommand::AlterDatabaseParam(_, _)
255            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
256            DdlCommand::CreateStreamingJob { .. }
257            | DdlCommand::CreateNonSharedSource(_)
258            | DdlCommand::ReplaceStreamJob(_)
259            | DdlCommand::AlterNonSharedSource(_)
260            | DdlCommand::ResetSource(_)
261            | DdlCommand::CreateSubscription(_) => false,
262        }
263    }
264}
265
266#[derive(Clone)]
267pub struct DdlController {
268    pub(crate) env: MetaSrvEnv,
269
270    pub(crate) metadata_manager: MetadataManager,
271    pub(crate) stream_manager: GlobalStreamManagerRef,
272    pub(crate) source_manager: SourceManagerRef,
273    barrier_manager: BarrierManagerRef,
274    sink_manager: SinkCoordinatorManager,
275    iceberg_compaction_manager: IcebergCompactionManagerRef,
276
277    // The semaphore is used to limit the number of concurrent streaming job creation.
278    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
279
280    /// Sequence number for DDL commands, used for observability and debugging.
281    seq: Arc<AtomicU64>,
282}
283
284#[derive(Clone)]
285pub struct CreatingStreamingJobPermit {
286    pub(crate) semaphore: Arc<Semaphore>,
287}
288
289impl CreatingStreamingJobPermit {
290    async fn new(env: &MetaSrvEnv) -> Self {
291        let mut permits = env
292            .system_params_reader()
293            .await
294            .max_concurrent_creating_streaming_jobs() as usize;
295        if permits == 0 {
296            // if the system parameter is set to zero, use the max permitted value.
297            permits = Semaphore::MAX_PERMITS;
298        }
299        let semaphore = Arc::new(Semaphore::new(permits));
300
301        let (local_notification_tx, mut local_notification_rx) =
302            tokio::sync::mpsc::unbounded_channel();
303        env.notification_manager()
304            .insert_local_sender(local_notification_tx);
305        let semaphore_clone = semaphore.clone();
306        tokio::spawn(async move {
307            while let Some(notification) = local_notification_rx.recv().await {
308                let LocalNotification::SystemParamsChange(p) = &notification else {
309                    continue;
310                };
311                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
312                if new_permits == 0 {
313                    new_permits = Semaphore::MAX_PERMITS;
314                }
315                match permits.cmp(&new_permits) {
316                    Ordering::Less => {
317                        semaphore_clone.add_permits(new_permits - permits);
318                    }
319                    Ordering::Equal => continue,
320                    Ordering::Greater => {
321                        let to_release = permits - new_permits;
322                        let reduced = semaphore_clone.forget_permits(to_release);
323                        // TODO: implement dynamic semaphore with limits by ourself.
324                        if reduced != to_release {
325                            tracing::warn!(
326                                "no enough permits to release, expected {}, but reduced {}",
327                                to_release,
328                                reduced
329                            );
330                        }
331                    }
332                }
333                tracing::info!(
334                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
335                    permits,
336                    new_permits
337                );
338                permits = new_permits;
339            }
340        });
341
342        Self { semaphore }
343    }
344}
345
346impl DdlController {
347    pub async fn new(
348        env: MetaSrvEnv,
349        metadata_manager: MetadataManager,
350        stream_manager: GlobalStreamManagerRef,
351        source_manager: SourceManagerRef,
352        barrier_manager: BarrierManagerRef,
353        sink_manager: SinkCoordinatorManager,
354        iceberg_compaction_manager: IcebergCompactionManagerRef,
355    ) -> Self {
356        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
357        Self {
358            env,
359            metadata_manager,
360            stream_manager,
361            source_manager,
362            barrier_manager,
363            sink_manager,
364            iceberg_compaction_manager,
365            creating_streaming_job_permits,
366            seq: Arc::new(AtomicU64::new(0)),
367        }
368    }
369
370    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
371    pub fn next_seq(&self) -> u64 {
372        // This is a simple atomic increment operation.
373        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
374    }
375
376    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
377    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
378    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
379    /// would be a huge hassle and pain if we don't spawn here.
380    ///
381    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
382    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
383        if !command.allow_in_recovery() {
384            self.barrier_manager.check_status_running()?;
385        }
386
387        let await_tree_key = format!("DDL Command {}", self.next_seq());
388        let await_tree_span = await_tree::span!("{command}({})", command.object());
389
390        let ctrl = self.clone();
391        let fut = async move {
392            match command {
393                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
394                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
395                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
396                DdlCommand::DropSchema(schema_id, drop_mode) => {
397                    ctrl.drop_schema(schema_id, drop_mode).await
398                }
399                DdlCommand::CreateNonSharedSource(source) => {
400                    ctrl.create_non_shared_source(source).await
401                }
402                DdlCommand::DropSource(source_id, drop_mode) => {
403                    ctrl.drop_source(source_id, drop_mode).await
404                }
405                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
406                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
407                DdlCommand::DropFunction(function_id, drop_mode) => {
408                    ctrl.drop_function(function_id, drop_mode).await
409                }
410                DdlCommand::CreateView(view, dependencies) => {
411                    ctrl.create_view(view, dependencies).await
412                }
413                DdlCommand::DropView(view_id, drop_mode) => {
414                    ctrl.drop_view(view_id, drop_mode).await
415                }
416                DdlCommand::CreateStreamingJob {
417                    stream_job,
418                    fragment_graph,
419                    dependencies,
420                    resource_type,
421                    if_not_exists,
422                } => {
423                    ctrl.create_streaming_job(
424                        stream_job,
425                        fragment_graph,
426                        dependencies,
427                        resource_type,
428                        if_not_exists,
429                    )
430                    .await
431                }
432                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
433                    ctrl.drop_streaming_job(job_id, drop_mode).await
434                }
435                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
436                    streaming_job,
437                    fragment_graph,
438                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
439                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
440                DdlCommand::AlterObjectOwner(object, owner_id) => {
441                    ctrl.alter_owner(object, owner_id).await
442                }
443                DdlCommand::AlterSetSchema(object, new_schema_id) => {
444                    ctrl.alter_set_schema(object, new_schema_id).await
445                }
446                DdlCommand::CreateConnection(connection) => {
447                    ctrl.create_connection(connection).await
448                }
449                DdlCommand::DropConnection(connection_id, drop_mode) => {
450                    ctrl.drop_connection(connection_id, drop_mode).await
451                }
452                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
453                DdlCommand::DropSecret(secret_id, drop_mode) => {
454                    ctrl.drop_secret(secret_id, drop_mode).await
455                }
456                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
457                DdlCommand::AlterNonSharedSource(source) => {
458                    ctrl.alter_non_shared_source(source).await
459                }
460                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
461                DdlCommand::CreateSubscription(subscription) => {
462                    ctrl.create_subscription(subscription).await
463                }
464                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
465                    ctrl.drop_subscription(subscription_id, drop_mode).await
466                }
467                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
468                DdlCommand::AlterDatabaseParam(database_id, param) => {
469                    ctrl.alter_database_param(database_id, param).await
470                }
471                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
472                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
473                        .await
474                }
475            }
476        }
477        .in_current_span();
478        let fut = (self.env.await_tree_reg())
479            .register(await_tree_key, await_tree_span)
480            .instrument(Box::pin(fut));
481        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
482        Ok(Some(WaitVersion {
483            catalog_version: notification_version,
484            hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
485        }))
486    }
487
488    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
489        self.barrier_manager.get_ddl_progress().await
490    }
491
492    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
493        let (version, updated_db) = self
494            .metadata_manager
495            .catalog_controller
496            .create_database(database)
497            .await?;
498        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
499        self.barrier_manager
500            .update_database_barrier(
501                updated_db.database_id,
502                updated_db.barrier_interval_ms.map(|v| v as u32),
503                updated_db.checkpoint_frequency.map(|v| v as u64),
504            )
505            .await?;
506        Ok(version)
507    }
508
509    #[tracing::instrument(skip(self), level = "debug")]
510    pub async fn reschedule_streaming_job(
511        &self,
512        job_id: JobId,
513        target: ReschedulePolicy,
514        mut deferred: bool,
515    ) -> MetaResult<()> {
516        tracing::info!("altering parallelism for job {}", job_id);
517        if self.barrier_manager.check_status_running().is_err() {
518            tracing::info!(
519                "alter parallelism is set to deferred mode because the system is in recovery state"
520            );
521            deferred = true;
522        }
523
524        self.stream_manager
525            .reschedule_streaming_job(job_id, target, deferred)
526            .await
527    }
528
529    pub async fn reschedule_cdc_table_backfill(
530        &self,
531        job_id: JobId,
532        target: ReschedulePolicy,
533    ) -> MetaResult<()> {
534        tracing::info!("alter CDC table backfill parallelism");
535        if self.barrier_manager.check_status_running().is_err() {
536            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
537        }
538        self.stream_manager
539            .reschedule_cdc_table_backfill(job_id, target)
540            .await
541    }
542
543    pub async fn reschedule_fragments(
544        &self,
545        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
546    ) -> MetaResult<()> {
547        tracing::info!(
548            "altering parallelism for fragments {:?}",
549            fragment_targets.keys()
550        );
551        let fragment_targets = fragment_targets
552            .into_iter()
553            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
554            .collect();
555
556        self.stream_manager
557            .reschedule_fragments(fragment_targets)
558            .await
559    }
560
561    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
562        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
563            .await
564    }
565
566    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
567        self.metadata_manager
568            .catalog_controller
569            .create_schema(schema)
570            .await
571    }
572
573    async fn drop_schema(
574        &self,
575        schema_id: SchemaId,
576        drop_mode: DropMode,
577    ) -> MetaResult<NotificationVersion> {
578        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
579            .await
580    }
581
582    /// Shared source is handled in [`Self::create_streaming_job`]
583    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
584        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
585            .await
586            .context("failed to create source worker")?;
587
588        let (source_id, version) = self
589            .metadata_manager
590            .catalog_controller
591            .create_source(source)
592            .await?;
593        self.source_manager
594            .register_source_with_handle(source_id, handle)
595            .await;
596        Ok(version)
597    }
598
599    async fn drop_source(
600        &self,
601        source_id: SourceId,
602        drop_mode: DropMode,
603    ) -> MetaResult<NotificationVersion> {
604        self.drop_object(ObjectType::Source, source_id, drop_mode)
605            .await
606    }
607
608    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
609        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
610
611        // Get database_id for the source
612        let database_id = self
613            .metadata_manager
614            .catalog_controller
615            .get_object_database_id(source_id)
616            .await?;
617
618        self.stream_manager
619            .barrier_scheduler
620            .run_command(database_id, Command::ResetSource { source_id })
621            .await?;
622
623        // RESET SOURCE doesn't modify catalog, so return the current catalog version
624        let version = self
625            .metadata_manager
626            .catalog_controller
627            .notify_frontend_trivial()
628            .await;
629        Ok(version)
630    }
631
632    /// This replaces the source in the catalog.
633    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
634    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
635        self.metadata_manager
636            .catalog_controller
637            .alter_non_shared_source(source)
638            .await
639    }
640
641    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
642        self.metadata_manager
643            .catalog_controller
644            .create_function(function)
645            .await
646    }
647
648    async fn drop_function(
649        &self,
650        function_id: FunctionId,
651        drop_mode: DropMode,
652    ) -> MetaResult<NotificationVersion> {
653        self.drop_object(ObjectType::Function, function_id, drop_mode)
654            .await
655    }
656
657    async fn create_view(
658        &self,
659        view: View,
660        dependencies: HashSet<ObjectId>,
661    ) -> MetaResult<NotificationVersion> {
662        self.metadata_manager
663            .catalog_controller
664            .create_view(view, dependencies)
665            .await
666    }
667
668    async fn drop_view(
669        &self,
670        view_id: ViewId,
671        drop_mode: DropMode,
672    ) -> MetaResult<NotificationVersion> {
673        self.drop_object(ObjectType::View, view_id, drop_mode).await
674    }
675
676    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
677        validate_connection(&connection).await?;
678        self.metadata_manager
679            .catalog_controller
680            .create_connection(connection)
681            .await
682    }
683
684    async fn drop_connection(
685        &self,
686        connection_id: ConnectionId,
687        drop_mode: DropMode,
688    ) -> MetaResult<NotificationVersion> {
689        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
690            .await
691    }
692
693    async fn alter_database_param(
694        &self,
695        database_id: DatabaseId,
696        param: AlterDatabaseParam,
697    ) -> MetaResult<NotificationVersion> {
698        let (version, updated_db) = self
699            .metadata_manager
700            .catalog_controller
701            .alter_database_param(database_id, param)
702            .await?;
703        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
704        self.barrier_manager
705            .update_database_barrier(
706                database_id,
707                updated_db.barrier_interval_ms.map(|v| v as u32),
708                updated_db.checkpoint_frequency.map(|v| v as u64),
709            )
710            .await?;
711        Ok(version)
712    }
713
714    // The 'secret' part of the request we receive from the frontend is in plaintext;
715    // here, we need to encrypt it before storing it in the catalog.
716    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
717        let secret_store_private_key = self
718            .env
719            .opts
720            .secret_store_private_key
721            .clone()
722            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
723
724        let encrypted_payload = SecretEncryption::encrypt(
725            secret_store_private_key.as_slice(),
726            secret.get_value().as_slice(),
727        )
728        .context(format!("failed to encrypt secret {}", secret.name))?;
729        Ok(encrypted_payload
730            .serialize()
731            .context(format!("failed to serialize secret {}", secret.name))?)
732    }
733
734    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
735        // The 'secret' part of the request we receive from the frontend is in plaintext;
736        // here, we need to encrypt it before storing it in the catalog.
737        let secret_plain_payload = secret.value.clone();
738        let encrypted_payload = self.get_encrypted_payload(&secret)?;
739        secret.value = encrypted_payload;
740
741        self.metadata_manager
742            .catalog_controller
743            .create_secret(secret, secret_plain_payload)
744            .await
745    }
746
747    async fn drop_secret(
748        &self,
749        secret_id: SecretId,
750        drop_mode: DropMode,
751    ) -> MetaResult<NotificationVersion> {
752        self.drop_object(ObjectType::Secret, secret_id, drop_mode)
753            .await
754    }
755
756    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
757        let secret_plain_payload = secret.value.clone();
758        let encrypted_payload = self.get_encrypted_payload(&secret)?;
759        secret.value = encrypted_payload;
760        self.metadata_manager
761            .catalog_controller
762            .alter_secret(secret, secret_plain_payload)
763            .await
764    }
765
766    async fn create_subscription(
767        &self,
768        mut subscription: Subscription,
769    ) -> MetaResult<NotificationVersion> {
770        tracing::debug!("create subscription");
771        let _permit = self
772            .creating_streaming_job_permits
773            .semaphore
774            .acquire()
775            .await
776            .unwrap();
777        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
778        self.metadata_manager
779            .catalog_controller
780            .create_subscription_catalog(&mut subscription)
781            .await?;
782        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
783            tracing::debug!(error = %err.as_report(), "failed to create subscription");
784            let _ = self
785                .metadata_manager
786                .catalog_controller
787                .try_abort_creating_subscription(subscription.id)
788                .await
789                .inspect_err(|e| {
790                    tracing::error!(
791                        error = %e.as_report(),
792                        "failed to abort create subscription after failure"
793                    );
794                });
795            return Err(err);
796        }
797
798        let version = self
799            .metadata_manager
800            .catalog_controller
801            .notify_create_subscription(subscription.id)
802            .await?;
803        tracing::debug!("finish create subscription");
804        Ok(version)
805    }
806
807    async fn drop_subscription(
808        &self,
809        subscription_id: SubscriptionId,
810        drop_mode: DropMode,
811    ) -> MetaResult<NotificationVersion> {
812        tracing::debug!("preparing drop subscription");
813        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
814        let subscription = self
815            .metadata_manager
816            .catalog_controller
817            .get_subscription_by_id(subscription_id)
818            .await?;
819        let table_id = subscription.dependent_table_id;
820        let database_id = subscription.database_id;
821        let (_, version) = self
822            .metadata_manager
823            .catalog_controller
824            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
825            .await?;
826        self.stream_manager
827            .drop_subscription(database_id, subscription_id, table_id)
828            .await;
829        tracing::debug!("finish drop subscription");
830        Ok(version)
831    }
832
833    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
834    #[await_tree::instrument]
835    pub(crate) async fn validate_cdc_table(
836        &self,
837        table: &Table,
838        table_fragments: &StreamJobFragments,
839    ) -> MetaResult<()> {
840        let stream_scan_fragment = table_fragments
841            .fragments
842            .values()
843            .filter(|f| {
844                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
845                    || f.fragment_type_mask
846                        .contains(FragmentTypeFlag::StreamCdcScan)
847            })
848            .exactly_one()
849            .ok()
850            .with_context(|| {
851                format!(
852                    "expect exactly one stream scan fragment, got: {:?}",
853                    table_fragments.fragments
854                )
855            })?;
856        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
857            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
858                if let Some(o) = node.options
859                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
860                {
861                    // Use parallel CDC backfill.
862                } else {
863                    assert_eq!(
864                        stream_scan_fragment.actors.len(),
865                        1,
866                        "Stream scan fragment should have only one actor"
867                    );
868                }
869            }
870        }
871        let mut found_cdc_scan = false;
872        match &stream_scan_fragment.nodes.node_body {
873            Some(NodeBody::StreamCdcScan(_)) => {
874                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
875                if self
876                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
877                    .await?
878                {
879                    found_cdc_scan = true;
880                }
881            }
882            // When there's generated columns, the cdc scan node is wrapped in a project node
883            Some(NodeBody::Project(_)) => {
884                for input in &stream_scan_fragment.nodes.input {
885                    assert_parallelism(stream_scan_fragment, &input.node_body);
886                    if self
887                        .validate_cdc_table_inner(&input.node_body, table.id)
888                        .await?
889                    {
890                        found_cdc_scan = true;
891                    }
892                }
893            }
894            _ => {
895                bail!("Unexpected node body for stream cdc scan");
896            }
897        };
898        if !found_cdc_scan {
899            bail!("No stream cdc scan node found in stream scan fragment");
900        }
901        Ok(())
902    }
903
904    async fn validate_cdc_table_inner(
905        &self,
906        node_body: &Option<NodeBody>,
907        table_id: TableId,
908    ) -> MetaResult<bool> {
909        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
910            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
911        {
912            let options_with_secret = WithOptionsSecResolved::new(
913                cdc_table_desc.connect_properties.clone(),
914                cdc_table_desc.secret_refs.clone(),
915            );
916
917            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
918            props.init_from_pb_cdc_table_desc(cdc_table_desc);
919
920            // Try creating a split enumerator to validate
921            let _enumerator = props
922                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
923                .await?;
924
925            tracing::debug!(?table_id, "validate cdc table success");
926            Ok(true)
927        } else {
928            Ok(false)
929        }
930    }
931
932    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
933        let migrated = self
934            .metadata_manager
935            .catalog_controller
936            .has_table_been_migrated(table_id)
937            .await?;
938        if !migrated {
939            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
940        } else {
941            Ok(())
942        }
943    }
944
945    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
946    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
947    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
948    pub async fn create_streaming_job(
949        &self,
950        mut streaming_job: StreamingJob,
951        fragment_graph: StreamFragmentGraphProto,
952        dependencies: HashSet<ObjectId>,
953        resource_type: streaming_job_resource_type::ResourceType,
954        if_not_exists: bool,
955    ) -> MetaResult<NotificationVersion> {
956        if let StreamingJob::Sink(sink) = &streaming_job
957            && let Some(target_table) = sink.target_table
958        {
959            self.validate_table_for_sink(target_table).await?;
960        }
961        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
962        let check_ret = self
963            .metadata_manager
964            .catalog_controller
965            .create_job_catalog(
966                &mut streaming_job,
967                &ctx,
968                &fragment_graph.parallelism,
969                fragment_graph.max_parallelism as _,
970                dependencies,
971                resource_type.clone(),
972                &fragment_graph.backfill_parallelism,
973            )
974            .await;
975        if let Err(meta_err) = check_ret {
976            if !if_not_exists {
977                return Err(meta_err);
978            }
979            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
980                if streaming_job.create_type() == CreateType::Foreground {
981                    let database_id = streaming_job.database_id();
982                    self.metadata_manager
983                        .wait_streaming_job_finished(database_id, *job_id)
984                        .await
985                } else {
986                    Ok(IGNORED_NOTIFICATION_VERSION)
987                }
988            } else {
989                Err(meta_err)
990            };
991        }
992        let job_id = streaming_job.id();
993        tracing::debug!(
994            id = %job_id,
995            definition = streaming_job.definition(),
996            create_type = streaming_job.create_type().as_str_name(),
997            job_type = ?streaming_job.job_type(),
998            "starting streaming job",
999        );
1000        // TODO: acquire permits for recovered background DDLs.
1001        let permit = self
1002            .creating_streaming_job_permits
1003            .semaphore
1004            .clone()
1005            .acquire_owned()
1006            .instrument_await("acquire_creating_streaming_job_permit")
1007            .await
1008            .unwrap();
1009        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1010
1011        let name = streaming_job.name();
1012        let definition = streaming_job.definition();
1013        let source_id = match &streaming_job {
1014            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1015            _ => None,
1016        };
1017
1018        // create streaming job.
1019        match self
1020            .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1021            .await
1022        {
1023            Ok(version) => Ok(version),
1024            Err(err) => {
1025                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1026                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1027                    id: job_id,
1028                    name,
1029                    definition,
1030                    error: err.as_report().to_string(),
1031                };
1032                self.env.event_log_manager_ref().add_event_logs(vec![
1033                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1034                ]);
1035                let (aborted, _) = self
1036                    .metadata_manager
1037                    .catalog_controller
1038                    .try_abort_creating_streaming_job(job_id, false)
1039                    .await?;
1040                if aborted {
1041                    tracing::warn!(id = %job_id, "aborted streaming job");
1042                    // FIXME: might also need other cleanup here
1043                    if let Some(source_id) = source_id {
1044                        self.source_manager
1045                            .apply_source_change(SourceChange::DropSource {
1046                                dropped_source_ids: vec![source_id],
1047                            })
1048                            .await;
1049                    }
1050                }
1051                Err(err)
1052            }
1053        }
1054    }
1055
1056    #[await_tree::instrument(boxed)]
1057    async fn create_streaming_job_inner(
1058        &self,
1059        ctx: StreamContext,
1060        mut streaming_job: StreamingJob,
1061        fragment_graph: StreamFragmentGraphProto,
1062        resource_type: streaming_job_resource_type::ResourceType,
1063        permit: OwnedSemaphorePermit,
1064    ) -> MetaResult<NotificationVersion> {
1065        let mut fragment_graph =
1066            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1067        streaming_job.set_info_from_graph(&fragment_graph);
1068
1069        // create internal table catalogs and refill table id.
1070        let incomplete_internal_tables = fragment_graph
1071            .incomplete_internal_tables()
1072            .into_values()
1073            .collect_vec();
1074        let table_id_map = self
1075            .metadata_manager
1076            .catalog_controller
1077            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1078            .await?;
1079        fragment_graph.refill_internal_table_ids(table_id_map);
1080
1081        // create fragment and actor catalogs.
1082        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1083        let (ctx, stream_job_fragments) = self
1084            .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1085            .await?;
1086
1087        let streaming_job = &ctx.streaming_job;
1088
1089        match streaming_job {
1090            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1091                self.validate_cdc_table(table, &stream_job_fragments)
1092                    .await?;
1093            }
1094            StreamingJob::Table(Some(source), ..) => {
1095                // Register the source on the connector node.
1096                self.source_manager.register_source(source).await?;
1097                let connector_name = source
1098                    .get_with_properties()
1099                    .get(UPSTREAM_SOURCE_KEY)
1100                    .cloned();
1101                let attr = source.info.as_ref().map(|source_info| {
1102                    jsonbb::json!({
1103                            "format": source_info.format().as_str_name(),
1104                            "encode": source_info.row_encode().as_str_name(),
1105                    })
1106                });
1107                report_create_object(
1108                    streaming_job.id(),
1109                    "source",
1110                    PbTelemetryDatabaseObject::Source,
1111                    connector_name,
1112                    attr,
1113                );
1114            }
1115            StreamingJob::Sink(sink) => {
1116                if sink.auto_refresh_schema_from_table.is_some() {
1117                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1118                }
1119                // Validate the sink on the connector node.
1120                validate_sink(sink).await?;
1121                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1122                let attr = sink.format_desc.as_ref().map(|sink_info| {
1123                    jsonbb::json!({
1124                        "format": sink_info.format().as_str_name(),
1125                        "encode": sink_info.encode().as_str_name(),
1126                    })
1127                });
1128                report_create_object(
1129                    streaming_job.id(),
1130                    "sink",
1131                    PbTelemetryDatabaseObject::Sink,
1132                    connector_name,
1133                    attr,
1134                );
1135            }
1136            StreamingJob::Source(source) => {
1137                // Register the source on the connector node.
1138                self.source_manager.register_source(source).await?;
1139                let connector_name = source
1140                    .get_with_properties()
1141                    .get(UPSTREAM_SOURCE_KEY)
1142                    .cloned();
1143                let attr = source.info.as_ref().map(|source_info| {
1144                    jsonbb::json!({
1145                            "format": source_info.format().as_str_name(),
1146                            "encode": source_info.row_encode().as_str_name(),
1147                    })
1148                });
1149                report_create_object(
1150                    streaming_job.id(),
1151                    "source",
1152                    PbTelemetryDatabaseObject::Source,
1153                    connector_name,
1154                    attr,
1155                );
1156            }
1157            _ => {}
1158        }
1159
1160        let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1161        self.metadata_manager
1162            .catalog_controller
1163            .prepare_stream_job_fragments(
1164                &stream_job_fragments,
1165                streaming_job,
1166                false,
1167                Some(backfill_orders),
1168            )
1169            .await?;
1170
1171        // create streaming jobs.
1172        let version = self
1173            .stream_manager
1174            .create_streaming_job(stream_job_fragments, ctx, permit)
1175            .await?;
1176
1177        Ok(version)
1178    }
1179
1180    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1181    pub async fn drop_object(
1182        &self,
1183        object_type: ObjectType,
1184        object_id: impl Into<ObjectId>,
1185        drop_mode: DropMode,
1186    ) -> MetaResult<NotificationVersion> {
1187        let object_id = object_id.into();
1188        let (release_ctx, version) = self
1189            .metadata_manager
1190            .catalog_controller
1191            .drop_object(object_type, object_id, drop_mode)
1192            .await?;
1193
1194        if object_type == ObjectType::Source {
1195            self.env
1196                .notification_manager_ref()
1197                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1198        }
1199
1200        let ReleaseContext {
1201            database_id,
1202            removed_streaming_job_ids,
1203            removed_state_table_ids,
1204            removed_source_ids,
1205            removed_secret_ids: secret_ids,
1206            removed_source_fragments,
1207            removed_actors,
1208            removed_fragments,
1209            removed_sink_fragment_by_targets,
1210            removed_iceberg_table_sinks,
1211        } = release_ctx;
1212
1213        let _guard = self.source_manager.pause_tick().await;
1214        self.stream_manager
1215            .drop_streaming_jobs(
1216                database_id,
1217                removed_actors.iter().map(|id| *id as _).collect(),
1218                removed_streaming_job_ids,
1219                removed_state_table_ids,
1220                removed_fragments.iter().map(|id| *id as _).collect(),
1221                removed_sink_fragment_by_targets
1222                    .into_iter()
1223                    .map(|(target, sinks)| {
1224                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1225                    })
1226                    .collect(),
1227            )
1228            .await;
1229
1230        // clean up sources after dropping streaming jobs.
1231        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1232        self.source_manager
1233            .apply_source_change(SourceChange::DropSource {
1234                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1235            })
1236            .await;
1237
1238        // unregister fragments and actors from source manager.
1239        // FIXME: need also unregister source backfill fragments.
1240        let dropped_source_fragments = removed_source_fragments;
1241        self.source_manager
1242            .apply_source_change(SourceChange::DropMv {
1243                dropped_source_fragments,
1244            })
1245            .await;
1246
1247        // clean up iceberg table sinks
1248        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1249            .iter()
1250            .map(|sink| sink.id)
1251            .collect();
1252
1253        for sink in removed_iceberg_table_sinks {
1254            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1255                .expect("Iceberg sink should be valid");
1256            let iceberg_sink =
1257                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1258            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1259                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1260                tracing::info!(
1261                    "dropping iceberg table {} for dropped sink",
1262                    table_identifier
1263                );
1264
1265                let _ = iceberg_catalog
1266                    .drop_table(&table_identifier)
1267                    .await
1268                    .inspect_err(|err| {
1269                        tracing::error!(
1270                            "failed to drop iceberg table {} during cleanup: {}",
1271                            table_identifier,
1272                            err.as_report()
1273                        );
1274                    });
1275            }
1276        }
1277
1278        // stop sink coordinators for iceberg table sinks
1279        if !iceberg_sink_ids.is_empty() {
1280            self.sink_manager
1281                .stop_sink_coordinator(iceberg_sink_ids.clone())
1282                .await;
1283
1284            for sink_id in iceberg_sink_ids {
1285                self.iceberg_compaction_manager
1286                    .clear_iceberg_commits_by_sink_id(sink_id);
1287            }
1288        }
1289
1290        // remove secrets.
1291        for secret in secret_ids {
1292            LocalSecretManager::global().remove_secret(secret);
1293        }
1294        Ok(version)
1295    }
1296
1297    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1298    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1299    pub async fn replace_job(
1300        &self,
1301        mut streaming_job: StreamingJob,
1302        fragment_graph: StreamFragmentGraphProto,
1303    ) -> MetaResult<NotificationVersion> {
1304        match &streaming_job {
1305            StreamingJob::Table(..)
1306            | StreamingJob::Source(..)
1307            | StreamingJob::MaterializedView(..) => {}
1308            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1309                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1310            }
1311        }
1312
1313        let job_id = streaming_job.id();
1314
1315        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1316        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1317
1318        // Ensure the max parallelism unchanged before replacing table.
1319        let original_max_parallelism = self
1320            .metadata_manager
1321            .get_job_max_parallelism(streaming_job.id())
1322            .await?;
1323        let fragment_graph = PbStreamFragmentGraph {
1324            max_parallelism: original_max_parallelism as _,
1325            ..fragment_graph
1326        };
1327
1328        // 1. build fragment graph.
1329        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1330        streaming_job.set_info_from_graph(&fragment_graph);
1331
1332        // make it immutable
1333        let streaming_job = streaming_job;
1334
1335        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1336            let auto_refresh_schema_sinks = self
1337                .metadata_manager
1338                .catalog_controller
1339                .get_sink_auto_refresh_schema_from(table.id)
1340                .await?;
1341            if !auto_refresh_schema_sinks.is_empty() {
1342                let original_table_columns = self
1343                    .metadata_manager
1344                    .catalog_controller
1345                    .get_table_columns(table.id)
1346                    .await?;
1347                // compare column id to find newly added columns
1348                let mut original_table_column_ids: HashSet<_> = original_table_columns
1349                    .iter()
1350                    .map(|col| col.column_id())
1351                    .collect();
1352                let newly_added_columns = table
1353                    .columns
1354                    .iter()
1355                    .filter(|col| {
1356                        !original_table_column_ids.remove(&ColumnId::new(
1357                            col.column_desc.as_ref().unwrap().column_id as _,
1358                        ))
1359                    })
1360                    .map(|col| ColumnCatalog::from(col.clone()))
1361                    .collect_vec();
1362                if !original_table_column_ids.is_empty() {
1363                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1364                }
1365                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1366                for sink in auto_refresh_schema_sinks {
1367                    let sink_job_fragments = self
1368                        .metadata_manager
1369                        .get_job_fragments_by_id(sink.id.as_job_id())
1370                        .await?;
1371                    if sink_job_fragments.fragments.len() != 1 {
1372                        return Err(anyhow!(
1373                            "auto schema refresh sink must have only one fragment, but got {}",
1374                            sink_job_fragments.fragments.len()
1375                        )
1376                        .into());
1377                    }
1378                    let original_sink_fragment =
1379                        sink_job_fragments.fragments.into_values().next().unwrap();
1380                    let (new_sink_fragment, new_schema, new_log_store_table) =
1381                        rewrite_refresh_schema_sink_fragment(
1382                            &original_sink_fragment,
1383                            &sink,
1384                            &newly_added_columns,
1385                            table,
1386                            fragment_graph.table_fragment_id(),
1387                            self.env.id_gen_manager(),
1388                            self.env.actor_id_generator(),
1389                        )?;
1390
1391                    assert_eq!(
1392                        original_sink_fragment.actors.len(),
1393                        new_sink_fragment.actors.len()
1394                    );
1395                    let actor_status = (0..original_sink_fragment.actors.len())
1396                        .map(|i| {
1397                            let worker_node_id = sink_job_fragments.actor_status
1398                                [&original_sink_fragment.actors[i].actor_id]
1399                                .location
1400                                .as_ref()
1401                                .unwrap()
1402                                .worker_node_id;
1403                            (
1404                                new_sink_fragment.actors[i].actor_id,
1405                                PbActorStatus {
1406                                    location: Some(PbActorLocation { worker_node_id }),
1407                                },
1408                            )
1409                        })
1410                        .collect();
1411
1412                    let streaming_job = StreamingJob::Sink(sink);
1413
1414                    let tmp_sink_id = self
1415                        .metadata_manager
1416                        .catalog_controller
1417                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1418                        .await?
1419                        .as_sink_id();
1420                    let StreamingJob::Sink(sink) = streaming_job else {
1421                        unreachable!()
1422                    };
1423
1424                    sinks.push(AutoRefreshSchemaSinkContext {
1425                        tmp_sink_id,
1426                        original_sink: sink,
1427                        original_fragment: original_sink_fragment,
1428                        new_schema,
1429                        newly_add_fields: newly_added_columns
1430                            .iter()
1431                            .map(|col| Field::from(&col.column_desc))
1432                            .collect(),
1433                        new_fragment: new_sink_fragment,
1434                        new_log_store_table,
1435                        actor_status,
1436                    });
1437                }
1438                Some(sinks)
1439            } else {
1440                None
1441            }
1442        } else {
1443            None
1444        };
1445
1446        let tmp_id = self
1447            .metadata_manager
1448            .catalog_controller
1449            .create_job_catalog_for_replace(
1450                &streaming_job,
1451                Some(&ctx),
1452                fragment_graph.specified_parallelism().as_ref(),
1453                Some(fragment_graph.max_parallelism()),
1454            )
1455            .await?;
1456
1457        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1458            sinks
1459                .iter()
1460                .map(|sink| sink.tmp_sink_id.as_object_id())
1461                .collect_vec()
1462        });
1463
1464        tracing::debug!(id = %job_id, "building replace streaming job");
1465        let mut updated_sink_catalogs = vec![];
1466
1467        let mut drop_table_connector_ctx = None;
1468        let result: MetaResult<_> = try {
1469            let (mut ctx, mut stream_job_fragments) = self
1470                .build_replace_job(
1471                    ctx,
1472                    &streaming_job,
1473                    fragment_graph,
1474                    tmp_id,
1475                    auto_refresh_schema_sinks,
1476                )
1477                .await?;
1478            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1479            let auto_refresh_schema_sink_finish_ctx =
1480                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1481                    sinks
1482                        .iter()
1483                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1484                            tmp_sink_id: sink.tmp_sink_id,
1485                            original_sink_id: sink.original_sink.id,
1486                            columns: sink.new_schema.clone(),
1487                            new_log_store_table: sink
1488                                .new_log_store_table
1489                                .as_ref()
1490                                .map(|table| (table.id, table.columns.clone())),
1491                        })
1492                        .collect()
1493                });
1494
1495            // Handle table that has incoming sinks.
1496            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1497                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1498                let upstream_infos = self
1499                    .metadata_manager
1500                    .catalog_controller
1501                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1502                    .await?;
1503                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1504
1505                for upstream_info in &upstream_infos {
1506                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1507                    ctx.upstream_fragment_downstreams
1508                        .entry(upstream_fragment_id)
1509                        .or_default()
1510                        .push(upstream_info.new_sink_downstream.clone());
1511                    if upstream_info.sink_original_target_columns.is_empty() {
1512                        updated_sink_catalogs.push(upstream_info.sink_id);
1513                    }
1514                }
1515            }
1516
1517            let replace_upstream = ctx.replace_upstream.clone();
1518
1519            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1520                let empty_downstreams = FragmentDownstreamRelation::default();
1521                for sink in sinks {
1522                    self.metadata_manager
1523                        .catalog_controller
1524                        .prepare_streaming_job(
1525                            sink.tmp_sink_id.as_job_id(),
1526                            || [&sink.new_fragment].into_iter(),
1527                            &empty_downstreams,
1528                            true,
1529                            None,
1530                            None,
1531                        )
1532                        .await?;
1533                }
1534            }
1535
1536            self.metadata_manager
1537                .catalog_controller
1538                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1539                .await?;
1540
1541            self.stream_manager
1542                .replace_stream_job(stream_job_fragments, ctx)
1543                .await?;
1544            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1545        };
1546
1547        match result {
1548            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1549                let version = self
1550                    .metadata_manager
1551                    .catalog_controller
1552                    .finish_replace_streaming_job(
1553                        tmp_id,
1554                        streaming_job,
1555                        replace_upstream,
1556                        SinkIntoTableContext {
1557                            updated_sink_catalogs,
1558                        },
1559                        drop_table_connector_ctx.as_ref(),
1560                        auto_refresh_schema_sink_finish_ctx,
1561                    )
1562                    .await?;
1563                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1564                    self.source_manager
1565                        .apply_source_change(SourceChange::DropSource {
1566                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1567                        })
1568                        .await;
1569                }
1570                Ok(version)
1571            }
1572            Err(err) => {
1573                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1574                let _ = self.metadata_manager
1575                    .catalog_controller
1576                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1577                    .await.inspect_err(|err| {
1578                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1579                });
1580                Err(err)
1581            }
1582        }
1583    }
1584
1585    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1586    )]
1587    async fn drop_streaming_job(
1588        &self,
1589        job_id: StreamingJobId,
1590        drop_mode: DropMode,
1591    ) -> MetaResult<NotificationVersion> {
1592        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1593
1594        let (object_id, object_type) = match job_id {
1595            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1596            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1597            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1598            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1599        };
1600
1601        let job_status = self
1602            .metadata_manager
1603            .catalog_controller
1604            .get_streaming_job_status(job_id.id())
1605            .await?;
1606        let version = match job_status {
1607            JobStatus::Initial => {
1608                unreachable!(
1609                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1610                );
1611            }
1612            JobStatus::Creating => {
1613                self.stream_manager
1614                    .cancel_streaming_jobs(vec![job_id.id()])
1615                    .await?;
1616                IGNORED_NOTIFICATION_VERSION
1617            }
1618            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1619        };
1620
1621        Ok(version)
1622    }
1623
1624    /// Resolve the parallelism of the stream job based on the given information.
1625    ///
1626    /// Returns error if user specifies a parallelism that cannot be satisfied.
1627    fn resolve_stream_parallelism(
1628        &self,
1629        specified: Option<NonZeroUsize>,
1630        max: NonZeroUsize,
1631        cluster_info: &StreamingClusterInfo,
1632        resource_group: String,
1633    ) -> MetaResult<NonZeroUsize> {
1634        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1635        DdlController::resolve_stream_parallelism_inner(
1636            specified,
1637            max,
1638            available,
1639            &self.env.opts.default_parallelism,
1640            &resource_group,
1641        )
1642    }
1643
1644    fn resolve_stream_parallelism_inner(
1645        specified: Option<NonZeroUsize>,
1646        max: NonZeroUsize,
1647        available: Option<NonZeroUsize>,
1648        default_parallelism: &DefaultParallelism,
1649        resource_group: &str,
1650    ) -> MetaResult<NonZeroUsize> {
1651        let Some(available) = available else {
1652            bail_unavailable!(
1653                "no available slots to schedule in resource group \"{}\", \
1654                 have you allocated any compute nodes within this resource group?",
1655                resource_group
1656            );
1657        };
1658
1659        if let Some(specified) = specified {
1660            if specified > max {
1661                bail_invalid_parameter!(
1662                    "specified parallelism {} should not exceed max parallelism {}",
1663                    specified,
1664                    max,
1665                );
1666            }
1667            if specified > available {
1668                tracing::warn!(
1669                    resource_group,
1670                    specified_parallelism = specified.get(),
1671                    available_parallelism = available.get(),
1672                    "specified parallelism exceeds available slots, scheduling with specified value",
1673                );
1674            }
1675            return Ok(specified);
1676        }
1677
1678        // Use default parallelism when no specific parallelism is provided by the user.
1679        let default_parallelism = match default_parallelism {
1680            DefaultParallelism::Full => available,
1681            DefaultParallelism::Default(num) => {
1682                if *num > available {
1683                    tracing::warn!(
1684                        resource_group,
1685                        configured_parallelism = num.get(),
1686                        available_parallelism = available.get(),
1687                        "default parallelism exceeds available slots, scheduling with configured value",
1688                    );
1689                }
1690                *num
1691            }
1692        };
1693
1694        if default_parallelism > max {
1695            tracing::warn!(
1696                max_parallelism = max.get(),
1697                resource_group,
1698                "default parallelism exceeds max parallelism, capping to max",
1699            );
1700        }
1701        Ok(default_parallelism.min(max))
1702    }
1703
1704    /// Builds the actor graph:
1705    /// - Add the upstream fragments to the fragment graph
1706    /// - Schedule the fragments based on their distribution
1707    /// - Expand each fragment into one or several actors
1708    /// - Construct the fragment level backfill order control.
1709    #[await_tree::instrument]
1710    pub(crate) async fn build_stream_job(
1711        &self,
1712        stream_ctx: StreamContext,
1713        mut stream_job: StreamingJob,
1714        fragment_graph: StreamFragmentGraph,
1715        resource_type: streaming_job_resource_type::ResourceType,
1716    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1717        let id = stream_job.id();
1718        let specified_parallelism = fragment_graph.specified_parallelism();
1719        let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1720        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1721
1722        // 1. Fragment Level ordering graph
1723        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1724
1725        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1726        // contains all information needed for building the actor graph.
1727
1728        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1729            fragment_graph.collect_snapshot_backfill_info()?;
1730        assert!(
1731            snapshot_backfill_info
1732                .iter()
1733                .chain([&cross_db_snapshot_backfill_info])
1734                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1735                .all(|backfill_epoch| backfill_epoch.is_none()),
1736            "should not set backfill epoch when initially build the job: {:?} {:?}",
1737            snapshot_backfill_info,
1738            cross_db_snapshot_backfill_info
1739        );
1740
1741        let locality_fragment_state_table_mapping =
1742            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1743
1744        // check if log store exists for all cross-db upstreams
1745        self.metadata_manager
1746            .catalog_controller
1747            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1748            .await?;
1749
1750        let upstream_table_ids = fragment_graph
1751            .dependent_table_ids()
1752            .iter()
1753            .filter(|id| {
1754                !cross_db_snapshot_backfill_info
1755                    .upstream_mv_table_id_to_backfill_epoch
1756                    .contains_key(*id)
1757            })
1758            .cloned()
1759            .collect();
1760
1761        let (upstream_root_fragments, existing_actor_location) = self
1762            .metadata_manager
1763            .get_upstream_root_fragments(&upstream_table_ids)
1764            .await?;
1765
1766        if snapshot_backfill_info.is_some() {
1767            match stream_job {
1768                StreamingJob::MaterializedView(_)
1769                | StreamingJob::Sink(_)
1770                | StreamingJob::Index(_, _) => {}
1771                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1772                    return Err(
1773                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1774                    );
1775                }
1776            }
1777        }
1778
1779        let upstream_actors = upstream_root_fragments
1780            .values()
1781            .map(|(fragment, _)| {
1782                (
1783                    fragment.fragment_id,
1784                    fragment.actors.keys().copied().collect(),
1785                )
1786            })
1787            .collect();
1788
1789        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1790            fragment_graph,
1791            FragmentGraphUpstreamContext {
1792                upstream_root_fragments,
1793                upstream_actor_location: existing_actor_location,
1794            },
1795            (&stream_job).into(),
1796        )?;
1797        let resource_group = if let Some(group) = resource_type.resource_group() {
1798            group
1799        } else {
1800            self.metadata_manager
1801                .get_database_resource_group(stream_job.database_id())
1802                .await?
1803        };
1804        let is_serverless_backfill = matches!(
1805            &resource_type,
1806            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1807        );
1808
1809        // 3. Build the actor graph.
1810        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1811
1812        let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1813        let parallelism = self.resolve_stream_parallelism(
1814            initial_parallelism,
1815            max_parallelism,
1816            &cluster_info,
1817            resource_group.clone(),
1818        )?;
1819
1820        let parallelism = if initial_parallelism.is_some() {
1821            parallelism.get()
1822        } else {
1823            // Prefer job-level override for initial scheduling, fallback to system strategy.
1824            let adaptive_strategy = match stream_ctx.adaptive_parallelism_strategy {
1825                Some(strategy) => strategy,
1826                None => self
1827                    .env
1828                    .system_params_reader()
1829                    .await
1830                    .adaptive_parallelism_strategy(),
1831            };
1832            adaptive_strategy.compute_target_parallelism(parallelism.get())
1833        };
1834
1835        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1836        let actor_graph_builder = ActorGraphBuilder::new(
1837            id,
1838            resource_group,
1839            complete_graph,
1840            cluster_info,
1841            parallelism,
1842        )?;
1843
1844        let ActorGraphBuildResult {
1845            graph,
1846            downstream_fragment_relations,
1847            building_locations,
1848            upstream_fragment_downstreams,
1849            new_no_shuffle,
1850            replace_upstream,
1851            ..
1852        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1853        assert!(replace_upstream.is_empty());
1854
1855        // 4. Build the table fragments structure that will be persisted in the stream manager,
1856        // and the context that contains all information needed for building the
1857        // actors on the compute nodes.
1858
1859        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1860        // Otherwise, it defaults to FIXED based on deduction.
1861        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1862            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1863            _ => TableParallelism::Fixed(parallelism.get()),
1864        };
1865
1866        let stream_job_fragments = StreamJobFragments::new(
1867            id,
1868            graph,
1869            &building_locations.actor_locations,
1870            stream_ctx.clone(),
1871            table_parallelism,
1872            max_parallelism.get(),
1873        );
1874
1875        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1876            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1877        }
1878
1879        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1880            && let Ok(table_id) = sink.get_target_table()
1881        {
1882            let tables = self
1883                .metadata_manager
1884                .get_table_catalog_by_ids(&[*table_id])
1885                .await?;
1886            let target_table = tables
1887                .first()
1888                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1889            let sink_fragment = stream_job_fragments
1890                .sink_fragment()
1891                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1892            let mview_fragment_id = self
1893                .metadata_manager
1894                .catalog_controller
1895                .get_mview_fragment_by_id(table_id.as_job_id())
1896                .await?;
1897            let upstream_sink_info = build_upstream_sink_info(
1898                sink.id,
1899                sink.original_target_columns.clone(),
1900                sink_fragment.fragment_id as _,
1901                target_table,
1902                mview_fragment_id,
1903            )?;
1904            Some(upstream_sink_info)
1905        } else {
1906            None
1907        };
1908
1909        let mut cdc_table_snapshot_splits = None;
1910        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1911            && let Some((_, stream_cdc_scan)) =
1912                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1913        {
1914            {
1915                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1916                let splits = try_init_parallel_cdc_table_snapshot_splits(
1917                    table.id,
1918                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1919                    self.env.meta_store_ref(),
1920                    stream_cdc_scan.options.as_ref().unwrap(),
1921                    self.env.opts.cdc_table_split_init_insert_batch_size,
1922                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1923                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1924                )
1925                .await?;
1926                cdc_table_snapshot_splits = Some(splits);
1927            }
1928        }
1929
1930        let ctx = CreateStreamingJobContext {
1931            upstream_fragment_downstreams,
1932            new_no_shuffle,
1933            upstream_actors,
1934            building_locations,
1935            definition: stream_job.definition(),
1936            create_type: stream_job.create_type(),
1937            job_type: (&stream_job).into(),
1938            streaming_job: stream_job,
1939            new_upstream_sink,
1940            option: CreateStreamingJobOption {},
1941            snapshot_backfill_info,
1942            cross_db_snapshot_backfill_info,
1943            fragment_backfill_ordering,
1944            locality_fragment_state_table_mapping,
1945            cdc_table_snapshot_splits,
1946            is_serverless_backfill,
1947        };
1948
1949        Ok((
1950            ctx,
1951            StreamJobFragmentsToCreate {
1952                inner: stream_job_fragments,
1953                downstreams: downstream_fragment_relations,
1954            },
1955        ))
1956    }
1957
1958    /// `build_replace_table` builds a job replacement and returns the context and new job
1959    /// fragments.
1960    ///
1961    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1962    /// replacement is finished.
1963    pub(crate) async fn build_replace_job(
1964        &self,
1965        stream_ctx: StreamContext,
1966        stream_job: &StreamingJob,
1967        mut fragment_graph: StreamFragmentGraph,
1968        tmp_job_id: JobId,
1969        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1970    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1971        match &stream_job {
1972            StreamingJob::Table(..)
1973            | StreamingJob::Source(..)
1974            | StreamingJob::MaterializedView(..) => {}
1975            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1976                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1977            }
1978        }
1979
1980        let id = stream_job.id();
1981
1982        // check if performing drop table connector
1983        let mut drop_table_associated_source_id = None;
1984        if let StreamingJob::Table(None, _, _) = &stream_job {
1985            drop_table_associated_source_id = self
1986                .metadata_manager
1987                .get_table_associated_source_id(id.as_mv_table_id())
1988                .await?;
1989        }
1990
1991        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1992        let old_internal_table_ids = old_fragments.internal_table_ids();
1993
1994        // handle drop table's associated source
1995        let mut drop_table_connector_ctx = None;
1996        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1997            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1998            debug_assert!(old_internal_table_ids.len() == 1);
1999
2000            drop_table_connector_ctx = Some(DropTableConnectorContext {
2001                // we do not remove the original table catalog as it's still needed for the streaming job
2002                // just need to remove the ref to the state table
2003                to_change_streaming_job_id: id,
2004                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
2005                to_remove_source_id,
2006            });
2007        } else if stream_job.is_materialized_view() {
2008            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
2009            // but more robust.
2010            let old_fragments_upstreams = self
2011                .metadata_manager
2012                .catalog_controller
2013                .upstream_fragments(old_fragments.fragment_ids())
2014                .await?;
2015
2016            let old_state_graph =
2017                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2018            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2019            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2020                .context("incompatible altering on the streaming job states")?;
2021
2022            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2023            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2024        } else {
2025            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2026            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2027            let old_internal_tables = self
2028                .metadata_manager
2029                .get_table_catalog_by_ids(&old_internal_table_ids)
2030                .await?;
2031            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2032        }
2033
2034        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2035        // graph that contains all information needed for building the actor graph.
2036        let original_root_fragment = old_fragments
2037            .root_fragment()
2038            .expect("root fragment not found");
2039
2040        let job_type = StreamingJobType::from(stream_job);
2041
2042        // Extract the downstream fragments from the fragment graph.
2043        let (mut downstream_fragments, mut downstream_actor_location) =
2044            self.metadata_manager.get_downstream_fragments(id).await?;
2045
2046        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2047            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2048                .iter()
2049                .map(|sink| sink.original_fragment.fragment_id)
2050                .collect();
2051            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2052                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2053                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2054                }) {
2055                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2056                    for actor_id in downstream_fragment.actors.keys() {
2057                        downstream_actor_location.remove(actor_id);
2058                    }
2059                    for (actor_id, status) in &sink.actor_status {
2060                        downstream_actor_location
2061                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2062                    }
2063
2064                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2065                    *nodes = sink.new_fragment.nodes.clone();
2066                }
2067            }
2068            assert!(remaining_fragment.is_empty());
2069        }
2070
2071        // build complete graph based on the table job type
2072        let complete_graph = match &job_type {
2073            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2074                CompleteStreamFragmentGraph::with_downstreams(
2075                    fragment_graph,
2076                    FragmentGraphDownstreamContext {
2077                        original_root_fragment_id: original_root_fragment.fragment_id,
2078                        downstream_fragments,
2079                        downstream_actor_location,
2080                    },
2081                    job_type,
2082                )?
2083            }
2084            StreamingJobType::Table(TableJobType::SharedCdcSource)
2085            | StreamingJobType::MaterializedView => {
2086                // CDC tables or materialized views can have upstream jobs as well.
2087                let (upstream_root_fragments, upstream_actor_location) = self
2088                    .metadata_manager
2089                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2090                    .await?;
2091
2092                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2093                    fragment_graph,
2094                    FragmentGraphUpstreamContext {
2095                        upstream_root_fragments,
2096                        upstream_actor_location,
2097                    },
2098                    FragmentGraphDownstreamContext {
2099                        original_root_fragment_id: original_root_fragment.fragment_id,
2100                        downstream_fragments,
2101                        downstream_actor_location,
2102                    },
2103                    job_type,
2104                )?
2105            }
2106            _ => unreachable!(),
2107        };
2108
2109        let resource_group = self
2110            .metadata_manager
2111            .get_existing_job_resource_group(id)
2112            .await?;
2113
2114        // 2. Build the actor graph.
2115        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2116
2117        // XXX: what is this parallelism?
2118        // Is it "assigned parallelism"?
2119        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2120            .expect("The number of actors in the original table fragment should be greater than 0");
2121
2122        let actor_graph_builder = ActorGraphBuilder::new(
2123            id,
2124            resource_group,
2125            complete_graph,
2126            cluster_info,
2127            parallelism,
2128        )?;
2129
2130        let ActorGraphBuildResult {
2131            graph,
2132            downstream_fragment_relations,
2133            building_locations,
2134            upstream_fragment_downstreams,
2135            mut replace_upstream,
2136            new_no_shuffle,
2137            ..
2138        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2139
2140        // general table & source does not have upstream job, so the dispatchers should be empty
2141        if matches!(
2142            job_type,
2143            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2144        ) {
2145            assert!(upstream_fragment_downstreams.is_empty());
2146        }
2147
2148        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2149        // the context that contains all information needed for building the actors on the compute
2150        // nodes.
2151        let stream_job_fragments = StreamJobFragments::new(
2152            tmp_job_id,
2153            graph,
2154            &building_locations.actor_locations,
2155            stream_ctx,
2156            old_fragments.assigned_parallelism,
2157            old_fragments.max_parallelism,
2158        );
2159
2160        if let Some(sinks) = &auto_refresh_schema_sinks {
2161            for sink in sinks {
2162                replace_upstream
2163                    .remove(&sink.new_fragment.fragment_id)
2164                    .expect("should exist");
2165            }
2166        }
2167
2168        // Note: no need to set `vnode_count` as it's already set by the frontend.
2169        // See `get_replace_table_plan`.
2170
2171        let ctx = ReplaceStreamJobContext {
2172            old_fragments,
2173            replace_upstream,
2174            new_no_shuffle,
2175            upstream_fragment_downstreams,
2176            building_locations,
2177            streaming_job: stream_job.clone(),
2178            tmp_id: tmp_job_id,
2179            drop_table_connector_ctx,
2180            auto_refresh_schema_sinks,
2181        };
2182
2183        Ok((
2184            ctx,
2185            StreamJobFragmentsToCreate {
2186                inner: stream_job_fragments,
2187                downstreams: downstream_fragment_relations,
2188            },
2189        ))
2190    }
2191
2192    async fn alter_name(
2193        &self,
2194        relation: alter_name_request::Object,
2195        new_name: &str,
2196    ) -> MetaResult<NotificationVersion> {
2197        let (obj_type, id): (ObjectType, ObjectId) = match relation {
2198            alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2199            alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2200            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2201            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2202            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2203            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2204            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2205            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2206        };
2207        self.metadata_manager
2208            .catalog_controller
2209            .alter_name(obj_type, id, new_name)
2210            .await
2211    }
2212
2213    async fn alter_swap_rename(
2214        &self,
2215        object: alter_swap_rename_request::Object,
2216    ) -> MetaResult<NotificationVersion> {
2217        let (obj_type, src_id, dst_id) = match object {
2218            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2219            alter_swap_rename_request::Object::Table(objs) => {
2220                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2221                (ObjectType::Table, src_id, dst_id)
2222            }
2223            alter_swap_rename_request::Object::View(objs) => {
2224                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2225                (ObjectType::View, src_id, dst_id)
2226            }
2227            alter_swap_rename_request::Object::Source(objs) => {
2228                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2229                (ObjectType::Source, src_id, dst_id)
2230            }
2231            alter_swap_rename_request::Object::Sink(objs) => {
2232                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2233                (ObjectType::Sink, src_id, dst_id)
2234            }
2235            alter_swap_rename_request::Object::Subscription(objs) => {
2236                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2237                (ObjectType::Subscription, src_id, dst_id)
2238            }
2239        };
2240
2241        self.metadata_manager
2242            .catalog_controller
2243            .alter_swap_rename(obj_type, src_id, dst_id)
2244            .await
2245    }
2246
2247    async fn alter_owner(
2248        &self,
2249        object: Object,
2250        owner_id: UserId,
2251    ) -> MetaResult<NotificationVersion> {
2252        let (obj_type, id): (ObjectType, ObjectId) = match object {
2253            Object::TableId(id) => (ObjectType::Table, id.into()),
2254            Object::ViewId(id) => (ObjectType::View, id.into()),
2255            Object::SourceId(id) => (ObjectType::Source, id.into()),
2256            Object::SinkId(id) => (ObjectType::Sink, id.into()),
2257            Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2258            Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2259            Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2260            Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2261            Object::FunctionId(id) => (ObjectType::Function, id.into()),
2262            Object::SecretId(id) => (ObjectType::Secret, id.into()),
2263        };
2264        self.metadata_manager
2265            .catalog_controller
2266            .alter_owner(obj_type, id, owner_id as _)
2267            .await
2268    }
2269
2270    async fn alter_set_schema(
2271        &self,
2272        object: alter_set_schema_request::Object,
2273        new_schema_id: SchemaId,
2274    ) -> MetaResult<NotificationVersion> {
2275        let (obj_type, id): (ObjectType, ObjectId) = match object {
2276            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2277            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2278            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2279            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2280            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2281            alter_set_schema_request::Object::ConnectionId(id) => {
2282                (ObjectType::Connection, id.into())
2283            }
2284            alter_set_schema_request::Object::SubscriptionId(id) => {
2285                (ObjectType::Subscription, id.into())
2286            }
2287        };
2288        self.metadata_manager
2289            .catalog_controller
2290            .alter_schema(obj_type, id, new_schema_id)
2291            .await
2292    }
2293
2294    pub async fn wait(&self) -> MetaResult<WaitVersion> {
2295        let timeout_ms = 30 * 60 * 1000;
2296        for _ in 0..timeout_ms {
2297            if self
2298                .metadata_manager
2299                .catalog_controller
2300                .list_background_creating_jobs(true, None)
2301                .await?
2302                .is_empty()
2303            {
2304                let catalog_version = self
2305                    .metadata_manager
2306                    .catalog_controller
2307                    .notify_frontend_trivial()
2308                    .await;
2309                let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2310                return Ok(WaitVersion {
2311                    catalog_version,
2312                    hummock_version_id,
2313                });
2314            }
2315
2316            sleep(Duration::from_millis(1)).await;
2317        }
2318        Err(MetaError::cancelled(format!(
2319            "timeout after {timeout_ms}ms"
2320        )))
2321    }
2322
2323    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2324        self.metadata_manager
2325            .catalog_controller
2326            .comment_on(comment)
2327            .await
2328    }
2329
2330    async fn alter_streaming_job_config(
2331        &self,
2332        job_id: JobId,
2333        entries_to_add: HashMap<String, String>,
2334        keys_to_remove: Vec<String>,
2335    ) -> MetaResult<NotificationVersion> {
2336        self.metadata_manager
2337            .catalog_controller
2338            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2339            .await
2340    }
2341}
2342
2343fn report_create_object(
2344    job_id: JobId,
2345    event_name: &str,
2346    obj_type: PbTelemetryDatabaseObject,
2347    connector_name: Option<String>,
2348    attr_info: Option<jsonbb::Value>,
2349) {
2350    report_event(
2351        PbTelemetryEventStage::CreateStreamJob,
2352        event_name,
2353        job_id.as_raw_id() as _,
2354        connector_name,
2355        Some(obj_type),
2356        attr_info,
2357    );
2358}
2359
2360pub fn build_upstream_sink_info(
2361    sink_id: SinkId,
2362    original_target_columns: Vec<PbColumnCatalog>,
2363    sink_fragment_id: FragmentId,
2364    target_table: &PbTable,
2365    target_fragment_id: FragmentId,
2366) -> MetaResult<UpstreamSinkInfo> {
2367    let sink_columns = if !original_target_columns.is_empty() {
2368        original_target_columns.clone()
2369    } else {
2370        // This is due to the fact that the value did not exist in earlier versions,
2371        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2372        // Therefore the columns of the table at this point are `original_target_columns`.
2373        // This value of sink will be filled on the meta.
2374        target_table.columns.clone()
2375    };
2376
2377    let sink_output_fields = sink_columns
2378        .iter()
2379        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2380        .collect_vec();
2381    let output_indices = (0..sink_output_fields.len())
2382        .map(|i| i as u32)
2383        .collect_vec();
2384
2385    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2386        let sink_idx_by_col_id = sink_columns
2387            .iter()
2388            .enumerate()
2389            .map(|(idx, col)| {
2390                let column_id = col.column_desc.as_ref().unwrap().column_id;
2391                (column_id, idx as u32)
2392            })
2393            .collect::<HashMap<_, _>>();
2394        target_table
2395            .distribution_key
2396            .iter()
2397            .map(|dist_idx| {
2398                let column_id = target_table.columns[*dist_idx as usize]
2399                    .column_desc
2400                    .as_ref()
2401                    .unwrap()
2402                    .column_id;
2403                let sink_idx = sink_idx_by_col_id
2404                    .get(&column_id)
2405                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2406                Ok(*sink_idx)
2407            })
2408            .collect::<anyhow::Result<Vec<_>>>()?
2409    };
2410    let dist_key_indices =
2411        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2412    let downstream_fragment_id = target_fragment_id as _;
2413    let new_downstream_relation = DownstreamFragmentRelation {
2414        downstream_fragment_id,
2415        dispatcher_type: DispatcherType::Hash,
2416        dist_key_indices,
2417        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2418    };
2419    let current_target_columns = target_table.get_columns();
2420    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2421    Ok(UpstreamSinkInfo {
2422        sink_id,
2423        sink_fragment_id: sink_fragment_id as _,
2424        sink_output_fields,
2425        sink_original_target_columns: original_target_columns,
2426        project_exprs,
2427        new_sink_downstream: new_downstream_relation,
2428    })
2429}
2430
2431pub fn refill_upstream_sink_union_in_table(
2432    union_fragment_root: &mut PbStreamNode,
2433    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2434) {
2435    visit_stream_node_cont_mut(union_fragment_root, |node| {
2436        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2437            let init_upstreams = upstream_sink_infos
2438                .iter()
2439                .map(|info| PbUpstreamSinkInfo {
2440                    upstream_fragment_id: info.sink_fragment_id,
2441                    sink_output_schema: info.sink_output_fields.clone(),
2442                    project_exprs: info.project_exprs.clone(),
2443                })
2444                .collect();
2445            upstream_sink_union.init_upstreams = init_upstreams;
2446            false
2447        } else {
2448            true
2449        }
2450    });
2451}
2452
2453#[cfg(test)]
2454mod tests {
2455    use std::num::NonZeroUsize;
2456
2457    use super::*;
2458
2459    #[test]
2460    fn test_specified_parallelism_exceeds_available() {
2461        let result = DdlController::resolve_stream_parallelism_inner(
2462            Some(NonZeroUsize::new(100).unwrap()),
2463            NonZeroUsize::new(256).unwrap(),
2464            Some(NonZeroUsize::new(4).unwrap()),
2465            &DefaultParallelism::Full,
2466            "default",
2467        )
2468        .unwrap();
2469        assert_eq!(result.get(), 100);
2470    }
2471
2472    #[test]
2473    fn test_allows_default_parallelism_over_available() {
2474        let result = DdlController::resolve_stream_parallelism_inner(
2475            None,
2476            NonZeroUsize::new(256).unwrap(),
2477            Some(NonZeroUsize::new(4).unwrap()),
2478            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2479            "default",
2480        )
2481        .unwrap();
2482        assert_eq!(result.get(), 50);
2483    }
2484
2485    #[test]
2486    fn test_full_parallelism_capped_by_max() {
2487        let result = DdlController::resolve_stream_parallelism_inner(
2488            None,
2489            NonZeroUsize::new(6).unwrap(),
2490            Some(NonZeroUsize::new(10).unwrap()),
2491            &DefaultParallelism::Full,
2492            "default",
2493        )
2494        .unwrap();
2495        assert_eq!(result.get(), 6);
2496    }
2497
2498    #[test]
2499    fn test_no_available_slots_returns_error() {
2500        let result = DdlController::resolve_stream_parallelism_inner(
2501            None,
2502            NonZeroUsize::new(4).unwrap(),
2503            None,
2504            &DefaultParallelism::Full,
2505            "default",
2506        );
2507        assert!(matches!(
2508            result,
2509            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2510        ));
2511    }
2512
2513    #[test]
2514    fn test_specified_over_max_returns_error() {
2515        let result = DdlController::resolve_stream_parallelism_inner(
2516            Some(NonZeroUsize::new(8).unwrap()),
2517            NonZeroUsize::new(4).unwrap(),
2518            Some(NonZeroUsize::new(10).unwrap()),
2519            &DefaultParallelism::Full,
2520            "default",
2521        );
2522        assert!(matches!(
2523            result,
2524            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2525        ));
2526    }
2527}