risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48};
49use risingwave_pb::catalog::{
50    Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
51    Subscription, Table, View,
52};
53use risingwave_pb::common::PbActorLocation;
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request,
58};
59use risingwave_pb::meta::table_fragments::PbActorStatus;
60use risingwave_pb::stream_plan::stream_node::NodeBody;
61use risingwave_pb::stream_plan::{
62    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
63    StreamFragmentGraph as StreamFragmentGraphProto,
64};
65use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::{
79    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
80    NotificationVersion, StreamingJob, StreamingJobType,
81};
82use crate::model::{
83    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
84    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
85    TableParallelism,
86};
87use crate::stream::cdc::{
88    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
89};
90use crate::stream::{
91    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
92    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
93    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
94    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
95    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
96    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
97};
98use crate::telemetry::report_event;
99use crate::{MetaError, MetaResult};
100
101#[derive(PartialEq)]
102pub enum DropMode {
103    Restrict,
104    Cascade,
105}
106
107impl DropMode {
108    pub fn from_request_setting(cascade: bool) -> DropMode {
109        if cascade {
110            DropMode::Cascade
111        } else {
112            DropMode::Restrict
113        }
114    }
115}
116
117#[derive(strum::AsRefStr)]
118pub enum StreamingJobId {
119    MaterializedView(TableId),
120    Sink(SinkId),
121    Table(Option<SourceId>, TableId),
122    Index(IndexId),
123}
124
125impl std::fmt::Display for StreamingJobId {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(f, "{}", self.as_ref())?;
128        write!(f, "({})", self.id())
129    }
130}
131
132impl StreamingJobId {
133    fn id(&self) -> JobId {
134        match self {
135            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
136            StreamingJobId::Index(id) => id.as_job_id(),
137            StreamingJobId::Sink(id) => id.as_job_id(),
138        }
139    }
140}
141
142/// It’s used to describe the information of the job that needs to be replaced
143/// and it will be used during replacing table and creating sink into table operations.
144pub struct ReplaceStreamJobInfo {
145    pub streaming_job: StreamingJob,
146    pub fragment_graph: StreamFragmentGraphProto,
147}
148
149#[derive(Display)]
150pub enum DdlCommand {
151    CreateDatabase(Database),
152    DropDatabase(DatabaseId),
153    CreateSchema(Schema),
154    DropSchema(SchemaId, DropMode),
155    CreateNonSharedSource(Source),
156    DropSource(SourceId, DropMode),
157    CreateFunction(Function),
158    DropFunction(FunctionId, DropMode),
159    CreateView(View, HashSet<ObjectId>),
160    DropView(ViewId, DropMode),
161    CreateStreamingJob {
162        stream_job: StreamingJob,
163        fragment_graph: StreamFragmentGraphProto,
164        dependencies: HashSet<ObjectId>,
165        specific_resource_group: Option<String>, // specific resource group
166        if_not_exists: bool,
167    },
168    DropStreamingJob {
169        job_id: StreamingJobId,
170        drop_mode: DropMode,
171    },
172    AlterName(alter_name_request::Object, String),
173    AlterSwapRename(alter_swap_rename_request::Object),
174    ReplaceStreamJob(ReplaceStreamJobInfo),
175    AlterNonSharedSource(Source),
176    AlterObjectOwner(Object, UserId),
177    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
178    CreateConnection(Connection),
179    DropConnection(ConnectionId, DropMode),
180    CreateSecret(Secret),
181    AlterSecret(Secret),
182    DropSecret(SecretId),
183    CommentOn(Comment),
184    CreateSubscription(Subscription),
185    DropSubscription(SubscriptionId, DropMode),
186    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
187    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
188}
189
190impl DdlCommand {
191    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
192    fn object(&self) -> Either<String, ObjectId> {
193        use Either::*;
194        match self {
195            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
196            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
197            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
198            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
199            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
200            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
201            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
202            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
203            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
204            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
205            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
206            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
207            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
208            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
209            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
210            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
211            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
212            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
213            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
214            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
215            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
216            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
217            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
218            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
219            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
220            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
221            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
222            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
223        }
224    }
225
226    fn allow_in_recovery(&self) -> bool {
227        match self {
228            DdlCommand::DropDatabase(_)
229            | DdlCommand::DropSchema(_, _)
230            | DdlCommand::DropSource(_, _)
231            | DdlCommand::DropFunction(_, _)
232            | DdlCommand::DropView(_, _)
233            | DdlCommand::DropStreamingJob { .. }
234            | DdlCommand::DropConnection(_, _)
235            | DdlCommand::DropSecret(_)
236            | DdlCommand::DropSubscription(_, _)
237            | DdlCommand::AlterName(_, _)
238            | DdlCommand::AlterObjectOwner(_, _)
239            | DdlCommand::AlterSetSchema(_, _)
240            | DdlCommand::CreateDatabase(_)
241            | DdlCommand::CreateSchema(_)
242            | DdlCommand::CreateFunction(_)
243            | DdlCommand::CreateView(_, _)
244            | DdlCommand::CreateConnection(_)
245            | DdlCommand::CommentOn(_)
246            | DdlCommand::CreateSecret(_)
247            | DdlCommand::AlterSecret(_)
248            | DdlCommand::AlterSwapRename(_)
249            | DdlCommand::AlterDatabaseParam(_, _)
250            | DdlCommand::AlterStreamingJobConfig(_, _, _) => true,
251            DdlCommand::CreateStreamingJob { .. }
252            | DdlCommand::CreateNonSharedSource(_)
253            | DdlCommand::ReplaceStreamJob(_)
254            | DdlCommand::AlterNonSharedSource(_)
255            | DdlCommand::CreateSubscription(_) => false,
256        }
257    }
258}
259
260#[derive(Clone)]
261pub struct DdlController {
262    pub(crate) env: MetaSrvEnv,
263
264    pub(crate) metadata_manager: MetadataManager,
265    pub(crate) stream_manager: GlobalStreamManagerRef,
266    pub(crate) source_manager: SourceManagerRef,
267    barrier_manager: BarrierManagerRef,
268
269    // The semaphore is used to limit the number of concurrent streaming job creation.
270    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
271
272    /// Sequence number for DDL commands, used for observability and debugging.
273    seq: Arc<AtomicU64>,
274}
275
276#[derive(Clone)]
277pub struct CreatingStreamingJobPermit {
278    pub(crate) semaphore: Arc<Semaphore>,
279}
280
281impl CreatingStreamingJobPermit {
282    async fn new(env: &MetaSrvEnv) -> Self {
283        let mut permits = env
284            .system_params_reader()
285            .await
286            .max_concurrent_creating_streaming_jobs() as usize;
287        if permits == 0 {
288            // if the system parameter is set to zero, use the max permitted value.
289            permits = Semaphore::MAX_PERMITS;
290        }
291        let semaphore = Arc::new(Semaphore::new(permits));
292
293        let (local_notification_tx, mut local_notification_rx) =
294            tokio::sync::mpsc::unbounded_channel();
295        env.notification_manager()
296            .insert_local_sender(local_notification_tx);
297        let semaphore_clone = semaphore.clone();
298        tokio::spawn(async move {
299            while let Some(notification) = local_notification_rx.recv().await {
300                let LocalNotification::SystemParamsChange(p) = &notification else {
301                    continue;
302                };
303                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
304                if new_permits == 0 {
305                    new_permits = Semaphore::MAX_PERMITS;
306                }
307                match permits.cmp(&new_permits) {
308                    Ordering::Less => {
309                        semaphore_clone.add_permits(new_permits - permits);
310                    }
311                    Ordering::Equal => continue,
312                    Ordering::Greater => {
313                        let to_release = permits - new_permits;
314                        let reduced = semaphore_clone.forget_permits(to_release);
315                        // TODO: implement dynamic semaphore with limits by ourself.
316                        if reduced != to_release {
317                            tracing::warn!(
318                                "no enough permits to release, expected {}, but reduced {}",
319                                to_release,
320                                reduced
321                            );
322                        }
323                    }
324                }
325                tracing::info!(
326                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
327                    permits,
328                    new_permits
329                );
330                permits = new_permits;
331            }
332        });
333
334        Self { semaphore }
335    }
336}
337
338impl DdlController {
339    pub async fn new(
340        env: MetaSrvEnv,
341        metadata_manager: MetadataManager,
342        stream_manager: GlobalStreamManagerRef,
343        source_manager: SourceManagerRef,
344        barrier_manager: BarrierManagerRef,
345    ) -> Self {
346        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
347        Self {
348            env,
349            metadata_manager,
350            stream_manager,
351            source_manager,
352            barrier_manager,
353            creating_streaming_job_permits,
354            seq: Arc::new(AtomicU64::new(0)),
355        }
356    }
357
358    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
359    pub fn next_seq(&self) -> u64 {
360        // This is a simple atomic increment operation.
361        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
362    }
363
364    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
365    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
366    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
367    /// would be a huge hassle and pain if we don't spawn here.
368    ///
369    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
370    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
371        if !command.allow_in_recovery() {
372            self.barrier_manager.check_status_running()?;
373        }
374
375        let await_tree_key = format!("DDL Command {}", self.next_seq());
376        let await_tree_span = await_tree::span!("{command}({})", command.object());
377
378        let ctrl = self.clone();
379        let fut = async move {
380            match command {
381                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
382                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
383                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
384                DdlCommand::DropSchema(schema_id, drop_mode) => {
385                    ctrl.drop_schema(schema_id, drop_mode).await
386                }
387                DdlCommand::CreateNonSharedSource(source) => {
388                    ctrl.create_non_shared_source(source).await
389                }
390                DdlCommand::DropSource(source_id, drop_mode) => {
391                    ctrl.drop_source(source_id, drop_mode).await
392                }
393                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
394                DdlCommand::DropFunction(function_id, drop_mode) => {
395                    ctrl.drop_function(function_id, drop_mode).await
396                }
397                DdlCommand::CreateView(view, dependencies) => {
398                    ctrl.create_view(view, dependencies).await
399                }
400                DdlCommand::DropView(view_id, drop_mode) => {
401                    ctrl.drop_view(view_id, drop_mode).await
402                }
403                DdlCommand::CreateStreamingJob {
404                    stream_job,
405                    fragment_graph,
406                    dependencies,
407                    specific_resource_group,
408                    if_not_exists,
409                } => {
410                    ctrl.create_streaming_job(
411                        stream_job,
412                        fragment_graph,
413                        dependencies,
414                        specific_resource_group,
415                        if_not_exists,
416                    )
417                    .await
418                }
419                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
420                    ctrl.drop_streaming_job(job_id, drop_mode).await
421                }
422                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
423                    streaming_job,
424                    fragment_graph,
425                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
426                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
427                DdlCommand::AlterObjectOwner(object, owner_id) => {
428                    ctrl.alter_owner(object, owner_id).await
429                }
430                DdlCommand::AlterSetSchema(object, new_schema_id) => {
431                    ctrl.alter_set_schema(object, new_schema_id).await
432                }
433                DdlCommand::CreateConnection(connection) => {
434                    ctrl.create_connection(connection).await
435                }
436                DdlCommand::DropConnection(connection_id, drop_mode) => {
437                    ctrl.drop_connection(connection_id, drop_mode).await
438                }
439                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
440                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
441                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
442                DdlCommand::AlterNonSharedSource(source) => {
443                    ctrl.alter_non_shared_source(source).await
444                }
445                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
446                DdlCommand::CreateSubscription(subscription) => {
447                    ctrl.create_subscription(subscription).await
448                }
449                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
450                    ctrl.drop_subscription(subscription_id, drop_mode).await
451                }
452                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
453                DdlCommand::AlterDatabaseParam(database_id, param) => {
454                    ctrl.alter_database_param(database_id, param).await
455                }
456                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
457                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
458                        .await
459                }
460            }
461        }
462        .in_current_span();
463        let fut = (self.env.await_tree_reg())
464            .register(await_tree_key, await_tree_span)
465            .instrument(Box::pin(fut));
466        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
467        Ok(Some(WaitVersion {
468            catalog_version: notification_version,
469            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
470        }))
471    }
472
473    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
474        self.barrier_manager.get_ddl_progress().await
475    }
476
477    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
478        let (version, updated_db) = self
479            .metadata_manager
480            .catalog_controller
481            .create_database(database)
482            .await?;
483        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
484        self.barrier_manager
485            .update_database_barrier(
486                updated_db.database_id,
487                updated_db.barrier_interval_ms.map(|v| v as u32),
488                updated_db.checkpoint_frequency.map(|v| v as u64),
489            )
490            .await?;
491        Ok(version)
492    }
493
494    #[tracing::instrument(skip(self), level = "debug")]
495    pub async fn reschedule_streaming_job(
496        &self,
497        job_id: JobId,
498        target: ReschedulePolicy,
499        mut deferred: bool,
500    ) -> MetaResult<()> {
501        tracing::info!("altering parallelism for job {}", job_id);
502        if self.barrier_manager.check_status_running().is_err() {
503            tracing::info!(
504                "alter parallelism is set to deferred mode because the system is in recovery state"
505            );
506            deferred = true;
507        }
508
509        self.stream_manager
510            .reschedule_streaming_job(job_id, target, deferred)
511            .await
512    }
513
514    pub async fn reschedule_cdc_table_backfill(
515        &self,
516        job_id: JobId,
517        target: ReschedulePolicy,
518    ) -> MetaResult<()> {
519        tracing::info!("alter CDC table backfill parallelism");
520        if self.barrier_manager.check_status_running().is_err() {
521            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
522        }
523        self.stream_manager
524            .reschedule_cdc_table_backfill(job_id, target)
525            .await
526    }
527
528    pub async fn reschedule_fragments(
529        &self,
530        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
531    ) -> MetaResult<()> {
532        tracing::info!(
533            "altering parallelism for fragments {:?}",
534            fragment_targets.keys()
535        );
536        let fragment_targets = fragment_targets
537            .into_iter()
538            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
539            .collect();
540
541        self.stream_manager
542            .reschedule_fragments(fragment_targets)
543            .await
544    }
545
546    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
547        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
548            .await
549    }
550
551    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
552        self.metadata_manager
553            .catalog_controller
554            .create_schema(schema)
555            .await
556    }
557
558    async fn drop_schema(
559        &self,
560        schema_id: SchemaId,
561        drop_mode: DropMode,
562    ) -> MetaResult<NotificationVersion> {
563        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
564            .await
565    }
566
567    /// Shared source is handled in [`Self::create_streaming_job`]
568    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
569        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
570            .await
571            .context("failed to create source worker")?;
572
573        let (source_id, version) = self
574            .metadata_manager
575            .catalog_controller
576            .create_source(source)
577            .await?;
578        self.source_manager
579            .register_source_with_handle(source_id, handle)
580            .await;
581        Ok(version)
582    }
583
584    async fn drop_source(
585        &self,
586        source_id: SourceId,
587        drop_mode: DropMode,
588    ) -> MetaResult<NotificationVersion> {
589        self.drop_object(ObjectType::Source, source_id, drop_mode)
590            .await
591    }
592
593    /// This replaces the source in the catalog.
594    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
595    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
596        self.metadata_manager
597            .catalog_controller
598            .alter_non_shared_source(source)
599            .await
600    }
601
602    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
603        self.metadata_manager
604            .catalog_controller
605            .create_function(function)
606            .await
607    }
608
609    async fn drop_function(
610        &self,
611        function_id: FunctionId,
612        drop_mode: DropMode,
613    ) -> MetaResult<NotificationVersion> {
614        self.drop_object(ObjectType::Function, function_id, drop_mode)
615            .await
616    }
617
618    async fn create_view(
619        &self,
620        view: View,
621        dependencies: HashSet<ObjectId>,
622    ) -> MetaResult<NotificationVersion> {
623        self.metadata_manager
624            .catalog_controller
625            .create_view(view, dependencies)
626            .await
627    }
628
629    async fn drop_view(
630        &self,
631        view_id: ViewId,
632        drop_mode: DropMode,
633    ) -> MetaResult<NotificationVersion> {
634        self.drop_object(ObjectType::View, view_id, drop_mode).await
635    }
636
637    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
638        validate_connection(&connection).await?;
639        self.metadata_manager
640            .catalog_controller
641            .create_connection(connection)
642            .await
643    }
644
645    async fn drop_connection(
646        &self,
647        connection_id: ConnectionId,
648        drop_mode: DropMode,
649    ) -> MetaResult<NotificationVersion> {
650        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
651            .await
652    }
653
654    async fn alter_database_param(
655        &self,
656        database_id: DatabaseId,
657        param: AlterDatabaseParam,
658    ) -> MetaResult<NotificationVersion> {
659        let (version, updated_db) = self
660            .metadata_manager
661            .catalog_controller
662            .alter_database_param(database_id, param)
663            .await?;
664        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
665        self.barrier_manager
666            .update_database_barrier(
667                database_id,
668                updated_db.barrier_interval_ms.map(|v| v as u32),
669                updated_db.checkpoint_frequency.map(|v| v as u64),
670            )
671            .await?;
672        Ok(version)
673    }
674
675    // The 'secret' part of the request we receive from the frontend is in plaintext;
676    // here, we need to encrypt it before storing it in the catalog.
677    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
678        let secret_store_private_key = self
679            .env
680            .opts
681            .secret_store_private_key
682            .clone()
683            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
684
685        let encrypted_payload = SecretEncryption::encrypt(
686            secret_store_private_key.as_slice(),
687            secret.get_value().as_slice(),
688        )
689        .context(format!("failed to encrypt secret {}", secret.name))?;
690        Ok(encrypted_payload
691            .serialize()
692            .context(format!("failed to serialize secret {}", secret.name))?)
693    }
694
695    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
696        // The 'secret' part of the request we receive from the frontend is in plaintext;
697        // here, we need to encrypt it before storing it in the catalog.
698        let secret_plain_payload = secret.value.clone();
699        let encrypted_payload = self.get_encrypted_payload(&secret)?;
700        secret.value = encrypted_payload;
701
702        self.metadata_manager
703            .catalog_controller
704            .create_secret(secret, secret_plain_payload)
705            .await
706    }
707
708    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
709        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
710            .await
711    }
712
713    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
714        let secret_plain_payload = secret.value.clone();
715        let encrypted_payload = self.get_encrypted_payload(&secret)?;
716        secret.value = encrypted_payload;
717        self.metadata_manager
718            .catalog_controller
719            .alter_secret(secret, secret_plain_payload)
720            .await
721    }
722
723    async fn create_subscription(
724        &self,
725        mut subscription: Subscription,
726    ) -> MetaResult<NotificationVersion> {
727        tracing::debug!("create subscription");
728        let _permit = self
729            .creating_streaming_job_permits
730            .semaphore
731            .acquire()
732            .await
733            .unwrap();
734        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
735        self.metadata_manager
736            .catalog_controller
737            .create_subscription_catalog(&mut subscription)
738            .await?;
739        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
740            tracing::debug!(error = %err.as_report(), "failed to create subscription");
741            let _ = self
742                .metadata_manager
743                .catalog_controller
744                .try_abort_creating_subscription(subscription.id)
745                .await
746                .inspect_err(|e| {
747                    tracing::error!(
748                        error = %e.as_report(),
749                        "failed to abort create subscription after failure"
750                    );
751                });
752            return Err(err);
753        }
754
755        let version = self
756            .metadata_manager
757            .catalog_controller
758            .notify_create_subscription(subscription.id)
759            .await?;
760        tracing::debug!("finish create subscription");
761        Ok(version)
762    }
763
764    async fn drop_subscription(
765        &self,
766        subscription_id: SubscriptionId,
767        drop_mode: DropMode,
768    ) -> MetaResult<NotificationVersion> {
769        tracing::debug!("preparing drop subscription");
770        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
771        let subscription = self
772            .metadata_manager
773            .catalog_controller
774            .get_subscription_by_id(subscription_id)
775            .await?;
776        let table_id = subscription.dependent_table_id;
777        let database_id = subscription.database_id;
778        let (_, version) = self
779            .metadata_manager
780            .catalog_controller
781            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
782            .await?;
783        self.stream_manager
784            .drop_subscription(database_id, subscription_id, table_id)
785            .await;
786        tracing::debug!("finish drop subscription");
787        Ok(version)
788    }
789
790    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
791    #[await_tree::instrument]
792    pub(crate) async fn validate_cdc_table(
793        &self,
794        table: &Table,
795        table_fragments: &StreamJobFragments,
796    ) -> MetaResult<()> {
797        let stream_scan_fragment = table_fragments
798            .fragments
799            .values()
800            .filter(|f| {
801                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
802                    || f.fragment_type_mask
803                        .contains(FragmentTypeFlag::StreamCdcScan)
804            })
805            .exactly_one()
806            .ok()
807            .with_context(|| {
808                format!(
809                    "expect exactly one stream scan fragment, got: {:?}",
810                    table_fragments.fragments
811                )
812            })?;
813        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
814            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
815                if let Some(o) = node.options
816                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
817                {
818                    // Use parallel CDC backfill.
819                } else {
820                    assert_eq!(
821                        stream_scan_fragment.actors.len(),
822                        1,
823                        "Stream scan fragment should have only one actor"
824                    );
825                }
826            }
827        }
828        let mut found_cdc_scan = false;
829        match &stream_scan_fragment.nodes.node_body {
830            Some(NodeBody::StreamCdcScan(_)) => {
831                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
832                if self
833                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
834                    .await?
835                {
836                    found_cdc_scan = true;
837                }
838            }
839            // When there's generated columns, the cdc scan node is wrapped in a project node
840            Some(NodeBody::Project(_)) => {
841                for input in &stream_scan_fragment.nodes.input {
842                    assert_parallelism(stream_scan_fragment, &input.node_body);
843                    if self
844                        .validate_cdc_table_inner(&input.node_body, table.id)
845                        .await?
846                    {
847                        found_cdc_scan = true;
848                    }
849                }
850            }
851            _ => {
852                bail!("Unexpected node body for stream cdc scan");
853            }
854        };
855        if !found_cdc_scan {
856            bail!("No stream cdc scan node found in stream scan fragment");
857        }
858        Ok(())
859    }
860
861    async fn validate_cdc_table_inner(
862        &self,
863        node_body: &Option<NodeBody>,
864        table_id: TableId,
865    ) -> MetaResult<bool> {
866        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
867            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
868        {
869            let options_with_secret = WithOptionsSecResolved::new(
870                cdc_table_desc.connect_properties.clone(),
871                cdc_table_desc.secret_refs.clone(),
872            );
873
874            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
875            props.init_from_pb_cdc_table_desc(cdc_table_desc);
876
877            // Try creating a split enumerator to validate
878            let _enumerator = props
879                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
880                .await?;
881
882            tracing::debug!(?table_id, "validate cdc table success");
883            Ok(true)
884        } else {
885            Ok(false)
886        }
887    }
888
889    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
890        let migrated = self
891            .metadata_manager
892            .catalog_controller
893            .has_table_been_migrated(table_id)
894            .await?;
895        if !migrated {
896            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
897        } else {
898            Ok(())
899        }
900    }
901
902    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
903    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
904    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
905    pub async fn create_streaming_job(
906        &self,
907        mut streaming_job: StreamingJob,
908        fragment_graph: StreamFragmentGraphProto,
909        dependencies: HashSet<ObjectId>,
910        specific_resource_group: Option<String>,
911        if_not_exists: bool,
912    ) -> MetaResult<NotificationVersion> {
913        if let StreamingJob::Sink(sink) = &streaming_job
914            && let Some(target_table) = sink.target_table
915        {
916            self.validate_table_for_sink(target_table).await?;
917        }
918        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
919        let check_ret = self
920            .metadata_manager
921            .catalog_controller
922            .create_job_catalog(
923                &mut streaming_job,
924                &ctx,
925                &fragment_graph.parallelism,
926                fragment_graph.max_parallelism as _,
927                dependencies,
928                specific_resource_group.clone(),
929            )
930            .await;
931        if let Err(meta_err) = check_ret {
932            if !if_not_exists {
933                return Err(meta_err);
934            }
935            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
936                if streaming_job.create_type() == CreateType::Foreground {
937                    let database_id = streaming_job.database_id();
938                    self.metadata_manager
939                        .wait_streaming_job_finished(database_id, *job_id)
940                        .await
941                } else {
942                    Ok(IGNORED_NOTIFICATION_VERSION)
943                }
944            } else {
945                Err(meta_err)
946            };
947        }
948        let job_id = streaming_job.id();
949        tracing::debug!(
950            id = %job_id,
951            definition = streaming_job.definition(),
952            create_type = streaming_job.create_type().as_str_name(),
953            job_type = ?streaming_job.job_type(),
954            "starting streaming job",
955        );
956        // TODO: acquire permits for recovered background DDLs.
957        let permit = self
958            .creating_streaming_job_permits
959            .semaphore
960            .clone()
961            .acquire_owned()
962            .instrument_await("acquire_creating_streaming_job_permit")
963            .await
964            .unwrap();
965        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
966
967        let name = streaming_job.name();
968        let definition = streaming_job.definition();
969        let source_id = match &streaming_job {
970            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
971            _ => None,
972        };
973
974        // create streaming job.
975        match self
976            .create_streaming_job_inner(
977                ctx,
978                streaming_job,
979                fragment_graph,
980                specific_resource_group,
981                permit,
982            )
983            .await
984        {
985            Ok(version) => Ok(version),
986            Err(err) => {
987                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
988                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
989                    id: job_id,
990                    name,
991                    definition,
992                    error: err.as_report().to_string(),
993                };
994                self.env.event_log_manager_ref().add_event_logs(vec![
995                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
996                ]);
997                let (aborted, _) = self
998                    .metadata_manager
999                    .catalog_controller
1000                    .try_abort_creating_streaming_job(job_id, false)
1001                    .await?;
1002                if aborted {
1003                    tracing::warn!(id = %job_id, "aborted streaming job");
1004                    // FIXME: might also need other cleanup here
1005                    if let Some(source_id) = source_id {
1006                        self.source_manager
1007                            .apply_source_change(SourceChange::DropSource {
1008                                dropped_source_ids: vec![source_id],
1009                            })
1010                            .await;
1011                    }
1012                }
1013                Err(err)
1014            }
1015        }
1016    }
1017
1018    #[await_tree::instrument(boxed)]
1019    async fn create_streaming_job_inner(
1020        &self,
1021        ctx: StreamContext,
1022        mut streaming_job: StreamingJob,
1023        fragment_graph: StreamFragmentGraphProto,
1024        specific_resource_group: Option<String>,
1025        permit: OwnedSemaphorePermit,
1026    ) -> MetaResult<NotificationVersion> {
1027        let mut fragment_graph =
1028            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1029        streaming_job.set_info_from_graph(&fragment_graph);
1030
1031        // create internal table catalogs and refill table id.
1032        let incomplete_internal_tables = fragment_graph
1033            .incomplete_internal_tables()
1034            .into_values()
1035            .collect_vec();
1036        let table_id_map = self
1037            .metadata_manager
1038            .catalog_controller
1039            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1040            .await?;
1041        fragment_graph.refill_internal_table_ids(table_id_map);
1042
1043        // create fragment and actor catalogs.
1044        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1045        let (ctx, stream_job_fragments) = self
1046            .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1047            .await?;
1048
1049        let streaming_job = &ctx.streaming_job;
1050
1051        match streaming_job {
1052            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1053                self.validate_cdc_table(table, &stream_job_fragments)
1054                    .await?;
1055            }
1056            StreamingJob::Table(Some(source), ..) => {
1057                // Register the source on the connector node.
1058                self.source_manager.register_source(source).await?;
1059                let connector_name = source
1060                    .get_with_properties()
1061                    .get(UPSTREAM_SOURCE_KEY)
1062                    .cloned();
1063                let attr = source.info.as_ref().map(|source_info| {
1064                    jsonbb::json!({
1065                            "format": source_info.format().as_str_name(),
1066                            "encode": source_info.row_encode().as_str_name(),
1067                    })
1068                });
1069                report_create_object(
1070                    streaming_job.id(),
1071                    "source",
1072                    PbTelemetryDatabaseObject::Source,
1073                    connector_name,
1074                    attr,
1075                );
1076            }
1077            StreamingJob::Sink(sink) => {
1078                if sink.auto_refresh_schema_from_table.is_some() {
1079                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1080                }
1081                // Validate the sink on the connector node.
1082                validate_sink(sink).await?;
1083                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1084                let attr = sink.format_desc.as_ref().map(|sink_info| {
1085                    jsonbb::json!({
1086                        "format": sink_info.format().as_str_name(),
1087                        "encode": sink_info.encode().as_str_name(),
1088                    })
1089                });
1090                report_create_object(
1091                    streaming_job.id(),
1092                    "sink",
1093                    PbTelemetryDatabaseObject::Sink,
1094                    connector_name,
1095                    attr,
1096                );
1097            }
1098            StreamingJob::Source(source) => {
1099                // Register the source on the connector node.
1100                self.source_manager.register_source(source).await?;
1101                let connector_name = source
1102                    .get_with_properties()
1103                    .get(UPSTREAM_SOURCE_KEY)
1104                    .cloned();
1105                let attr = source.info.as_ref().map(|source_info| {
1106                    jsonbb::json!({
1107                            "format": source_info.format().as_str_name(),
1108                            "encode": source_info.row_encode().as_str_name(),
1109                    })
1110                });
1111                report_create_object(
1112                    streaming_job.id(),
1113                    "source",
1114                    PbTelemetryDatabaseObject::Source,
1115                    connector_name,
1116                    attr,
1117                );
1118            }
1119            _ => {}
1120        }
1121
1122        self.metadata_manager
1123            .catalog_controller
1124            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1125            .await?;
1126
1127        // create streaming jobs.
1128        let version = self
1129            .stream_manager
1130            .create_streaming_job(stream_job_fragments, ctx, permit)
1131            .await?;
1132
1133        Ok(version)
1134    }
1135
1136    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1137    pub async fn drop_object(
1138        &self,
1139        object_type: ObjectType,
1140        object_id: impl Into<ObjectId>,
1141        drop_mode: DropMode,
1142    ) -> MetaResult<NotificationVersion> {
1143        let object_id = object_id.into();
1144        let (release_ctx, version) = self
1145            .metadata_manager
1146            .catalog_controller
1147            .drop_object(object_type, object_id, drop_mode)
1148            .await?;
1149
1150        if object_type == ObjectType::Source {
1151            self.env
1152                .notification_manager_ref()
1153                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1154        }
1155
1156        let ReleaseContext {
1157            database_id,
1158            removed_streaming_job_ids,
1159            removed_state_table_ids,
1160            removed_source_ids,
1161            removed_secret_ids: secret_ids,
1162            removed_source_fragments,
1163            removed_actors,
1164            removed_fragments,
1165            removed_sink_fragment_by_targets,
1166            removed_iceberg_table_sinks,
1167        } = release_ctx;
1168
1169        let _guard = self.source_manager.pause_tick().await;
1170        self.stream_manager
1171            .drop_streaming_jobs(
1172                database_id,
1173                removed_actors.iter().map(|id| *id as _).collect(),
1174                removed_streaming_job_ids,
1175                removed_state_table_ids,
1176                removed_fragments.iter().map(|id| *id as _).collect(),
1177                removed_sink_fragment_by_targets
1178                    .into_iter()
1179                    .map(|(target, sinks)| {
1180                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1181                    })
1182                    .collect(),
1183            )
1184            .await;
1185
1186        // clean up sources after dropping streaming jobs.
1187        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1188        self.source_manager
1189            .apply_source_change(SourceChange::DropSource {
1190                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1191            })
1192            .await;
1193
1194        // unregister fragments and actors from source manager.
1195        // FIXME: need also unregister source backfill fragments.
1196        let dropped_source_fragments = removed_source_fragments;
1197        self.source_manager
1198            .apply_source_change(SourceChange::DropMv {
1199                dropped_source_fragments,
1200            })
1201            .await;
1202
1203        // clean up iceberg table sinks
1204        for sink in removed_iceberg_table_sinks {
1205            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1206                .expect("Iceberg sink should be valid");
1207            let iceberg_sink =
1208                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1209            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1210                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1211                tracing::info!(
1212                    "dropping iceberg table {} for dropped sink",
1213                    table_identifier
1214                );
1215
1216                let _ = iceberg_catalog
1217                    .drop_table(&table_identifier)
1218                    .await
1219                    .inspect_err(|err| {
1220                        tracing::error!(
1221                            "failed to drop iceberg table {} during cleanup: {}",
1222                            table_identifier,
1223                            err.as_report()
1224                        );
1225                    });
1226            }
1227        }
1228
1229        // remove secrets.
1230        for secret in secret_ids {
1231            LocalSecretManager::global().remove_secret(secret);
1232        }
1233        Ok(version)
1234    }
1235
1236    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1237    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1238    pub async fn replace_job(
1239        &self,
1240        mut streaming_job: StreamingJob,
1241        fragment_graph: StreamFragmentGraphProto,
1242    ) -> MetaResult<NotificationVersion> {
1243        match &streaming_job {
1244            StreamingJob::Table(..)
1245            | StreamingJob::Source(..)
1246            | StreamingJob::MaterializedView(..) => {}
1247            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1248                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1249            }
1250        }
1251
1252        let job_id = streaming_job.id();
1253
1254        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1255        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1256
1257        // Ensure the max parallelism unchanged before replacing table.
1258        let original_max_parallelism = self
1259            .metadata_manager
1260            .get_job_max_parallelism(streaming_job.id())
1261            .await?;
1262        let fragment_graph = PbStreamFragmentGraph {
1263            max_parallelism: original_max_parallelism as _,
1264            ..fragment_graph
1265        };
1266
1267        // 1. build fragment graph.
1268        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1269        streaming_job.set_info_from_graph(&fragment_graph);
1270
1271        // make it immutable
1272        let streaming_job = streaming_job;
1273
1274        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1275            let auto_refresh_schema_sinks = self
1276                .metadata_manager
1277                .catalog_controller
1278                .get_sink_auto_refresh_schema_from(table.id)
1279                .await?;
1280            if !auto_refresh_schema_sinks.is_empty() {
1281                let original_table_columns = self
1282                    .metadata_manager
1283                    .catalog_controller
1284                    .get_table_columns(table.id)
1285                    .await?;
1286                // compare column id to find newly added columns
1287                let mut original_table_column_ids: HashSet<_> = original_table_columns
1288                    .iter()
1289                    .map(|col| col.column_id())
1290                    .collect();
1291                let newly_added_columns = table
1292                    .columns
1293                    .iter()
1294                    .filter(|col| {
1295                        !original_table_column_ids.remove(&ColumnId::new(
1296                            col.column_desc.as_ref().unwrap().column_id as _,
1297                        ))
1298                    })
1299                    .map(|col| ColumnCatalog::from(col.clone()))
1300                    .collect_vec();
1301                if !original_table_column_ids.is_empty() {
1302                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1303                }
1304                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1305                for sink in auto_refresh_schema_sinks {
1306                    let sink_job_fragments = self
1307                        .metadata_manager
1308                        .get_job_fragments_by_id(sink.id.as_job_id())
1309                        .await?;
1310                    if sink_job_fragments.fragments.len() != 1 {
1311                        return Err(anyhow!(
1312                            "auto schema refresh sink must have only one fragment, but got {}",
1313                            sink_job_fragments.fragments.len()
1314                        )
1315                        .into());
1316                    }
1317                    let original_sink_fragment =
1318                        sink_job_fragments.fragments.into_values().next().unwrap();
1319                    let (new_sink_fragment, new_schema, new_log_store_table) =
1320                        rewrite_refresh_schema_sink_fragment(
1321                            &original_sink_fragment,
1322                            &sink,
1323                            &newly_added_columns,
1324                            table,
1325                            fragment_graph.table_fragment_id(),
1326                            self.env.id_gen_manager(),
1327                            self.env.actor_id_generator(),
1328                        )?;
1329
1330                    assert_eq!(
1331                        original_sink_fragment.actors.len(),
1332                        new_sink_fragment.actors.len()
1333                    );
1334                    let actor_status = (0..original_sink_fragment.actors.len())
1335                        .map(|i| {
1336                            let worker_node_id = sink_job_fragments.actor_status
1337                                [&original_sink_fragment.actors[i].actor_id]
1338                                .location
1339                                .as_ref()
1340                                .unwrap()
1341                                .worker_node_id;
1342                            (
1343                                new_sink_fragment.actors[i].actor_id,
1344                                PbActorStatus {
1345                                    location: Some(PbActorLocation { worker_node_id }),
1346                                },
1347                            )
1348                        })
1349                        .collect();
1350
1351                    let streaming_job = StreamingJob::Sink(sink);
1352
1353                    let tmp_sink_id = self
1354                        .metadata_manager
1355                        .catalog_controller
1356                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1357                        .await?
1358                        .as_sink_id();
1359                    let StreamingJob::Sink(sink) = streaming_job else {
1360                        unreachable!()
1361                    };
1362
1363                    sinks.push(AutoRefreshSchemaSinkContext {
1364                        tmp_sink_id,
1365                        original_sink: sink,
1366                        original_fragment: original_sink_fragment,
1367                        new_schema,
1368                        newly_add_fields: newly_added_columns
1369                            .iter()
1370                            .map(|col| Field::from(&col.column_desc))
1371                            .collect(),
1372                        new_fragment: new_sink_fragment,
1373                        new_log_store_table,
1374                        actor_status,
1375                    });
1376                }
1377                Some(sinks)
1378            } else {
1379                None
1380            }
1381        } else {
1382            None
1383        };
1384
1385        let tmp_id = self
1386            .metadata_manager
1387            .catalog_controller
1388            .create_job_catalog_for_replace(
1389                &streaming_job,
1390                Some(&ctx),
1391                fragment_graph.specified_parallelism().as_ref(),
1392                Some(fragment_graph.max_parallelism()),
1393            )
1394            .await?;
1395
1396        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1397            sinks
1398                .iter()
1399                .map(|sink| sink.tmp_sink_id.as_object_id())
1400                .collect_vec()
1401        });
1402
1403        tracing::debug!(id = %job_id, "building replace streaming job");
1404        let mut updated_sink_catalogs = vec![];
1405
1406        let mut drop_table_connector_ctx = None;
1407        let result: MetaResult<_> = try {
1408            let (mut ctx, mut stream_job_fragments) = self
1409                .build_replace_job(
1410                    ctx,
1411                    &streaming_job,
1412                    fragment_graph,
1413                    tmp_id,
1414                    auto_refresh_schema_sinks,
1415                )
1416                .await?;
1417            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1418            let auto_refresh_schema_sink_finish_ctx =
1419                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1420                    sinks
1421                        .iter()
1422                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1423                            tmp_sink_id: sink.tmp_sink_id,
1424                            original_sink_id: sink.original_sink.id,
1425                            columns: sink.new_schema.clone(),
1426                            new_log_store_table: sink
1427                                .new_log_store_table
1428                                .as_ref()
1429                                .map(|table| (table.id, table.columns.clone())),
1430                        })
1431                        .collect()
1432                });
1433
1434            // Handle table that has incoming sinks.
1435            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1436                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1437                let upstream_infos = self
1438                    .metadata_manager
1439                    .catalog_controller
1440                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1441                    .await?;
1442                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1443
1444                for upstream_info in &upstream_infos {
1445                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1446                    ctx.upstream_fragment_downstreams
1447                        .entry(upstream_fragment_id)
1448                        .or_default()
1449                        .push(upstream_info.new_sink_downstream.clone());
1450                    if upstream_info.sink_original_target_columns.is_empty() {
1451                        updated_sink_catalogs.push(upstream_info.sink_id);
1452                    }
1453                }
1454            }
1455
1456            let replace_upstream = ctx.replace_upstream.clone();
1457
1458            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1459                let empty_downstreams = FragmentDownstreamRelation::default();
1460                for sink in sinks {
1461                    self.metadata_manager
1462                        .catalog_controller
1463                        .prepare_streaming_job(
1464                            sink.tmp_sink_id.as_job_id(),
1465                            || [&sink.new_fragment].into_iter(),
1466                            &empty_downstreams,
1467                            true,
1468                            None,
1469                        )
1470                        .await?;
1471                }
1472            }
1473
1474            self.metadata_manager
1475                .catalog_controller
1476                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1477                .await?;
1478
1479            self.stream_manager
1480                .replace_stream_job(stream_job_fragments, ctx)
1481                .await?;
1482            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1483        };
1484
1485        match result {
1486            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1487                let version = self
1488                    .metadata_manager
1489                    .catalog_controller
1490                    .finish_replace_streaming_job(
1491                        tmp_id,
1492                        streaming_job,
1493                        replace_upstream,
1494                        SinkIntoTableContext {
1495                            updated_sink_catalogs,
1496                        },
1497                        drop_table_connector_ctx.as_ref(),
1498                        auto_refresh_schema_sink_finish_ctx,
1499                    )
1500                    .await?;
1501                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1502                    self.source_manager
1503                        .apply_source_change(SourceChange::DropSource {
1504                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1505                        })
1506                        .await;
1507                }
1508                Ok(version)
1509            }
1510            Err(err) => {
1511                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1512                let _ = self.metadata_manager
1513                    .catalog_controller
1514                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1515                    .await.inspect_err(|err| {
1516                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1517                });
1518                Err(err)
1519            }
1520        }
1521    }
1522
1523    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1524    )]
1525    async fn drop_streaming_job(
1526        &self,
1527        job_id: StreamingJobId,
1528        drop_mode: DropMode,
1529    ) -> MetaResult<NotificationVersion> {
1530        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1531
1532        let (object_id, object_type) = match job_id {
1533            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1534            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1535            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1536            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1537        };
1538
1539        let job_status = self
1540            .metadata_manager
1541            .catalog_controller
1542            .get_streaming_job_status(job_id.id())
1543            .await?;
1544        let version = match job_status {
1545            JobStatus::Initial => {
1546                unreachable!(
1547                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1548                );
1549            }
1550            JobStatus::Creating => {
1551                self.stream_manager
1552                    .cancel_streaming_jobs(vec![job_id.id()])
1553                    .await?;
1554                IGNORED_NOTIFICATION_VERSION
1555            }
1556            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1557        };
1558
1559        Ok(version)
1560    }
1561
1562    /// Resolve the parallelism of the stream job based on the given information.
1563    ///
1564    /// Returns error if user specifies a parallelism that cannot be satisfied.
1565    fn resolve_stream_parallelism(
1566        &self,
1567        specified: Option<NonZeroUsize>,
1568        max: NonZeroUsize,
1569        cluster_info: &StreamingClusterInfo,
1570        resource_group: String,
1571    ) -> MetaResult<NonZeroUsize> {
1572        let available = NonZeroUsize::new(cluster_info.parallelism(&resource_group));
1573        DdlController::resolve_stream_parallelism_inner(
1574            specified,
1575            max,
1576            available,
1577            &self.env.opts.default_parallelism,
1578            &resource_group,
1579        )
1580    }
1581
1582    fn resolve_stream_parallelism_inner(
1583        specified: Option<NonZeroUsize>,
1584        max: NonZeroUsize,
1585        available: Option<NonZeroUsize>,
1586        default_parallelism: &DefaultParallelism,
1587        resource_group: &str,
1588    ) -> MetaResult<NonZeroUsize> {
1589        let Some(available) = available else {
1590            bail_unavailable!(
1591                "no available slots to schedule in resource group \"{}\", \
1592                 have you allocated any compute nodes within this resource group?",
1593                resource_group
1594            );
1595        };
1596
1597        if let Some(specified) = specified {
1598            if specified > max {
1599                bail_invalid_parameter!(
1600                    "specified parallelism {} should not exceed max parallelism {}",
1601                    specified,
1602                    max,
1603                );
1604            }
1605            if specified > available {
1606                tracing::warn!(
1607                    resource_group,
1608                    specified_parallelism = specified.get(),
1609                    available_parallelism = available.get(),
1610                    "specified parallelism exceeds available slots, scheduling with specified value",
1611                );
1612            }
1613            return Ok(specified);
1614        }
1615
1616        // Use default parallelism when no specific parallelism is provided by the user.
1617        let default_parallelism = match default_parallelism {
1618            DefaultParallelism::Full => available,
1619            DefaultParallelism::Default(num) => {
1620                if *num > available {
1621                    tracing::warn!(
1622                        resource_group,
1623                        configured_parallelism = num.get(),
1624                        available_parallelism = available.get(),
1625                        "default parallelism exceeds available slots, scheduling with configured value",
1626                    );
1627                }
1628                *num
1629            }
1630        };
1631
1632        if default_parallelism > max {
1633            tracing::warn!(
1634                max_parallelism = max.get(),
1635                resource_group,
1636                "default parallelism exceeds max parallelism, capping to max",
1637            );
1638        }
1639        Ok(default_parallelism.min(max))
1640    }
1641
1642    /// Builds the actor graph:
1643    /// - Add the upstream fragments to the fragment graph
1644    /// - Schedule the fragments based on their distribution
1645    /// - Expand each fragment into one or several actors
1646    /// - Construct the fragment level backfill order control.
1647    #[await_tree::instrument]
1648    pub(crate) async fn build_stream_job(
1649        &self,
1650        stream_ctx: StreamContext,
1651        mut stream_job: StreamingJob,
1652        fragment_graph: StreamFragmentGraph,
1653        specific_resource_group: Option<String>,
1654    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1655        let id = stream_job.id();
1656        let specified_parallelism = fragment_graph.specified_parallelism();
1657        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1658
1659        // 1. Fragment Level ordering graph
1660        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1661
1662        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1663        // contains all information needed for building the actor graph.
1664
1665        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1666            fragment_graph.collect_snapshot_backfill_info()?;
1667        assert!(
1668            snapshot_backfill_info
1669                .iter()
1670                .chain([&cross_db_snapshot_backfill_info])
1671                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1672                .all(|backfill_epoch| backfill_epoch.is_none()),
1673            "should not set backfill epoch when initially build the job: {:?} {:?}",
1674            snapshot_backfill_info,
1675            cross_db_snapshot_backfill_info
1676        );
1677
1678        let locality_fragment_state_table_mapping =
1679            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1680
1681        // check if log store exists for all cross-db upstreams
1682        self.metadata_manager
1683            .catalog_controller
1684            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1685            .await?;
1686
1687        let upstream_table_ids = fragment_graph
1688            .dependent_table_ids()
1689            .iter()
1690            .filter(|id| {
1691                !cross_db_snapshot_backfill_info
1692                    .upstream_mv_table_id_to_backfill_epoch
1693                    .contains_key(id)
1694            })
1695            .cloned()
1696            .collect();
1697
1698        let (upstream_root_fragments, existing_actor_location) = self
1699            .metadata_manager
1700            .get_upstream_root_fragments(&upstream_table_ids)
1701            .await?;
1702
1703        if snapshot_backfill_info.is_some() {
1704            match stream_job {
1705                StreamingJob::MaterializedView(_)
1706                | StreamingJob::Sink(_)
1707                | StreamingJob::Index(_, _) => {}
1708                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1709                    return Err(
1710                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1711                    );
1712                }
1713            }
1714        }
1715
1716        let upstream_actors = upstream_root_fragments
1717            .values()
1718            .map(|(fragment, _)| {
1719                (
1720                    fragment.fragment_id,
1721                    fragment.actors.keys().copied().collect(),
1722                )
1723            })
1724            .collect();
1725
1726        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1727            fragment_graph,
1728            FragmentGraphUpstreamContext {
1729                upstream_root_fragments,
1730                upstream_actor_location: existing_actor_location,
1731            },
1732            (&stream_job).into(),
1733        )?;
1734        let resource_group = match specific_resource_group {
1735            None => {
1736                self.metadata_manager
1737                    .get_database_resource_group(stream_job.database_id())
1738                    .await?
1739            }
1740            Some(resource_group) => resource_group,
1741        };
1742
1743        // 3. Build the actor graph.
1744        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1745
1746        let parallelism = self.resolve_stream_parallelism(
1747            specified_parallelism,
1748            max_parallelism,
1749            &cluster_info,
1750            resource_group.clone(),
1751        )?;
1752
1753        let parallelism = self
1754            .env
1755            .system_params_reader()
1756            .await
1757            .adaptive_parallelism_strategy()
1758            .compute_target_parallelism(parallelism.get());
1759
1760        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1761        let actor_graph_builder = ActorGraphBuilder::new(
1762            id,
1763            resource_group,
1764            complete_graph,
1765            cluster_info,
1766            parallelism,
1767        )?;
1768
1769        let ActorGraphBuildResult {
1770            graph,
1771            downstream_fragment_relations,
1772            building_locations,
1773            upstream_fragment_downstreams,
1774            new_no_shuffle,
1775            replace_upstream,
1776            ..
1777        } = actor_graph_builder.generate_graph(&self.env, &stream_job, stream_ctx.clone())?;
1778        assert!(replace_upstream.is_empty());
1779
1780        // 4. Build the table fragments structure that will be persisted in the stream manager,
1781        // and the context that contains all information needed for building the
1782        // actors on the compute nodes.
1783
1784        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1785        // Otherwise, it defaults to FIXED based on deduction.
1786        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1787            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1788            _ => TableParallelism::Fixed(parallelism.get()),
1789        };
1790
1791        let stream_job_fragments = StreamJobFragments::new(
1792            id,
1793            graph,
1794            &building_locations.actor_locations,
1795            stream_ctx.clone(),
1796            table_parallelism,
1797            max_parallelism.get(),
1798        );
1799
1800        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1801            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1802        }
1803
1804        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1805            && let Ok(table_id) = sink.get_target_table()
1806        {
1807            let tables = self
1808                .metadata_manager
1809                .get_table_catalog_by_ids(&[*table_id])
1810                .await?;
1811            let target_table = tables
1812                .first()
1813                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1814            let sink_fragment = stream_job_fragments
1815                .sink_fragment()
1816                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1817            let mview_fragment_id = self
1818                .metadata_manager
1819                .catalog_controller
1820                .get_mview_fragment_by_id(table_id.as_job_id())
1821                .await?;
1822            let upstream_sink_info = build_upstream_sink_info(
1823                sink,
1824                sink_fragment.fragment_id as _,
1825                target_table,
1826                mview_fragment_id,
1827            )?;
1828            Some(upstream_sink_info)
1829        } else {
1830            None
1831        };
1832
1833        let mut cdc_table_snapshot_splits = None;
1834        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1835            && let Some((_, stream_cdc_scan)) =
1836                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1837        {
1838            {
1839                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1840                let splits = try_init_parallel_cdc_table_snapshot_splits(
1841                    table.id,
1842                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1843                    self.env.meta_store_ref(),
1844                    stream_cdc_scan.options.as_ref().unwrap(),
1845                    self.env.opts.cdc_table_split_init_insert_batch_size,
1846                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1847                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1848                )
1849                .await?;
1850                cdc_table_snapshot_splits = Some(splits);
1851            }
1852        }
1853
1854        let ctx = CreateStreamingJobContext {
1855            upstream_fragment_downstreams,
1856            new_no_shuffle,
1857            upstream_actors,
1858            building_locations,
1859            definition: stream_job.definition(),
1860            create_type: stream_job.create_type(),
1861            job_type: (&stream_job).into(),
1862            streaming_job: stream_job,
1863            new_upstream_sink,
1864            option: CreateStreamingJobOption {},
1865            snapshot_backfill_info,
1866            cross_db_snapshot_backfill_info,
1867            fragment_backfill_ordering,
1868            locality_fragment_state_table_mapping,
1869            cdc_table_snapshot_splits,
1870        };
1871
1872        Ok((
1873            ctx,
1874            StreamJobFragmentsToCreate {
1875                inner: stream_job_fragments,
1876                downstreams: downstream_fragment_relations,
1877            },
1878        ))
1879    }
1880
1881    /// `build_replace_table` builds a job replacement and returns the context and new job
1882    /// fragments.
1883    ///
1884    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1885    /// replacement is finished.
1886    pub(crate) async fn build_replace_job(
1887        &self,
1888        stream_ctx: StreamContext,
1889        stream_job: &StreamingJob,
1890        mut fragment_graph: StreamFragmentGraph,
1891        tmp_job_id: JobId,
1892        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1893    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1894        match &stream_job {
1895            StreamingJob::Table(..)
1896            | StreamingJob::Source(..)
1897            | StreamingJob::MaterializedView(..) => {}
1898            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1899                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1900            }
1901        }
1902
1903        let id = stream_job.id();
1904
1905        // check if performing drop table connector
1906        let mut drop_table_associated_source_id = None;
1907        if let StreamingJob::Table(None, _, _) = &stream_job {
1908            drop_table_associated_source_id = self
1909                .metadata_manager
1910                .get_table_associated_source_id(id.as_mv_table_id())
1911                .await?;
1912        }
1913
1914        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1915        let old_internal_table_ids = old_fragments.internal_table_ids();
1916
1917        // handle drop table's associated source
1918        let mut drop_table_connector_ctx = None;
1919        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1920            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1921            debug_assert!(old_internal_table_ids.len() == 1);
1922
1923            drop_table_connector_ctx = Some(DropTableConnectorContext {
1924                // we do not remove the original table catalog as it's still needed for the streaming job
1925                // just need to remove the ref to the state table
1926                to_change_streaming_job_id: id,
1927                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1928                to_remove_source_id,
1929            });
1930        } else if stream_job.is_materialized_view() {
1931            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1932            // but more robust.
1933            let old_fragments_upstreams = self
1934                .metadata_manager
1935                .catalog_controller
1936                .upstream_fragments(old_fragments.fragment_ids())
1937                .await?;
1938
1939            let old_state_graph =
1940                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1941            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1942            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
1943                .context("incompatible altering on the streaming job states")?;
1944
1945            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
1946            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
1947        } else {
1948            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
1949            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
1950            let old_internal_tables = self
1951                .metadata_manager
1952                .get_table_catalog_by_ids(&old_internal_table_ids)
1953                .await?;
1954            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1955        }
1956
1957        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
1958        // graph that contains all information needed for building the actor graph.
1959        let original_root_fragment = old_fragments
1960            .root_fragment()
1961            .expect("root fragment not found");
1962
1963        let job_type = StreamingJobType::from(stream_job);
1964
1965        // Extract the downstream fragments from the fragment graph.
1966        let (mut downstream_fragments, mut downstream_actor_location) =
1967            self.metadata_manager.get_downstream_fragments(id).await?;
1968
1969        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1970            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1971                .iter()
1972                .map(|sink| sink.original_fragment.fragment_id)
1973                .collect();
1974            for (_, downstream_fragment, nodes) in &mut downstream_fragments {
1975                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1976                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1977                }) {
1978                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1979                    for actor_id in downstream_fragment.actors.keys() {
1980                        downstream_actor_location.remove(actor_id);
1981                    }
1982                    for (actor_id, status) in &sink.actor_status {
1983                        downstream_actor_location
1984                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
1985                    }
1986
1987                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
1988                    *nodes = sink.new_fragment.nodes.clone();
1989                }
1990            }
1991            assert!(remaining_fragment.is_empty());
1992        }
1993
1994        // build complete graph based on the table job type
1995        let complete_graph = match &job_type {
1996            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1997                CompleteStreamFragmentGraph::with_downstreams(
1998                    fragment_graph,
1999                    FragmentGraphDownstreamContext {
2000                        original_root_fragment_id: original_root_fragment.fragment_id,
2001                        downstream_fragments,
2002                        downstream_actor_location,
2003                    },
2004                    job_type,
2005                )?
2006            }
2007            StreamingJobType::Table(TableJobType::SharedCdcSource)
2008            | StreamingJobType::MaterializedView => {
2009                // CDC tables or materialized views can have upstream jobs as well.
2010                let (upstream_root_fragments, upstream_actor_location) = self
2011                    .metadata_manager
2012                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2013                    .await?;
2014
2015                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2016                    fragment_graph,
2017                    FragmentGraphUpstreamContext {
2018                        upstream_root_fragments,
2019                        upstream_actor_location,
2020                    },
2021                    FragmentGraphDownstreamContext {
2022                        original_root_fragment_id: original_root_fragment.fragment_id,
2023                        downstream_fragments,
2024                        downstream_actor_location,
2025                    },
2026                    job_type,
2027                )?
2028            }
2029            _ => unreachable!(),
2030        };
2031
2032        let resource_group = self
2033            .metadata_manager
2034            .get_existing_job_resource_group(id)
2035            .await?;
2036
2037        // 2. Build the actor graph.
2038        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2039
2040        // XXX: what is this parallelism?
2041        // Is it "assigned parallelism"?
2042        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2043            .expect("The number of actors in the original table fragment should be greater than 0");
2044
2045        let actor_graph_builder = ActorGraphBuilder::new(
2046            id,
2047            resource_group,
2048            complete_graph,
2049            cluster_info,
2050            parallelism,
2051        )?;
2052
2053        let ActorGraphBuildResult {
2054            graph,
2055            downstream_fragment_relations,
2056            building_locations,
2057            upstream_fragment_downstreams,
2058            mut replace_upstream,
2059            new_no_shuffle,
2060            ..
2061        } = actor_graph_builder.generate_graph(&self.env, stream_job, stream_ctx.clone())?;
2062
2063        // general table & source does not have upstream job, so the dispatchers should be empty
2064        if matches!(
2065            job_type,
2066            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2067        ) {
2068            assert!(upstream_fragment_downstreams.is_empty());
2069        }
2070
2071        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2072        // the context that contains all information needed for building the actors on the compute
2073        // nodes.
2074        let stream_job_fragments = StreamJobFragments::new(
2075            tmp_job_id,
2076            graph,
2077            &building_locations.actor_locations,
2078            stream_ctx,
2079            old_fragments.assigned_parallelism,
2080            old_fragments.max_parallelism,
2081        );
2082
2083        if let Some(sinks) = &auto_refresh_schema_sinks {
2084            for sink in sinks {
2085                replace_upstream
2086                    .remove(&sink.new_fragment.fragment_id)
2087                    .expect("should exist");
2088            }
2089        }
2090
2091        // Note: no need to set `vnode_count` as it's already set by the frontend.
2092        // See `get_replace_table_plan`.
2093
2094        let ctx = ReplaceStreamJobContext {
2095            old_fragments,
2096            replace_upstream,
2097            new_no_shuffle,
2098            upstream_fragment_downstreams,
2099            building_locations,
2100            streaming_job: stream_job.clone(),
2101            tmp_id: tmp_job_id,
2102            drop_table_connector_ctx,
2103            auto_refresh_schema_sinks,
2104        };
2105
2106        Ok((
2107            ctx,
2108            StreamJobFragmentsToCreate {
2109                inner: stream_job_fragments,
2110                downstreams: downstream_fragment_relations,
2111            },
2112        ))
2113    }
2114
2115    async fn alter_name(
2116        &self,
2117        relation: alter_name_request::Object,
2118        new_name: &str,
2119    ) -> MetaResult<NotificationVersion> {
2120        let (obj_type, id) = match relation {
2121            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2122            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2123            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2124            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2125            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2126            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2127            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2128            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2129        };
2130        self.metadata_manager
2131            .catalog_controller
2132            .alter_name(obj_type, id, new_name)
2133            .await
2134    }
2135
2136    async fn alter_swap_rename(
2137        &self,
2138        object: alter_swap_rename_request::Object,
2139    ) -> MetaResult<NotificationVersion> {
2140        let (obj_type, src_id, dst_id) = match object {
2141            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2142            alter_swap_rename_request::Object::Table(objs) => {
2143                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2144                (ObjectType::Table, src_id, dst_id)
2145            }
2146            alter_swap_rename_request::Object::View(objs) => {
2147                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2148                (ObjectType::View, src_id, dst_id)
2149            }
2150            alter_swap_rename_request::Object::Source(objs) => {
2151                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2152                (ObjectType::Source, src_id, dst_id)
2153            }
2154            alter_swap_rename_request::Object::Sink(objs) => {
2155                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2156                (ObjectType::Sink, src_id, dst_id)
2157            }
2158            alter_swap_rename_request::Object::Subscription(objs) => {
2159                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2160                (ObjectType::Subscription, src_id, dst_id)
2161            }
2162        };
2163
2164        self.metadata_manager
2165            .catalog_controller
2166            .alter_swap_rename(obj_type, src_id, dst_id)
2167            .await
2168    }
2169
2170    async fn alter_owner(
2171        &self,
2172        object: Object,
2173        owner_id: UserId,
2174    ) -> MetaResult<NotificationVersion> {
2175        let (obj_type, id) = match object {
2176            Object::TableId(id) => (ObjectType::Table, id),
2177            Object::ViewId(id) => (ObjectType::View, id),
2178            Object::SourceId(id) => (ObjectType::Source, id),
2179            Object::SinkId(id) => (ObjectType::Sink, id),
2180            Object::SchemaId(id) => (ObjectType::Schema, id),
2181            Object::DatabaseId(id) => (ObjectType::Database, id),
2182            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2183            Object::ConnectionId(id) => (ObjectType::Connection, id),
2184        };
2185        self.metadata_manager
2186            .catalog_controller
2187            .alter_owner(obj_type, id.into(), owner_id as _)
2188            .await
2189    }
2190
2191    async fn alter_set_schema(
2192        &self,
2193        object: alter_set_schema_request::Object,
2194        new_schema_id: SchemaId,
2195    ) -> MetaResult<NotificationVersion> {
2196        let (obj_type, id) = match object {
2197            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2198            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2199            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2200            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2201            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2202            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2203            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2204        };
2205        self.metadata_manager
2206            .catalog_controller
2207            .alter_schema(obj_type, id.into(), new_schema_id as _)
2208            .await
2209    }
2210
2211    pub async fn wait(&self) -> MetaResult<()> {
2212        let timeout_ms = 30 * 60 * 1000;
2213        for _ in 0..timeout_ms {
2214            if self
2215                .metadata_manager
2216                .catalog_controller
2217                .list_background_creating_jobs(true, None)
2218                .await?
2219                .is_empty()
2220            {
2221                return Ok(());
2222            }
2223
2224            sleep(Duration::from_millis(1)).await;
2225        }
2226        Err(MetaError::cancelled(format!(
2227            "timeout after {timeout_ms}ms"
2228        )))
2229    }
2230
2231    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2232        self.metadata_manager
2233            .catalog_controller
2234            .comment_on(comment)
2235            .await
2236    }
2237
2238    async fn alter_streaming_job_config(
2239        &self,
2240        job_id: JobId,
2241        entries_to_add: HashMap<String, String>,
2242        keys_to_remove: Vec<String>,
2243    ) -> MetaResult<NotificationVersion> {
2244        self.metadata_manager
2245            .catalog_controller
2246            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2247            .await
2248    }
2249}
2250
2251fn report_create_object(
2252    job_id: JobId,
2253    event_name: &str,
2254    obj_type: PbTelemetryDatabaseObject,
2255    connector_name: Option<String>,
2256    attr_info: Option<jsonbb::Value>,
2257) {
2258    report_event(
2259        PbTelemetryEventStage::CreateStreamJob,
2260        event_name,
2261        job_id.as_raw_id() as _,
2262        connector_name,
2263        Some(obj_type),
2264        attr_info,
2265    );
2266}
2267
2268pub fn build_upstream_sink_info(
2269    sink: &PbSink,
2270    sink_fragment_id: FragmentId,
2271    target_table: &PbTable,
2272    target_fragment_id: FragmentId,
2273) -> MetaResult<UpstreamSinkInfo> {
2274    let sink_columns = if !sink.original_target_columns.is_empty() {
2275        sink.original_target_columns.clone()
2276    } else {
2277        // This is due to the fact that the value did not exist in earlier versions,
2278        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2279        // Therefore the columns of the table at this point are `original_target_columns`.
2280        // This value of sink will be filled on the meta.
2281        target_table.columns.clone()
2282    };
2283
2284    let sink_output_fields = sink_columns
2285        .iter()
2286        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2287        .collect_vec();
2288    let output_indices = (0..sink_output_fields.len())
2289        .map(|i| i as u32)
2290        .collect_vec();
2291
2292    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2293        let sink_idx_by_col_id = sink_columns
2294            .iter()
2295            .enumerate()
2296            .map(|(idx, col)| {
2297                let column_id = col.column_desc.as_ref().unwrap().column_id;
2298                (column_id, idx as u32)
2299            })
2300            .collect::<HashMap<_, _>>();
2301        target_table
2302            .distribution_key
2303            .iter()
2304            .map(|dist_idx| {
2305                let column_id = target_table.columns[*dist_idx as usize]
2306                    .column_desc
2307                    .as_ref()
2308                    .unwrap()
2309                    .column_id;
2310                let sink_idx = sink_idx_by_col_id
2311                    .get(&column_id)
2312                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2313                Ok(*sink_idx)
2314            })
2315            .collect::<anyhow::Result<Vec<_>>>()?
2316    };
2317    let dist_key_indices =
2318        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2319    let downstream_fragment_id = target_fragment_id as _;
2320    let new_downstream_relation = DownstreamFragmentRelation {
2321        downstream_fragment_id,
2322        dispatcher_type: DispatcherType::Hash,
2323        dist_key_indices,
2324        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2325    };
2326    let current_target_columns = target_table.get_columns();
2327    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2328    Ok(UpstreamSinkInfo {
2329        sink_id: sink.id,
2330        sink_fragment_id: sink_fragment_id as _,
2331        sink_output_fields,
2332        sink_original_target_columns: sink.get_original_target_columns().clone(),
2333        project_exprs,
2334        new_sink_downstream: new_downstream_relation,
2335    })
2336}
2337
2338pub fn refill_upstream_sink_union_in_table(
2339    union_fragment_root: &mut PbStreamNode,
2340    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2341) {
2342    visit_stream_node_cont_mut(union_fragment_root, |node| {
2343        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2344            let init_upstreams = upstream_sink_infos
2345                .iter()
2346                .map(|info| PbUpstreamSinkInfo {
2347                    upstream_fragment_id: info.sink_fragment_id,
2348                    sink_output_schema: info.sink_output_fields.clone(),
2349                    project_exprs: info.project_exprs.clone(),
2350                })
2351                .collect();
2352            upstream_sink_union.init_upstreams = init_upstreams;
2353            false
2354        } else {
2355            true
2356        }
2357    });
2358}
2359
2360#[cfg(test)]
2361mod tests {
2362    use std::num::NonZeroUsize;
2363
2364    use super::*;
2365
2366    #[test]
2367    fn test_specified_parallelism_exceeds_available() {
2368        let result = DdlController::resolve_stream_parallelism_inner(
2369            Some(NonZeroUsize::new(100).unwrap()),
2370            NonZeroUsize::new(256).unwrap(),
2371            Some(NonZeroUsize::new(4).unwrap()),
2372            &DefaultParallelism::Full,
2373            "default",
2374        )
2375        .unwrap();
2376        assert_eq!(result.get(), 100);
2377    }
2378
2379    #[test]
2380    fn test_allows_default_parallelism_over_available() {
2381        let result = DdlController::resolve_stream_parallelism_inner(
2382            None,
2383            NonZeroUsize::new(256).unwrap(),
2384            Some(NonZeroUsize::new(4).unwrap()),
2385            &DefaultParallelism::Default(NonZeroUsize::new(50).unwrap()),
2386            "default",
2387        )
2388        .unwrap();
2389        assert_eq!(result.get(), 50);
2390    }
2391
2392    #[test]
2393    fn test_full_parallelism_capped_by_max() {
2394        let result = DdlController::resolve_stream_parallelism_inner(
2395            None,
2396            NonZeroUsize::new(6).unwrap(),
2397            Some(NonZeroUsize::new(10).unwrap()),
2398            &DefaultParallelism::Full,
2399            "default",
2400        )
2401        .unwrap();
2402        assert_eq!(result.get(), 6);
2403    }
2404
2405    #[test]
2406    fn test_no_available_slots_returns_error() {
2407        let result = DdlController::resolve_stream_parallelism_inner(
2408            None,
2409            NonZeroUsize::new(4).unwrap(),
2410            None,
2411            &DefaultParallelism::Full,
2412            "default",
2413        );
2414        assert!(matches!(
2415            result,
2416            Err(ref e) if matches!(e.inner(), MetaErrorInner::Unavailable(_))
2417        ));
2418    }
2419
2420    #[test]
2421    fn test_specified_over_max_returns_error() {
2422        let result = DdlController::resolve_stream_parallelism_inner(
2423            Some(NonZeroUsize::new(8).unwrap()),
2424            NonZeroUsize::new(4).unwrap(),
2425            Some(NonZeroUsize::new(10).unwrap()),
2426            &DefaultParallelism::Full,
2427            "default",
2428        );
2429        assert!(matches!(
2430            result,
2431            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2432        ));
2433    }
2434}