risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63    StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::{BarrierManagerRef, Command};
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::sink_coordination::SinkCoordinatorManager;
79use crate::manager::{
80    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
81    NotificationVersion, StreamingJob, StreamingJobType,
82};
83use crate::model::{
84    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
85    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
86    TableParallelism,
87};
88use crate::stream::cdc::{
89    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
96    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
97    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104    Restrict,
105    Cascade,
106}
107
108impl DropMode {
109    pub fn from_request_setting(cascade: bool) -> DropMode {
110        if cascade {
111            DropMode::Cascade
112        } else {
113            DropMode::Restrict
114        }
115    }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120    MaterializedView(TableId),
121    Sink(SinkId),
122    Table(Option<SourceId>, TableId),
123    Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "{}", self.as_ref())?;
129        write!(f, "({})", self.id())
130    }
131}
132
133impl StreamingJobId {
134    fn id(&self) -> JobId {
135        match self {
136            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137            StreamingJobId::Index(id) => id.as_job_id(),
138            StreamingJobId::Sink(id) => id.as_job_id(),
139        }
140    }
141}
142
143/// It’s used to describe the information of the job that needs to be replaced
144/// and it will be used during replacing table and creating sink into table operations.
145pub struct ReplaceStreamJobInfo {
146    pub streaming_job: StreamingJob,
147    pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152    CreateDatabase(Database),
153    DropDatabase(DatabaseId),
154    CreateSchema(Schema),
155    DropSchema(SchemaId, DropMode),
156    CreateNonSharedSource(Source),
157    DropSource(SourceId, DropMode),
158    ResetSource(SourceId),
159    CreateFunction(Function),
160    DropFunction(FunctionId, DropMode),
161    CreateView(View, HashSet<ObjectId>),
162    DropView(ViewId, DropMode),
163    CreateStreamingJob {
164        stream_job: StreamingJob,
165        fragment_graph: StreamFragmentGraphProto,
166        dependencies: HashSet<ObjectId>,
167        resource_type: streaming_job_resource_type::ResourceType,
168        if_not_exists: bool,
169    },
170    DropStreamingJob {
171        job_id: StreamingJobId,
172        drop_mode: DropMode,
173    },
174    AlterName(alter_name_request::Object, String),
175    AlterSwapRename(alter_swap_rename_request::Object),
176    ReplaceStreamJob(ReplaceStreamJobInfo),
177    AlterNonSharedSource(Source),
178    AlterObjectOwner(Object, UserId),
179    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180    CreateConnection(Connection),
181    DropConnection(ConnectionId, DropMode),
182    CreateSecret(Secret),
183    AlterSecret(Secret),
184    DropSecret(SecretId),
185    CommentOn(Comment),
186    CreateSubscription(Subscription),
187    DropSubscription(SubscriptionId, DropMode),
188    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
190}
191
192impl DdlCommand {
193    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
194    fn object(&self) -> Either<String, ObjectId> {
195        use Either::*;
196        match self {
197            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
198            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
199            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
200            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
201            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
202            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
203            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
204            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
205            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
206            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
207            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
208            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
209            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
210            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
211            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
212            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
213            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
214            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
215            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
216            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
217            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
218            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
219            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
220            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
221            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
222            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
223            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
224            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
225            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
226        }
227    }
228
229    fn allow_in_recovery(&self) -> bool {
230        match self {
231            DdlCommand::DropDatabase(_)
232            | DdlCommand::DropSchema(_, _)
233            | DdlCommand::DropSource(_, _)
234            | DdlCommand::DropFunction(_, _)
235            | DdlCommand::DropView(_, _)
236            | DdlCommand::DropStreamingJob { .. }
237            | DdlCommand::DropConnection(_, _)
238            | DdlCommand::DropSecret(_)
239            | DdlCommand::DropSubscription(_, _)
240            | DdlCommand::AlterName(_, _)
241            | DdlCommand::AlterObjectOwner(_, _)
242            | DdlCommand::AlterSetSchema(_, _)
243            | DdlCommand::CreateDatabase(_)
244            | DdlCommand::CreateSchema(_)
245            | DdlCommand::CreateFunction(_)
246            | DdlCommand::CreateView(_, _)
247            | DdlCommand::CreateConnection(_)
248            | DdlCommand::CommentOn(_)
249            | DdlCommand::CreateSecret(_)
250            | DdlCommand::AlterSecret(_)
251            | DdlCommand::AlterSwapRename(_)
252            | DdlCommand::AlterDatabaseParam(_, _)
253            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
254            DdlCommand::CreateStreamingJob { .. }
255            | DdlCommand::CreateNonSharedSource(_)
256            | DdlCommand::ReplaceStreamJob(_)
257            | DdlCommand::AlterNonSharedSource(_)
258            | DdlCommand::ResetSource(_)
259            | DdlCommand::CreateSubscription(_) => false,
260        }
261    }
262}
263
264#[derive(Clone)]
265pub struct DdlController {
266    pub(crate) env: MetaSrvEnv,
267
268    pub(crate) metadata_manager: MetadataManager,
269    pub(crate) stream_manager: GlobalStreamManagerRef,
270    pub(crate) source_manager: SourceManagerRef,
271    barrier_manager: BarrierManagerRef,
272    sink_manager: SinkCoordinatorManager,
273
274    // The semaphore is used to limit the number of concurrent streaming job creation.
275    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
276
277    /// Sequence number for DDL commands, used for observability and debugging.
278    seq: Arc<AtomicU64>,
279}
280
281#[derive(Clone)]
282pub struct CreatingStreamingJobPermit {
283    pub(crate) semaphore: Arc<Semaphore>,
284}
285
286impl CreatingStreamingJobPermit {
287    async fn new(env: &MetaSrvEnv) -> Self {
288        let mut permits = env
289            .system_params_reader()
290            .await
291            .max_concurrent_creating_streaming_jobs() as usize;
292        if permits == 0 {
293            // if the system parameter is set to zero, use the max permitted value.
294            permits = Semaphore::MAX_PERMITS;
295        }
296        let semaphore = Arc::new(Semaphore::new(permits));
297
298        let (local_notification_tx, mut local_notification_rx) =
299            tokio::sync::mpsc::unbounded_channel();
300        env.notification_manager()
301            .insert_local_sender(local_notification_tx);
302        let semaphore_clone = semaphore.clone();
303        tokio::spawn(async move {
304            while let Some(notification) = local_notification_rx.recv().await {
305                let LocalNotification::SystemParamsChange(p) = &notification else {
306                    continue;
307                };
308                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
309                if new_permits == 0 {
310                    new_permits = Semaphore::MAX_PERMITS;
311                }
312                match permits.cmp(&new_permits) {
313                    Ordering::Less => {
314                        semaphore_clone.add_permits(new_permits - permits);
315                    }
316                    Ordering::Equal => continue,
317                    Ordering::Greater => {
318                        let to_release = permits - new_permits;
319                        let reduced = semaphore_clone.forget_permits(to_release);
320                        // TODO: implement dynamic semaphore with limits by ourself.
321                        if reduced != to_release {
322                            tracing::warn!(
323                                "no enough permits to release, expected {}, but reduced {}",
324                                to_release,
325                                reduced
326                            );
327                        }
328                    }
329                }
330                tracing::info!(
331                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
332                    permits,
333                    new_permits
334                );
335                permits = new_permits;
336            }
337        });
338
339        Self { semaphore }
340    }
341}
342
343impl DdlController {
344    pub async fn new(
345        env: MetaSrvEnv,
346        metadata_manager: MetadataManager,
347        stream_manager: GlobalStreamManagerRef,
348        source_manager: SourceManagerRef,
349        barrier_manager: BarrierManagerRef,
350        sink_manager: SinkCoordinatorManager,
351    ) -> Self {
352        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
353        Self {
354            env,
355            metadata_manager,
356            stream_manager,
357            source_manager,
358            barrier_manager,
359            sink_manager,
360            creating_streaming_job_permits,
361            seq: Arc::new(AtomicU64::new(0)),
362        }
363    }
364
365    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
366    pub fn next_seq(&self) -> u64 {
367        // This is a simple atomic increment operation.
368        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
369    }
370
371    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
372    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
373    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
374    /// would be a huge hassle and pain if we don't spawn here.
375    ///
376    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
377    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
378        if !command.allow_in_recovery() {
379            self.barrier_manager.check_status_running()?;
380        }
381
382        let await_tree_key = format!("DDL Command {}", self.next_seq());
383        let await_tree_span = await_tree::span!("{command}({})", command.object());
384
385        let ctrl = self.clone();
386        let fut = async move {
387            match command {
388                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
389                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
390                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
391                DdlCommand::DropSchema(schema_id, drop_mode) => {
392                    ctrl.drop_schema(schema_id, drop_mode).await
393                }
394                DdlCommand::CreateNonSharedSource(source) => {
395                    ctrl.create_non_shared_source(source).await
396                }
397                DdlCommand::DropSource(source_id, drop_mode) => {
398                    ctrl.drop_source(source_id, drop_mode).await
399                }
400                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
401                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
402                DdlCommand::DropFunction(function_id, drop_mode) => {
403                    ctrl.drop_function(function_id, drop_mode).await
404                }
405                DdlCommand::CreateView(view, dependencies) => {
406                    ctrl.create_view(view, dependencies).await
407                }
408                DdlCommand::DropView(view_id, drop_mode) => {
409                    ctrl.drop_view(view_id, drop_mode).await
410                }
411                DdlCommand::CreateStreamingJob {
412                    stream_job,
413                    fragment_graph,
414                    dependencies,
415                    resource_type,
416                    if_not_exists,
417                } => {
418                    ctrl.create_streaming_job(
419                        stream_job,
420                        fragment_graph,
421                        dependencies,
422                        resource_type,
423                        if_not_exists,
424                    )
425                    .await
426                }
427                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
428                    ctrl.drop_streaming_job(job_id, drop_mode).await
429                }
430                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
431                    streaming_job,
432                    fragment_graph,
433                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
434                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
435                DdlCommand::AlterObjectOwner(object, owner_id) => {
436                    ctrl.alter_owner(object, owner_id).await
437                }
438                DdlCommand::AlterSetSchema(object, new_schema_id) => {
439                    ctrl.alter_set_schema(object, new_schema_id).await
440                }
441                DdlCommand::CreateConnection(connection) => {
442                    ctrl.create_connection(connection).await
443                }
444                DdlCommand::DropConnection(connection_id, drop_mode) => {
445                    ctrl.drop_connection(connection_id, drop_mode).await
446                }
447                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
448                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
449                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
450                DdlCommand::AlterNonSharedSource(source) => {
451                    ctrl.alter_non_shared_source(source).await
452                }
453                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
454                DdlCommand::CreateSubscription(subscription) => {
455                    ctrl.create_subscription(subscription).await
456                }
457                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
458                    ctrl.drop_subscription(subscription_id, drop_mode).await
459                }
460                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
461                DdlCommand::AlterDatabaseParam(database_id, param) => {
462                    ctrl.alter_database_param(database_id, param).await
463                }
464                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
465                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
466                        .await
467                }
468            }
469        }
470        .in_current_span();
471        let fut = (self.env.await_tree_reg())
472            .register(await_tree_key, await_tree_span)
473            .instrument(Box::pin(fut));
474        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
475        Ok(Some(WaitVersion {
476            catalog_version: notification_version,
477            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
478        }))
479    }
480
481    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
482        self.barrier_manager.get_ddl_progress().await
483    }
484
485    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
486        let (version, updated_db) = self
487            .metadata_manager
488            .catalog_controller
489            .create_database(database)
490            .await?;
491        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
492        self.barrier_manager
493            .update_database_barrier(
494                updated_db.database_id,
495                updated_db.barrier_interval_ms.map(|v| v as u32),
496                updated_db.checkpoint_frequency.map(|v| v as u64),
497            )
498            .await?;
499        Ok(version)
500    }
501
502    #[tracing::instrument(skip(self), level = "debug")]
503    pub async fn reschedule_streaming_job(
504        &self,
505        job_id: JobId,
506        target: ReschedulePolicy,
507        mut deferred: bool,
508    ) -> MetaResult<()> {
509        tracing::info!("altering parallelism for job {}", job_id);
510        if self.barrier_manager.check_status_running().is_err() {
511            tracing::info!(
512                "alter parallelism is set to deferred mode because the system is in recovery state"
513            );
514            deferred = true;
515        }
516
517        self.stream_manager
518            .reschedule_streaming_job(job_id, target, deferred)
519            .await
520    }
521
522    pub async fn reschedule_cdc_table_backfill(
523        &self,
524        job_id: JobId,
525        target: ReschedulePolicy,
526    ) -> MetaResult<()> {
527        tracing::info!("alter CDC table backfill parallelism");
528        if self.barrier_manager.check_status_running().is_err() {
529            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
530        }
531        self.stream_manager
532            .reschedule_cdc_table_backfill(job_id, target)
533            .await
534    }
535
536    pub async fn reschedule_fragments(
537        &self,
538        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
539    ) -> MetaResult<()> {
540        tracing::info!(
541            "altering parallelism for fragments {:?}",
542            fragment_targets.keys()
543        );
544        let fragment_targets = fragment_targets
545            .into_iter()
546            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
547            .collect();
548
549        self.stream_manager
550            .reschedule_fragments(fragment_targets)
551            .await
552    }
553
554    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
555        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
556            .await
557    }
558
559    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
560        self.metadata_manager
561            .catalog_controller
562            .create_schema(schema)
563            .await
564    }
565
566    async fn drop_schema(
567        &self,
568        schema_id: SchemaId,
569        drop_mode: DropMode,
570    ) -> MetaResult<NotificationVersion> {
571        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
572            .await
573    }
574
575    /// Shared source is handled in [`Self::create_streaming_job`]
576    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
577        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
578            .await
579            .context("failed to create source worker")?;
580
581        let (source_id, version) = self
582            .metadata_manager
583            .catalog_controller
584            .create_source(source)
585            .await?;
586        self.source_manager
587            .register_source_with_handle(source_id, handle)
588            .await;
589        Ok(version)
590    }
591
592    async fn drop_source(
593        &self,
594        source_id: SourceId,
595        drop_mode: DropMode,
596    ) -> MetaResult<NotificationVersion> {
597        self.drop_object(ObjectType::Source, source_id, drop_mode)
598            .await
599    }
600
601    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
602        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
603
604        // Get database_id for the source
605        let database_id = self
606            .metadata_manager
607            .catalog_controller
608            .get_object_database_id(source_id)
609            .await?;
610
611        self.stream_manager
612            .barrier_scheduler
613            .run_command(database_id, Command::ResetSource { source_id })
614            .await?;
615
616        // RESET SOURCE doesn't modify catalog, so return the current catalog version
617        let version = self
618            .metadata_manager
619            .catalog_controller
620            .current_notification_version()
621            .await;
622        Ok(version)
623    }
624
625    /// This replaces the source in the catalog.
626    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
627    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
628        self.metadata_manager
629            .catalog_controller
630            .alter_non_shared_source(source)
631            .await
632    }
633
634    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
635        self.metadata_manager
636            .catalog_controller
637            .create_function(function)
638            .await
639    }
640
641    async fn drop_function(
642        &self,
643        function_id: FunctionId,
644        drop_mode: DropMode,
645    ) -> MetaResult<NotificationVersion> {
646        self.drop_object(ObjectType::Function, function_id, drop_mode)
647            .await
648    }
649
650    async fn create_view(
651        &self,
652        view: View,
653        dependencies: HashSet<ObjectId>,
654    ) -> MetaResult<NotificationVersion> {
655        self.metadata_manager
656            .catalog_controller
657            .create_view(view, dependencies)
658            .await
659    }
660
661    async fn drop_view(
662        &self,
663        view_id: ViewId,
664        drop_mode: DropMode,
665    ) -> MetaResult<NotificationVersion> {
666        self.drop_object(ObjectType::View, view_id, drop_mode).await
667    }
668
669    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
670        validate_connection(&connection).await?;
671        self.metadata_manager
672            .catalog_controller
673            .create_connection(connection)
674            .await
675    }
676
677    async fn drop_connection(
678        &self,
679        connection_id: ConnectionId,
680        drop_mode: DropMode,
681    ) -> MetaResult<NotificationVersion> {
682        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
683            .await
684    }
685
686    async fn alter_database_param(
687        &self,
688        database_id: DatabaseId,
689        param: AlterDatabaseParam,
690    ) -> MetaResult<NotificationVersion> {
691        let (version, updated_db) = self
692            .metadata_manager
693            .catalog_controller
694            .alter_database_param(database_id, param)
695            .await?;
696        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
697        self.barrier_manager
698            .update_database_barrier(
699                database_id,
700                updated_db.barrier_interval_ms.map(|v| v as u32),
701                updated_db.checkpoint_frequency.map(|v| v as u64),
702            )
703            .await?;
704        Ok(version)
705    }
706
707    // The 'secret' part of the request we receive from the frontend is in plaintext;
708    // here, we need to encrypt it before storing it in the catalog.
709    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
710        let secret_store_private_key = self
711            .env
712            .opts
713            .secret_store_private_key
714            .clone()
715            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
716
717        let encrypted_payload = SecretEncryption::encrypt(
718            secret_store_private_key.as_slice(),
719            secret.get_value().as_slice(),
720        )
721        .context(format!("failed to encrypt secret {}", secret.name))?;
722        Ok(encrypted_payload
723            .serialize()
724            .context(format!("failed to serialize secret {}", secret.name))?)
725    }
726
727    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
728        // The 'secret' part of the request we receive from the frontend is in plaintext;
729        // here, we need to encrypt it before storing it in the catalog.
730        let secret_plain_payload = secret.value.clone();
731        let encrypted_payload = self.get_encrypted_payload(&secret)?;
732        secret.value = encrypted_payload;
733
734        self.metadata_manager
735            .catalog_controller
736            .create_secret(secret, secret_plain_payload)
737            .await
738    }
739
740    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
741        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
742            .await
743    }
744
745    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
746        let secret_plain_payload = secret.value.clone();
747        let encrypted_payload = self.get_encrypted_payload(&secret)?;
748        secret.value = encrypted_payload;
749        self.metadata_manager
750            .catalog_controller
751            .alter_secret(secret, secret_plain_payload)
752            .await
753    }
754
755    async fn create_subscription(
756        &self,
757        mut subscription: Subscription,
758    ) -> MetaResult<NotificationVersion> {
759        tracing::debug!("create subscription");
760        let _permit = self
761            .creating_streaming_job_permits
762            .semaphore
763            .acquire()
764            .await
765            .unwrap();
766        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
767        self.metadata_manager
768            .catalog_controller
769            .create_subscription_catalog(&mut subscription)
770            .await?;
771        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
772            tracing::debug!(error = %err.as_report(), "failed to create subscription");
773            let _ = self
774                .metadata_manager
775                .catalog_controller
776                .try_abort_creating_subscription(subscription.id)
777                .await
778                .inspect_err(|e| {
779                    tracing::error!(
780                        error = %e.as_report(),
781                        "failed to abort create subscription after failure"
782                    );
783                });
784            return Err(err);
785        }
786
787        let version = self
788            .metadata_manager
789            .catalog_controller
790            .notify_create_subscription(subscription.id)
791            .await?;
792        tracing::debug!("finish create subscription");
793        Ok(version)
794    }
795
796    async fn drop_subscription(
797        &self,
798        subscription_id: SubscriptionId,
799        drop_mode: DropMode,
800    ) -> MetaResult<NotificationVersion> {
801        tracing::debug!("preparing drop subscription");
802        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
803        let subscription = self
804            .metadata_manager
805            .catalog_controller
806            .get_subscription_by_id(subscription_id)
807            .await?;
808        let table_id = subscription.dependent_table_id;
809        let database_id = subscription.database_id;
810        let (_, version) = self
811            .metadata_manager
812            .catalog_controller
813            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
814            .await?;
815        self.stream_manager
816            .drop_subscription(database_id, subscription_id, table_id)
817            .await;
818        tracing::debug!("finish drop subscription");
819        Ok(version)
820    }
821
822    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
823    #[await_tree::instrument]
824    pub(crate) async fn validate_cdc_table(
825        &self,
826        table: &Table,
827        table_fragments: &StreamJobFragments,
828    ) -> MetaResult<()> {
829        let stream_scan_fragment = table_fragments
830            .fragments
831            .values()
832            .filter(|f| {
833                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
834                    || f.fragment_type_mask
835                        .contains(FragmentTypeFlag::StreamCdcScan)
836            })
837            .exactly_one()
838            .ok()
839            .with_context(|| {
840                format!(
841                    "expect exactly one stream scan fragment, got: {:?}",
842                    table_fragments.fragments
843                )
844            })?;
845        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
846            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
847                if let Some(o) = node.options
848                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
849                {
850                    // Use parallel CDC backfill.
851                } else {
852                    assert_eq!(
853                        stream_scan_fragment.actors.len(),
854                        1,
855                        "Stream scan fragment should have only one actor"
856                    );
857                }
858            }
859        }
860        let mut found_cdc_scan = false;
861        match &stream_scan_fragment.nodes.node_body {
862            Some(NodeBody::StreamCdcScan(_)) => {
863                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
864                if self
865                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
866                    .await?
867                {
868                    found_cdc_scan = true;
869                }
870            }
871            // When there's generated columns, the cdc scan node is wrapped in a project node
872            Some(NodeBody::Project(_)) => {
873                for input in &stream_scan_fragment.nodes.input {
874                    assert_parallelism(stream_scan_fragment, &input.node_body);
875                    if self
876                        .validate_cdc_table_inner(&input.node_body, table.id)
877                        .await?
878                    {
879                        found_cdc_scan = true;
880                    }
881                }
882            }
883            _ => {
884                bail!("Unexpected node body for stream cdc scan");
885            }
886        };
887        if !found_cdc_scan {
888            bail!("No stream cdc scan node found in stream scan fragment");
889        }
890        Ok(())
891    }
892
893    async fn validate_cdc_table_inner(
894        &self,
895        node_body: &Option<NodeBody>,
896        table_id: TableId,
897    ) -> MetaResult<bool> {
898        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
899            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
900        {
901            let options_with_secret = WithOptionsSecResolved::new(
902                cdc_table_desc.connect_properties.clone(),
903                cdc_table_desc.secret_refs.clone(),
904            );
905
906            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
907            props.init_from_pb_cdc_table_desc(cdc_table_desc);
908
909            // Try creating a split enumerator to validate
910            let _enumerator = props
911                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
912                .await?;
913
914            tracing::debug!(?table_id, "validate cdc table success");
915            Ok(true)
916        } else {
917            Ok(false)
918        }
919    }
920
921    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
922        let migrated = self
923            .metadata_manager
924            .catalog_controller
925            .has_table_been_migrated(table_id)
926            .await?;
927        if !migrated {
928            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
929        } else {
930            Ok(())
931        }
932    }
933
934    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
935    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
936    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
937    pub async fn create_streaming_job(
938        &self,
939        mut streaming_job: StreamingJob,
940        fragment_graph: StreamFragmentGraphProto,
941        dependencies: HashSet<ObjectId>,
942        resource_type: streaming_job_resource_type::ResourceType,
943        if_not_exists: bool,
944    ) -> MetaResult<NotificationVersion> {
945        if let StreamingJob::Sink(sink) = &streaming_job
946            && let Some(target_table) = sink.target_table
947        {
948            self.validate_table_for_sink(target_table).await?;
949        }
950        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
951        let specific_resource_group = resource_type.resource_group();
952        let check_ret = self
953            .metadata_manager
954            .catalog_controller
955            .create_job_catalog(
956                &mut streaming_job,
957                &ctx,
958                &fragment_graph.parallelism,
959                fragment_graph.max_parallelism as _,
960                dependencies,
961                specific_resource_group,
962                &fragment_graph.backfill_parallelism,
963            )
964            .await;
965        if let Err(meta_err) = check_ret {
966            if !if_not_exists {
967                return Err(meta_err);
968            }
969            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
970                if streaming_job.create_type() == CreateType::Foreground {
971                    let database_id = streaming_job.database_id();
972                    self.metadata_manager
973                        .wait_streaming_job_finished(database_id, *job_id)
974                        .await
975                } else {
976                    Ok(IGNORED_NOTIFICATION_VERSION)
977                }
978            } else {
979                Err(meta_err)
980            };
981        }
982        let job_id = streaming_job.id();
983        tracing::debug!(
984            id = %job_id,
985            definition = streaming_job.definition(),
986            create_type = streaming_job.create_type().as_str_name(),
987            job_type = ?streaming_job.job_type(),
988            "starting streaming job",
989        );
990        // TODO: acquire permits for recovered background DDLs.
991        let permit = self
992            .creating_streaming_job_permits
993            .semaphore
994            .clone()
995            .acquire_owned()
996            .instrument_await("acquire_creating_streaming_job_permit")
997            .await
998            .unwrap();
999        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1000
1001        let name = streaming_job.name();
1002        let definition = streaming_job.definition();
1003        let source_id = match &streaming_job {
1004            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1005            _ => None,
1006        };
1007
1008        // create streaming job.
1009        match self
1010            .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1011            .await
1012        {
1013            Ok(version) => Ok(version),
1014            Err(err) => {
1015                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1016                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1017                    id: job_id,
1018                    name,
1019                    definition,
1020                    error: err.as_report().to_string(),
1021                };
1022                self.env.event_log_manager_ref().add_event_logs(vec![
1023                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1024                ]);
1025                let (aborted, _) = self
1026                    .metadata_manager
1027                    .catalog_controller
1028                    .try_abort_creating_streaming_job(job_id, false)
1029                    .await?;
1030                if aborted {
1031                    tracing::warn!(id = %job_id, "aborted streaming job");
1032                    // FIXME: might also need other cleanup here
1033                    if let Some(source_id) = source_id {
1034                        self.source_manager
1035                            .apply_source_change(SourceChange::DropSource {
1036                                dropped_source_ids: vec![source_id],
1037                            })
1038                            .await;
1039                    }
1040                }
1041                Err(err)
1042            }
1043        }
1044    }
1045
1046    #[await_tree::instrument(boxed)]
1047    async fn create_streaming_job_inner(
1048        &self,
1049        ctx: StreamContext,
1050        mut streaming_job: StreamingJob,
1051        fragment_graph: StreamFragmentGraphProto,
1052        resource_type: streaming_job_resource_type::ResourceType,
1053        permit: OwnedSemaphorePermit,
1054    ) -> MetaResult<NotificationVersion> {
1055        let mut fragment_graph =
1056            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1057        streaming_job.set_info_from_graph(&fragment_graph);
1058
1059        // create internal table catalogs and refill table id.
1060        let incomplete_internal_tables = fragment_graph
1061            .incomplete_internal_tables()
1062            .into_values()
1063            .collect_vec();
1064        let table_id_map = self
1065            .metadata_manager
1066            .catalog_controller
1067            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1068            .await?;
1069        fragment_graph.refill_internal_table_ids(table_id_map);
1070
1071        // create fragment and actor catalogs.
1072        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1073        let (ctx, stream_job_fragments) = self
1074            .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1075            .await?;
1076
1077        let streaming_job = &ctx.streaming_job;
1078
1079        match streaming_job {
1080            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1081                self.validate_cdc_table(table, &stream_job_fragments)
1082                    .await?;
1083            }
1084            StreamingJob::Table(Some(source), ..) => {
1085                // Register the source on the connector node.
1086                self.source_manager.register_source(source).await?;
1087                let connector_name = source
1088                    .get_with_properties()
1089                    .get(UPSTREAM_SOURCE_KEY)
1090                    .cloned();
1091                let attr = source.info.as_ref().map(|source_info| {
1092                    jsonbb::json!({
1093                            "format": source_info.format().as_str_name(),
1094                            "encode": source_info.row_encode().as_str_name(),
1095                    })
1096                });
1097                report_create_object(
1098                    streaming_job.id(),
1099                    "source",
1100                    PbTelemetryDatabaseObject::Source,
1101                    connector_name,
1102                    attr,
1103                );
1104            }
1105            StreamingJob::Sink(sink) => {
1106                if sink.auto_refresh_schema_from_table.is_some() {
1107                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1108                }
1109                // Validate the sink on the connector node.
1110                validate_sink(sink).await?;
1111                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1112                let attr = sink.format_desc.as_ref().map(|sink_info| {
1113                    jsonbb::json!({
1114                        "format": sink_info.format().as_str_name(),
1115                        "encode": sink_info.encode().as_str_name(),
1116                    })
1117                });
1118                report_create_object(
1119                    streaming_job.id(),
1120                    "sink",
1121                    PbTelemetryDatabaseObject::Sink,
1122                    connector_name,
1123                    attr,
1124                );
1125            }
1126            StreamingJob::Source(source) => {
1127                // Register the source on the connector node.
1128                self.source_manager.register_source(source).await?;
1129                let connector_name = source
1130                    .get_with_properties()
1131                    .get(UPSTREAM_SOURCE_KEY)
1132                    .cloned();
1133                let attr = source.info.as_ref().map(|source_info| {
1134                    jsonbb::json!({
1135                            "format": source_info.format().as_str_name(),
1136                            "encode": source_info.row_encode().as_str_name(),
1137                    })
1138                });
1139                report_create_object(
1140                    streaming_job.id(),
1141                    "source",
1142                    PbTelemetryDatabaseObject::Source,
1143                    connector_name,
1144                    attr,
1145                );
1146            }
1147            _ => {}
1148        }
1149
1150        self.metadata_manager
1151            .catalog_controller
1152            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1153            .await?;
1154
1155        // create streaming jobs.
1156        let version = self
1157            .stream_manager
1158            .create_streaming_job(stream_job_fragments, ctx, permit)
1159            .await?;
1160
1161        Ok(version)
1162    }
1163
1164    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1165    pub async fn drop_object(
1166        &self,
1167        object_type: ObjectType,
1168        object_id: impl Into<ObjectId>,
1169        drop_mode: DropMode,
1170    ) -> MetaResult<NotificationVersion> {
1171        let object_id = object_id.into();
1172        let (release_ctx, version) = self
1173            .metadata_manager
1174            .catalog_controller
1175            .drop_object(object_type, object_id, drop_mode)
1176            .await?;
1177
1178        if object_type == ObjectType::Source {
1179            self.env
1180                .notification_manager_ref()
1181                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1182        }
1183
1184        let ReleaseContext {
1185            database_id,
1186            removed_streaming_job_ids,
1187            removed_state_table_ids,
1188            removed_source_ids,
1189            removed_secret_ids: secret_ids,
1190            removed_source_fragments,
1191            removed_actors,
1192            removed_fragments,
1193            removed_sink_fragment_by_targets,
1194            removed_iceberg_table_sinks,
1195        } = release_ctx;
1196
1197        let _guard = self.source_manager.pause_tick().await;
1198        self.stream_manager
1199            .drop_streaming_jobs(
1200                database_id,
1201                removed_actors.iter().map(|id| *id as _).collect(),
1202                removed_streaming_job_ids,
1203                removed_state_table_ids,
1204                removed_fragments.iter().map(|id| *id as _).collect(),
1205                removed_sink_fragment_by_targets
1206                    .into_iter()
1207                    .map(|(target, sinks)| {
1208                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1209                    })
1210                    .collect(),
1211            )
1212            .await;
1213
1214        // clean up sources after dropping streaming jobs.
1215        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1216        self.source_manager
1217            .apply_source_change(SourceChange::DropSource {
1218                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1219            })
1220            .await;
1221
1222        // unregister fragments and actors from source manager.
1223        // FIXME: need also unregister source backfill fragments.
1224        let dropped_source_fragments = removed_source_fragments;
1225        self.source_manager
1226            .apply_source_change(SourceChange::DropMv {
1227                dropped_source_fragments,
1228            })
1229            .await;
1230
1231        // clean up iceberg table sinks
1232        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1233            .iter()
1234            .map(|sink| sink.id)
1235            .collect();
1236
1237        for sink in removed_iceberg_table_sinks {
1238            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1239                .expect("Iceberg sink should be valid");
1240            let iceberg_sink =
1241                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1242            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1243                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1244                tracing::info!(
1245                    "dropping iceberg table {} for dropped sink",
1246                    table_identifier
1247                );
1248
1249                let _ = iceberg_catalog
1250                    .drop_table(&table_identifier)
1251                    .await
1252                    .inspect_err(|err| {
1253                        tracing::error!(
1254                            "failed to drop iceberg table {} during cleanup: {}",
1255                            table_identifier,
1256                            err.as_report()
1257                        );
1258                    });
1259            }
1260        }
1261
1262        // stop sink coordinators for iceberg table sinks
1263        if !iceberg_sink_ids.is_empty() {
1264            self.sink_manager
1265                .stop_sink_coordinator(iceberg_sink_ids)
1266                .await;
1267        }
1268
1269        // remove secrets.
1270        for secret in secret_ids {
1271            LocalSecretManager::global().remove_secret(secret);
1272        }
1273        Ok(version)
1274    }
1275
1276    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1277    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1278    pub async fn replace_job(
1279        &self,
1280        mut streaming_job: StreamingJob,
1281        fragment_graph: StreamFragmentGraphProto,
1282    ) -> MetaResult<NotificationVersion> {
1283        match &streaming_job {
1284            StreamingJob::Table(..)
1285            | StreamingJob::Source(..)
1286            | StreamingJob::MaterializedView(..) => {}
1287            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1288                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1289            }
1290        }
1291
1292        let job_id = streaming_job.id();
1293
1294        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1295        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1296
1297        // Ensure the max parallelism unchanged before replacing table.
1298        let original_max_parallelism = self
1299            .metadata_manager
1300            .get_job_max_parallelism(streaming_job.id())
1301            .await?;
1302        let fragment_graph = PbStreamFragmentGraph {
1303            max_parallelism: original_max_parallelism as _,
1304            ..fragment_graph
1305        };
1306
1307        // 1. build fragment graph.
1308        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1309        streaming_job.set_info_from_graph(&fragment_graph);
1310
1311        // make it immutable
1312        let streaming_job = streaming_job;
1313
1314        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1315            let auto_refresh_schema_sinks = self
1316                .metadata_manager
1317                .catalog_controller
1318                .get_sink_auto_refresh_schema_from(table.id)
1319                .await?;
1320            if !auto_refresh_schema_sinks.is_empty() {
1321                let original_table_columns = self
1322                    .metadata_manager
1323                    .catalog_controller
1324                    .get_table_columns(table.id)
1325                    .await?;
1326                // compare column id to find newly added columns
1327                let mut original_table_column_ids: HashSet<_> = original_table_columns
1328                    .iter()
1329                    .map(|col| col.column_id())
1330                    .collect();
1331                let newly_added_columns = table
1332                    .columns
1333                    .iter()
1334                    .filter(|col| {
1335                        !original_table_column_ids.remove(&ColumnId::new(
1336                            col.column_desc.as_ref().unwrap().column_id as _,
1337                        ))
1338                    })
1339                    .map(|col| ColumnCatalog::from(col.clone()))
1340                    .collect_vec();
1341                if !original_table_column_ids.is_empty() {
1342                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1343                }
1344                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1345                for sink in auto_refresh_schema_sinks {
1346                    let sink_job_fragments = self
1347                        .metadata_manager
1348                        .get_job_fragments_by_id(sink.id.as_job_id())
1349                        .await?;
1350                    if sink_job_fragments.fragments.len() != 1 {
1351                        return Err(anyhow!(
1352                            "auto schema refresh sink must have only one fragment, but got {}",
1353                            sink_job_fragments.fragments.len()
1354                        )
1355                        .into());
1356                    }
1357                    let original_sink_fragment =
1358                        sink_job_fragments.fragments.into_values().next().unwrap();
1359                    let (new_sink_fragment, new_schema, new_log_store_table) =
1360                        rewrite_refresh_schema_sink_fragment(
1361                            &original_sink_fragment,
1362                            &sink,
1363                            &newly_added_columns,
1364                            table,
1365                            fragment_graph.table_fragment_id(),
1366                            self.env.id_gen_manager(),
1367                            self.env.actor_id_generator(),
1368                        )?;
1369
1370                    assert_eq!(
1371                        original_sink_fragment.actors.len(),
1372                        new_sink_fragment.actors.len()
1373                    );
1374                    let actor_status = (0..original_sink_fragment.actors.len())
1375                        .map(|i| {
1376                            let worker_node_id = sink_job_fragments.actor_status
1377                                [&original_sink_fragment.actors[i].actor_id]
1378                                .location
1379                                .as_ref()
1380                                .unwrap()
1381                                .worker_node_id;
1382                            (
1383                                new_sink_fragment.actors[i].actor_id,
1384                                PbActorStatus {
1385                                    location: Some(PbActorLocation { worker_node_id }),
1386                                },
1387                            )
1388                        })
1389                        .collect();
1390
1391                    let streaming_job = StreamingJob::Sink(sink);
1392
1393                    let tmp_sink_id = self
1394                        .metadata_manager
1395                        .catalog_controller
1396                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1397                        .await?
1398                        .as_sink_id();
1399                    let StreamingJob::Sink(sink) = streaming_job else {
1400                        unreachable!()
1401                    };
1402
1403                    sinks.push(AutoRefreshSchemaSinkContext {
1404                        tmp_sink_id,
1405                        original_sink: sink,
1406                        original_fragment: original_sink_fragment,
1407                        new_schema,
1408                        newly_add_fields: newly_added_columns
1409                            .iter()
1410                            .map(|col| Field::from(&col.column_desc))
1411                            .collect(),
1412                        new_fragment: new_sink_fragment,
1413                        new_log_store_table,
1414                        actor_status,
1415                    });
1416                }
1417                Some(sinks)
1418            } else {
1419                None
1420            }
1421        } else {
1422            None
1423        };
1424
1425        let tmp_id = self
1426            .metadata_manager
1427            .catalog_controller
1428            .create_job_catalog_for_replace(
1429                &streaming_job,
1430                Some(&ctx),
1431                fragment_graph.specified_parallelism().as_ref(),
1432                Some(fragment_graph.max_parallelism()),
1433            )
1434            .await?;
1435
1436        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1437            sinks
1438                .iter()
1439                .map(|sink| sink.tmp_sink_id.as_object_id())
1440                .collect_vec()
1441        });
1442
1443        tracing::debug!(id = %job_id, "building replace streaming job");
1444        let mut updated_sink_catalogs = vec![];
1445
1446        let mut drop_table_connector_ctx = None;
1447        let result: MetaResult<_> = try {
1448            let (mut ctx, mut stream_job_fragments) = self
1449                .build_replace_job(
1450                    ctx,
1451                    &streaming_job,
1452                    fragment_graph,
1453                    tmp_id,
1454                    auto_refresh_schema_sinks,
1455                )
1456                .await?;
1457            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1458            let auto_refresh_schema_sink_finish_ctx =
1459                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1460                    sinks
1461                        .iter()
1462                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1463                            tmp_sink_id: sink.tmp_sink_id,
1464                            original_sink_id: sink.original_sink.id,
1465                            columns: sink.new_schema.clone(),
1466                            new_log_store_table: sink
1467                                .new_log_store_table
1468                                .as_ref()
1469                                .map(|table| (table.id, table.columns.clone())),
1470                        })
1471                        .collect()
1472                });
1473
1474            // Handle table that has incoming sinks.
1475            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1476                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1477                let upstream_infos = self
1478                    .metadata_manager
1479                    .catalog_controller
1480                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1481                    .await?;
1482                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1483
1484                for upstream_info in &upstream_infos {
1485                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1486                    ctx.upstream_fragment_downstreams
1487                        .entry(upstream_fragment_id)
1488                        .or_default()
1489                        .push(upstream_info.new_sink_downstream.clone());
1490                    if upstream_info.sink_original_target_columns.is_empty() {
1491                        updated_sink_catalogs.push(upstream_info.sink_id);
1492                    }
1493                }
1494            }
1495
1496            let replace_upstream = ctx.replace_upstream.clone();
1497
1498            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1499                let empty_downstreams = FragmentDownstreamRelation::default();
1500                for sink in sinks {
1501                    self.metadata_manager
1502                        .catalog_controller
1503                        .prepare_streaming_job(
1504                            sink.tmp_sink_id.as_job_id(),
1505                            || [&sink.new_fragment].into_iter(),
1506                            &empty_downstreams,
1507                            true,
1508                            None,
1509                        )
1510                        .await?;
1511                }
1512            }
1513
1514            self.metadata_manager
1515                .catalog_controller
1516                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1517                .await?;
1518
1519            self.stream_manager
1520                .replace_stream_job(stream_job_fragments, ctx)
1521                .await?;
1522            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1523        };
1524
1525        match result {
1526            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1527                let version = self
1528                    .metadata_manager
1529                    .catalog_controller
1530                    .finish_replace_streaming_job(
1531                        tmp_id,
1532                        streaming_job,
1533                        replace_upstream,
1534                        SinkIntoTableContext {
1535                            updated_sink_catalogs,
1536                        },
1537                        drop_table_connector_ctx.as_ref(),
1538                        auto_refresh_schema_sink_finish_ctx,
1539                    )
1540                    .await?;
1541                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1542                    self.source_manager
1543                        .apply_source_change(SourceChange::DropSource {
1544                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1545                        })
1546                        .await;
1547                }
1548                Ok(version)
1549            }
1550            Err(err) => {
1551                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1552                let _ = self.metadata_manager
1553                    .catalog_controller
1554                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1555                    .await.inspect_err(|err| {
1556                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1557                });
1558                Err(err)
1559            }
1560        }
1561    }
1562
1563    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1564    )]
1565    async fn drop_streaming_job(
1566        &self,
1567        job_id: StreamingJobId,
1568        drop_mode: DropMode,
1569    ) -> MetaResult<NotificationVersion> {
1570        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1571
1572        let (object_id, object_type) = match job_id {
1573            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1574            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1575            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1576            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1577        };
1578
1579        let job_status = self
1580            .metadata_manager
1581            .catalog_controller
1582            .get_streaming_job_status(job_id.id())
1583            .await?;
1584        let version = match job_status {
1585            JobStatus::Initial => {
1586                unreachable!(
1587                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1588                );
1589            }
1590            JobStatus::Creating => {
1591                self.stream_manager
1592                    .cancel_streaming_jobs(vec![job_id.id()])
1593                    .await?;
1594                IGNORED_NOTIFICATION_VERSION
1595            }
1596            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1597        };
1598
1599        Ok(version)
1600    }
1601
1602    /// Resolve the parallelism of the stream job based on the given information.
1603    ///
1604    /// Returns error if user specifies a parallelism that cannot be satisfied.
1605    fn resolve_stream_parallelism(
1606        &self,
1607        specified: Option<NonZeroUsize>,
1608        max: NonZeroUsize,
1609        cluster_info: &StreamingClusterInfo,
1610        resource_group: String,
1611    ) -> MetaResult<NonZeroUsize> {
1612        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1613        DdlController::resolve_stream_parallelism_inner(
1614            specified,
1615            max,
1616            available,
1617            &self.env.opts.default_parallelism,
1618            &resource_group,
1619        )
1620    }
1621
1622    fn resolve_stream_parallelism_inner(
1623        specified: Option<NonZeroUsize>,
1624        max: NonZeroUsize,
1625        available: Option<NonZeroUsize>,
1626        default_parallelism: &DefaultParallelism,
1627        resource_group: &str,
1628    ) -> MetaResult<NonZeroUsize> {
1629        let Some(available) = available else {
1630            bail_unavailable!(
1631                "no available slots to schedule in resource group \"{}\", \
1632                 have you allocated any compute nodes within this resource group?",
1633                resource_group
1634            );
1635        };
1636
1637        if let Some(specified) = specified {
1638            if specified > max {
1639                bail_invalid_parameter!(
1640                    "specified parallelism {} should not exceed max parallelism {}",
1641                    specified,
1642                    max,
1643                );
1644            }
1645            if specified > available {
1646                tracing::warn!(
1647                    resource_group,
1648                    specified_parallelism = specified.get(),
1649                    available_parallelism = available.get(),
1650                    "specified parallelism exceeds available slots, scheduling with specified value",
1651                );
1652            }
1653            return Ok(specified);
1654        }
1655
1656        // Use default parallelism when no specific parallelism is provided by the user.
1657        let default_parallelism = match default_parallelism {
1658            DefaultParallelism::Full => available,
1659            DefaultParallelism::Default(num) => {
1660                if *num > available {
1661                    tracing::warn!(
1662                        resource_group,
1663                        configured_parallelism = num.get(),
1664                        available_parallelism = available.get(),
1665                        "default parallelism exceeds available slots, scheduling with configured value",
1666                    );
1667                }
1668                *num
1669            }
1670        };
1671
1672        if default_parallelism > max {
1673            tracing::warn!(
1674                max_parallelism = max.get(),
1675                resource_group,
1676                "default parallelism exceeds max parallelism, capping to max",
1677            );
1678        }
1679        Ok(default_parallelism.min(max))
1680    }
1681
1682    /// Builds the actor graph:
1683    /// - Add the upstream fragments to the fragment graph
1684    /// - Schedule the fragments based on their distribution
1685    /// - Expand each fragment into one or several actors
1686    /// - Construct the fragment level backfill order control.
1687    #[await_tree::instrument]
1688    pub(crate) async fn build_stream_job(
1689        &self,
1690        stream_ctx: StreamContext,
1691        mut stream_job: StreamingJob,
1692        fragment_graph: StreamFragmentGraph,
1693        resource_type: streaming_job_resource_type::ResourceType,
1694    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1695        let id = stream_job.id();
1696        let specified_parallelism = fragment_graph.specified_parallelism();
1697        let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1698        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1699
1700        // 1. Fragment Level ordering graph
1701        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1702
1703        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1704        // contains all information needed for building the actor graph.
1705
1706        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1707            fragment_graph.collect_snapshot_backfill_info()?;
1708        assert!(
1709            snapshot_backfill_info
1710                .iter()
1711                .chain([&cross_db_snapshot_backfill_info])
1712                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1713                .all(|backfill_epoch| backfill_epoch.is_none()),
1714            "should not set backfill epoch when initially build the job: {:?} {:?}",
1715            snapshot_backfill_info,
1716            cross_db_snapshot_backfill_info
1717        );
1718
1719        let locality_fragment_state_table_mapping =
1720            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1721
1722        // check if log store exists for all cross-db upstreams
1723        self.metadata_manager
1724            .catalog_controller
1725            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1726            .await?;
1727
1728        let upstream_table_ids = fragment_graph
1729            .dependent_table_ids()
1730            .iter()
1731            .filter(|id| {
1732                !cross_db_snapshot_backfill_info
1733                    .upstream_mv_table_id_to_backfill_epoch
1734                    .contains_key(id)
1735            })
1736            .cloned()
1737            .collect();
1738
1739        let (upstream_root_fragments, existing_actor_location) = self
1740            .metadata_manager
1741            .get_upstream_root_fragments(&upstream_table_ids)
1742            .await?;
1743
1744        if snapshot_backfill_info.is_some() {
1745            match stream_job {
1746                StreamingJob::MaterializedView(_)
1747                | StreamingJob::Sink(_)
1748                | StreamingJob::Index(_, _) => {}
1749                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1750                    return Err(
1751                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1752                    );
1753                }
1754            }
1755        }
1756
1757        let upstream_actors = upstream_root_fragments
1758            .values()
1759            .map(|(fragment, _)| {
1760                (
1761                    fragment.fragment_id,
1762                    fragment.actors.keys().copied().collect(),
1763                )
1764            })
1765            .collect();
1766
1767        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1768            fragment_graph,
1769            FragmentGraphUpstreamContext {
1770                upstream_root_fragments,
1771                upstream_actor_location: existing_actor_location,
1772            },
1773            (&stream_job).into(),
1774        )?;
1775        let resource_group = if let Some(group) = resource_type.resource_group() {
1776            group
1777        } else {
1778            self.metadata_manager
1779                .get_database_resource_group(stream_job.database_id())
1780                .await?
1781        };
1782        let is_serverless_backfill = matches!(
1783            &resource_type,
1784            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1785        );
1786
1787        // 3. Build the actor graph.
1788        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1789
1790        let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1791        let parallelism = self.resolve_stream_parallelism(
1792            initial_parallelism,
1793            max_parallelism,
1794            &cluster_info,
1795            resource_group.clone(),
1796        )?;
1797
1798        let parallelism = self
1799            .env
1800            .system_params_reader()
1801            .await
1802            .adaptive_parallelism_strategy()
1803            .compute_target_parallelism(parallelism.get());
1804
1805        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1806        let actor_graph_builder = ActorGraphBuilder::new(
1807            id,
1808            resource_group,
1809            complete_graph,
1810            cluster_info,
1811            parallelism,
1812        )?;
1813
1814        let ActorGraphBuildResult {
1815            graph,
1816            downstream_fragment_relations,
1817            building_locations,
1818            upstream_fragment_downstreams,
1819            new_no_shuffle,
1820            replace_upstream,
1821            ..
1822        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1823        assert!(replace_upstream.is_empty());
1824
1825        // 4. Build the table fragments structure that will be persisted in the stream manager,
1826        // and the context that contains all information needed for building the
1827        // actors on the compute nodes.
1828
1829        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1830        // Otherwise, it defaults to FIXED based on deduction.
1831        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1832            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1833            _ => TableParallelism::Fixed(parallelism.get()),
1834        };
1835
1836        let stream_job_fragments = StreamJobFragments::new(
1837            id,
1838            graph,
1839            &building_locations.actor_locations,
1840            stream_ctx.clone(),
1841            table_parallelism,
1842            max_parallelism.get(),
1843        );
1844
1845        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1846            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1847        }
1848
1849        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1850            && let Ok(table_id) = sink.get_target_table()
1851        {
1852            let tables = self
1853                .metadata_manager
1854                .get_table_catalog_by_ids(&[*table_id])
1855                .await?;
1856            let target_table = tables
1857                .first()
1858                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1859            let sink_fragment = stream_job_fragments
1860                .sink_fragment()
1861                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1862            let mview_fragment_id = self
1863                .metadata_manager
1864                .catalog_controller
1865                .get_mview_fragment_by_id(table_id.as_job_id())
1866                .await?;
1867            let upstream_sink_info = build_upstream_sink_info(
1868                sink,
1869                sink_fragment.fragment_id as _,
1870                target_table,
1871                mview_fragment_id,
1872            )?;
1873            Some(upstream_sink_info)
1874        } else {
1875            None
1876        };
1877
1878        let mut cdc_table_snapshot_splits = None;
1879        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1880            && let Some((_, stream_cdc_scan)) =
1881                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1882        {
1883            {
1884                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1885                let splits = try_init_parallel_cdc_table_snapshot_splits(
1886                    table.id,
1887                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1888                    self.env.meta_store_ref(),
1889                    stream_cdc_scan.options.as_ref().unwrap(),
1890                    self.env.opts.cdc_table_split_init_insert_batch_size,
1891                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1892                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1893                )
1894                .await?;
1895                cdc_table_snapshot_splits = Some(splits);
1896            }
1897        }
1898
1899        let ctx = CreateStreamingJobContext {
1900            upstream_fragment_downstreams,
1901            new_no_shuffle,
1902            upstream_actors,
1903            building_locations,
1904            definition: stream_job.definition(),
1905            create_type: stream_job.create_type(),
1906            job_type: (&stream_job).into(),
1907            streaming_job: stream_job,
1908            new_upstream_sink,
1909            option: CreateStreamingJobOption {},
1910            snapshot_backfill_info,
1911            cross_db_snapshot_backfill_info,
1912            fragment_backfill_ordering,
1913            locality_fragment_state_table_mapping,
1914            cdc_table_snapshot_splits,
1915            is_serverless_backfill,
1916        };
1917
1918        Ok((
1919            ctx,
1920            StreamJobFragmentsToCreate {
1921                inner: stream_job_fragments,
1922                downstreams: downstream_fragment_relations,
1923            },
1924        ))
1925    }
1926
1927    /// `build_replace_table` builds a job replacement and returns the context and new job
1928    /// fragments.
1929    ///
1930    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1931    /// replacement is finished.
1932    pub(crate) async fn build_replace_job(
1933        &self,
1934        stream_ctx: StreamContext,
1935        stream_job: &StreamingJob,
1936        mut fragment_graph: StreamFragmentGraph,
1937        tmp_job_id: JobId,
1938        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1939    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1940        match &stream_job {
1941            StreamingJob::Table(..)
1942            | StreamingJob::Source(..)
1943            | StreamingJob::MaterializedView(..) => {}
1944            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1945                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1946            }
1947        }
1948
1949        let id = stream_job.id();
1950
1951        // check if performing drop table connector
1952        let mut drop_table_associated_source_id = None;
1953        if let StreamingJob::Table(None, _, _) = &stream_job {
1954            drop_table_associated_source_id = self
1955                .metadata_manager
1956                .get_table_associated_source_id(id.as_mv_table_id())
1957                .await?;
1958        }
1959
1960        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1961        let old_internal_table_ids = old_fragments.internal_table_ids();
1962
1963        // handle drop table's associated source
1964        let mut drop_table_connector_ctx = None;
1965        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1966            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1967            debug_assert!(old_internal_table_ids.len() == 1);
1968
1969            drop_table_connector_ctx = Some(DropTableConnectorContext {
1970                // we do not remove the original table catalog as it's still needed for the streaming job
1971                // just need to remove the ref to the state table
1972                to_change_streaming_job_id: id,
1973                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1974                to_remove_source_id,
1975            });
1976        } else if stream_job.is_materialized_view() {
1977            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1978            // but more robust.
1979            let old_fragments_upstreams = self
1980                .metadata_manager
1981                .catalog_controller
1982                .upstream_fragments(old_fragments.fragment_ids())
1983                .await?;
1984
1985            let old_state_graph =
1986                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1987            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1988            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1989                .context("incompatible altering on the streaming job states")?;
1990
1991            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1992            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1993        } else {
1994            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
1995            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
1996            let old_internal_tables = self
1997                .metadata_manager
1998                .get_table_catalog_by_ids(&old_internal_table_ids)
1999                .await?;
2000            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2001        }
2002
2003        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2004        // graph that contains all information needed for building the actor graph.
2005        let original_root_fragment = old_fragments
2006            .root_fragment()
2007            .expect("root fragment not found");
2008
2009        let job_type = StreamingJobType::from(stream_job);
2010
2011        // Extract the downstream fragments from the fragment graph.
2012        let (mut downstream_fragments, mut downstream_actor_location) =
2013            self.metadata_manager.get_downstream_fragments(id).await?;
2014
2015        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2016            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2017                .iter()
2018                .map(|sink| sink.original_fragment.fragment_id)
2019                .collect();
2020            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2021                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2022                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2023                }) {
2024                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2025                    for actor_id in downstream_fragment.actors.keys() {
2026                        downstream_actor_location.remove(actor_id);
2027                    }
2028                    for (actor_id, status) in &sink.actor_status {
2029                        downstream_actor_location
2030                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2031                    }
2032
2033                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2034                    *nodes = sink.new_fragment.nodes.clone();
2035                }
2036            }
2037            assert!(remaining_fragment.is_empty());
2038        }
2039
2040        // build complete graph based on the table job type
2041        let complete_graph = match &job_type {
2042            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2043                CompleteStreamFragmentGraph::with_downstreams(
2044                    fragment_graph,
2045                    FragmentGraphDownstreamContext {
2046                        original_root_fragment_id: original_root_fragment.fragment_id,
2047                        downstream_fragments,
2048                        downstream_actor_location,
2049                    },
2050                    job_type,
2051                )?
2052            }
2053            StreamingJobType::Table(TableJobType::SharedCdcSource)
2054            | StreamingJobType::MaterializedView => {
2055                // CDC tables or materialized views can have upstream jobs as well.
2056                let (upstream_root_fragments, upstream_actor_location) = self
2057                    .metadata_manager
2058                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2059                    .await?;
2060
2061                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2062                    fragment_graph,
2063                    FragmentGraphUpstreamContext {
2064                        upstream_root_fragments,
2065                        upstream_actor_location,
2066                    },
2067                    FragmentGraphDownstreamContext {
2068                        original_root_fragment_id: original_root_fragment.fragment_id,
2069                        downstream_fragments,
2070                        downstream_actor_location,
2071                    },
2072                    job_type,
2073                )?
2074            }
2075            _ => unreachable!(),
2076        };
2077
2078        let resource_group = self
2079            .metadata_manager
2080            .get_existing_job_resource_group(id)
2081            .await?;
2082
2083        // 2. Build the actor graph.
2084        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2085
2086        // XXX: what is this parallelism?
2087        // Is it "assigned parallelism"?
2088        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2089            .expect("The number of actors in the original table fragment should be greater than 0");
2090
2091        let actor_graph_builder = ActorGraphBuilder::new(
2092            id,
2093            resource_group,
2094            complete_graph,
2095            cluster_info,
2096            parallelism,
2097        )?;
2098
2099        let ActorGraphBuildResult {
2100            graph,
2101            downstream_fragment_relations,
2102            building_locations,
2103            upstream_fragment_downstreams,
2104            mut replace_upstream,
2105            new_no_shuffle,
2106            ..
2107        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2108
2109        // general table & source does not have upstream job, so the dispatchers should be empty
2110        if matches!(
2111            job_type,
2112            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2113        ) {
2114            assert!(upstream_fragment_downstreams.is_empty());
2115        }
2116
2117        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2118        // the context that contains all information needed for building the actors on the compute
2119        // nodes.
2120        let stream_job_fragments = StreamJobFragments::new(
2121            tmp_job_id,
2122            graph,
2123            &building_locations.actor_locations,
2124            stream_ctx,
2125            old_fragments.assigned_parallelism,
2126            old_fragments.max_parallelism,
2127        );
2128
2129        if let Some(sinks) = &auto_refresh_schema_sinks {
2130            for sink in sinks {
2131                replace_upstream
2132                    .remove(&sink.new_fragment.fragment_id)
2133                    .expect("should exist");
2134            }
2135        }
2136
2137        // Note: no need to set `vnode_count` as it's already set by the frontend.
2138        // See `get_replace_table_plan`.
2139
2140        let ctx = ReplaceStreamJobContext {
2141            old_fragments,
2142            replace_upstream,
2143            new_no_shuffle,
2144            upstream_fragment_downstreams,
2145            building_locations,
2146            streaming_job: stream_job.clone(),
2147            tmp_id: tmp_job_id,
2148            drop_table_connector_ctx,
2149            auto_refresh_schema_sinks,
2150        };
2151
2152        Ok((
2153            ctx,
2154            StreamJobFragmentsToCreate {
2155                inner: stream_job_fragments,
2156                downstreams: downstream_fragment_relations,
2157            },
2158        ))
2159    }
2160
2161    async fn alter_name(
2162        &self,
2163        relation: alter_name_request::Object,
2164        new_name: &str,
2165    ) -> MetaResult<NotificationVersion> {
2166        let (obj_type, id) = match relation {
2167            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2168            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2169            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2170            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2171            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2172            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2173            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2174            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2175        };
2176        self.metadata_manager
2177            .catalog_controller
2178            .alter_name(obj_type, id, new_name)
2179            .await
2180    }
2181
2182    async fn alter_swap_rename(
2183        &self,
2184        object: alter_swap_rename_request::Object,
2185    ) -> MetaResult<NotificationVersion> {
2186        let (obj_type, src_id, dst_id) = match object {
2187            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2188            alter_swap_rename_request::Object::Table(objs) => {
2189                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2190                (ObjectType::Table, src_id, dst_id)
2191            }
2192            alter_swap_rename_request::Object::View(objs) => {
2193                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2194                (ObjectType::View, src_id, dst_id)
2195            }
2196            alter_swap_rename_request::Object::Source(objs) => {
2197                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2198                (ObjectType::Source, src_id, dst_id)
2199            }
2200            alter_swap_rename_request::Object::Sink(objs) => {
2201                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2202                (ObjectType::Sink, src_id, dst_id)
2203            }
2204            alter_swap_rename_request::Object::Subscription(objs) => {
2205                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2206                (ObjectType::Subscription, src_id, dst_id)
2207            }
2208        };
2209
2210        self.metadata_manager
2211            .catalog_controller
2212            .alter_swap_rename(obj_type, src_id, dst_id)
2213            .await
2214    }
2215
2216    async fn alter_owner(
2217        &self,
2218        object: Object,
2219        owner_id: UserId,
2220    ) -> MetaResult<NotificationVersion> {
2221        let (obj_type, id) = match object {
2222            Object::TableId(id) => (ObjectType::Table, id),
2223            Object::ViewId(id) => (ObjectType::View, id),
2224            Object::SourceId(id) => (ObjectType::Source, id),
2225            Object::SinkId(id) => (ObjectType::Sink, id),
2226            Object::SchemaId(id) => (ObjectType::Schema, id),
2227            Object::DatabaseId(id) => (ObjectType::Database, id),
2228            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2229            Object::ConnectionId(id) => (ObjectType::Connection, id),
2230        };
2231        self.metadata_manager
2232            .catalog_controller
2233            .alter_owner(obj_type, id.into(), owner_id as _)
2234            .await
2235    }
2236
2237    async fn alter_set_schema(
2238        &self,
2239        object: alter_set_schema_request::Object,
2240        new_schema_id: SchemaId,
2241    ) -> MetaResult<NotificationVersion> {
2242        let (obj_type, id) = match object {
2243            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2244            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2245            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2246            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2247            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2248            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2249            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2250        };
2251        self.metadata_manager
2252            .catalog_controller
2253            .alter_schema(obj_type, id.into(), new_schema_id as _)
2254            .await
2255    }
2256
2257    pub async fn wait(&self) -> MetaResult<()> {
2258        let timeout_ms = 30 * 60 * 1000;
2259        for _ in 0..timeout_ms {
2260            if self
2261                .metadata_manager
2262                .catalog_controller
2263                .list_background_creating_jobs(true, None)
2264                .await?
2265                .is_empty()
2266            {
2267                return Ok(());
2268            }
2269
2270            sleep(Duration::from_millis(1)).await;
2271        }
2272        Err(MetaError::cancelled(format!(
2273            "timeout after {timeout_ms}ms"
2274        )))
2275    }
2276
2277    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2278        self.metadata_manager
2279            .catalog_controller
2280            .comment_on(comment)
2281            .await
2282    }
2283
2284    async fn alter_streaming_job_config(
2285        &self,
2286        job_id: JobId,
2287        entries_to_add: HashMap<String, String>,
2288        keys_to_remove: Vec<String>,
2289    ) -> MetaResult<NotificationVersion> {
2290        self.metadata_manager
2291            .catalog_controller
2292            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2293            .await
2294    }
2295}
2296
2297fn report_create_object(
2298    job_id: JobId,
2299    event_name: &str,
2300    obj_type: PbTelemetryDatabaseObject,
2301    connector_name: Option<String>,
2302    attr_info: Option<jsonbb::Value>,
2303) {
2304    report_event(
2305        PbTelemetryEventStage::CreateStreamJob,
2306        event_name,
2307        job_id.as_raw_id() as _,
2308        connector_name,
2309        Some(obj_type),
2310        attr_info,
2311    );
2312}
2313
2314pub fn build_upstream_sink_info(
2315    sink: &PbSink,
2316    sink_fragment_id: FragmentId,
2317    target_table: &PbTable,
2318    target_fragment_id: FragmentId,
2319) -> MetaResult<UpstreamSinkInfo> {
2320    let sink_columns = if !sink.original_target_columns.is_empty() {
2321        sink.original_target_columns.clone()
2322    } else {
2323        // This is due to the fact that the value did not exist in earlier versions,
2324        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2325        // Therefore the columns of the table at this point are `original_target_columns`.
2326        // This value of sink will be filled on the meta.
2327        target_table.columns.clone()
2328    };
2329
2330    let sink_output_fields = sink_columns
2331        .iter()
2332        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2333        .collect_vec();
2334    let output_indices = (0..sink_output_fields.len())
2335        .map(|i| i as u32)
2336        .collect_vec();
2337
2338    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2339        let sink_idx_by_col_id = sink_columns
2340            .iter()
2341            .enumerate()
2342            .map(|(idx, col)| {
2343                let column_id = col.column_desc.as_ref().unwrap().column_id;
2344                (column_id, idx as u32)
2345            })
2346            .collect::<HashMap<_, _>>();
2347        target_table
2348            .distribution_key
2349            .iter()
2350            .map(|dist_idx| {
2351                let column_id = target_table.columns[*dist_idx as usize]
2352                    .column_desc
2353                    .as_ref()
2354                    .unwrap()
2355                    .column_id;
2356                let sink_idx = sink_idx_by_col_id
2357                    .get(&column_id)
2358                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2359                Ok(*sink_idx)
2360            })
2361            .collect::<anyhow::Result<Vec<_>>>()?
2362    };
2363    let dist_key_indices =
2364        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2365    let downstream_fragment_id = target_fragment_id as _;
2366    let new_downstream_relation = DownstreamFragmentRelation {
2367        downstream_fragment_id,
2368        dispatcher_type: DispatcherType::Hash,
2369        dist_key_indices,
2370        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2371    };
2372    let current_target_columns = target_table.get_columns();
2373    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2374    Ok(UpstreamSinkInfo {
2375        sink_id: sink.id,
2376        sink_fragment_id: sink_fragment_id as _,
2377        sink_output_fields,
2378        sink_original_target_columns: sink.get_original_target_columns().clone(),
2379        project_exprs,
2380        new_sink_downstream: new_downstream_relation,
2381    })
2382}
2383
2384pub fn refill_upstream_sink_union_in_table(
2385    union_fragment_root: &mut PbStreamNode,
2386    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2387) {
2388    visit_stream_node_cont_mut(union_fragment_root, |node| {
2389        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2390            let init_upstreams = upstream_sink_infos
2391                .iter()
2392                .map(|info| PbUpstreamSinkInfo {
2393                    upstream_fragment_id: info.sink_fragment_id,
2394                    sink_output_schema: info.sink_output_fields.clone(),
2395                    project_exprs: info.project_exprs.clone(),
2396                })
2397                .collect();
2398            upstream_sink_union.init_upstreams = init_upstreams;
2399            false
2400        } else {
2401            true
2402        }
2403    });
2404}
2405
2406#[cfg(test)]
2407mod tests {
2408    use std::num::NonZeroUsize;
2409
2410    use super::*;
2411
2412    #[test]
2413    fn test_specified_parallelism_exceeds_available() {
2414        let result = DdlController::resolve_stream_parallelism_inner(
2415            Some(NonZeroUsize::new(100).unwrap()),
2416            NonZeroUsize::new(256).unwrap(),
2417            Some(NonZeroUsize::new(4).unwrap()),
2418            &DefaultParallelism::Full,
2419            "default",
2420        )
2421        .unwrap();
2422        assert_eq!(result.get(), 100);
2423    }
2424
2425    #[test]
2426    fn test_allows_default_parallelism_over_available() {
2427        let result = DdlController::resolve_stream_parallelism_inner(
2428            None,
2429            NonZeroUsize::new(256).unwrap(),
2430            Some(NonZeroUsize::new(4).unwrap()),
2431            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2432            "default",
2433        )
2434        .unwrap();
2435        assert_eq!(result.get(), 50);
2436    }
2437
2438    #[test]
2439    fn test_full_parallelism_capped_by_max() {
2440        let result = DdlController::resolve_stream_parallelism_inner(
2441            None,
2442            NonZeroUsize::new(6).unwrap(),
2443            Some(NonZeroUsize::new(10).unwrap()),
2444            &DefaultParallelism::Full,
2445            "default",
2446        )
2447        .unwrap();
2448        assert_eq!(result.get(), 6);
2449    }
2450
2451    #[test]
2452    fn test_no_available_slots_returns_error() {
2453        let result = DdlController::resolve_stream_parallelism_inner(
2454            None,
2455            NonZeroUsize::new(4).unwrap(),
2456            None,
2457            &DefaultParallelism::Full,
2458            "default",
2459        );
2460        assert!(matches!(
2461            result,
2462            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2463        ));
2464    }
2465
2466    #[test]
2467    fn test_specified_over_max_returns_error() {
2468        let result = DdlController::resolve_stream_parallelism_inner(
2469            Some(NonZeroUsize::new(8).unwrap()),
2470            NonZeroUsize::new(4).unwrap(),
2471            Some(NonZeroUsize::new(10).unwrap()),
2472            &DefaultParallelism::Full,
2473            "default",
2474        );
2475        assert!(matches!(
2476            result,
2477            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2478        ));
2479    }
2480}