risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63    StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::{BarrierManagerRef, Command};
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::sink_coordination::SinkCoordinatorManager;
79use crate::manager::{
80    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
81    NotificationVersion, StreamingJob, StreamingJobType,
82};
83use crate::model::{
84    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
85    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
86    TableParallelism,
87};
88use crate::stream::cdc::{
89    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
96    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
97    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104    Restrict,
105    Cascade,
106}
107
108impl DropMode {
109    pub fn from_request_setting(cascade: bool) -> DropMode {
110        if cascade {
111            DropMode::Cascade
112        } else {
113            DropMode::Restrict
114        }
115    }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120    MaterializedView(TableId),
121    Sink(SinkId),
122    Table(Option<SourceId>, TableId),
123    Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "{}", self.as_ref())?;
129        write!(f, "({})", self.id())
130    }
131}
132
133impl StreamingJobId {
134    fn id(&self) -> JobId {
135        match self {
136            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137            StreamingJobId::Index(id) => id.as_job_id(),
138            StreamingJobId::Sink(id) => id.as_job_id(),
139        }
140    }
141}
142
143/// It’s used to describe the information of the job that needs to be replaced
144/// and it will be used during replacing table and creating sink into table operations.
145pub struct ReplaceStreamJobInfo {
146    pub streaming_job: StreamingJob,
147    pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152    CreateDatabase(Database),
153    DropDatabase(DatabaseId),
154    CreateSchema(Schema),
155    DropSchema(SchemaId, DropMode),
156    CreateNonSharedSource(Source),
157    DropSource(SourceId, DropMode),
158    ResetSource(SourceId),
159    CreateFunction(Function),
160    DropFunction(FunctionId, DropMode),
161    CreateView(View, HashSet<ObjectId>),
162    DropView(ViewId, DropMode),
163    CreateStreamingJob {
164        stream_job: StreamingJob,
165        fragment_graph: StreamFragmentGraphProto,
166        dependencies: HashSet<ObjectId>,
167        specific_resource_group: Option<String>, // specific resource group
168        if_not_exists: bool,
169    },
170    DropStreamingJob {
171        job_id: StreamingJobId,
172        drop_mode: DropMode,
173    },
174    AlterName(alter_name_request::Object, String),
175    AlterSwapRename(alter_swap_rename_request::Object),
176    ReplaceStreamJob(ReplaceStreamJobInfo),
177    AlterNonSharedSource(Source),
178    AlterObjectOwner(Object, UserId),
179    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180    CreateConnection(Connection),
181    DropConnection(ConnectionId, DropMode),
182    CreateSecret(Secret),
183    AlterSecret(Secret),
184    DropSecret(SecretId),
185    CommentOn(Comment),
186    CreateSubscription(Subscription),
187    DropSubscription(SubscriptionId, DropMode),
188    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
190}
191
192impl DdlCommand {
193    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
194    fn object(&self) -> Either<String, ObjectId> {
195        use Either::*;
196        match self {
197            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
198            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
199            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
200            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
201            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
202            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
203            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
204            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
205            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
206            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
207            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
208            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
209            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
210            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
211            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
212            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
213            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
214            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
215            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
216            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
217            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
218            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
219            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
220            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
221            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
222            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
223            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
224            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
225            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
226        }
227    }
228
229    fn allow_in_recovery(&self) -> bool {
230        match self {
231            DdlCommand::DropDatabase(_)
232            | DdlCommand::DropSchema(_, _)
233            | DdlCommand::DropSource(_, _)
234            | DdlCommand::DropFunction(_, _)
235            | DdlCommand::DropView(_, _)
236            | DdlCommand::DropStreamingJob { .. }
237            | DdlCommand::DropConnection(_, _)
238            | DdlCommand::DropSecret(_)
239            | DdlCommand::DropSubscription(_, _)
240            | DdlCommand::AlterName(_, _)
241            | DdlCommand::AlterObjectOwner(_, _)
242            | DdlCommand::AlterSetSchema(_, _)
243            | DdlCommand::CreateDatabase(_)
244            | DdlCommand::CreateSchema(_)
245            | DdlCommand::CreateFunction(_)
246            | DdlCommand::CreateView(_, _)
247            | DdlCommand::CreateConnection(_)
248            | DdlCommand::CommentOn(_)
249            | DdlCommand::CreateSecret(_)
250            | DdlCommand::AlterSecret(_)
251            | DdlCommand::AlterSwapRename(_)
252            | DdlCommand::AlterDatabaseParam(_, _)
253            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
254            DdlCommand::CreateStreamingJob { .. }
255            | DdlCommand::CreateNonSharedSource(_)
256            | DdlCommand::ReplaceStreamJob(_)
257            | DdlCommand::AlterNonSharedSource(_)
258            | DdlCommand::ResetSource(_)
259            | DdlCommand::CreateSubscription(_) => false,
260        }
261    }
262}
263
264#[derive(Clone)]
265pub struct DdlController {
266    pub(crate) env: MetaSrvEnv,
267
268    pub(crate) metadata_manager: MetadataManager,
269    pub(crate) stream_manager: GlobalStreamManagerRef,
270    pub(crate) source_manager: SourceManagerRef,
271    barrier_manager: BarrierManagerRef,
272    sink_manager: SinkCoordinatorManager,
273
274    // The semaphore is used to limit the number of concurrent streaming job creation.
275    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
276
277    /// Sequence number for DDL commands, used for observability and debugging.
278    seq: Arc<AtomicU64>,
279}
280
281#[derive(Clone)]
282pub struct CreatingStreamingJobPermit {
283    pub(crate) semaphore: Arc<Semaphore>,
284}
285
286impl CreatingStreamingJobPermit {
287    async fn new(env: &MetaSrvEnv) -> Self {
288        let mut permits = env
289            .system_params_reader()
290            .await
291            .max_concurrent_creating_streaming_jobs() as usize;
292        if permits == 0 {
293            // if the system parameter is set to zero, use the max permitted value.
294            permits = Semaphore::MAX_PERMITS;
295        }
296        let semaphore = Arc::new(Semaphore::new(permits));
297
298        let (local_notification_tx, mut local_notification_rx) =
299            tokio::sync::mpsc::unbounded_channel();
300        env.notification_manager()
301            .insert_local_sender(local_notification_tx);
302        let semaphore_clone = semaphore.clone();
303        tokio::spawn(async move {
304            while let Some(notification) = local_notification_rx.recv().await {
305                let LocalNotification::SystemParamsChange(p) = &notification else {
306                    continue;
307                };
308                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
309                if new_permits == 0 {
310                    new_permits = Semaphore::MAX_PERMITS;
311                }
312                match permits.cmp(&new_permits) {
313                    Ordering::Less => {
314                        semaphore_clone.add_permits(new_permits - permits);
315                    }
316                    Ordering::Equal => continue,
317                    Ordering::Greater => {
318                        let to_release = permits - new_permits;
319                        let reduced = semaphore_clone.forget_permits(to_release);
320                        // TODO: implement dynamic semaphore with limits by ourself.
321                        if reduced != to_release {
322                            tracing::warn!(
323                                "no enough permits to release, expected {}, but reduced {}",
324                                to_release,
325                                reduced
326                            );
327                        }
328                    }
329                }
330                tracing::info!(
331                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
332                    permits,
333                    new_permits
334                );
335                permits = new_permits;
336            }
337        });
338
339        Self { semaphore }
340    }
341}
342
343impl DdlController {
344    pub async fn new(
345        env: MetaSrvEnv,
346        metadata_manager: MetadataManager,
347        stream_manager: GlobalStreamManagerRef,
348        source_manager: SourceManagerRef,
349        barrier_manager: BarrierManagerRef,
350        sink_manager: SinkCoordinatorManager,
351    ) -> Self {
352        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
353        Self {
354            env,
355            metadata_manager,
356            stream_manager,
357            source_manager,
358            barrier_manager,
359            sink_manager,
360            creating_streaming_job_permits,
361            seq: Arc::new(AtomicU64::new(0)),
362        }
363    }
364
365    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
366    pub fn next_seq(&self) -> u64 {
367        // This is a simple atomic increment operation.
368        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
369    }
370
371    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
372    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
373    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
374    /// would be a huge hassle and pain if we don't spawn here.
375    ///
376    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
377    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
378        if !command.allow_in_recovery() {
379            self.barrier_manager.check_status_running()?;
380        }
381
382        let await_tree_key = format!("DDL Command {}", self.next_seq());
383        let await_tree_span = await_tree::span!("{command}({})", command.object());
384
385        let ctrl = self.clone();
386        let fut = async move {
387            match command {
388                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
389                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
390                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
391                DdlCommand::DropSchema(schema_id, drop_mode) => {
392                    ctrl.drop_schema(schema_id, drop_mode).await
393                }
394                DdlCommand::CreateNonSharedSource(source) => {
395                    ctrl.create_non_shared_source(source).await
396                }
397                DdlCommand::DropSource(source_id, drop_mode) => {
398                    ctrl.drop_source(source_id, drop_mode).await
399                }
400                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
401                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
402                DdlCommand::DropFunction(function_id, drop_mode) => {
403                    ctrl.drop_function(function_id, drop_mode).await
404                }
405                DdlCommand::CreateView(view, dependencies) => {
406                    ctrl.create_view(view, dependencies).await
407                }
408                DdlCommand::DropView(view_id, drop_mode) => {
409                    ctrl.drop_view(view_id, drop_mode).await
410                }
411                DdlCommand::CreateStreamingJob {
412                    stream_job,
413                    fragment_graph,
414                    dependencies,
415                    specific_resource_group,
416                    if_not_exists,
417                } => {
418                    ctrl.create_streaming_job(
419                        stream_job,
420                        fragment_graph,
421                        dependencies,
422                        specific_resource_group,
423                        if_not_exists,
424                    )
425                    .await
426                }
427                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
428                    ctrl.drop_streaming_job(job_id, drop_mode).await
429                }
430                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
431                    streaming_job,
432                    fragment_graph,
433                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
434                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
435                DdlCommand::AlterObjectOwner(object, owner_id) => {
436                    ctrl.alter_owner(object, owner_id).await
437                }
438                DdlCommand::AlterSetSchema(object, new_schema_id) => {
439                    ctrl.alter_set_schema(object, new_schema_id).await
440                }
441                DdlCommand::CreateConnection(connection) => {
442                    ctrl.create_connection(connection).await
443                }
444                DdlCommand::DropConnection(connection_id, drop_mode) => {
445                    ctrl.drop_connection(connection_id, drop_mode).await
446                }
447                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
448                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
449                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
450                DdlCommand::AlterNonSharedSource(source) => {
451                    ctrl.alter_non_shared_source(source).await
452                }
453                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
454                DdlCommand::CreateSubscription(subscription) => {
455                    ctrl.create_subscription(subscription).await
456                }
457                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
458                    ctrl.drop_subscription(subscription_id, drop_mode).await
459                }
460                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
461                DdlCommand::AlterDatabaseParam(database_id, param) => {
462                    ctrl.alter_database_param(database_id, param).await
463                }
464                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
465                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
466                        .await
467                }
468            }
469        }
470        .in_current_span();
471        let fut = (self.env.await_tree_reg())
472            .register(await_tree_key, await_tree_span)
473            .instrument(Box::pin(fut));
474        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
475        Ok(Some(WaitVersion {
476            catalog_version: notification_version,
477            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
478        }))
479    }
480
481    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
482        self.barrier_manager.get_ddl_progress().await
483    }
484
485    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
486        let (version, updated_db) = self
487            .metadata_manager
488            .catalog_controller
489            .create_database(database)
490            .await?;
491        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
492        self.barrier_manager
493            .update_database_barrier(
494                updated_db.database_id,
495                updated_db.barrier_interval_ms.map(|v| v as u32),
496                updated_db.checkpoint_frequency.map(|v| v as u64),
497            )
498            .await?;
499        Ok(version)
500    }
501
502    #[tracing::instrument(skip(self), level = "debug")]
503    pub async fn reschedule_streaming_job(
504        &self,
505        job_id: JobId,
506        target: ReschedulePolicy,
507        mut deferred: bool,
508    ) -> MetaResult<()> {
509        tracing::info!("altering parallelism for job {}", job_id);
510        if self.barrier_manager.check_status_running().is_err() {
511            tracing::info!(
512                "alter parallelism is set to deferred mode because the system is in recovery state"
513            );
514            deferred = true;
515        }
516
517        self.stream_manager
518            .reschedule_streaming_job(job_id, target, deferred)
519            .await
520    }
521
522    pub async fn reschedule_cdc_table_backfill(
523        &self,
524        job_id: JobId,
525        target: ReschedulePolicy,
526    ) -> MetaResult<()> {
527        tracing::info!("alter CDC table backfill parallelism");
528        if self.barrier_manager.check_status_running().is_err() {
529            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
530        }
531        self.stream_manager
532            .reschedule_cdc_table_backfill(job_id, target)
533            .await
534    }
535
536    pub async fn reschedule_fragments(
537        &self,
538        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
539    ) -> MetaResult<()> {
540        tracing::info!(
541            "altering parallelism for fragments {:?}",
542            fragment_targets.keys()
543        );
544        let fragment_targets = fragment_targets
545            .into_iter()
546            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
547            .collect();
548
549        self.stream_manager
550            .reschedule_fragments(fragment_targets)
551            .await
552    }
553
554    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
555        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
556            .await
557    }
558
559    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
560        self.metadata_manager
561            .catalog_controller
562            .create_schema(schema)
563            .await
564    }
565
566    async fn drop_schema(
567        &self,
568        schema_id: SchemaId,
569        drop_mode: DropMode,
570    ) -> MetaResult<NotificationVersion> {
571        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
572            .await
573    }
574
575    /// Shared source is handled in [`Self::create_streaming_job`]
576    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
577        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
578            .await
579            .context("failed to create source worker")?;
580
581        let (source_id, version) = self
582            .metadata_manager
583            .catalog_controller
584            .create_source(source)
585            .await?;
586        self.source_manager
587            .register_source_with_handle(source_id, handle)
588            .await;
589        Ok(version)
590    }
591
592    async fn drop_source(
593        &self,
594        source_id: SourceId,
595        drop_mode: DropMode,
596    ) -> MetaResult<NotificationVersion> {
597        self.drop_object(ObjectType::Source, source_id, drop_mode)
598            .await
599    }
600
601    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
602        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
603
604        // Get database_id for the source
605        let database_id = self
606            .metadata_manager
607            .catalog_controller
608            .get_object_database_id(source_id)
609            .await?;
610
611        self.stream_manager
612            .barrier_scheduler
613            .run_command(database_id, Command::ResetSource { source_id })
614            .await?;
615
616        // RESET SOURCE doesn't modify catalog, so return the current catalog version
617        let version = self
618            .metadata_manager
619            .catalog_controller
620            .current_notification_version()
621            .await;
622        Ok(version)
623    }
624
625    /// This replaces the source in the catalog.
626    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
627    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
628        self.metadata_manager
629            .catalog_controller
630            .alter_non_shared_source(source)
631            .await
632    }
633
634    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
635        self.metadata_manager
636            .catalog_controller
637            .create_function(function)
638            .await
639    }
640
641    async fn drop_function(
642        &self,
643        function_id: FunctionId,
644        drop_mode: DropMode,
645    ) -> MetaResult<NotificationVersion> {
646        self.drop_object(ObjectType::Function, function_id, drop_mode)
647            .await
648    }
649
650    async fn create_view(
651        &self,
652        view: View,
653        dependencies: HashSet<ObjectId>,
654    ) -> MetaResult<NotificationVersion> {
655        self.metadata_manager
656            .catalog_controller
657            .create_view(view, dependencies)
658            .await
659    }
660
661    async fn drop_view(
662        &self,
663        view_id: ViewId,
664        drop_mode: DropMode,
665    ) -> MetaResult<NotificationVersion> {
666        self.drop_object(ObjectType::View, view_id, drop_mode).await
667    }
668
669    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
670        validate_connection(&connection).await?;
671        self.metadata_manager
672            .catalog_controller
673            .create_connection(connection)
674            .await
675    }
676
677    async fn drop_connection(
678        &self,
679        connection_id: ConnectionId,
680        drop_mode: DropMode,
681    ) -> MetaResult<NotificationVersion> {
682        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
683            .await
684    }
685
686    async fn alter_database_param(
687        &self,
688        database_id: DatabaseId,
689        param: AlterDatabaseParam,
690    ) -> MetaResult<NotificationVersion> {
691        let (version, updated_db) = self
692            .metadata_manager
693            .catalog_controller
694            .alter_database_param(database_id, param)
695            .await?;
696        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
697        self.barrier_manager
698            .update_database_barrier(
699                database_id,
700                updated_db.barrier_interval_ms.map(|v| v as u32),
701                updated_db.checkpoint_frequency.map(|v| v as u64),
702            )
703            .await?;
704        Ok(version)
705    }
706
707    // The 'secret' part of the request we receive from the frontend is in plaintext;
708    // here, we need to encrypt it before storing it in the catalog.
709    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
710        let secret_store_private_key = self
711            .env
712            .opts
713            .secret_store_private_key
714            .clone()
715            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
716
717        let encrypted_payload = SecretEncryption::encrypt(
718            secret_store_private_key.as_slice(),
719            secret.get_value().as_slice(),
720        )
721        .context(format!("failed to encrypt secret {}", secret.name))?;
722        Ok(encrypted_payload
723            .serialize()
724            .context(format!("failed to serialize secret {}", secret.name))?)
725    }
726
727    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
728        // The 'secret' part of the request we receive from the frontend is in plaintext;
729        // here, we need to encrypt it before storing it in the catalog.
730        let secret_plain_payload = secret.value.clone();
731        let encrypted_payload = self.get_encrypted_payload(&secret)?;
732        secret.value = encrypted_payload;
733
734        self.metadata_manager
735            .catalog_controller
736            .create_secret(secret, secret_plain_payload)
737            .await
738    }
739
740    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
741        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
742            .await
743    }
744
745    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
746        let secret_plain_payload = secret.value.clone();
747        let encrypted_payload = self.get_encrypted_payload(&secret)?;
748        secret.value = encrypted_payload;
749        self.metadata_manager
750            .catalog_controller
751            .alter_secret(secret, secret_plain_payload)
752            .await
753    }
754
755    async fn create_subscription(
756        &self,
757        mut subscription: Subscription,
758    ) -> MetaResult<NotificationVersion> {
759        tracing::debug!("create subscription");
760        let _permit = self
761            .creating_streaming_job_permits
762            .semaphore
763            .acquire()
764            .await
765            .unwrap();
766        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
767        self.metadata_manager
768            .catalog_controller
769            .create_subscription_catalog(&mut subscription)
770            .await?;
771        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
772            tracing::debug!(error = %err.as_report(), "failed to create subscription");
773            let _ = self
774                .metadata_manager
775                .catalog_controller
776                .try_abort_creating_subscription(subscription.id)
777                .await
778                .inspect_err(|e| {
779                    tracing::error!(
780                        error = %e.as_report(),
781                        "failed to abort create subscription after failure"
782                    );
783                });
784            return Err(err);
785        }
786
787        let version = self
788            .metadata_manager
789            .catalog_controller
790            .notify_create_subscription(subscription.id)
791            .await?;
792        tracing::debug!("finish create subscription");
793        Ok(version)
794    }
795
796    async fn drop_subscription(
797        &self,
798        subscription_id: SubscriptionId,
799        drop_mode: DropMode,
800    ) -> MetaResult<NotificationVersion> {
801        tracing::debug!("preparing drop subscription");
802        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
803        let subscription = self
804            .metadata_manager
805            .catalog_controller
806            .get_subscription_by_id(subscription_id)
807            .await?;
808        let table_id = subscription.dependent_table_id;
809        let database_id = subscription.database_id;
810        let (_, version) = self
811            .metadata_manager
812            .catalog_controller
813            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
814            .await?;
815        self.stream_manager
816            .drop_subscription(database_id, subscription_id, table_id)
817            .await;
818        tracing::debug!("finish drop subscription");
819        Ok(version)
820    }
821
822    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
823    #[await_tree::instrument]
824    pub(crate) async fn validate_cdc_table(
825        &self,
826        table: &Table,
827        table_fragments: &StreamJobFragments,
828    ) -> MetaResult<()> {
829        let stream_scan_fragment = table_fragments
830            .fragments
831            .values()
832            .filter(|f| {
833                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
834                    || f.fragment_type_mask
835                        .contains(FragmentTypeFlag::StreamCdcScan)
836            })
837            .exactly_one()
838            .ok()
839            .with_context(|| {
840                format!(
841                    "expect exactly one stream scan fragment, got: {:?}",
842                    table_fragments.fragments
843                )
844            })?;
845        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
846            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
847                if let Some(o) = node.options
848                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
849                {
850                    // Use parallel CDC backfill.
851                } else {
852                    assert_eq!(
853                        stream_scan_fragment.actors.len(),
854                        1,
855                        "Stream scan fragment should have only one actor"
856                    );
857                }
858            }
859        }
860        let mut found_cdc_scan = false;
861        match &stream_scan_fragment.nodes.node_body {
862            Some(NodeBody::StreamCdcScan(_)) => {
863                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
864                if self
865                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
866                    .await?
867                {
868                    found_cdc_scan = true;
869                }
870            }
871            // When there's generated columns, the cdc scan node is wrapped in a project node
872            Some(NodeBody::Project(_)) => {
873                for input in &stream_scan_fragment.nodes.input {
874                    assert_parallelism(stream_scan_fragment, &input.node_body);
875                    if self
876                        .validate_cdc_table_inner(&input.node_body, table.id)
877                        .await?
878                    {
879                        found_cdc_scan = true;
880                    }
881                }
882            }
883            _ => {
884                bail!("Unexpected node body for stream cdc scan");
885            }
886        };
887        if !found_cdc_scan {
888            bail!("No stream cdc scan node found in stream scan fragment");
889        }
890        Ok(())
891    }
892
893    async fn validate_cdc_table_inner(
894        &self,
895        node_body: &Option<NodeBody>,
896        table_id: TableId,
897    ) -> MetaResult<bool> {
898        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
899            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
900        {
901            let options_with_secret = WithOptionsSecResolved::new(
902                cdc_table_desc.connect_properties.clone(),
903                cdc_table_desc.secret_refs.clone(),
904            );
905
906            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
907            props.init_from_pb_cdc_table_desc(cdc_table_desc);
908
909            // Try creating a split enumerator to validate
910            let _enumerator = props
911                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
912                .await?;
913
914            tracing::debug!(?table_id, "validate cdc table success");
915            Ok(true)
916        } else {
917            Ok(false)
918        }
919    }
920
921    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
922        let migrated = self
923            .metadata_manager
924            .catalog_controller
925            .has_table_been_migrated(table_id)
926            .await?;
927        if !migrated {
928            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
929        } else {
930            Ok(())
931        }
932    }
933
934    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
935    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
936    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
937    pub async fn create_streaming_job(
938        &self,
939        mut streaming_job: StreamingJob,
940        fragment_graph: StreamFragmentGraphProto,
941        dependencies: HashSet<ObjectId>,
942        specific_resource_group: Option<String>,
943        if_not_exists: bool,
944    ) -> MetaResult<NotificationVersion> {
945        if let StreamingJob::Sink(sink) = &streaming_job
946            && let Some(target_table) = sink.target_table
947        {
948            self.validate_table_for_sink(target_table).await?;
949        }
950        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
951        let check_ret = self
952            .metadata_manager
953            .catalog_controller
954            .create_job_catalog(
955                &mut streaming_job,
956                &ctx,
957                &fragment_graph.parallelism,
958                fragment_graph.max_parallelism as _,
959                dependencies,
960                specific_resource_group.clone(),
961                &fragment_graph.backfill_parallelism,
962            )
963            .await;
964        if let Err(meta_err) = check_ret {
965            if !if_not_exists {
966                return Err(meta_err);
967            }
968            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
969                if streaming_job.create_type() == CreateType::Foreground {
970                    let database_id = streaming_job.database_id();
971                    self.metadata_manager
972                        .wait_streaming_job_finished(database_id, *job_id)
973                        .await
974                } else {
975                    Ok(IGNORED_NOTIFICATION_VERSION)
976                }
977            } else {
978                Err(meta_err)
979            };
980        }
981        let job_id = streaming_job.id();
982        tracing::debug!(
983            id = %job_id,
984            definition = streaming_job.definition(),
985            create_type = streaming_job.create_type().as_str_name(),
986            job_type = ?streaming_job.job_type(),
987            "starting streaming job",
988        );
989        // TODO: acquire permits for recovered background DDLs.
990        let permit = self
991            .creating_streaming_job_permits
992            .semaphore
993            .clone()
994            .acquire_owned()
995            .instrument_await("acquire_creating_streaming_job_permit")
996            .await
997            .unwrap();
998        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
999
1000        let name = streaming_job.name();
1001        let definition = streaming_job.definition();
1002        let source_id = match &streaming_job {
1003            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1004            _ => None,
1005        };
1006
1007        // create streaming job.
1008        match self
1009            .create_streaming_job_inner(
1010                ctx,
1011                streaming_job,
1012                fragment_graph,
1013                specific_resource_group,
1014                permit,
1015            )
1016            .await
1017        {
1018            Ok(version) => Ok(version),
1019            Err(err) => {
1020                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1021                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1022                    id: job_id,
1023                    name,
1024                    definition,
1025                    error: err.as_report().to_string(),
1026                };
1027                self.env.event_log_manager_ref().add_event_logs(vec![
1028                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1029                ]);
1030                let (aborted, _) = self
1031                    .metadata_manager
1032                    .catalog_controller
1033                    .try_abort_creating_streaming_job(job_id, false)
1034                    .await?;
1035                if aborted {
1036                    tracing::warn!(id = %job_id, "aborted streaming job");
1037                    // FIXME: might also need other cleanup here
1038                    if let Some(source_id) = source_id {
1039                        self.source_manager
1040                            .apply_source_change(SourceChange::DropSource {
1041                                dropped_source_ids: vec![source_id],
1042                            })
1043                            .await;
1044                    }
1045                }
1046                Err(err)
1047            }
1048        }
1049    }
1050
1051    #[await_tree::instrument(boxed)]
1052    async fn create_streaming_job_inner(
1053        &self,
1054        ctx: StreamContext,
1055        mut streaming_job: StreamingJob,
1056        fragment_graph: StreamFragmentGraphProto,
1057        specific_resource_group: Option<String>,
1058        permit: OwnedSemaphorePermit,
1059    ) -> MetaResult<NotificationVersion> {
1060        let mut fragment_graph =
1061            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1062        streaming_job.set_info_from_graph(&fragment_graph);
1063
1064        // create internal table catalogs and refill table id.
1065        let incomplete_internal_tables = fragment_graph
1066            .incomplete_internal_tables()
1067            .into_values()
1068            .collect_vec();
1069        let table_id_map = self
1070            .metadata_manager
1071            .catalog_controller
1072            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1073            .await?;
1074        fragment_graph.refill_internal_table_ids(table_id_map);
1075
1076        // create fragment and actor catalogs.
1077        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1078        let (ctx, stream_job_fragments) = self
1079            .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1080            .await?;
1081
1082        let streaming_job = &ctx.streaming_job;
1083
1084        match streaming_job {
1085            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1086                self.validate_cdc_table(table, &stream_job_fragments)
1087                    .await?;
1088            }
1089            StreamingJob::Table(Some(source), ..) => {
1090                // Register the source on the connector node.
1091                self.source_manager.register_source(source).await?;
1092                let connector_name = source
1093                    .get_with_properties()
1094                    .get(UPSTREAM_SOURCE_KEY)
1095                    .cloned();
1096                let attr = source.info.as_ref().map(|source_info| {
1097                    jsonbb::json!({
1098                            "format": source_info.format().as_str_name(),
1099                            "encode": source_info.row_encode().as_str_name(),
1100                    })
1101                });
1102                report_create_object(
1103                    streaming_job.id(),
1104                    "source",
1105                    PbTelemetryDatabaseObject::Source,
1106                    connector_name,
1107                    attr,
1108                );
1109            }
1110            StreamingJob::Sink(sink) => {
1111                if sink.auto_refresh_schema_from_table.is_some() {
1112                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1113                }
1114                // Validate the sink on the connector node.
1115                validate_sink(sink).await?;
1116                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1117                let attr = sink.format_desc.as_ref().map(|sink_info| {
1118                    jsonbb::json!({
1119                        "format": sink_info.format().as_str_name(),
1120                        "encode": sink_info.encode().as_str_name(),
1121                    })
1122                });
1123                report_create_object(
1124                    streaming_job.id(),
1125                    "sink",
1126                    PbTelemetryDatabaseObject::Sink,
1127                    connector_name,
1128                    attr,
1129                );
1130            }
1131            StreamingJob::Source(source) => {
1132                // Register the source on the connector node.
1133                self.source_manager.register_source(source).await?;
1134                let connector_name = source
1135                    .get_with_properties()
1136                    .get(UPSTREAM_SOURCE_KEY)
1137                    .cloned();
1138                let attr = source.info.as_ref().map(|source_info| {
1139                    jsonbb::json!({
1140                            "format": source_info.format().as_str_name(),
1141                            "encode": source_info.row_encode().as_str_name(),
1142                    })
1143                });
1144                report_create_object(
1145                    streaming_job.id(),
1146                    "source",
1147                    PbTelemetryDatabaseObject::Source,
1148                    connector_name,
1149                    attr,
1150                );
1151            }
1152            _ => {}
1153        }
1154
1155        self.metadata_manager
1156            .catalog_controller
1157            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1158            .await?;
1159
1160        // create streaming jobs.
1161        let version = self
1162            .stream_manager
1163            .create_streaming_job(stream_job_fragments, ctx, permit)
1164            .await?;
1165
1166        Ok(version)
1167    }
1168
1169    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1170    pub async fn drop_object(
1171        &self,
1172        object_type: ObjectType,
1173        object_id: impl Into<ObjectId>,
1174        drop_mode: DropMode,
1175    ) -> MetaResult<NotificationVersion> {
1176        let object_id = object_id.into();
1177        let (release_ctx, version) = self
1178            .metadata_manager
1179            .catalog_controller
1180            .drop_object(object_type, object_id, drop_mode)
1181            .await?;
1182
1183        if object_type == ObjectType::Source {
1184            self.env
1185                .notification_manager_ref()
1186                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1187        }
1188
1189        let ReleaseContext {
1190            database_id,
1191            removed_streaming_job_ids,
1192            removed_state_table_ids,
1193            removed_source_ids,
1194            removed_secret_ids: secret_ids,
1195            removed_source_fragments,
1196            removed_actors,
1197            removed_fragments,
1198            removed_sink_fragment_by_targets,
1199            removed_iceberg_table_sinks,
1200        } = release_ctx;
1201
1202        let _guard = self.source_manager.pause_tick().await;
1203        self.stream_manager
1204            .drop_streaming_jobs(
1205                database_id,
1206                removed_actors.iter().map(|id| *id as _).collect(),
1207                removed_streaming_job_ids,
1208                removed_state_table_ids,
1209                removed_fragments.iter().map(|id| *id as _).collect(),
1210                removed_sink_fragment_by_targets
1211                    .into_iter()
1212                    .map(|(target, sinks)| {
1213                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1214                    })
1215                    .collect(),
1216            )
1217            .await;
1218
1219        // clean up sources after dropping streaming jobs.
1220        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1221        self.source_manager
1222            .apply_source_change(SourceChange::DropSource {
1223                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1224            })
1225            .await;
1226
1227        // unregister fragments and actors from source manager.
1228        // FIXME: need also unregister source backfill fragments.
1229        let dropped_source_fragments = removed_source_fragments;
1230        self.source_manager
1231            .apply_source_change(SourceChange::DropMv {
1232                dropped_source_fragments,
1233            })
1234            .await;
1235
1236        // clean up iceberg table sinks
1237        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1238            .iter()
1239            .map(|sink| sink.id)
1240            .collect();
1241
1242        for sink in removed_iceberg_table_sinks {
1243            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1244                .expect("Iceberg sink should be valid");
1245            let iceberg_sink =
1246                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1247            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1248                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1249                tracing::info!(
1250                    "dropping iceberg table {} for dropped sink",
1251                    table_identifier
1252                );
1253
1254                let _ = iceberg_catalog
1255                    .drop_table(&table_identifier)
1256                    .await
1257                    .inspect_err(|err| {
1258                        tracing::error!(
1259                            "failed to drop iceberg table {} during cleanup: {}",
1260                            table_identifier,
1261                            err.as_report()
1262                        );
1263                    });
1264            }
1265        }
1266
1267        // stop sink coordinators for iceberg table sinks
1268        if !iceberg_sink_ids.is_empty() {
1269            self.sink_manager
1270                .stop_sink_coordinator(iceberg_sink_ids)
1271                .await;
1272        }
1273
1274        // remove secrets.
1275        for secret in secret_ids {
1276            LocalSecretManager::global().remove_secret(secret);
1277        }
1278        Ok(version)
1279    }
1280
1281    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1282    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1283    pub async fn replace_job(
1284        &self,
1285        mut streaming_job: StreamingJob,
1286        fragment_graph: StreamFragmentGraphProto,
1287    ) -> MetaResult<NotificationVersion> {
1288        match &streaming_job {
1289            StreamingJob::Table(..)
1290            | StreamingJob::Source(..)
1291            | StreamingJob::MaterializedView(..) => {}
1292            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1293                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1294            }
1295        }
1296
1297        let job_id = streaming_job.id();
1298
1299        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1300        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1301
1302        // Ensure the max parallelism unchanged before replacing table.
1303        let original_max_parallelism = self
1304            .metadata_manager
1305            .get_job_max_parallelism(streaming_job.id())
1306            .await?;
1307        let fragment_graph = PbStreamFragmentGraph {
1308            max_parallelism: original_max_parallelism as _,
1309            ..fragment_graph
1310        };
1311
1312        // 1. build fragment graph.
1313        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1314        streaming_job.set_info_from_graph(&fragment_graph);
1315
1316        // make it immutable
1317        let streaming_job = streaming_job;
1318
1319        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1320            let auto_refresh_schema_sinks = self
1321                .metadata_manager
1322                .catalog_controller
1323                .get_sink_auto_refresh_schema_from(table.id)
1324                .await?;
1325            if !auto_refresh_schema_sinks.is_empty() {
1326                let original_table_columns = self
1327                    .metadata_manager
1328                    .catalog_controller
1329                    .get_table_columns(table.id)
1330                    .await?;
1331                // compare column id to find newly added columns
1332                let mut original_table_column_ids: HashSet<_> = original_table_columns
1333                    .iter()
1334                    .map(|col| col.column_id())
1335                    .collect();
1336                let newly_added_columns = table
1337                    .columns
1338                    .iter()
1339                    .filter(|col| {
1340                        !original_table_column_ids.remove(&ColumnId::new(
1341                            col.column_desc.as_ref().unwrap().column_id as _,
1342                        ))
1343                    })
1344                    .map(|col| ColumnCatalog::from(col.clone()))
1345                    .collect_vec();
1346                if !original_table_column_ids.is_empty() {
1347                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1348                }
1349                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1350                for sink in auto_refresh_schema_sinks {
1351                    let sink_job_fragments = self
1352                        .metadata_manager
1353                        .get_job_fragments_by_id(sink.id.as_job_id())
1354                        .await?;
1355                    if sink_job_fragments.fragments.len() != 1 {
1356                        return Err(anyhow!(
1357                            "auto schema refresh sink must have only one fragment, but got {}",
1358                            sink_job_fragments.fragments.len()
1359                        )
1360                        .into());
1361                    }
1362                    let original_sink_fragment =
1363                        sink_job_fragments.fragments.into_values().next().unwrap();
1364                    let (new_sink_fragment, new_schema, new_log_store_table) =
1365                        rewrite_refresh_schema_sink_fragment(
1366                            &original_sink_fragment,
1367                            &sink,
1368                            &newly_added_columns,
1369                            table,
1370                            fragment_graph.table_fragment_id(),
1371                            self.env.id_gen_manager(),
1372                            self.env.actor_id_generator(),
1373                        )?;
1374
1375                    assert_eq!(
1376                        original_sink_fragment.actors.len(),
1377                        new_sink_fragment.actors.len()
1378                    );
1379                    let actor_status = (0..original_sink_fragment.actors.len())
1380                        .map(|i| {
1381                            let worker_node_id = sink_job_fragments.actor_status
1382                                [&original_sink_fragment.actors[i].actor_id]
1383                                .location
1384                                .as_ref()
1385                                .unwrap()
1386                                .worker_node_id;
1387                            (
1388                                new_sink_fragment.actors[i].actor_id,
1389                                PbActorStatus {
1390                                    location: Some(PbActorLocation { worker_node_id }),
1391                                },
1392                            )
1393                        })
1394                        .collect();
1395
1396                    let streaming_job = StreamingJob::Sink(sink);
1397
1398                    let tmp_sink_id = self
1399                        .metadata_manager
1400                        .catalog_controller
1401                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1402                        .await?
1403                        .as_sink_id();
1404                    let StreamingJob::Sink(sink) = streaming_job else {
1405                        unreachable!()
1406                    };
1407
1408                    sinks.push(AutoRefreshSchemaSinkContext {
1409                        tmp_sink_id,
1410                        original_sink: sink,
1411                        original_fragment: original_sink_fragment,
1412                        new_schema,
1413                        newly_add_fields: newly_added_columns
1414                            .iter()
1415                            .map(|col| Field::from(&col.column_desc))
1416                            .collect(),
1417                        new_fragment: new_sink_fragment,
1418                        new_log_store_table,
1419                        actor_status,
1420                    });
1421                }
1422                Some(sinks)
1423            } else {
1424                None
1425            }
1426        } else {
1427            None
1428        };
1429
1430        let tmp_id = self
1431            .metadata_manager
1432            .catalog_controller
1433            .create_job_catalog_for_replace(
1434                &streaming_job,
1435                Some(&ctx),
1436                fragment_graph.specified_parallelism().as_ref(),
1437                Some(fragment_graph.max_parallelism()),
1438            )
1439            .await?;
1440
1441        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1442            sinks
1443                .iter()
1444                .map(|sink| sink.tmp_sink_id.as_object_id())
1445                .collect_vec()
1446        });
1447
1448        tracing::debug!(id = %job_id, "building replace streaming job");
1449        let mut updated_sink_catalogs = vec![];
1450
1451        let mut drop_table_connector_ctx = None;
1452        let result: MetaResult<_> = try {
1453            let (mut ctx, mut stream_job_fragments) = self
1454                .build_replace_job(
1455                    ctx,
1456                    &streaming_job,
1457                    fragment_graph,
1458                    tmp_id,
1459                    auto_refresh_schema_sinks,
1460                )
1461                .await?;
1462            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1463            let auto_refresh_schema_sink_finish_ctx =
1464                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1465                    sinks
1466                        .iter()
1467                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1468                            tmp_sink_id: sink.tmp_sink_id,
1469                            original_sink_id: sink.original_sink.id,
1470                            columns: sink.new_schema.clone(),
1471                            new_log_store_table: sink
1472                                .new_log_store_table
1473                                .as_ref()
1474                                .map(|table| (table.id, table.columns.clone())),
1475                        })
1476                        .collect()
1477                });
1478
1479            // Handle table that has incoming sinks.
1480            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1481                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1482                let upstream_infos = self
1483                    .metadata_manager
1484                    .catalog_controller
1485                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1486                    .await?;
1487                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1488
1489                for upstream_info in &upstream_infos {
1490                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1491                    ctx.upstream_fragment_downstreams
1492                        .entry(upstream_fragment_id)
1493                        .or_default()
1494                        .push(upstream_info.new_sink_downstream.clone());
1495                    if upstream_info.sink_original_target_columns.is_empty() {
1496                        updated_sink_catalogs.push(upstream_info.sink_id);
1497                    }
1498                }
1499            }
1500
1501            let replace_upstream = ctx.replace_upstream.clone();
1502
1503            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1504                let empty_downstreams = FragmentDownstreamRelation::default();
1505                for sink in sinks {
1506                    self.metadata_manager
1507                        .catalog_controller
1508                        .prepare_streaming_job(
1509                            sink.tmp_sink_id.as_job_id(),
1510                            || [&sink.new_fragment].into_iter(),
1511                            &empty_downstreams,
1512                            true,
1513                            None,
1514                        )
1515                        .await?;
1516                }
1517            }
1518
1519            self.metadata_manager
1520                .catalog_controller
1521                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1522                .await?;
1523
1524            self.stream_manager
1525                .replace_stream_job(stream_job_fragments, ctx)
1526                .await?;
1527            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1528        };
1529
1530        match result {
1531            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1532                let version = self
1533                    .metadata_manager
1534                    .catalog_controller
1535                    .finish_replace_streaming_job(
1536                        tmp_id,
1537                        streaming_job,
1538                        replace_upstream,
1539                        SinkIntoTableContext {
1540                            updated_sink_catalogs,
1541                        },
1542                        drop_table_connector_ctx.as_ref(),
1543                        auto_refresh_schema_sink_finish_ctx,
1544                    )
1545                    .await?;
1546                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1547                    self.source_manager
1548                        .apply_source_change(SourceChange::DropSource {
1549                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1550                        })
1551                        .await;
1552                }
1553                Ok(version)
1554            }
1555            Err(err) => {
1556                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1557                let _ = self.metadata_manager
1558                    .catalog_controller
1559                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1560                    .await.inspect_err(|err| {
1561                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1562                });
1563                Err(err)
1564            }
1565        }
1566    }
1567
1568    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1569    )]
1570    async fn drop_streaming_job(
1571        &self,
1572        job_id: StreamingJobId,
1573        drop_mode: DropMode,
1574    ) -> MetaResult<NotificationVersion> {
1575        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1576
1577        let (object_id, object_type) = match job_id {
1578            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1579            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1580            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1581            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1582        };
1583
1584        let job_status = self
1585            .metadata_manager
1586            .catalog_controller
1587            .get_streaming_job_status(job_id.id())
1588            .await?;
1589        let version = match job_status {
1590            JobStatus::Initial => {
1591                unreachable!(
1592                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1593                );
1594            }
1595            JobStatus::Creating => {
1596                self.stream_manager
1597                    .cancel_streaming_jobs(vec![job_id.id()])
1598                    .await?;
1599                IGNORED_NOTIFICATION_VERSION
1600            }
1601            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1602        };
1603
1604        Ok(version)
1605    }
1606
1607    /// Resolve the parallelism of the stream job based on the given information.
1608    ///
1609    /// Returns error if user specifies a parallelism that cannot be satisfied.
1610    fn resolve_stream_parallelism(
1611        &self,
1612        specified: Option<NonZeroUsize>,
1613        max: NonZeroUsize,
1614        cluster_info: &StreamingClusterInfo,
1615        resource_group: String,
1616    ) -> MetaResult<NonZeroUsize> {
1617        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1618        DdlController::resolve_stream_parallelism_inner(
1619            specified,
1620            max,
1621            available,
1622            &self.env.opts.default_parallelism,
1623            &resource_group,
1624        )
1625    }
1626
1627    fn resolve_stream_parallelism_inner(
1628        specified: Option<NonZeroUsize>,
1629        max: NonZeroUsize,
1630        available: Option<NonZeroUsize>,
1631        default_parallelism: &DefaultParallelism,
1632        resource_group: &str,
1633    ) -> MetaResult<NonZeroUsize> {
1634        let Some(available) = available else {
1635            bail_unavailable!(
1636                "no available slots to schedule in resource group \"{}\", \
1637                 have you allocated any compute nodes within this resource group?",
1638                resource_group
1639            );
1640        };
1641
1642        if let Some(specified) = specified {
1643            if specified > max {
1644                bail_invalid_parameter!(
1645                    "specified parallelism {} should not exceed max parallelism {}",
1646                    specified,
1647                    max,
1648                );
1649            }
1650            if specified > available {
1651                tracing::warn!(
1652                    resource_group,
1653                    specified_parallelism = specified.get(),
1654                    available_parallelism = available.get(),
1655                    "specified parallelism exceeds available slots, scheduling with specified value",
1656                );
1657            }
1658            return Ok(specified);
1659        }
1660
1661        // Use default parallelism when no specific parallelism is provided by the user.
1662        let default_parallelism = match default_parallelism {
1663            DefaultParallelism::Full => available,
1664            DefaultParallelism::Default(num) => {
1665                if *num > available {
1666                    tracing::warn!(
1667                        resource_group,
1668                        configured_parallelism = num.get(),
1669                        available_parallelism = available.get(),
1670                        "default parallelism exceeds available slots, scheduling with configured value",
1671                    );
1672                }
1673                *num
1674            }
1675        };
1676
1677        if default_parallelism > max {
1678            tracing::warn!(
1679                max_parallelism = max.get(),
1680                resource_group,
1681                "default parallelism exceeds max parallelism, capping to max",
1682            );
1683        }
1684        Ok(default_parallelism.min(max))
1685    }
1686
1687    /// Builds the actor graph:
1688    /// - Add the upstream fragments to the fragment graph
1689    /// - Schedule the fragments based on their distribution
1690    /// - Expand each fragment into one or several actors
1691    /// - Construct the fragment level backfill order control.
1692    #[await_tree::instrument]
1693    pub(crate) async fn build_stream_job(
1694        &self,
1695        stream_ctx: StreamContext,
1696        mut stream_job: StreamingJob,
1697        fragment_graph: StreamFragmentGraph,
1698        specific_resource_group: Option<String>,
1699    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1700        let id = stream_job.id();
1701        let specified_parallelism = fragment_graph.specified_parallelism();
1702        let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1703        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1704
1705        // 1. Fragment Level ordering graph
1706        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1707
1708        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1709        // contains all information needed for building the actor graph.
1710
1711        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1712            fragment_graph.collect_snapshot_backfill_info()?;
1713        assert!(
1714            snapshot_backfill_info
1715                .iter()
1716                .chain([&cross_db_snapshot_backfill_info])
1717                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1718                .all(|backfill_epoch| backfill_epoch.is_none()),
1719            "should not set backfill epoch when initially build the job: {:?} {:?}",
1720            snapshot_backfill_info,
1721            cross_db_snapshot_backfill_info
1722        );
1723
1724        let locality_fragment_state_table_mapping =
1725            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1726
1727        // check if log store exists for all cross-db upstreams
1728        self.metadata_manager
1729            .catalog_controller
1730            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1731            .await?;
1732
1733        let upstream_table_ids = fragment_graph
1734            .dependent_table_ids()
1735            .iter()
1736            .filter(|id| {
1737                !cross_db_snapshot_backfill_info
1738                    .upstream_mv_table_id_to_backfill_epoch
1739                    .contains_key(id)
1740            })
1741            .cloned()
1742            .collect();
1743
1744        let (upstream_root_fragments, existing_actor_location) = self
1745            .metadata_manager
1746            .get_upstream_root_fragments(&upstream_table_ids)
1747            .await?;
1748
1749        if snapshot_backfill_info.is_some() {
1750            match stream_job {
1751                StreamingJob::MaterializedView(_)
1752                | StreamingJob::Sink(_)
1753                | StreamingJob::Index(_, _) => {}
1754                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1755                    return Err(
1756                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1757                    );
1758                }
1759            }
1760        }
1761
1762        let upstream_actors = upstream_root_fragments
1763            .values()
1764            .map(|(fragment, _)| {
1765                (
1766                    fragment.fragment_id,
1767                    fragment.actors.keys().copied().collect(),
1768                )
1769            })
1770            .collect();
1771
1772        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1773            fragment_graph,
1774            FragmentGraphUpstreamContext {
1775                upstream_root_fragments,
1776                upstream_actor_location: existing_actor_location,
1777            },
1778            (&stream_job).into(),
1779        )?;
1780        let resource_group = match specific_resource_group {
1781            None => {
1782                self.metadata_manager
1783                    .get_database_resource_group(stream_job.database_id())
1784                    .await?
1785            }
1786            Some(resource_group) => resource_group,
1787        };
1788
1789        // 3. Build the actor graph.
1790        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1791
1792        let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1793        let parallelism = self.resolve_stream_parallelism(
1794            initial_parallelism,
1795            max_parallelism,
1796            &cluster_info,
1797            resource_group.clone(),
1798        )?;
1799
1800        let parallelism = self
1801            .env
1802            .system_params_reader()
1803            .await
1804            .adaptive_parallelism_strategy()
1805            .compute_target_parallelism(parallelism.get());
1806
1807        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1808        let actor_graph_builder = ActorGraphBuilder::new(
1809            id,
1810            resource_group,
1811            complete_graph,
1812            cluster_info,
1813            parallelism,
1814        )?;
1815
1816        let ActorGraphBuildResult {
1817            graph,
1818            downstream_fragment_relations,
1819            building_locations,
1820            upstream_fragment_downstreams,
1821            new_no_shuffle,
1822            replace_upstream,
1823            ..
1824        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1825        assert!(replace_upstream.is_empty());
1826
1827        // 4. Build the table fragments structure that will be persisted in the stream manager,
1828        // and the context that contains all information needed for building the
1829        // actors on the compute nodes.
1830
1831        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1832        // Otherwise, it defaults to FIXED based on deduction.
1833        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1834            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1835            _ => TableParallelism::Fixed(parallelism.get()),
1836        };
1837
1838        let stream_job_fragments = StreamJobFragments::new(
1839            id,
1840            graph,
1841            &building_locations.actor_locations,
1842            stream_ctx.clone(),
1843            table_parallelism,
1844            max_parallelism.get(),
1845        );
1846
1847        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1848            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1849        }
1850
1851        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1852            && let Ok(table_id) = sink.get_target_table()
1853        {
1854            let tables = self
1855                .metadata_manager
1856                .get_table_catalog_by_ids(&[*table_id])
1857                .await?;
1858            let target_table = tables
1859                .first()
1860                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1861            let sink_fragment = stream_job_fragments
1862                .sink_fragment()
1863                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1864            let mview_fragment_id = self
1865                .metadata_manager
1866                .catalog_controller
1867                .get_mview_fragment_by_id(table_id.as_job_id())
1868                .await?;
1869            let upstream_sink_info = build_upstream_sink_info(
1870                sink,
1871                sink_fragment.fragment_id as _,
1872                target_table,
1873                mview_fragment_id,
1874            )?;
1875            Some(upstream_sink_info)
1876        } else {
1877            None
1878        };
1879
1880        let mut cdc_table_snapshot_splits = None;
1881        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1882            && let Some((_, stream_cdc_scan)) =
1883                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1884        {
1885            {
1886                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1887                let splits = try_init_parallel_cdc_table_snapshot_splits(
1888                    table.id,
1889                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1890                    self.env.meta_store_ref(),
1891                    stream_cdc_scan.options.as_ref().unwrap(),
1892                    self.env.opts.cdc_table_split_init_insert_batch_size,
1893                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1894                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1895                )
1896                .await?;
1897                cdc_table_snapshot_splits = Some(splits);
1898            }
1899        }
1900
1901        let ctx = CreateStreamingJobContext {
1902            upstream_fragment_downstreams,
1903            new_no_shuffle,
1904            upstream_actors,
1905            building_locations,
1906            definition: stream_job.definition(),
1907            create_type: stream_job.create_type(),
1908            job_type: (&stream_job).into(),
1909            streaming_job: stream_job,
1910            new_upstream_sink,
1911            option: CreateStreamingJobOption {},
1912            snapshot_backfill_info,
1913            cross_db_snapshot_backfill_info,
1914            fragment_backfill_ordering,
1915            locality_fragment_state_table_mapping,
1916            cdc_table_snapshot_splits,
1917        };
1918
1919        Ok((
1920            ctx,
1921            StreamJobFragmentsToCreate {
1922                inner: stream_job_fragments,
1923                downstreams: downstream_fragment_relations,
1924            },
1925        ))
1926    }
1927
1928    /// `build_replace_table` builds a job replacement and returns the context and new job
1929    /// fragments.
1930    ///
1931    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1932    /// replacement is finished.
1933    pub(crate) async fn build_replace_job(
1934        &self,
1935        stream_ctx: StreamContext,
1936        stream_job: &StreamingJob,
1937        mut fragment_graph: StreamFragmentGraph,
1938        tmp_job_id: JobId,
1939        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1940    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1941        match &stream_job {
1942            StreamingJob::Table(..)
1943            | StreamingJob::Source(..)
1944            | StreamingJob::MaterializedView(..) => {}
1945            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1946                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1947            }
1948        }
1949
1950        let id = stream_job.id();
1951
1952        // check if performing drop table connector
1953        let mut drop_table_associated_source_id = None;
1954        if let StreamingJob::Table(None, _, _) = &stream_job {
1955            drop_table_associated_source_id = self
1956                .metadata_manager
1957                .get_table_associated_source_id(id.as_mv_table_id())
1958                .await?;
1959        }
1960
1961        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1962        let old_internal_table_ids = old_fragments.internal_table_ids();
1963
1964        // handle drop table's associated source
1965        let mut drop_table_connector_ctx = None;
1966        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1967            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1968            debug_assert!(old_internal_table_ids.len() == 1);
1969
1970            drop_table_connector_ctx = Some(DropTableConnectorContext {
1971                // we do not remove the original table catalog as it's still needed for the streaming job
1972                // just need to remove the ref to the state table
1973                to_change_streaming_job_id: id,
1974                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1975                to_remove_source_id,
1976            });
1977        } else if stream_job.is_materialized_view() {
1978            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1979            // but more robust.
1980            let old_fragments_upstreams = self
1981                .metadata_manager
1982                .catalog_controller
1983                .upstream_fragments(old_fragments.fragment_ids())
1984                .await?;
1985
1986            let old_state_graph =
1987                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1988            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1989            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1990                .context("incompatible altering on the streaming job states")?;
1991
1992            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1993            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1994        } else {
1995            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
1996            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
1997            let old_internal_tables = self
1998                .metadata_manager
1999                .get_table_catalog_by_ids(&old_internal_table_ids)
2000                .await?;
2001            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2002        }
2003
2004        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2005        // graph that contains all information needed for building the actor graph.
2006        let original_root_fragment = old_fragments
2007            .root_fragment()
2008            .expect("root fragment not found");
2009
2010        let job_type = StreamingJobType::from(stream_job);
2011
2012        // Extract the downstream fragments from the fragment graph.
2013        let (mut downstream_fragments, mut downstream_actor_location) =
2014            self.metadata_manager.get_downstream_fragments(id).await?;
2015
2016        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2017            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2018                .iter()
2019                .map(|sink| sink.original_fragment.fragment_id)
2020                .collect();
2021            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2022                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2023                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2024                }) {
2025                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2026                    for actor_id in downstream_fragment.actors.keys() {
2027                        downstream_actor_location.remove(actor_id);
2028                    }
2029                    for (actor_id, status) in &sink.actor_status {
2030                        downstream_actor_location
2031                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2032                    }
2033
2034                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2035                    *nodes = sink.new_fragment.nodes.clone();
2036                }
2037            }
2038            assert!(remaining_fragment.is_empty());
2039        }
2040
2041        // build complete graph based on the table job type
2042        let complete_graph = match &job_type {
2043            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2044                CompleteStreamFragmentGraph::with_downstreams(
2045                    fragment_graph,
2046                    FragmentGraphDownstreamContext {
2047                        original_root_fragment_id: original_root_fragment.fragment_id,
2048                        downstream_fragments,
2049                        downstream_actor_location,
2050                    },
2051                    job_type,
2052                )?
2053            }
2054            StreamingJobType::Table(TableJobType::SharedCdcSource)
2055            | StreamingJobType::MaterializedView => {
2056                // CDC tables or materialized views can have upstream jobs as well.
2057                let (upstream_root_fragments, upstream_actor_location) = self
2058                    .metadata_manager
2059                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2060                    .await?;
2061
2062                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2063                    fragment_graph,
2064                    FragmentGraphUpstreamContext {
2065                        upstream_root_fragments,
2066                        upstream_actor_location,
2067                    },
2068                    FragmentGraphDownstreamContext {
2069                        original_root_fragment_id: original_root_fragment.fragment_id,
2070                        downstream_fragments,
2071                        downstream_actor_location,
2072                    },
2073                    job_type,
2074                )?
2075            }
2076            _ => unreachable!(),
2077        };
2078
2079        let resource_group = self
2080            .metadata_manager
2081            .get_existing_job_resource_group(id)
2082            .await?;
2083
2084        // 2. Build the actor graph.
2085        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2086
2087        // XXX: what is this parallelism?
2088        // Is it "assigned parallelism"?
2089        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2090            .expect("The number of actors in the original table fragment should be greater than 0");
2091
2092        let actor_graph_builder = ActorGraphBuilder::new(
2093            id,
2094            resource_group,
2095            complete_graph,
2096            cluster_info,
2097            parallelism,
2098        )?;
2099
2100        let ActorGraphBuildResult {
2101            graph,
2102            downstream_fragment_relations,
2103            building_locations,
2104            upstream_fragment_downstreams,
2105            mut replace_upstream,
2106            new_no_shuffle,
2107            ..
2108        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2109
2110        // general table & source does not have upstream job, so the dispatchers should be empty
2111        if matches!(
2112            job_type,
2113            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2114        ) {
2115            assert!(upstream_fragment_downstreams.is_empty());
2116        }
2117
2118        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2119        // the context that contains all information needed for building the actors on the compute
2120        // nodes.
2121        let stream_job_fragments = StreamJobFragments::new(
2122            tmp_job_id,
2123            graph,
2124            &building_locations.actor_locations,
2125            stream_ctx,
2126            old_fragments.assigned_parallelism,
2127            old_fragments.max_parallelism,
2128        );
2129
2130        if let Some(sinks) = &auto_refresh_schema_sinks {
2131            for sink in sinks {
2132                replace_upstream
2133                    .remove(&sink.new_fragment.fragment_id)
2134                    .expect("should exist");
2135            }
2136        }
2137
2138        // Note: no need to set `vnode_count` as it's already set by the frontend.
2139        // See `get_replace_table_plan`.
2140
2141        let ctx = ReplaceStreamJobContext {
2142            old_fragments,
2143            replace_upstream,
2144            new_no_shuffle,
2145            upstream_fragment_downstreams,
2146            building_locations,
2147            streaming_job: stream_job.clone(),
2148            tmp_id: tmp_job_id,
2149            drop_table_connector_ctx,
2150            auto_refresh_schema_sinks,
2151        };
2152
2153        Ok((
2154            ctx,
2155            StreamJobFragmentsToCreate {
2156                inner: stream_job_fragments,
2157                downstreams: downstream_fragment_relations,
2158            },
2159        ))
2160    }
2161
2162    async fn alter_name(
2163        &self,
2164        relation: alter_name_request::Object,
2165        new_name: &str,
2166    ) -> MetaResult<NotificationVersion> {
2167        let (obj_type, id) = match relation {
2168            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2169            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2170            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2171            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2172            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2173            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2174            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2175            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2176        };
2177        self.metadata_manager
2178            .catalog_controller
2179            .alter_name(obj_type, id, new_name)
2180            .await
2181    }
2182
2183    async fn alter_swap_rename(
2184        &self,
2185        object: alter_swap_rename_request::Object,
2186    ) -> MetaResult<NotificationVersion> {
2187        let (obj_type, src_id, dst_id) = match object {
2188            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2189            alter_swap_rename_request::Object::Table(objs) => {
2190                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2191                (ObjectType::Table, src_id, dst_id)
2192            }
2193            alter_swap_rename_request::Object::View(objs) => {
2194                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2195                (ObjectType::View, src_id, dst_id)
2196            }
2197            alter_swap_rename_request::Object::Source(objs) => {
2198                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2199                (ObjectType::Source, src_id, dst_id)
2200            }
2201            alter_swap_rename_request::Object::Sink(objs) => {
2202                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2203                (ObjectType::Sink, src_id, dst_id)
2204            }
2205            alter_swap_rename_request::Object::Subscription(objs) => {
2206                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2207                (ObjectType::Subscription, src_id, dst_id)
2208            }
2209        };
2210
2211        self.metadata_manager
2212            .catalog_controller
2213            .alter_swap_rename(obj_type, src_id, dst_id)
2214            .await
2215    }
2216
2217    async fn alter_owner(
2218        &self,
2219        object: Object,
2220        owner_id: UserId,
2221    ) -> MetaResult<NotificationVersion> {
2222        let (obj_type, id) = match object {
2223            Object::TableId(id) => (ObjectType::Table, id),
2224            Object::ViewId(id) => (ObjectType::View, id),
2225            Object::SourceId(id) => (ObjectType::Source, id),
2226            Object::SinkId(id) => (ObjectType::Sink, id),
2227            Object::SchemaId(id) => (ObjectType::Schema, id),
2228            Object::DatabaseId(id) => (ObjectType::Database, id),
2229            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2230            Object::ConnectionId(id) => (ObjectType::Connection, id),
2231        };
2232        self.metadata_manager
2233            .catalog_controller
2234            .alter_owner(obj_type, id.into(), owner_id as _)
2235            .await
2236    }
2237
2238    async fn alter_set_schema(
2239        &self,
2240        object: alter_set_schema_request::Object,
2241        new_schema_id: SchemaId,
2242    ) -> MetaResult<NotificationVersion> {
2243        let (obj_type, id) = match object {
2244            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2245            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2246            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2247            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2248            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2249            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2250            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2251        };
2252        self.metadata_manager
2253            .catalog_controller
2254            .alter_schema(obj_type, id.into(), new_schema_id as _)
2255            .await
2256    }
2257
2258    pub async fn wait(&self) -> MetaResult<()> {
2259        let timeout_ms = 30 * 60 * 1000;
2260        for _ in 0..timeout_ms {
2261            if self
2262                .metadata_manager
2263                .catalog_controller
2264                .list_background_creating_jobs(true, None)
2265                .await?
2266                .is_empty()
2267            {
2268                return Ok(());
2269            }
2270
2271            sleep(Duration::from_millis(1)).await;
2272        }
2273        Err(MetaError::cancelled(format!(
2274            "timeout after {timeout_ms}ms"
2275        )))
2276    }
2277
2278    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2279        self.metadata_manager
2280            .catalog_controller
2281            .comment_on(comment)
2282            .await
2283    }
2284
2285    async fn alter_streaming_job_config(
2286        &self,
2287        job_id: JobId,
2288        entries_to_add: HashMap<String, String>,
2289        keys_to_remove: Vec<String>,
2290    ) -> MetaResult<NotificationVersion> {
2291        self.metadata_manager
2292            .catalog_controller
2293            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2294            .await
2295    }
2296}
2297
2298fn report_create_object(
2299    job_id: JobId,
2300    event_name: &str,
2301    obj_type: PbTelemetryDatabaseObject,
2302    connector_name: Option<String>,
2303    attr_info: Option<jsonbb::Value>,
2304) {
2305    report_event(
2306        PbTelemetryEventStage::CreateStreamJob,
2307        event_name,
2308        job_id.as_raw_id() as _,
2309        connector_name,
2310        Some(obj_type),
2311        attr_info,
2312    );
2313}
2314
2315pub fn build_upstream_sink_info(
2316    sink: &PbSink,
2317    sink_fragment_id: FragmentId,
2318    target_table: &PbTable,
2319    target_fragment_id: FragmentId,
2320) -> MetaResult<UpstreamSinkInfo> {
2321    let sink_columns = if !sink.original_target_columns.is_empty() {
2322        sink.original_target_columns.clone()
2323    } else {
2324        // This is due to the fact that the value did not exist in earlier versions,
2325        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2326        // Therefore the columns of the table at this point are `original_target_columns`.
2327        // This value of sink will be filled on the meta.
2328        target_table.columns.clone()
2329    };
2330
2331    let sink_output_fields = sink_columns
2332        .iter()
2333        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2334        .collect_vec();
2335    let output_indices = (0..sink_output_fields.len())
2336        .map(|i| i as u32)
2337        .collect_vec();
2338
2339    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2340        let sink_idx_by_col_id = sink_columns
2341            .iter()
2342            .enumerate()
2343            .map(|(idx, col)| {
2344                let column_id = col.column_desc.as_ref().unwrap().column_id;
2345                (column_id, idx as u32)
2346            })
2347            .collect::<HashMap<_, _>>();
2348        target_table
2349            .distribution_key
2350            .iter()
2351            .map(|dist_idx| {
2352                let column_id = target_table.columns[*dist_idx as usize]
2353                    .column_desc
2354                    .as_ref()
2355                    .unwrap()
2356                    .column_id;
2357                let sink_idx = sink_idx_by_col_id
2358                    .get(&column_id)
2359                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2360                Ok(*sink_idx)
2361            })
2362            .collect::<anyhow::Result<Vec<_>>>()?
2363    };
2364    let dist_key_indices =
2365        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2366    let downstream_fragment_id = target_fragment_id as _;
2367    let new_downstream_relation = DownstreamFragmentRelation {
2368        downstream_fragment_id,
2369        dispatcher_type: DispatcherType::Hash,
2370        dist_key_indices,
2371        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2372    };
2373    let current_target_columns = target_table.get_columns();
2374    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2375    Ok(UpstreamSinkInfo {
2376        sink_id: sink.id,
2377        sink_fragment_id: sink_fragment_id as _,
2378        sink_output_fields,
2379        sink_original_target_columns: sink.get_original_target_columns().clone(),
2380        project_exprs,
2381        new_sink_downstream: new_downstream_relation,
2382    })
2383}
2384
2385pub fn refill_upstream_sink_union_in_table(
2386    union_fragment_root: &mut PbStreamNode,
2387    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2388) {
2389    visit_stream_node_cont_mut(union_fragment_root, |node| {
2390        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2391            let init_upstreams = upstream_sink_infos
2392                .iter()
2393                .map(|info| PbUpstreamSinkInfo {
2394                    upstream_fragment_id: info.sink_fragment_id,
2395                    sink_output_schema: info.sink_output_fields.clone(),
2396                    project_exprs: info.project_exprs.clone(),
2397                })
2398                .collect();
2399            upstream_sink_union.init_upstreams = init_upstreams;
2400            false
2401        } else {
2402            true
2403        }
2404    });
2405}
2406
2407#[cfg(test)]
2408mod tests {
2409    use std::num::NonZeroUsize;
2410
2411    use super::*;
2412
2413    #[test]
2414    fn test_specified_parallelism_exceeds_available() {
2415        let result = DdlController::resolve_stream_parallelism_inner(
2416            Some(NonZeroUsize::new(100).unwrap()),
2417            NonZeroUsize::new(256).unwrap(),
2418            Some(NonZeroUsize::new(4).unwrap()),
2419            &DefaultParallelism::Full,
2420            "default",
2421        )
2422        .unwrap();
2423        assert_eq!(result.get(), 100);
2424    }
2425
2426    #[test]
2427    fn test_allows_default_parallelism_over_available() {
2428        let result = DdlController::resolve_stream_parallelism_inner(
2429            None,
2430            NonZeroUsize::new(256).unwrap(),
2431            Some(NonZeroUsize::new(4).unwrap()),
2432            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2433            "default",
2434        )
2435        .unwrap();
2436        assert_eq!(result.get(), 50);
2437    }
2438
2439    #[test]
2440    fn test_full_parallelism_capped_by_max() {
2441        let result = DdlController::resolve_stream_parallelism_inner(
2442            None,
2443            NonZeroUsize::new(6).unwrap(),
2444            Some(NonZeroUsize::new(10).unwrap()),
2445            &DefaultParallelism::Full,
2446            "default",
2447        )
2448        .unwrap();
2449        assert_eq!(result.get(), 6);
2450    }
2451
2452    #[test]
2453    fn test_no_available_slots_returns_error() {
2454        let result = DdlController::resolve_stream_parallelism_inner(
2455            None,
2456            NonZeroUsize::new(4).unwrap(),
2457            None,
2458            &DefaultParallelism::Full,
2459            "default",
2460        );
2461        assert!(matches!(
2462            result,
2463            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2464        ));
2465    }
2466
2467    #[test]
2468    fn test_specified_over_max_returns_error() {
2469        let result = DdlController::resolve_stream_parallelism_inner(
2470            Some(NonZeroUsize::new(8).unwrap()),
2471            NonZeroUsize::new(4).unwrap(),
2472            Some(NonZeroUsize::new(10).unwrap()),
2473            &DefaultParallelism::Full,
2474            "default",
2475        );
2476        assert!(matches!(
2477            result,
2478            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2479        ));
2480    }
2481}