risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::{OwnedSemaphorePermit, Semaphore};
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::cluster::StreamingClusterInfo;
76use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
77use crate::controller::utils::build_select_node_list;
78use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
79use crate::manager::sink_coordination::SinkCoordinatorManager;
80use crate::manager::{
81    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82    NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
86    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87    TableParallelism,
88};
89use crate::stream::cdc::{
90    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105    Restrict,
106    Cascade,
107}
108
109impl DropMode {
110    pub fn from_request_setting(cascade: bool) -> DropMode {
111        if cascade {
112            DropMode::Cascade
113        } else {
114            DropMode::Restrict
115        }
116    }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121    MaterializedView(TableId),
122    Sink(SinkId),
123    Table(Option<SourceId>, TableId),
124    Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{}", self.as_ref())?;
130        write!(f, "({})", self.id())
131    }
132}
133
134impl StreamingJobId {
135    fn id(&self) -> JobId {
136        match self {
137            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138            StreamingJobId::Index(id) => id.as_job_id(),
139            StreamingJobId::Sink(id) => id.as_job_id(),
140        }
141    }
142}
143
144/// It’s used to describe the information of the job that needs to be replaced
145/// and it will be used during replacing table and creating sink into table operations.
146pub struct ReplaceStreamJobInfo {
147    pub streaming_job: StreamingJob,
148    pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153    CreateDatabase(Database),
154    DropDatabase(DatabaseId),
155    CreateSchema(Schema),
156    DropSchema(SchemaId, DropMode),
157    CreateNonSharedSource(Source),
158    DropSource(SourceId, DropMode),
159    ResetSource(SourceId),
160    CreateFunction(Function),
161    DropFunction(FunctionId, DropMode),
162    CreateView(View, HashSet<ObjectId>),
163    DropView(ViewId, DropMode),
164    CreateStreamingJob {
165        stream_job: StreamingJob,
166        fragment_graph: StreamFragmentGraphProto,
167        dependencies: HashSet<ObjectId>,
168        resource_type: streaming_job_resource_type::ResourceType,
169        if_not_exists: bool,
170    },
171    DropStreamingJob {
172        job_id: StreamingJobId,
173        drop_mode: DropMode,
174    },
175    AlterName(alter_name_request::Object, String),
176    AlterSwapRename(alter_swap_rename_request::Object),
177    ReplaceStreamJob(ReplaceStreamJobInfo),
178    AlterNonSharedSource(Source),
179    AlterObjectOwner(Object, UserId),
180    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
181    CreateConnection(Connection),
182    DropConnection(ConnectionId, DropMode),
183    CreateSecret(Secret),
184    AlterSecret(Secret),
185    DropSecret(SecretId),
186    CommentOn(Comment),
187    CreateSubscription(Subscription),
188    DropSubscription(SubscriptionId, DropMode),
189    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
190    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
191}
192
193impl DdlCommand {
194    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
195    fn object(&self) -> Either<String, ObjectId> {
196        use Either::*;
197        match self {
198            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
199            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
200            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
201            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
202            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
203            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
204            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
205            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
206            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
207            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
208            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
209            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
210            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
211            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
212            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
213            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
214            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
215            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
216            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
217            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
218            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
219            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
220            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
221            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
222            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
223            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
224            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
225            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
226            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
227        }
228    }
229
230    fn allow_in_recovery(&self) -> bool {
231        match self {
232            DdlCommand::DropDatabase(_)
233            | DdlCommand::DropSchema(_, _)
234            | DdlCommand::DropSource(_, _)
235            | DdlCommand::DropFunction(_, _)
236            | DdlCommand::DropView(_, _)
237            | DdlCommand::DropStreamingJob { .. }
238            | DdlCommand::DropConnection(_, _)
239            | DdlCommand::DropSecret(_)
240            | DdlCommand::DropSubscription(_, _)
241            | DdlCommand::AlterName(_, _)
242            | DdlCommand::AlterObjectOwner(_, _)
243            | DdlCommand::AlterSetSchema(_, _)
244            | DdlCommand::CreateDatabase(_)
245            | DdlCommand::CreateSchema(_)
246            | DdlCommand::CreateFunction(_)
247            | DdlCommand::CreateView(_, _)
248            | DdlCommand::CreateConnection(_)
249            | DdlCommand::CommentOn(_)
250            | DdlCommand::CreateSecret(_)
251            | DdlCommand::AlterSecret(_)
252            | DdlCommand::AlterSwapRename(_)
253            | DdlCommand::AlterDatabaseParam(_, _)
254            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
255            DdlCommand::CreateStreamingJob { .. }
256            | DdlCommand::CreateNonSharedSource(_)
257            | DdlCommand::ReplaceStreamJob(_)
258            | DdlCommand::AlterNonSharedSource(_)
259            | DdlCommand::ResetSource(_)
260            | DdlCommand::CreateSubscription(_) => false,
261        }
262    }
263}
264
265#[derive(Clone)]
266pub struct DdlController {
267    pub(crate) env: MetaSrvEnv,
268
269    pub(crate) metadata_manager: MetadataManager,
270    pub(crate) stream_manager: GlobalStreamManagerRef,
271    pub(crate) source_manager: SourceManagerRef,
272    barrier_manager: BarrierManagerRef,
273    sink_manager: SinkCoordinatorManager,
274
275    // The semaphore is used to limit the number of concurrent streaming job creation.
276    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
277
278    /// Sequence number for DDL commands, used for observability and debugging.
279    seq: Arc<AtomicU64>,
280}
281
282#[derive(Clone)]
283pub struct CreatingStreamingJobPermit {
284    pub(crate) semaphore: Arc<Semaphore>,
285}
286
287impl CreatingStreamingJobPermit {
288    async fn new(env: &MetaSrvEnv) -> Self {
289        let mut permits = env
290            .system_params_reader()
291            .await
292            .max_concurrent_creating_streaming_jobs() as usize;
293        if permits == 0 {
294            // if the system parameter is set to zero, use the max permitted value.
295            permits = Semaphore::MAX_PERMITS;
296        }
297        let semaphore = Arc::new(Semaphore::new(permits));
298
299        let (local_notification_tx, mut local_notification_rx) =
300            tokio::sync::mpsc::unbounded_channel();
301        env.notification_manager()
302            .insert_local_sender(local_notification_tx);
303        let semaphore_clone = semaphore.clone();
304        tokio::spawn(async move {
305            while let Some(notification) = local_notification_rx.recv().await {
306                let LocalNotification::SystemParamsChange(p) = &notification else {
307                    continue;
308                };
309                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
310                if new_permits == 0 {
311                    new_permits = Semaphore::MAX_PERMITS;
312                }
313                match permits.cmp(&new_permits) {
314                    Ordering::Less => {
315                        semaphore_clone.add_permits(new_permits - permits);
316                    }
317                    Ordering::Equal => continue,
318                    Ordering::Greater => {
319                        let to_release = permits - new_permits;
320                        let reduced = semaphore_clone.forget_permits(to_release);
321                        // TODO: implement dynamic semaphore with limits by ourself.
322                        if reduced != to_release {
323                            tracing::warn!(
324                                "no enough permits to release, expected {}, but reduced {}",
325                                to_release,
326                                reduced
327                            );
328                        }
329                    }
330                }
331                tracing::info!(
332                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
333                    permits,
334                    new_permits
335                );
336                permits = new_permits;
337            }
338        });
339
340        Self { semaphore }
341    }
342}
343
344impl DdlController {
345    pub async fn new(
346        env: MetaSrvEnv,
347        metadata_manager: MetadataManager,
348        stream_manager: GlobalStreamManagerRef,
349        source_manager: SourceManagerRef,
350        barrier_manager: BarrierManagerRef,
351        sink_manager: SinkCoordinatorManager,
352    ) -> Self {
353        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
354        Self {
355            env,
356            metadata_manager,
357            stream_manager,
358            source_manager,
359            barrier_manager,
360            sink_manager,
361            creating_streaming_job_permits,
362            seq: Arc::new(AtomicU64::new(0)),
363        }
364    }
365
366    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
367    pub fn next_seq(&self) -> u64 {
368        // This is a simple atomic increment operation.
369        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
370    }
371
372    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
373    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
374    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
375    /// would be a huge hassle and pain if we don't spawn here.
376    ///
377    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
378    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
379        if !command.allow_in_recovery() {
380            self.barrier_manager.check_status_running()?;
381        }
382
383        let await_tree_key = format!("DDL Command {}", self.next_seq());
384        let await_tree_span = await_tree::span!("{command}({})", command.object());
385
386        let ctrl = self.clone();
387        let fut = async move {
388            match command {
389                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
390                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
391                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
392                DdlCommand::DropSchema(schema_id, drop_mode) => {
393                    ctrl.drop_schema(schema_id, drop_mode).await
394                }
395                DdlCommand::CreateNonSharedSource(source) => {
396                    ctrl.create_non_shared_source(source).await
397                }
398                DdlCommand::DropSource(source_id, drop_mode) => {
399                    ctrl.drop_source(source_id, drop_mode).await
400                }
401                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
402                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
403                DdlCommand::DropFunction(function_id, drop_mode) => {
404                    ctrl.drop_function(function_id, drop_mode).await
405                }
406                DdlCommand::CreateView(view, dependencies) => {
407                    ctrl.create_view(view, dependencies).await
408                }
409                DdlCommand::DropView(view_id, drop_mode) => {
410                    ctrl.drop_view(view_id, drop_mode).await
411                }
412                DdlCommand::CreateStreamingJob {
413                    stream_job,
414                    fragment_graph,
415                    dependencies,
416                    resource_type,
417                    if_not_exists,
418                } => {
419                    ctrl.create_streaming_job(
420                        stream_job,
421                        fragment_graph,
422                        dependencies,
423                        resource_type,
424                        if_not_exists,
425                    )
426                    .await
427                }
428                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
429                    ctrl.drop_streaming_job(job_id, drop_mode).await
430                }
431                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
432                    streaming_job,
433                    fragment_graph,
434                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
435                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
436                DdlCommand::AlterObjectOwner(object, owner_id) => {
437                    ctrl.alter_owner(object, owner_id).await
438                }
439                DdlCommand::AlterSetSchema(object, new_schema_id) => {
440                    ctrl.alter_set_schema(object, new_schema_id).await
441                }
442                DdlCommand::CreateConnection(connection) => {
443                    ctrl.create_connection(connection).await
444                }
445                DdlCommand::DropConnection(connection_id, drop_mode) => {
446                    ctrl.drop_connection(connection_id, drop_mode).await
447                }
448                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
449                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
450                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
451                DdlCommand::AlterNonSharedSource(source) => {
452                    ctrl.alter_non_shared_source(source).await
453                }
454                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
455                DdlCommand::CreateSubscription(subscription) => {
456                    ctrl.create_subscription(subscription).await
457                }
458                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
459                    ctrl.drop_subscription(subscription_id, drop_mode).await
460                }
461                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
462                DdlCommand::AlterDatabaseParam(database_id, param) => {
463                    ctrl.alter_database_param(database_id, param).await
464                }
465                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
466                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
467                        .await
468                }
469            }
470        }
471        .in_current_span();
472        let fut = (self.env.await_tree_reg())
473            .register(await_tree_key, await_tree_span)
474            .instrument(Box::pin(fut));
475        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
476        Ok(Some(WaitVersion {
477            catalog_version: notification_version,
478            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
479        }))
480    }
481
482    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
483        self.barrier_manager.get_ddl_progress().await
484    }
485
486    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
487        let (version, updated_db) = self
488            .metadata_manager
489            .catalog_controller
490            .create_database(database)
491            .await?;
492        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
493        self.barrier_manager
494            .update_database_barrier(
495                updated_db.database_id,
496                updated_db.barrier_interval_ms.map(|v| v as u32),
497                updated_db.checkpoint_frequency.map(|v| v as u64),
498            )
499            .await?;
500        Ok(version)
501    }
502
503    #[tracing::instrument(skip(self), level = "debug")]
504    pub async fn reschedule_streaming_job(
505        &self,
506        job_id: JobId,
507        target: ReschedulePolicy,
508        mut deferred: bool,
509    ) -> MetaResult<()> {
510        tracing::info!("altering parallelism for job {}", job_id);
511        if self.barrier_manager.check_status_running().is_err() {
512            tracing::info!(
513                "alter parallelism is set to deferred mode because the system is in recovery state"
514            );
515            deferred = true;
516        }
517
518        self.stream_manager
519            .reschedule_streaming_job(job_id, target, deferred)
520            .await
521    }
522
523    pub async fn reschedule_cdc_table_backfill(
524        &self,
525        job_id: JobId,
526        target: ReschedulePolicy,
527    ) -> MetaResult<()> {
528        tracing::info!("alter CDC table backfill parallelism");
529        if self.barrier_manager.check_status_running().is_err() {
530            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
531        }
532        self.stream_manager
533            .reschedule_cdc_table_backfill(job_id, target)
534            .await
535    }
536
537    pub async fn reschedule_fragments(
538        &self,
539        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
540    ) -> MetaResult<()> {
541        tracing::info!(
542            "altering parallelism for fragments {:?}",
543            fragment_targets.keys()
544        );
545        let fragment_targets = fragment_targets
546            .into_iter()
547            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
548            .collect();
549
550        self.stream_manager
551            .reschedule_fragments(fragment_targets)
552            .await
553    }
554
555    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
556        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
557            .await
558    }
559
560    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
561        self.metadata_manager
562            .catalog_controller
563            .create_schema(schema)
564            .await
565    }
566
567    async fn drop_schema(
568        &self,
569        schema_id: SchemaId,
570        drop_mode: DropMode,
571    ) -> MetaResult<NotificationVersion> {
572        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
573            .await
574    }
575
576    /// Shared source is handled in [`Self::create_streaming_job`]
577    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
578        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
579            .await
580            .context("failed to create source worker")?;
581
582        let (source_id, version) = self
583            .metadata_manager
584            .catalog_controller
585            .create_source(source)
586            .await?;
587        self.source_manager
588            .register_source_with_handle(source_id, handle)
589            .await;
590        Ok(version)
591    }
592
593    async fn drop_source(
594        &self,
595        source_id: SourceId,
596        drop_mode: DropMode,
597    ) -> MetaResult<NotificationVersion> {
598        self.drop_object(ObjectType::Source, source_id, drop_mode)
599            .await
600    }
601
602    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
603        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
604
605        // Get database_id for the source
606        let database_id = self
607            .metadata_manager
608            .catalog_controller
609            .get_object_database_id(source_id)
610            .await?;
611
612        self.stream_manager
613            .barrier_scheduler
614            .run_command(database_id, Command::ResetSource { source_id })
615            .await?;
616
617        // RESET SOURCE doesn't modify catalog, so return the current catalog version
618        let version = self
619            .metadata_manager
620            .catalog_controller
621            .current_notification_version()
622            .await;
623        Ok(version)
624    }
625
626    /// This replaces the source in the catalog.
627    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
628    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
629        self.metadata_manager
630            .catalog_controller
631            .alter_non_shared_source(source)
632            .await
633    }
634
635    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
636        self.metadata_manager
637            .catalog_controller
638            .create_function(function)
639            .await
640    }
641
642    async fn drop_function(
643        &self,
644        function_id: FunctionId,
645        drop_mode: DropMode,
646    ) -> MetaResult<NotificationVersion> {
647        self.drop_object(ObjectType::Function, function_id, drop_mode)
648            .await
649    }
650
651    async fn create_view(
652        &self,
653        view: View,
654        dependencies: HashSet<ObjectId>,
655    ) -> MetaResult<NotificationVersion> {
656        self.metadata_manager
657            .catalog_controller
658            .create_view(view, dependencies)
659            .await
660    }
661
662    async fn drop_view(
663        &self,
664        view_id: ViewId,
665        drop_mode: DropMode,
666    ) -> MetaResult<NotificationVersion> {
667        self.drop_object(ObjectType::View, view_id, drop_mode).await
668    }
669
670    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
671        validate_connection(&connection).await?;
672        self.metadata_manager
673            .catalog_controller
674            .create_connection(connection)
675            .await
676    }
677
678    async fn drop_connection(
679        &self,
680        connection_id: ConnectionId,
681        drop_mode: DropMode,
682    ) -> MetaResult<NotificationVersion> {
683        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
684            .await
685    }
686
687    async fn alter_database_param(
688        &self,
689        database_id: DatabaseId,
690        param: AlterDatabaseParam,
691    ) -> MetaResult<NotificationVersion> {
692        let (version, updated_db) = self
693            .metadata_manager
694            .catalog_controller
695            .alter_database_param(database_id, param)
696            .await?;
697        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
698        self.barrier_manager
699            .update_database_barrier(
700                database_id,
701                updated_db.barrier_interval_ms.map(|v| v as u32),
702                updated_db.checkpoint_frequency.map(|v| v as u64),
703            )
704            .await?;
705        Ok(version)
706    }
707
708    // The 'secret' part of the request we receive from the frontend is in plaintext;
709    // here, we need to encrypt it before storing it in the catalog.
710    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
711        let secret_store_private_key = self
712            .env
713            .opts
714            .secret_store_private_key
715            .clone()
716            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
717
718        let encrypted_payload = SecretEncryption::encrypt(
719            secret_store_private_key.as_slice(),
720            secret.get_value().as_slice(),
721        )
722        .context(format!("failed to encrypt secret {}", secret.name))?;
723        Ok(encrypted_payload
724            .serialize()
725            .context(format!("failed to serialize secret {}", secret.name))?)
726    }
727
728    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
729        // The 'secret' part of the request we receive from the frontend is in plaintext;
730        // here, we need to encrypt it before storing it in the catalog.
731        let secret_plain_payload = secret.value.clone();
732        let encrypted_payload = self.get_encrypted_payload(&secret)?;
733        secret.value = encrypted_payload;
734
735        self.metadata_manager
736            .catalog_controller
737            .create_secret(secret, secret_plain_payload)
738            .await
739    }
740
741    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
742        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
743            .await
744    }
745
746    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
747        let secret_plain_payload = secret.value.clone();
748        let encrypted_payload = self.get_encrypted_payload(&secret)?;
749        secret.value = encrypted_payload;
750        self.metadata_manager
751            .catalog_controller
752            .alter_secret(secret, secret_plain_payload)
753            .await
754    }
755
756    async fn create_subscription(
757        &self,
758        mut subscription: Subscription,
759    ) -> MetaResult<NotificationVersion> {
760        tracing::debug!("create subscription");
761        let _permit = self
762            .creating_streaming_job_permits
763            .semaphore
764            .acquire()
765            .await
766            .unwrap();
767        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
768        self.metadata_manager
769            .catalog_controller
770            .create_subscription_catalog(&mut subscription)
771            .await?;
772        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
773            tracing::debug!(error = %err.as_report(), "failed to create subscription");
774            let _ = self
775                .metadata_manager
776                .catalog_controller
777                .try_abort_creating_subscription(subscription.id)
778                .await
779                .inspect_err(|e| {
780                    tracing::error!(
781                        error = %e.as_report(),
782                        "failed to abort create subscription after failure"
783                    );
784                });
785            return Err(err);
786        }
787
788        let version = self
789            .metadata_manager
790            .catalog_controller
791            .notify_create_subscription(subscription.id)
792            .await?;
793        tracing::debug!("finish create subscription");
794        Ok(version)
795    }
796
797    async fn drop_subscription(
798        &self,
799        subscription_id: SubscriptionId,
800        drop_mode: DropMode,
801    ) -> MetaResult<NotificationVersion> {
802        tracing::debug!("preparing drop subscription");
803        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
804        let subscription = self
805            .metadata_manager
806            .catalog_controller
807            .get_subscription_by_id(subscription_id)
808            .await?;
809        let table_id = subscription.dependent_table_id;
810        let database_id = subscription.database_id;
811        let (_, version) = self
812            .metadata_manager
813            .catalog_controller
814            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
815            .await?;
816        self.stream_manager
817            .drop_subscription(database_id, subscription_id, table_id)
818            .await;
819        tracing::debug!("finish drop subscription");
820        Ok(version)
821    }
822
823    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
824    #[await_tree::instrument]
825    pub(crate) async fn validate_cdc_table(
826        &self,
827        table: &Table,
828        table_fragments: &StreamJobFragments,
829    ) -> MetaResult<()> {
830        let stream_scan_fragment = table_fragments
831            .fragments
832            .values()
833            .filter(|f| {
834                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
835                    || f.fragment_type_mask
836                        .contains(FragmentTypeFlag::StreamCdcScan)
837            })
838            .exactly_one()
839            .ok()
840            .with_context(|| {
841                format!(
842                    "expect exactly one stream scan fragment, got: {:?}",
843                    table_fragments.fragments
844                )
845            })?;
846        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
847            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
848                if let Some(o) = node.options
849                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
850                {
851                    // Use parallel CDC backfill.
852                } else {
853                    assert_eq!(
854                        stream_scan_fragment.actors.len(),
855                        1,
856                        "Stream scan fragment should have only one actor"
857                    );
858                }
859            }
860        }
861        let mut found_cdc_scan = false;
862        match &stream_scan_fragment.nodes.node_body {
863            Some(NodeBody::StreamCdcScan(_)) => {
864                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
865                if self
866                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
867                    .await?
868                {
869                    found_cdc_scan = true;
870                }
871            }
872            // When there's generated columns, the cdc scan node is wrapped in a project node
873            Some(NodeBody::Project(_)) => {
874                for input in &stream_scan_fragment.nodes.input {
875                    assert_parallelism(stream_scan_fragment, &input.node_body);
876                    if self
877                        .validate_cdc_table_inner(&input.node_body, table.id)
878                        .await?
879                    {
880                        found_cdc_scan = true;
881                    }
882                }
883            }
884            _ => {
885                bail!("Unexpected node body for stream cdc scan");
886            }
887        };
888        if !found_cdc_scan {
889            bail!("No stream cdc scan node found in stream scan fragment");
890        }
891        Ok(())
892    }
893
894    async fn validate_cdc_table_inner(
895        &self,
896        node_body: &Option<NodeBody>,
897        table_id: TableId,
898    ) -> MetaResult<bool> {
899        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
900            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
901        {
902            let options_with_secret = WithOptionsSecResolved::new(
903                cdc_table_desc.connect_properties.clone(),
904                cdc_table_desc.secret_refs.clone(),
905            );
906
907            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
908            props.init_from_pb_cdc_table_desc(cdc_table_desc);
909
910            // Try creating a split enumerator to validate
911            let _enumerator = props
912                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
913                .await?;
914
915            tracing::debug!(?table_id, "validate cdc table success");
916            Ok(true)
917        } else {
918            Ok(false)
919        }
920    }
921
922    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
923        let migrated = self
924            .metadata_manager
925            .catalog_controller
926            .has_table_been_migrated(table_id)
927            .await?;
928        if !migrated {
929            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
930        } else {
931            Ok(())
932        }
933    }
934
935    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
936    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
937    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
938    pub async fn create_streaming_job(
939        &self,
940        mut streaming_job: StreamingJob,
941        fragment_graph: StreamFragmentGraphProto,
942        dependencies: HashSet<ObjectId>,
943        resource_type: streaming_job_resource_type::ResourceType,
944        if_not_exists: bool,
945    ) -> MetaResult<NotificationVersion> {
946        if let StreamingJob::Sink(sink) = &streaming_job
947            && let Some(target_table) = sink.target_table
948        {
949            self.validate_table_for_sink(target_table).await?;
950        }
951        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
952        let check_ret = self
953            .metadata_manager
954            .catalog_controller
955            .create_job_catalog(
956                &mut streaming_job,
957                &ctx,
958                &fragment_graph.parallelism,
959                fragment_graph.max_parallelism as _,
960                dependencies,
961                resource_type.clone(),
962                &fragment_graph.backfill_parallelism,
963            )
964            .await;
965        if let Err(meta_err) = check_ret {
966            if !if_not_exists {
967                return Err(meta_err);
968            }
969            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
970                if streaming_job.create_type() == CreateType::Foreground {
971                    let database_id = streaming_job.database_id();
972                    self.metadata_manager
973                        .wait_streaming_job_finished(database_id, *job_id)
974                        .await
975                } else {
976                    Ok(IGNORED_NOTIFICATION_VERSION)
977                }
978            } else {
979                Err(meta_err)
980            };
981        }
982        let job_id = streaming_job.id();
983        tracing::debug!(
984            id = %job_id,
985            definition = streaming_job.definition(),
986            create_type = streaming_job.create_type().as_str_name(),
987            job_type = ?streaming_job.job_type(),
988            "starting streaming job",
989        );
990        // TODO: acquire permits for recovered background DDLs.
991        let permit = self
992            .creating_streaming_job_permits
993            .semaphore
994            .clone()
995            .acquire_owned()
996            .instrument_await("acquire_creating_streaming_job_permit")
997            .await
998            .unwrap();
999        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1000
1001        let name = streaming_job.name();
1002        let definition = streaming_job.definition();
1003        let source_id = match &streaming_job {
1004            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1005            _ => None,
1006        };
1007
1008        // create streaming job.
1009        match self
1010            .create_streaming_job_inner(ctx, streaming_job, fragment_graph, resource_type, permit)
1011            .await
1012        {
1013            Ok(version) => Ok(version),
1014            Err(err) => {
1015                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1016                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1017                    id: job_id,
1018                    name,
1019                    definition,
1020                    error: err.as_report().to_string(),
1021                };
1022                self.env.event_log_manager_ref().add_event_logs(vec![
1023                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1024                ]);
1025                let (aborted, _) = self
1026                    .metadata_manager
1027                    .catalog_controller
1028                    .try_abort_creating_streaming_job(job_id, false)
1029                    .await?;
1030                if aborted {
1031                    tracing::warn!(id = %job_id, "aborted streaming job");
1032                    // FIXME: might also need other cleanup here
1033                    if let Some(source_id) = source_id {
1034                        self.source_manager
1035                            .apply_source_change(SourceChange::DropSource {
1036                                dropped_source_ids: vec![source_id],
1037                            })
1038                            .await;
1039                    }
1040                }
1041                Err(err)
1042            }
1043        }
1044    }
1045
1046    #[await_tree::instrument(boxed)]
1047    async fn create_streaming_job_inner(
1048        &self,
1049        ctx: StreamContext,
1050        mut streaming_job: StreamingJob,
1051        fragment_graph: StreamFragmentGraphProto,
1052        resource_type: streaming_job_resource_type::ResourceType,
1053        permit: OwnedSemaphorePermit,
1054    ) -> MetaResult<NotificationVersion> {
1055        let mut fragment_graph =
1056            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1057        streaming_job.set_info_from_graph(&fragment_graph);
1058
1059        // create internal table catalogs and refill table id.
1060        let incomplete_internal_tables = fragment_graph
1061            .incomplete_internal_tables()
1062            .into_values()
1063            .collect_vec();
1064        let table_id_map = self
1065            .metadata_manager
1066            .catalog_controller
1067            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1068            .await?;
1069        fragment_graph.refill_internal_table_ids(table_id_map);
1070
1071        // create fragment and actor catalogs.
1072        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1073        let (ctx, stream_job_fragments) = self
1074            .build_stream_job(ctx, streaming_job, fragment_graph, resource_type)
1075            .await?;
1076
1077        let streaming_job = &ctx.streaming_job;
1078
1079        match streaming_job {
1080            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1081                self.validate_cdc_table(table, &stream_job_fragments)
1082                    .await?;
1083            }
1084            StreamingJob::Table(Some(source), ..) => {
1085                // Register the source on the connector node.
1086                self.source_manager.register_source(source).await?;
1087                let connector_name = source
1088                    .get_with_properties()
1089                    .get(UPSTREAM_SOURCE_KEY)
1090                    .cloned();
1091                let attr = source.info.as_ref().map(|source_info| {
1092                    jsonbb::json!({
1093                            "format": source_info.format().as_str_name(),
1094                            "encode": source_info.row_encode().as_str_name(),
1095                    })
1096                });
1097                report_create_object(
1098                    streaming_job.id(),
1099                    "source",
1100                    PbTelemetryDatabaseObject::Source,
1101                    connector_name,
1102                    attr,
1103                );
1104            }
1105            StreamingJob::Sink(sink) => {
1106                if sink.auto_refresh_schema_from_table.is_some() {
1107                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1108                }
1109                // Validate the sink on the connector node.
1110                validate_sink(sink).await?;
1111                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1112                let attr = sink.format_desc.as_ref().map(|sink_info| {
1113                    jsonbb::json!({
1114                        "format": sink_info.format().as_str_name(),
1115                        "encode": sink_info.encode().as_str_name(),
1116                    })
1117                });
1118                report_create_object(
1119                    streaming_job.id(),
1120                    "sink",
1121                    PbTelemetryDatabaseObject::Sink,
1122                    connector_name,
1123                    attr,
1124                );
1125            }
1126            StreamingJob::Source(source) => {
1127                // Register the source on the connector node.
1128                self.source_manager.register_source(source).await?;
1129                let connector_name = source
1130                    .get_with_properties()
1131                    .get(UPSTREAM_SOURCE_KEY)
1132                    .cloned();
1133                let attr = source.info.as_ref().map(|source_info| {
1134                    jsonbb::json!({
1135                            "format": source_info.format().as_str_name(),
1136                            "encode": source_info.row_encode().as_str_name(),
1137                    })
1138                });
1139                report_create_object(
1140                    streaming_job.id(),
1141                    "source",
1142                    PbTelemetryDatabaseObject::Source,
1143                    connector_name,
1144                    attr,
1145                );
1146            }
1147            _ => {}
1148        }
1149
1150        let backfill_orders = ctx.fragment_backfill_ordering.clone().into();
1151        self.metadata_manager
1152            .catalog_controller
1153            .prepare_stream_job_fragments(
1154                &stream_job_fragments,
1155                streaming_job,
1156                false,
1157                Some(backfill_orders),
1158            )
1159            .await?;
1160
1161        // create streaming jobs.
1162        let version = self
1163            .stream_manager
1164            .create_streaming_job(stream_job_fragments, ctx, permit)
1165            .await?;
1166
1167        Ok(version)
1168    }
1169
1170    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1171    pub async fn drop_object(
1172        &self,
1173        object_type: ObjectType,
1174        object_id: impl Into<ObjectId>,
1175        drop_mode: DropMode,
1176    ) -> MetaResult<NotificationVersion> {
1177        let object_id = object_id.into();
1178        let (release_ctx, version) = self
1179            .metadata_manager
1180            .catalog_controller
1181            .drop_object(object_type, object_id, drop_mode)
1182            .await?;
1183
1184        if object_type == ObjectType::Source {
1185            self.env
1186                .notification_manager_ref()
1187                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1188        }
1189
1190        let ReleaseContext {
1191            database_id,
1192            removed_streaming_job_ids,
1193            removed_state_table_ids,
1194            removed_source_ids,
1195            removed_secret_ids: secret_ids,
1196            removed_source_fragments,
1197            removed_actors,
1198            removed_fragments,
1199            removed_sink_fragment_by_targets,
1200            removed_iceberg_table_sinks,
1201        } = release_ctx;
1202
1203        let _guard = self.source_manager.pause_tick().await;
1204        self.stream_manager
1205            .drop_streaming_jobs(
1206                database_id,
1207                removed_actors.iter().map(|id| *id as _).collect(),
1208                removed_streaming_job_ids,
1209                removed_state_table_ids,
1210                removed_fragments.iter().map(|id| *id as _).collect(),
1211                removed_sink_fragment_by_targets
1212                    .into_iter()
1213                    .map(|(target, sinks)| {
1214                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1215                    })
1216                    .collect(),
1217            )
1218            .await;
1219
1220        // clean up sources after dropping streaming jobs.
1221        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1222        self.source_manager
1223            .apply_source_change(SourceChange::DropSource {
1224                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1225            })
1226            .await;
1227
1228        // unregister fragments and actors from source manager.
1229        // FIXME: need also unregister source backfill fragments.
1230        let dropped_source_fragments = removed_source_fragments;
1231        self.source_manager
1232            .apply_source_change(SourceChange::DropMv {
1233                dropped_source_fragments,
1234            })
1235            .await;
1236
1237        // clean up iceberg table sinks
1238        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1239            .iter()
1240            .map(|sink| sink.id)
1241            .collect();
1242
1243        for sink in removed_iceberg_table_sinks {
1244            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1245                .expect("Iceberg sink should be valid");
1246            let iceberg_sink =
1247                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1248            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1249                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1250                tracing::info!(
1251                    "dropping iceberg table {} for dropped sink",
1252                    table_identifier
1253                );
1254
1255                let _ = iceberg_catalog
1256                    .drop_table(&table_identifier)
1257                    .await
1258                    .inspect_err(|err| {
1259                        tracing::error!(
1260                            "failed to drop iceberg table {} during cleanup: {}",
1261                            table_identifier,
1262                            err.as_report()
1263                        );
1264                    });
1265            }
1266        }
1267
1268        // stop sink coordinators for iceberg table sinks
1269        if !iceberg_sink_ids.is_empty() {
1270            self.sink_manager
1271                .stop_sink_coordinator(iceberg_sink_ids)
1272                .await;
1273        }
1274
1275        // remove secrets.
1276        for secret in secret_ids {
1277            LocalSecretManager::global().remove_secret(secret);
1278        }
1279        Ok(version)
1280    }
1281
1282    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1283    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1284    pub async fn replace_job(
1285        &self,
1286        mut streaming_job: StreamingJob,
1287        fragment_graph: StreamFragmentGraphProto,
1288    ) -> MetaResult<NotificationVersion> {
1289        match &streaming_job {
1290            StreamingJob::Table(..)
1291            | StreamingJob::Source(..)
1292            | StreamingJob::MaterializedView(..) => {}
1293            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1294                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1295            }
1296        }
1297
1298        let job_id = streaming_job.id();
1299
1300        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1301        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1302
1303        // Ensure the max parallelism unchanged before replacing table.
1304        let original_max_parallelism = self
1305            .metadata_manager
1306            .get_job_max_parallelism(streaming_job.id())
1307            .await?;
1308        let fragment_graph = PbStreamFragmentGraph {
1309            max_parallelism: original_max_parallelism as _,
1310            ..fragment_graph
1311        };
1312
1313        // 1. build fragment graph.
1314        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1315        streaming_job.set_info_from_graph(&fragment_graph);
1316
1317        // make it immutable
1318        let streaming_job = streaming_job;
1319
1320        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1321            let auto_refresh_schema_sinks = self
1322                .metadata_manager
1323                .catalog_controller
1324                .get_sink_auto_refresh_schema_from(table.id)
1325                .await?;
1326            if !auto_refresh_schema_sinks.is_empty() {
1327                let original_table_columns = self
1328                    .metadata_manager
1329                    .catalog_controller
1330                    .get_table_columns(table.id)
1331                    .await?;
1332                // compare column id to find newly added columns
1333                let mut original_table_column_ids: HashSet<_> = original_table_columns
1334                    .iter()
1335                    .map(|col| col.column_id())
1336                    .collect();
1337                let newly_added_columns = table
1338                    .columns
1339                    .iter()
1340                    .filter(|col| {
1341                        !original_table_column_ids.remove(&ColumnId::new(
1342                            col.column_desc.as_ref().unwrap().column_id as _,
1343                        ))
1344                    })
1345                    .map(|col| ColumnCatalog::from(col.clone()))
1346                    .collect_vec();
1347                if !original_table_column_ids.is_empty() {
1348                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1349                }
1350                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1351                for sink in auto_refresh_schema_sinks {
1352                    let sink_job_fragments = self
1353                        .metadata_manager
1354                        .get_job_fragments_by_id(sink.id.as_job_id())
1355                        .await?;
1356                    if sink_job_fragments.fragments.len() != 1 {
1357                        return Err(anyhow!(
1358                            "auto schema refresh sink must have only one fragment, but got {}",
1359                            sink_job_fragments.fragments.len()
1360                        )
1361                        .into());
1362                    }
1363                    let original_sink_fragment =
1364                        sink_job_fragments.fragments.into_values().next().unwrap();
1365                    let (new_sink_fragment, new_schema, new_log_store_table) =
1366                        rewrite_refresh_schema_sink_fragment(
1367                            &original_sink_fragment,
1368                            &sink,
1369                            &newly_added_columns,
1370                            table,
1371                            fragment_graph.table_fragment_id(),
1372                            self.env.id_gen_manager(),
1373                            self.env.actor_id_generator(),
1374                        )?;
1375
1376                    assert_eq!(
1377                        original_sink_fragment.actors.len(),
1378                        new_sink_fragment.actors.len()
1379                    );
1380                    let actor_status = (0..original_sink_fragment.actors.len())
1381                        .map(|i| {
1382                            let worker_node_id = sink_job_fragments.actor_status
1383                                [&original_sink_fragment.actors[i].actor_id]
1384                                .location
1385                                .as_ref()
1386                                .unwrap()
1387                                .worker_node_id;
1388                            (
1389                                new_sink_fragment.actors[i].actor_id,
1390                                PbActorStatus {
1391                                    location: Some(PbActorLocation { worker_node_id }),
1392                                },
1393                            )
1394                        })
1395                        .collect();
1396
1397                    let streaming_job = StreamingJob::Sink(sink);
1398
1399                    let tmp_sink_id = self
1400                        .metadata_manager
1401                        .catalog_controller
1402                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1403                        .await?
1404                        .as_sink_id();
1405                    let StreamingJob::Sink(sink) = streaming_job else {
1406                        unreachable!()
1407                    };
1408
1409                    sinks.push(AutoRefreshSchemaSinkContext {
1410                        tmp_sink_id,
1411                        original_sink: sink,
1412                        original_fragment: original_sink_fragment,
1413                        new_schema,
1414                        newly_add_fields: newly_added_columns
1415                            .iter()
1416                            .map(|col| Field::from(&col.column_desc))
1417                            .collect(),
1418                        new_fragment: new_sink_fragment,
1419                        new_log_store_table,
1420                        actor_status,
1421                    });
1422                }
1423                Some(sinks)
1424            } else {
1425                None
1426            }
1427        } else {
1428            None
1429        };
1430
1431        let tmp_id = self
1432            .metadata_manager
1433            .catalog_controller
1434            .create_job_catalog_for_replace(
1435                &streaming_job,
1436                Some(&ctx),
1437                fragment_graph.specified_parallelism().as_ref(),
1438                Some(fragment_graph.max_parallelism()),
1439            )
1440            .await?;
1441
1442        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1443            sinks
1444                .iter()
1445                .map(|sink| sink.tmp_sink_id.as_object_id())
1446                .collect_vec()
1447        });
1448
1449        tracing::debug!(id = %job_id, "building replace streaming job");
1450        let mut updated_sink_catalogs = vec![];
1451
1452        let mut drop_table_connector_ctx = None;
1453        let result: MetaResult<_> = try {
1454            let (mut ctx, mut stream_job_fragments) = self
1455                .build_replace_job(
1456                    ctx,
1457                    &streaming_job,
1458                    fragment_graph,
1459                    tmp_id,
1460                    auto_refresh_schema_sinks,
1461                )
1462                .await?;
1463            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1464            let auto_refresh_schema_sink_finish_ctx =
1465                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1466                    sinks
1467                        .iter()
1468                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1469                            tmp_sink_id: sink.tmp_sink_id,
1470                            original_sink_id: sink.original_sink.id,
1471                            columns: sink.new_schema.clone(),
1472                            new_log_store_table: sink
1473                                .new_log_store_table
1474                                .as_ref()
1475                                .map(|table| (table.id, table.columns.clone())),
1476                        })
1477                        .collect()
1478                });
1479
1480            // Handle table that has incoming sinks.
1481            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1482                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1483                let upstream_infos = self
1484                    .metadata_manager
1485                    .catalog_controller
1486                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1487                    .await?;
1488                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1489
1490                for upstream_info in &upstream_infos {
1491                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1492                    ctx.upstream_fragment_downstreams
1493                        .entry(upstream_fragment_id)
1494                        .or_default()
1495                        .push(upstream_info.new_sink_downstream.clone());
1496                    if upstream_info.sink_original_target_columns.is_empty() {
1497                        updated_sink_catalogs.push(upstream_info.sink_id);
1498                    }
1499                }
1500            }
1501
1502            let replace_upstream = ctx.replace_upstream.clone();
1503
1504            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1505                let empty_downstreams = FragmentDownstreamRelation::default();
1506                for sink in sinks {
1507                    self.metadata_manager
1508                        .catalog_controller
1509                        .prepare_streaming_job(
1510                            sink.tmp_sink_id.as_job_id(),
1511                            || [&sink.new_fragment].into_iter(),
1512                            &empty_downstreams,
1513                            true,
1514                            None,
1515                            None,
1516                        )
1517                        .await?;
1518                }
1519            }
1520
1521            self.metadata_manager
1522                .catalog_controller
1523                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1524                .await?;
1525
1526            self.stream_manager
1527                .replace_stream_job(stream_job_fragments, ctx)
1528                .await?;
1529            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1530        };
1531
1532        match result {
1533            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1534                let version = self
1535                    .metadata_manager
1536                    .catalog_controller
1537                    .finish_replace_streaming_job(
1538                        tmp_id,
1539                        streaming_job,
1540                        replace_upstream,
1541                        SinkIntoTableContext {
1542                            updated_sink_catalogs,
1543                        },
1544                        drop_table_connector_ctx.as_ref(),
1545                        auto_refresh_schema_sink_finish_ctx,
1546                    )
1547                    .await?;
1548                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1549                    self.source_manager
1550                        .apply_source_change(SourceChange::DropSource {
1551                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1552                        })
1553                        .await;
1554                }
1555                Ok(version)
1556            }
1557            Err(err) => {
1558                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1559                let _ = self.metadata_manager
1560                    .catalog_controller
1561                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1562                    .await.inspect_err(|err| {
1563                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1564                });
1565                Err(err)
1566            }
1567        }
1568    }
1569
1570    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1571    )]
1572    async fn drop_streaming_job(
1573        &self,
1574        job_id: StreamingJobId,
1575        drop_mode: DropMode,
1576    ) -> MetaResult<NotificationVersion> {
1577        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1578
1579        let (object_id, object_type) = match job_id {
1580            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1581            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1582            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1583            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1584        };
1585
1586        let job_status = self
1587            .metadata_manager
1588            .catalog_controller
1589            .get_streaming_job_status(job_id.id())
1590            .await?;
1591        let version = match job_status {
1592            JobStatus::Initial => {
1593                unreachable!(
1594                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1595                );
1596            }
1597            JobStatus::Creating => {
1598                self.stream_manager
1599                    .cancel_streaming_jobs(vec![job_id.id()])
1600                    .await?;
1601                IGNORED_NOTIFICATION_VERSION
1602            }
1603            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1604        };
1605
1606        Ok(version)
1607    }
1608
1609    /// Resolve the parallelism of the stream job based on the given information.
1610    ///
1611    /// Returns error if user specifies a parallelism that cannot be satisfied.
1612    fn resolve_stream_parallelism(
1613        &self,
1614        specified: Option<NonZeroUsize>,
1615        max: NonZeroUsize,
1616        cluster_info: &StreamingClusterInfo,
1617        resource_group: String,
1618    ) -> MetaResult<NonZeroUsize> {
1619        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1620        DdlController::resolve_stream_parallelism_inner(
1621            specified,
1622            max,
1623            available,
1624            &self.env.opts.default_parallelism,
1625            &resource_group,
1626        )
1627    }
1628
1629    fn resolve_stream_parallelism_inner(
1630        specified: Option<NonZeroUsize>,
1631        max: NonZeroUsize,
1632        available: Option<NonZeroUsize>,
1633        default_parallelism: &DefaultParallelism,
1634        resource_group: &str,
1635    ) -> MetaResult<NonZeroUsize> {
1636        let Some(available) = available else {
1637            bail_unavailable!(
1638                "no available slots to schedule in resource group \"{}\", \
1639                 have you allocated any compute nodes within this resource group?",
1640                resource_group
1641            );
1642        };
1643
1644        if let Some(specified) = specified {
1645            if specified > max {
1646                bail_invalid_parameter!(
1647                    "specified parallelism {} should not exceed max parallelism {}",
1648                    specified,
1649                    max,
1650                );
1651            }
1652            if specified > available {
1653                tracing::warn!(
1654                    resource_group,
1655                    specified_parallelism = specified.get(),
1656                    available_parallelism = available.get(),
1657                    "specified parallelism exceeds available slots, scheduling with specified value",
1658                );
1659            }
1660            return Ok(specified);
1661        }
1662
1663        // Use default parallelism when no specific parallelism is provided by the user.
1664        let default_parallelism = match default_parallelism {
1665            DefaultParallelism::Full => available,
1666            DefaultParallelism::Default(num) => {
1667                if *num > available {
1668                    tracing::warn!(
1669                        resource_group,
1670                        configured_parallelism = num.get(),
1671                        available_parallelism = available.get(),
1672                        "default parallelism exceeds available slots, scheduling with configured value",
1673                    );
1674                }
1675                *num
1676            }
1677        };
1678
1679        if default_parallelism > max {
1680            tracing::warn!(
1681                max_parallelism = max.get(),
1682                resource_group,
1683                "default parallelism exceeds max parallelism, capping to max",
1684            );
1685        }
1686        Ok(default_parallelism.min(max))
1687    }
1688
1689    /// Builds the actor graph:
1690    /// - Add the upstream fragments to the fragment graph
1691    /// - Schedule the fragments based on their distribution
1692    /// - Expand each fragment into one or several actors
1693    /// - Construct the fragment level backfill order control.
1694    #[await_tree::instrument]
1695    pub(crate) async fn build_stream_job(
1696        &self,
1697        stream_ctx: StreamContext,
1698        mut stream_job: StreamingJob,
1699        fragment_graph: StreamFragmentGraph,
1700        resource_type: streaming_job_resource_type::ResourceType,
1701    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1702        let id = stream_job.id();
1703        let specified_parallelism = fragment_graph.specified_parallelism();
1704        let specified_backfill_parallelism = fragment_graph.specified_backfill_parallelism();
1705        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1706
1707        // 1. Fragment Level ordering graph
1708        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1709
1710        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1711        // contains all information needed for building the actor graph.
1712
1713        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1714            fragment_graph.collect_snapshot_backfill_info()?;
1715        assert!(
1716            snapshot_backfill_info
1717                .iter()
1718                .chain([&cross_db_snapshot_backfill_info])
1719                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1720                .all(|backfill_epoch| backfill_epoch.is_none()),
1721            "should not set backfill epoch when initially build the job: {:?} {:?}",
1722            snapshot_backfill_info,
1723            cross_db_snapshot_backfill_info
1724        );
1725
1726        let locality_fragment_state_table_mapping =
1727            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1728
1729        // check if log store exists for all cross-db upstreams
1730        self.metadata_manager
1731            .catalog_controller
1732            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1733            .await?;
1734
1735        let upstream_table_ids = fragment_graph
1736            .dependent_table_ids()
1737            .iter()
1738            .filter(|id| {
1739                !cross_db_snapshot_backfill_info
1740                    .upstream_mv_table_id_to_backfill_epoch
1741                    .contains_key(id)
1742            })
1743            .cloned()
1744            .collect();
1745
1746        let (upstream_root_fragments, existing_actor_location) = self
1747            .metadata_manager
1748            .get_upstream_root_fragments(&upstream_table_ids)
1749            .await?;
1750
1751        if snapshot_backfill_info.is_some() {
1752            match stream_job {
1753                StreamingJob::MaterializedView(_)
1754                | StreamingJob::Sink(_)
1755                | StreamingJob::Index(_, _) => {}
1756                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1757                    return Err(
1758                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1759                    );
1760                }
1761            }
1762        }
1763
1764        let upstream_actors = upstream_root_fragments
1765            .values()
1766            .map(|(fragment, _)| {
1767                (
1768                    fragment.fragment_id,
1769                    fragment.actors.keys().copied().collect(),
1770                )
1771            })
1772            .collect();
1773
1774        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1775            fragment_graph,
1776            FragmentGraphUpstreamContext {
1777                upstream_root_fragments,
1778                upstream_actor_location: existing_actor_location,
1779            },
1780            (&stream_job).into(),
1781        )?;
1782        let resource_group = if let Some(group) = resource_type.resource_group() {
1783            group
1784        } else {
1785            self.metadata_manager
1786                .get_database_resource_group(stream_job.database_id())
1787                .await?
1788        };
1789        let is_serverless_backfill = matches!(
1790            &resource_type,
1791            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1792        );
1793
1794        // 3. Build the actor graph.
1795        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1796
1797        let initial_parallelism = specified_backfill_parallelism.or(specified_parallelism);
1798        let parallelism = self.resolve_stream_parallelism(
1799            initial_parallelism,
1800            max_parallelism,
1801            &cluster_info,
1802            resource_group.clone(),
1803        )?;
1804
1805        let parallelism = self
1806            .env
1807            .system_params_reader()
1808            .await
1809            .adaptive_parallelism_strategy()
1810            .compute_target_parallelism(parallelism.get());
1811
1812        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1813        let actor_graph_builder = ActorGraphBuilder::new(
1814            id,
1815            resource_group,
1816            complete_graph,
1817            cluster_info,
1818            parallelism,
1819        )?;
1820
1821        let ActorGraphBuildResult {
1822            graph,
1823            downstream_fragment_relations,
1824            building_locations,
1825            upstream_fragment_downstreams,
1826            new_no_shuffle,
1827            replace_upstream,
1828            ..
1829        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1830        assert!(replace_upstream.is_empty());
1831
1832        // 4. Build the table fragments structure that will be persisted in the stream manager,
1833        // and the context that contains all information needed for building the
1834        // actors on the compute nodes.
1835
1836        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1837        // Otherwise, it defaults to FIXED based on deduction.
1838        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1839            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1840            _ => TableParallelism::Fixed(parallelism.get()),
1841        };
1842
1843        let stream_job_fragments = StreamJobFragments::new(
1844            id,
1845            graph,
1846            &building_locations.actor_locations,
1847            stream_ctx.clone(),
1848            table_parallelism,
1849            max_parallelism.get(),
1850        );
1851
1852        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1853            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1854        }
1855
1856        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1857            && let Ok(table_id) = sink.get_target_table()
1858        {
1859            let tables = self
1860                .metadata_manager
1861                .get_table_catalog_by_ids(&[*table_id])
1862                .await?;
1863            let target_table = tables
1864                .first()
1865                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1866            let sink_fragment = stream_job_fragments
1867                .sink_fragment()
1868                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1869            let mview_fragment_id = self
1870                .metadata_manager
1871                .catalog_controller
1872                .get_mview_fragment_by_id(table_id.as_job_id())
1873                .await?;
1874            let upstream_sink_info = build_upstream_sink_info(
1875                sink.id,
1876                sink.original_target_columns.clone(),
1877                sink_fragment.fragment_id as _,
1878                target_table,
1879                mview_fragment_id,
1880            )?;
1881            Some(upstream_sink_info)
1882        } else {
1883            None
1884        };
1885
1886        let mut cdc_table_snapshot_splits = None;
1887        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1888            && let Some((_, stream_cdc_scan)) =
1889                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1890        {
1891            {
1892                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1893                let splits = try_init_parallel_cdc_table_snapshot_splits(
1894                    table.id,
1895                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1896                    self.env.meta_store_ref(),
1897                    stream_cdc_scan.options.as_ref().unwrap(),
1898                    self.env.opts.cdc_table_split_init_insert_batch_size,
1899                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1900                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1901                )
1902                .await?;
1903                cdc_table_snapshot_splits = Some(splits);
1904            }
1905        }
1906
1907        let ctx = CreateStreamingJobContext {
1908            upstream_fragment_downstreams,
1909            new_no_shuffle,
1910            upstream_actors,
1911            building_locations,
1912            definition: stream_job.definition(),
1913            create_type: stream_job.create_type(),
1914            job_type: (&stream_job).into(),
1915            streaming_job: stream_job,
1916            new_upstream_sink,
1917            option: CreateStreamingJobOption {},
1918            snapshot_backfill_info,
1919            cross_db_snapshot_backfill_info,
1920            fragment_backfill_ordering,
1921            locality_fragment_state_table_mapping,
1922            cdc_table_snapshot_splits,
1923            is_serverless_backfill,
1924        };
1925
1926        Ok((
1927            ctx,
1928            StreamJobFragmentsToCreate {
1929                inner: stream_job_fragments,
1930                downstreams: downstream_fragment_relations,
1931            },
1932        ))
1933    }
1934
1935    /// `build_replace_table` builds a job replacement and returns the context and new job
1936    /// fragments.
1937    ///
1938    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1939    /// replacement is finished.
1940    pub(crate) async fn build_replace_job(
1941        &self,
1942        stream_ctx: StreamContext,
1943        stream_job: &StreamingJob,
1944        mut fragment_graph: StreamFragmentGraph,
1945        tmp_job_id: JobId,
1946        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1947    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1948        match &stream_job {
1949            StreamingJob::Table(..)
1950            | StreamingJob::Source(..)
1951            | StreamingJob::MaterializedView(..) => {}
1952            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1953                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1954            }
1955        }
1956
1957        let id = stream_job.id();
1958
1959        // check if performing drop table connector
1960        let mut drop_table_associated_source_id = None;
1961        if let StreamingJob::Table(None, _, _) = &stream_job {
1962            drop_table_associated_source_id = self
1963                .metadata_manager
1964                .get_table_associated_source_id(id.as_mv_table_id())
1965                .await?;
1966        }
1967
1968        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1969        let old_internal_table_ids = old_fragments.internal_table_ids();
1970
1971        // handle drop table's associated source
1972        let mut drop_table_connector_ctx = None;
1973        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1974            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1975            debug_assert!(old_internal_table_ids.len() == 1);
1976
1977            drop_table_connector_ctx = Some(DropTableConnectorContext {
1978                // we do not remove the original table catalog as it's still needed for the streaming job
1979                // just need to remove the ref to the state table
1980                to_change_streaming_job_id: id,
1981                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1982                to_remove_source_id,
1983            });
1984        } else if stream_job.is_materialized_view() {
1985            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1986            // but more robust.
1987            let old_fragments_upstreams = self
1988                .metadata_manager
1989                .catalog_controller
1990                .upstream_fragments(old_fragments.fragment_ids())
1991                .await?;
1992
1993            let old_state_graph =
1994                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1995            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1996            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1997                .context("incompatible altering on the streaming job states")?;
1998
1999            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2000            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2001        } else {
2002            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2003            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2004            let old_internal_tables = self
2005                .metadata_manager
2006                .get_table_catalog_by_ids(&old_internal_table_ids)
2007                .await?;
2008            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2009        }
2010
2011        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2012        // graph that contains all information needed for building the actor graph.
2013        let original_root_fragment = old_fragments
2014            .root_fragment()
2015            .expect("root fragment not found");
2016
2017        let job_type = StreamingJobType::from(stream_job);
2018
2019        // Extract the downstream fragments from the fragment graph.
2020        let (mut downstream_fragments, mut downstream_actor_location) =
2021            self.metadata_manager.get_downstream_fragments(id).await?;
2022
2023        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2024            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2025                .iter()
2026                .map(|sink| sink.original_fragment.fragment_id)
2027                .collect();
2028            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
2029                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2030                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2031                }) {
2032                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2033                    for actor_id in downstream_fragment.actors.keys() {
2034                        downstream_actor_location.remove(actor_id);
2035                    }
2036                    for (actor_id, status) in &sink.actor_status {
2037                        downstream_actor_location
2038                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
2039                    }
2040
2041                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
2042                    *nodes = sink.new_fragment.nodes.clone();
2043                }
2044            }
2045            assert!(remaining_fragment.is_empty());
2046        }
2047
2048        // build complete graph based on the table job type
2049        let complete_graph = match &job_type {
2050            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2051                CompleteStreamFragmentGraph::with_downstreams(
2052                    fragment_graph,
2053                    FragmentGraphDownstreamContext {
2054                        original_root_fragment_id: original_root_fragment.fragment_id,
2055                        downstream_fragments,
2056                        downstream_actor_location,
2057                    },
2058                    job_type,
2059                )?
2060            }
2061            StreamingJobType::Table(TableJobType::SharedCdcSource)
2062            | StreamingJobType::MaterializedView => {
2063                // CDC tables or materialized views can have upstream jobs as well.
2064                let (upstream_root_fragments, upstream_actor_location) = self
2065                    .metadata_manager
2066                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2067                    .await?;
2068
2069                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2070                    fragment_graph,
2071                    FragmentGraphUpstreamContext {
2072                        upstream_root_fragments,
2073                        upstream_actor_location,
2074                    },
2075                    FragmentGraphDownstreamContext {
2076                        original_root_fragment_id: original_root_fragment.fragment_id,
2077                        downstream_fragments,
2078                        downstream_actor_location,
2079                    },
2080                    job_type,
2081                )?
2082            }
2083            _ => unreachable!(),
2084        };
2085
2086        let resource_group = self
2087            .metadata_manager
2088            .get_existing_job_resource_group(id)
2089            .await?;
2090
2091        // 2. Build the actor graph.
2092        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2093
2094        // XXX: what is this parallelism?
2095        // Is it "assigned parallelism"?
2096        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2097            .expect("The number of actors in the original table fragment should be greater than 0");
2098
2099        let actor_graph_builder = ActorGraphBuilder::new(
2100            id,
2101            resource_group,
2102            complete_graph,
2103            cluster_info,
2104            parallelism,
2105        )?;
2106
2107        let ActorGraphBuildResult {
2108            graph,
2109            downstream_fragment_relations,
2110            building_locations,
2111            upstream_fragment_downstreams,
2112            mut replace_upstream,
2113            new_no_shuffle,
2114            ..
2115        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2116
2117        // general table & source does not have upstream job, so the dispatchers should be empty
2118        if matches!(
2119            job_type,
2120            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2121        ) {
2122            assert!(upstream_fragment_downstreams.is_empty());
2123        }
2124
2125        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2126        // the context that contains all information needed for building the actors on the compute
2127        // nodes.
2128        let stream_job_fragments = StreamJobFragments::new(
2129            tmp_job_id,
2130            graph,
2131            &building_locations.actor_locations,
2132            stream_ctx,
2133            old_fragments.assigned_parallelism,
2134            old_fragments.max_parallelism,
2135        );
2136
2137        if let Some(sinks) = &auto_refresh_schema_sinks {
2138            for sink in sinks {
2139                replace_upstream
2140                    .remove(&sink.new_fragment.fragment_id)
2141                    .expect("should exist");
2142            }
2143        }
2144
2145        // Note: no need to set `vnode_count` as it's already set by the frontend.
2146        // See `get_replace_table_plan`.
2147
2148        let ctx = ReplaceStreamJobContext {
2149            old_fragments,
2150            replace_upstream,
2151            new_no_shuffle,
2152            upstream_fragment_downstreams,
2153            building_locations,
2154            streaming_job: stream_job.clone(),
2155            tmp_id: tmp_job_id,
2156            drop_table_connector_ctx,
2157            auto_refresh_schema_sinks,
2158        };
2159
2160        Ok((
2161            ctx,
2162            StreamJobFragmentsToCreate {
2163                inner: stream_job_fragments,
2164                downstreams: downstream_fragment_relations,
2165            },
2166        ))
2167    }
2168
2169    async fn alter_name(
2170        &self,
2171        relation: alter_name_request::Object,
2172        new_name: &str,
2173    ) -> MetaResult<NotificationVersion> {
2174        let (obj_type, id) = match relation {
2175            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2176            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2177            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2178            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2179            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2180            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2181            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2182            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2183        };
2184        self.metadata_manager
2185            .catalog_controller
2186            .alter_name(obj_type, id, new_name)
2187            .await
2188    }
2189
2190    async fn alter_swap_rename(
2191        &self,
2192        object: alter_swap_rename_request::Object,
2193    ) -> MetaResult<NotificationVersion> {
2194        let (obj_type, src_id, dst_id) = match object {
2195            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2196            alter_swap_rename_request::Object::Table(objs) => {
2197                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2198                (ObjectType::Table, src_id, dst_id)
2199            }
2200            alter_swap_rename_request::Object::View(objs) => {
2201                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2202                (ObjectType::View, src_id, dst_id)
2203            }
2204            alter_swap_rename_request::Object::Source(objs) => {
2205                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2206                (ObjectType::Source, src_id, dst_id)
2207            }
2208            alter_swap_rename_request::Object::Sink(objs) => {
2209                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2210                (ObjectType::Sink, src_id, dst_id)
2211            }
2212            alter_swap_rename_request::Object::Subscription(objs) => {
2213                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2214                (ObjectType::Subscription, src_id, dst_id)
2215            }
2216        };
2217
2218        self.metadata_manager
2219            .catalog_controller
2220            .alter_swap_rename(obj_type, src_id, dst_id)
2221            .await
2222    }
2223
2224    async fn alter_owner(
2225        &self,
2226        object: Object,
2227        owner_id: UserId,
2228    ) -> MetaResult<NotificationVersion> {
2229        let (obj_type, id) = match object {
2230            Object::TableId(id) => (ObjectType::Table, id),
2231            Object::ViewId(id) => (ObjectType::View, id),
2232            Object::SourceId(id) => (ObjectType::Source, id),
2233            Object::SinkId(id) => (ObjectType::Sink, id),
2234            Object::SchemaId(id) => (ObjectType::Schema, id),
2235            Object::DatabaseId(id) => (ObjectType::Database, id),
2236            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2237            Object::ConnectionId(id) => (ObjectType::Connection, id),
2238        };
2239        self.metadata_manager
2240            .catalog_controller
2241            .alter_owner(obj_type, id.into(), owner_id as _)
2242            .await
2243    }
2244
2245    async fn alter_set_schema(
2246        &self,
2247        object: alter_set_schema_request::Object,
2248        new_schema_id: SchemaId,
2249    ) -> MetaResult<NotificationVersion> {
2250        let (obj_type, id) = match object {
2251            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2252            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2253            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2254            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2255            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2256            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2257            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2258        };
2259        self.metadata_manager
2260            .catalog_controller
2261            .alter_schema(obj_type, id.into(), new_schema_id as _)
2262            .await
2263    }
2264
2265    pub async fn wait(&self) -> MetaResult<()> {
2266        let timeout_ms = 30 * 60 * 1000;
2267        for _ in 0..timeout_ms {
2268            if self
2269                .metadata_manager
2270                .catalog_controller
2271                .list_background_creating_jobs(true, None)
2272                .await?
2273                .is_empty()
2274            {
2275                return Ok(());
2276            }
2277
2278            sleep(Duration::from_millis(1)).await;
2279        }
2280        Err(MetaError::cancelled(format!(
2281            "timeout after {timeout_ms}ms"
2282        )))
2283    }
2284
2285    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2286        self.metadata_manager
2287            .catalog_controller
2288            .comment_on(comment)
2289            .await
2290    }
2291
2292    async fn alter_streaming_job_config(
2293        &self,
2294        job_id: JobId,
2295        entries_to_add: HashMap<String, String>,
2296        keys_to_remove: Vec<String>,
2297    ) -> MetaResult<NotificationVersion> {
2298        self.metadata_manager
2299            .catalog_controller
2300            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2301            .await
2302    }
2303}
2304
2305fn report_create_object(
2306    job_id: JobId,
2307    event_name: &str,
2308    obj_type: PbTelemetryDatabaseObject,
2309    connector_name: Option<String>,
2310    attr_info: Option<jsonbb::Value>,
2311) {
2312    report_event(
2313        PbTelemetryEventStage::CreateStreamJob,
2314        event_name,
2315        job_id.as_raw_id() as _,
2316        connector_name,
2317        Some(obj_type),
2318        attr_info,
2319    );
2320}
2321
2322pub fn build_upstream_sink_info(
2323    sink_id: SinkId,
2324    original_target_columns: Vec<PbColumnCatalog>,
2325    sink_fragment_id: FragmentId,
2326    target_table: &PbTable,
2327    target_fragment_id: FragmentId,
2328) -> MetaResult<UpstreamSinkInfo> {
2329    let sink_columns = if !original_target_columns.is_empty() {
2330        original_target_columns.clone()
2331    } else {
2332        // This is due to the fact that the value did not exist in earlier versions,
2333        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2334        // Therefore the columns of the table at this point are `original_target_columns`.
2335        // This value of sink will be filled on the meta.
2336        target_table.columns.clone()
2337    };
2338
2339    let sink_output_fields = sink_columns
2340        .iter()
2341        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2342        .collect_vec();
2343    let output_indices = (0..sink_output_fields.len())
2344        .map(|i| i as u32)
2345        .collect_vec();
2346
2347    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2348        let sink_idx_by_col_id = sink_columns
2349            .iter()
2350            .enumerate()
2351            .map(|(idx, col)| {
2352                let column_id = col.column_desc.as_ref().unwrap().column_id;
2353                (column_id, idx as u32)
2354            })
2355            .collect::<HashMap<_, _>>();
2356        target_table
2357            .distribution_key
2358            .iter()
2359            .map(|dist_idx| {
2360                let column_id = target_table.columns[*dist_idx as usize]
2361                    .column_desc
2362                    .as_ref()
2363                    .unwrap()
2364                    .column_id;
2365                let sink_idx = sink_idx_by_col_id
2366                    .get(&column_id)
2367                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2368                Ok(*sink_idx)
2369            })
2370            .collect::<anyhow::Result<Vec<_>>>()?
2371    };
2372    let dist_key_indices =
2373        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2374    let downstream_fragment_id = target_fragment_id as _;
2375    let new_downstream_relation = DownstreamFragmentRelation {
2376        downstream_fragment_id,
2377        dispatcher_type: DispatcherType::Hash,
2378        dist_key_indices,
2379        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2380    };
2381    let current_target_columns = target_table.get_columns();
2382    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2383    Ok(UpstreamSinkInfo {
2384        sink_id,
2385        sink_fragment_id: sink_fragment_id as _,
2386        sink_output_fields,
2387        sink_original_target_columns: original_target_columns,
2388        project_exprs,
2389        new_sink_downstream: new_downstream_relation,
2390    })
2391}
2392
2393pub fn refill_upstream_sink_union_in_table(
2394    union_fragment_root: &mut PbStreamNode,
2395    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2396) {
2397    visit_stream_node_cont_mut(union_fragment_root, |node| {
2398        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2399            let init_upstreams = upstream_sink_infos
2400                .iter()
2401                .map(|info| PbUpstreamSinkInfo {
2402                    upstream_fragment_id: info.sink_fragment_id,
2403                    sink_output_schema: info.sink_output_fields.clone(),
2404                    project_exprs: info.project_exprs.clone(),
2405                })
2406                .collect();
2407            upstream_sink_union.init_upstreams = init_upstreams;
2408            false
2409        } else {
2410            true
2411        }
2412    });
2413}
2414
2415#[cfg(test)]
2416mod tests {
2417    use std::num::NonZeroUsize;
2418
2419    use super::*;
2420
2421    #[test]
2422    fn test_specified_parallelism_exceeds_available() {
2423        let result = DdlController::resolve_stream_parallelism_inner(
2424            Some(NonZeroUsize::new(100).unwrap()),
2425            NonZeroUsize::new(256).unwrap(),
2426            Some(NonZeroUsize::new(4).unwrap()),
2427            &DefaultParallelism::Full,
2428            "default",
2429        )
2430        .unwrap();
2431        assert_eq!(result.get(), 100);
2432    }
2433
2434    #[test]
2435    fn test_allows_default_parallelism_over_available() {
2436        let result = DdlController::resolve_stream_parallelism_inner(
2437            None,
2438            NonZeroUsize::new(256).unwrap(),
2439            Some(NonZeroUsize::new(4).unwrap()),
2440            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2441            "default",
2442        )
2443        .unwrap();
2444        assert_eq!(result.get(), 50);
2445    }
2446
2447    #[test]
2448    fn test_full_parallelism_capped_by_max() {
2449        let result = DdlController::resolve_stream_parallelism_inner(
2450            None,
2451            NonZeroUsize::new(6).unwrap(),
2452            Some(NonZeroUsize::new(10).unwrap()),
2453            &DefaultParallelism::Full,
2454            "default",
2455        )
2456        .unwrap();
2457        assert_eq!(result.get(), 6);
2458    }
2459
2460    #[test]
2461    fn test_no_available_slots_returns_error() {
2462        let result = DdlController::resolve_stream_parallelism_inner(
2463            None,
2464            NonZeroUsize::new(4).unwrap(),
2465            None,
2466            &DefaultParallelism::Full,
2467            "default",
2468        );
2469        assert!(matches!(
2470            result,
2471            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2472        ));
2473    }
2474
2475    #[test]
2476    fn test_specified_over_max_returns_error() {
2477        let result = DdlController::resolve_stream_parallelism_inner(
2478            Some(NonZeroUsize::new(8).unwrap()),
2479            NonZeroUsize::new(4).unwrap(),
2480            Some(NonZeroUsize::new(10).unwrap()),
2481            &DefaultParallelism::Full,
2482            "default",
2483        );
2484        assert!(matches!(
2485            result,
2486            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2487        ));
2488    }
2489}