risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::hash::VnodeCountCompat;
30use risingwave_common::id::{JobId, TableId};
31use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
32use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::object::ObjectType;
45use risingwave_meta_model::{
46    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
47    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
48    streaming_job,
49};
50use risingwave_pb::catalog::{
51    Comment, Connection, CreateType, Database, Function, PbTable, Schema, Secret, Source,
52    Subscription, Table, View,
53};
54use risingwave_pb::ddl_service::alter_owner_request::Object;
55use risingwave_pb::ddl_service::{
56    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
57    alter_swap_rename_request, streaming_job_resource_type,
58};
59use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType as PbFragmentDistributionType;
60use risingwave_pb::plan_common::PbColumnCatalog;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use strum::Display;
68use thiserror_ext::AsReport;
69use tokio::sync::Semaphore;
70use tokio::time::sleep;
71use tracing::Instrument;
72
73use crate::barrier::{BarrierManagerRef, Command};
74use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter};
78use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
79use crate::manager::sink_coordination::SinkCoordinatorManager;
80use crate::manager::{
81    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82    NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85    DownstreamFragmentRelation, FragmentDownstreamRelation, FragmentId as CatalogFragmentId,
86    StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87};
88use crate::stream::cdc::{
89    parallel_cdc_table_backfill_fragment, try_init_parallel_cdc_table_snapshot_splits,
90};
91use crate::stream::{
92    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
93    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
94    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
95    ParallelismPolicy, ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef,
96    StreamFragmentGraph, UpstreamSinkInfo, check_sink_fragments_support_refresh_schema,
97    create_source_worker, rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
98};
99use crate::telemetry::report_event;
100use crate::{MetaError, MetaResult};
101
102#[derive(PartialEq)]
103pub enum DropMode {
104    Restrict,
105    Cascade,
106}
107
108impl DropMode {
109    pub fn from_request_setting(cascade: bool) -> DropMode {
110        if cascade {
111            DropMode::Cascade
112        } else {
113            DropMode::Restrict
114        }
115    }
116}
117
118#[derive(strum::AsRefStr)]
119pub enum StreamingJobId {
120    MaterializedView(TableId),
121    Sink(SinkId),
122    Table(Option<SourceId>, TableId),
123    Index(IndexId),
124}
125
126impl std::fmt::Display for StreamingJobId {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "{}", self.as_ref())?;
129        write!(f, "({})", self.id())
130    }
131}
132
133impl StreamingJobId {
134    fn id(&self) -> JobId {
135        match self {
136            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
137            StreamingJobId::Index(id) => id.as_job_id(),
138            StreamingJobId::Sink(id) => id.as_job_id(),
139        }
140    }
141}
142
143/// It’s used to describe the information of the job that needs to be replaced
144/// and it will be used during replacing table and creating sink into table operations.
145pub struct ReplaceStreamJobInfo {
146    pub streaming_job: StreamingJob,
147    pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152    CreateDatabase(Database),
153    DropDatabase(DatabaseId),
154    CreateSchema(Schema),
155    DropSchema(SchemaId, DropMode),
156    CreateNonSharedSource(Source),
157    DropSource(SourceId, DropMode),
158    ResetSource(SourceId),
159    CreateFunction(Function),
160    DropFunction(FunctionId, DropMode),
161    CreateView(View, HashSet<ObjectId>),
162    DropView(ViewId, DropMode),
163    CreateStreamingJob {
164        stream_job: StreamingJob,
165        fragment_graph: StreamFragmentGraphProto,
166        dependencies: HashSet<ObjectId>,
167        resource_type: streaming_job_resource_type::ResourceType,
168        if_not_exists: bool,
169        refresh_interval_sec: Option<u64>,
170    },
171    DropStreamingJob {
172        job_id: StreamingJobId,
173        drop_mode: DropMode,
174    },
175    AlterName(alter_name_request::Object, String),
176    AlterSwapRename(alter_swap_rename_request::Object),
177    ReplaceStreamJob(ReplaceStreamJobInfo),
178    AlterNonSharedSource(Source),
179    AlterObjectOwner(Object, UserId),
180    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
181    CreateConnection(Connection),
182    DropConnection(ConnectionId, DropMode),
183    CreateSecret(Secret),
184    AlterSecret(Secret),
185    DropSecret(SecretId, DropMode),
186    CommentOn(Comment),
187    CreateSubscription(Subscription),
188    DropSubscription(SubscriptionId, DropMode),
189    AlterSubscriptionRetention {
190        subscription_id: SubscriptionId,
191        retention_seconds: u64,
192        definition: String,
193    },
194    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
195    AlterStreamingJobConfig(JobId, HashMap<String, String>, Vec<String>),
196}
197
198impl DdlCommand {
199    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
200    fn object(&self) -> Either<String, ObjectId> {
201        use Either::*;
202        match self {
203            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
204            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
205            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
206            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
207            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
208            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
209            DdlCommand::ResetSource(id) => Right(id.as_object_id()),
210            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
211            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
212            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
213            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
214            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
215            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
216            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
217            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
218            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
219            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
220            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
221            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
222            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
223            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
224            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
225            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
226            DdlCommand::DropSecret(id, _) => Right(id.as_object_id()),
227            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
228            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
229            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
230            DdlCommand::AlterSubscriptionRetention {
231                subscription_id, ..
232            } => Right(subscription_id.as_object_id()),
233            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
234            DdlCommand::AlterStreamingJobConfig(job_id, _, _) => Right(job_id.as_object_id()),
235        }
236    }
237
238    fn allow_in_recovery(&self) -> bool {
239        match self {
240            DdlCommand::DropDatabase(_)
241            | DdlCommand::DropSchema(_, _)
242            | DdlCommand::DropSource(_, _)
243            | DdlCommand::DropFunction(_, _)
244            | DdlCommand::DropView(_, _)
245            | DdlCommand::DropStreamingJob { .. }
246            | DdlCommand::DropConnection(_, _)
247            | DdlCommand::DropSecret(_, _)
248            | DdlCommand::DropSubscription(_, _)
249            | DdlCommand::AlterName(_, _)
250            | DdlCommand::AlterObjectOwner(_, _)
251            | DdlCommand::AlterSetSchema(_, _)
252            | DdlCommand::CreateDatabase(_)
253            | DdlCommand::CreateSchema(_)
254            | DdlCommand::CreateFunction(_)
255            | DdlCommand::CreateView(_, _)
256            | DdlCommand::CreateConnection(_)
257            | DdlCommand::CommentOn(_)
258            | DdlCommand::CreateSecret(_)
259            | DdlCommand::AlterSecret(_)
260            | DdlCommand::AlterSwapRename(_)
261            | DdlCommand::AlterDatabaseParam(_, _)
262            | DdlCommand::AlterStreamingJobConfig(_, _, _)
263            | DdlCommand::AlterSubscriptionRetention { .. } => true,
264            DdlCommand::CreateStreamingJob { .. }
265            | DdlCommand::CreateNonSharedSource(_)
266            | DdlCommand::ReplaceStreamJob(_)
267            | DdlCommand::AlterNonSharedSource(_)
268            | DdlCommand::ResetSource(_)
269            | DdlCommand::CreateSubscription(_) => false,
270        }
271    }
272}
273
274#[derive(Clone)]
275pub struct DdlController {
276    pub(crate) env: MetaSrvEnv,
277
278    pub(crate) metadata_manager: MetadataManager,
279    pub(crate) stream_manager: GlobalStreamManagerRef,
280    pub(crate) source_manager: SourceManagerRef,
281    barrier_manager: BarrierManagerRef,
282    sink_manager: SinkCoordinatorManager,
283    iceberg_compaction_manager: IcebergCompactionManagerRef,
284
285    // The semaphore is used to limit the number of concurrent streaming job creation.
286    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
287
288    /// Sequence number for DDL commands, used for observability and debugging.
289    seq: Arc<AtomicU64>,
290}
291
292#[derive(Clone)]
293pub struct CreatingStreamingJobPermit {
294    pub(crate) semaphore: Arc<Semaphore>,
295}
296
297impl CreatingStreamingJobPermit {
298    async fn new(env: &MetaSrvEnv) -> Self {
299        let mut permits = env
300            .system_params_reader()
301            .await
302            .max_concurrent_creating_streaming_jobs() as usize;
303        if permits == 0 {
304            // if the system parameter is set to zero, use the max permitted value.
305            permits = Semaphore::MAX_PERMITS;
306        }
307        let semaphore = Arc::new(Semaphore::new(permits));
308
309        let (local_notification_tx, mut local_notification_rx) =
310            tokio::sync::mpsc::unbounded_channel();
311        env.notification_manager()
312            .insert_local_sender(local_notification_tx);
313        let semaphore_clone = semaphore.clone();
314        tokio::spawn(async move {
315            while let Some(notification) = local_notification_rx.recv().await {
316                let LocalNotification::SystemParamsChange(p) = &notification else {
317                    continue;
318                };
319                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
320                if new_permits == 0 {
321                    new_permits = Semaphore::MAX_PERMITS;
322                }
323                match permits.cmp(&new_permits) {
324                    Ordering::Less => {
325                        semaphore_clone.add_permits(new_permits - permits);
326                    }
327                    Ordering::Equal => continue,
328                    Ordering::Greater => {
329                        let to_release = permits - new_permits;
330                        let reduced = semaphore_clone.forget_permits(to_release);
331                        // TODO: implement dynamic semaphore with limits by ourself.
332                        if reduced != to_release {
333                            tracing::warn!(
334                                "no enough permits to release, expected {}, but reduced {}",
335                                to_release,
336                                reduced
337                            );
338                        }
339                    }
340                }
341                tracing::info!(
342                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
343                    permits,
344                    new_permits
345                );
346                permits = new_permits;
347            }
348        });
349
350        Self { semaphore }
351    }
352}
353
354impl DdlController {
355    fn validate_specified_parallelism(
356        specified_parallelism: Option<NonZeroUsize>,
357        specified_backfill_parallelism: Option<NonZeroUsize>,
358        max_parallelism: NonZeroUsize,
359    ) -> MetaResult<()> {
360        if let Some(parallelism) = specified_parallelism
361            && parallelism > max_parallelism
362        {
363            bail_invalid_parameter!(
364                "specified parallelism {} should not exceed max parallelism {}",
365                parallelism,
366                max_parallelism,
367            );
368        }
369        if let Some(backfill_parallelism) = specified_backfill_parallelism
370            && backfill_parallelism > max_parallelism
371        {
372            bail_invalid_parameter!(
373                "specified backfill parallelism {} should not exceed max parallelism {}",
374                backfill_parallelism,
375                max_parallelism,
376            );
377        }
378        Ok(())
379    }
380
381    pub async fn new(
382        env: MetaSrvEnv,
383        metadata_manager: MetadataManager,
384        stream_manager: GlobalStreamManagerRef,
385        source_manager: SourceManagerRef,
386        barrier_manager: BarrierManagerRef,
387        sink_manager: SinkCoordinatorManager,
388        iceberg_compaction_manager: IcebergCompactionManagerRef,
389    ) -> Self {
390        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
391        Self {
392            env,
393            metadata_manager,
394            stream_manager,
395            source_manager,
396            barrier_manager,
397            sink_manager,
398            iceberg_compaction_manager,
399            creating_streaming_job_permits,
400            seq: Arc::new(AtomicU64::new(0)),
401        }
402    }
403
404    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
405    pub fn next_seq(&self) -> u64 {
406        // This is a simple atomic increment operation.
407        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
408    }
409
410    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
411    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
412    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
413    /// would be a huge hassle and pain if we don't spawn here.
414    ///
415    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
416    #[expect(clippy::large_stack_frames)]
417    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
418        if !command.allow_in_recovery() {
419            self.barrier_manager.check_status_running()?;
420        }
421
422        let await_tree_key = format!("DDL Command {}", self.next_seq());
423        let await_tree_span = await_tree::span!("{command}({})", command.object());
424
425        let ctrl = self.clone();
426        let fut = Box::pin(async move {
427            match command {
428                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
429                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
430                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
431                DdlCommand::DropSchema(schema_id, drop_mode) => {
432                    ctrl.drop_schema(schema_id, drop_mode).await
433                }
434                DdlCommand::CreateNonSharedSource(source) => {
435                    ctrl.create_non_shared_source(source).await
436                }
437                DdlCommand::DropSource(source_id, drop_mode) => {
438                    ctrl.drop_source(source_id, drop_mode).await
439                }
440                DdlCommand::ResetSource(source_id) => ctrl.reset_source(source_id).await,
441                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
442                DdlCommand::DropFunction(function_id, drop_mode) => {
443                    ctrl.drop_function(function_id, drop_mode).await
444                }
445                DdlCommand::CreateView(view, dependencies) => {
446                    ctrl.create_view(view, dependencies).await
447                }
448                DdlCommand::DropView(view_id, drop_mode) => {
449                    ctrl.drop_view(view_id, drop_mode).await
450                }
451                DdlCommand::CreateStreamingJob {
452                    stream_job,
453                    fragment_graph,
454                    dependencies,
455                    resource_type,
456                    if_not_exists,
457                    refresh_interval_sec,
458                } => {
459                    ctrl.create_streaming_job(
460                        stream_job,
461                        fragment_graph,
462                        dependencies,
463                        resource_type,
464                        if_not_exists,
465                        refresh_interval_sec,
466                    )
467                    .await
468                }
469                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
470                    ctrl.drop_streaming_job(job_id, drop_mode).await
471                }
472                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
473                    streaming_job,
474                    fragment_graph,
475                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
476                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
477                DdlCommand::AlterObjectOwner(object, owner_id) => {
478                    ctrl.alter_owner(object, owner_id).await
479                }
480                DdlCommand::AlterSetSchema(object, new_schema_id) => {
481                    ctrl.alter_set_schema(object, new_schema_id).await
482                }
483                DdlCommand::CreateConnection(connection) => {
484                    ctrl.create_connection(connection).await
485                }
486                DdlCommand::DropConnection(connection_id, drop_mode) => {
487                    ctrl.drop_connection(connection_id, drop_mode).await
488                }
489                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
490                DdlCommand::DropSecret(secret_id, drop_mode) => {
491                    ctrl.drop_secret(secret_id, drop_mode).await
492                }
493                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
494                DdlCommand::AlterNonSharedSource(source) => {
495                    ctrl.alter_non_shared_source(source).await
496                }
497                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
498                DdlCommand::CreateSubscription(subscription) => {
499                    ctrl.create_subscription(subscription).await
500                }
501                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
502                    ctrl.drop_subscription(subscription_id, drop_mode).await
503                }
504                DdlCommand::AlterSubscriptionRetention {
505                    subscription_id,
506                    retention_seconds,
507                    definition,
508                } => {
509                    ctrl.alter_subscription_retention(
510                        subscription_id,
511                        retention_seconds,
512                        definition,
513                    )
514                    .await
515                }
516                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
517                DdlCommand::AlterDatabaseParam(database_id, param) => {
518                    ctrl.alter_database_param(database_id, param).await
519                }
520                DdlCommand::AlterStreamingJobConfig(job_id, entries_to_add, keys_to_remove) => {
521                    ctrl.alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
522                        .await
523                }
524            }
525        })
526        .in_current_span();
527        let fut = (self.env.await_tree_reg())
528            .register(await_tree_key, await_tree_span)
529            .instrument(Box::pin(fut));
530        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
531        Ok(Some(WaitVersion {
532            catalog_version: notification_version,
533            hummock_version_id: self.barrier_manager.get_hummock_version_id().await,
534        }))
535    }
536
537    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
538        self.barrier_manager.get_ddl_progress().await
539    }
540
541    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
542        let (version, updated_db) = self
543            .metadata_manager
544            .catalog_controller
545            .create_database(database)
546            .await?;
547        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
548        self.barrier_manager
549            .update_database_barrier(
550                updated_db.database_id,
551                updated_db.barrier_interval_ms.map(|v| v as u32),
552                updated_db.checkpoint_frequency.map(|v| v as u64),
553            )
554            .await?;
555        Ok(version)
556    }
557
558    #[tracing::instrument(skip(self), level = "debug")]
559    pub async fn reschedule_streaming_job(
560        &self,
561        job_id: JobId,
562        target: ReschedulePolicy,
563        mut deferred: bool,
564    ) -> MetaResult<()> {
565        tracing::info!("altering parallelism for job {}", job_id);
566        if self.barrier_manager.check_status_running().is_err() {
567            tracing::info!(
568                "alter parallelism is set to deferred mode because the system is in recovery state"
569            );
570            deferred = true;
571        }
572
573        self.stream_manager
574            .reschedule_streaming_job(job_id, target, deferred)
575            .await
576    }
577
578    pub async fn reschedule_streaming_job_backfill_parallelism(
579        &self,
580        job_id: JobId,
581        parallelism: Option<ParallelismPolicy>,
582        mut deferred: bool,
583    ) -> MetaResult<()> {
584        tracing::info!("altering backfill parallelism for job {}", job_id);
585        if self.barrier_manager.check_status_running().is_err() {
586            tracing::info!(
587                "alter backfill parallelism is set to deferred mode because the system is in recovery state"
588            );
589            deferred = true;
590        }
591
592        self.stream_manager
593            .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
594            .await
595    }
596
597    pub async fn reschedule_cdc_table_backfill(
598        &self,
599        job_id: JobId,
600        target: ReschedulePolicy,
601    ) -> MetaResult<()> {
602        tracing::info!("alter CDC table backfill parallelism");
603        if self.barrier_manager.check_status_running().is_err() {
604            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
605        }
606        self.stream_manager
607            .reschedule_cdc_table_backfill(job_id, target)
608            .await
609    }
610
611    pub async fn reschedule_fragments(
612        &self,
613        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
614    ) -> MetaResult<()> {
615        tracing::info!(
616            "altering parallelism for fragments {:?}",
617            fragment_targets.keys()
618        );
619        let fragment_targets = fragment_targets
620            .into_iter()
621            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
622            .collect();
623
624        self.stream_manager
625            .reschedule_fragments(fragment_targets)
626            .await
627    }
628
629    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
630        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
631            .await
632    }
633
634    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
635        self.metadata_manager
636            .catalog_controller
637            .create_schema(schema)
638            .await
639    }
640
641    async fn drop_schema(
642        &self,
643        schema_id: SchemaId,
644        drop_mode: DropMode,
645    ) -> MetaResult<NotificationVersion> {
646        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
647            .await
648    }
649
650    /// Shared source is handled in [`Self::create_streaming_job`]
651    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
652        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
653            .await
654            .context("failed to create source worker")?;
655
656        let (source_id, version) = self
657            .metadata_manager
658            .catalog_controller
659            .create_source(source)
660            .await?;
661        self.source_manager
662            .register_source_with_handle(source_id, handle)
663            .await;
664        Ok(version)
665    }
666
667    async fn drop_source(
668        &self,
669        source_id: SourceId,
670        drop_mode: DropMode,
671    ) -> MetaResult<NotificationVersion> {
672        self.drop_object(ObjectType::Source, source_id, drop_mode)
673            .await
674    }
675
676    async fn reset_source(&self, source_id: SourceId) -> MetaResult<NotificationVersion> {
677        tracing::info!(source_id = %source_id, "resetting CDC source offset to latest");
678
679        // Get database_id for the source
680        let database_id = self
681            .metadata_manager
682            .catalog_controller
683            .get_object_database_id(source_id)
684            .await?;
685
686        self.stream_manager
687            .barrier_scheduler
688            .run_command(database_id, Command::ResetSource { source_id })
689            .await?;
690
691        // RESET SOURCE doesn't modify catalog, so return the current catalog version
692        let version = self
693            .metadata_manager
694            .catalog_controller
695            .notify_frontend_trivial()
696            .await;
697        Ok(version)
698    }
699
700    /// This replaces the source in the catalog.
701    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
702    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
703        self.metadata_manager
704            .catalog_controller
705            .alter_non_shared_source(source)
706            .await
707    }
708
709    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
710        self.metadata_manager
711            .catalog_controller
712            .create_function(function)
713            .await
714    }
715
716    async fn drop_function(
717        &self,
718        function_id: FunctionId,
719        drop_mode: DropMode,
720    ) -> MetaResult<NotificationVersion> {
721        self.drop_object(ObjectType::Function, function_id, drop_mode)
722            .await
723    }
724
725    async fn create_view(
726        &self,
727        view: View,
728        dependencies: HashSet<ObjectId>,
729    ) -> MetaResult<NotificationVersion> {
730        self.metadata_manager
731            .catalog_controller
732            .create_view(view, dependencies)
733            .await
734    }
735
736    async fn drop_view(
737        &self,
738        view_id: ViewId,
739        drop_mode: DropMode,
740    ) -> MetaResult<NotificationVersion> {
741        self.drop_object(ObjectType::View, view_id, drop_mode).await
742    }
743
744    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
745        validate_connection(&connection).await?;
746        self.metadata_manager
747            .catalog_controller
748            .create_connection(connection)
749            .await
750    }
751
752    async fn drop_connection(
753        &self,
754        connection_id: ConnectionId,
755        drop_mode: DropMode,
756    ) -> MetaResult<NotificationVersion> {
757        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
758            .await
759    }
760
761    async fn alter_database_param(
762        &self,
763        database_id: DatabaseId,
764        param: AlterDatabaseParam,
765    ) -> MetaResult<NotificationVersion> {
766        let (version, updated_db) = self
767            .metadata_manager
768            .catalog_controller
769            .alter_database_param(database_id, param)
770            .await?;
771        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
772        self.barrier_manager
773            .update_database_barrier(
774                database_id,
775                updated_db.barrier_interval_ms.map(|v| v as u32),
776                updated_db.checkpoint_frequency.map(|v| v as u64),
777            )
778            .await?;
779        Ok(version)
780    }
781
782    // The 'secret' part of the request we receive from the frontend is in plaintext;
783    // here, we need to encrypt it before storing it in the catalog.
784    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
785        let secret_store_private_key = self
786            .env
787            .opts
788            .secret_store_private_key
789            .clone()
790            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
791
792        let encrypted_payload = SecretEncryption::encrypt(
793            secret_store_private_key.as_slice(),
794            secret.get_value().as_slice(),
795        )
796        .context(format!("failed to encrypt secret {}", secret.name))?;
797        Ok(encrypted_payload
798            .serialize()
799            .context(format!("failed to serialize secret {}", secret.name))?)
800    }
801
802    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
803        // The 'secret' part of the request we receive from the frontend is in plaintext;
804        // here, we need to encrypt it before storing it in the catalog.
805        let secret_plain_payload = secret.value.clone();
806        let encrypted_payload = self.get_encrypted_payload(&secret)?;
807        secret.value = encrypted_payload;
808
809        self.metadata_manager
810            .catalog_controller
811            .create_secret(secret, secret_plain_payload)
812            .await
813    }
814
815    async fn drop_secret(
816        &self,
817        secret_id: SecretId,
818        drop_mode: DropMode,
819    ) -> MetaResult<NotificationVersion> {
820        self.drop_object(ObjectType::Secret, secret_id, drop_mode)
821            .await
822    }
823
824    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
825        let secret_plain_payload = secret.value.clone();
826        let encrypted_payload = self.get_encrypted_payload(&secret)?;
827        secret.value = encrypted_payload;
828        self.metadata_manager
829            .catalog_controller
830            .alter_secret(secret, secret_plain_payload)
831            .await
832    }
833
834    async fn create_subscription(
835        &self,
836        mut subscription: Subscription,
837    ) -> MetaResult<NotificationVersion> {
838        tracing::debug!("create subscription");
839        let _permit = self
840            .creating_streaming_job_permits
841            .semaphore
842            .acquire()
843            .await
844            .unwrap();
845        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
846        self.metadata_manager
847            .catalog_controller
848            .create_subscription_catalog(&mut subscription)
849            .await?;
850        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
851            tracing::debug!(error = %err.as_report(), "failed to create subscription");
852            let _ = self
853                .metadata_manager
854                .catalog_controller
855                .try_abort_creating_subscription(subscription.id)
856                .await
857                .inspect_err(|e| {
858                    tracing::error!(
859                        error = %e.as_report(),
860                        "failed to abort create subscription after failure"
861                    );
862                });
863            return Err(err);
864        }
865
866        let version = self
867            .metadata_manager
868            .catalog_controller
869            .notify_create_subscription(subscription.id)
870            .await?;
871        tracing::debug!("finish create subscription");
872        Ok(version)
873    }
874
875    async fn drop_subscription(
876        &self,
877        subscription_id: SubscriptionId,
878        drop_mode: DropMode,
879    ) -> MetaResult<NotificationVersion> {
880        tracing::debug!("preparing drop subscription");
881        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
882        let subscription = self
883            .metadata_manager
884            .catalog_controller
885            .get_subscription_by_id(subscription_id)
886            .await?;
887        let table_id = subscription.dependent_table_id;
888        let database_id = subscription.database_id;
889        let (_, version) = self
890            .metadata_manager
891            .catalog_controller
892            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
893            .await?;
894        self.stream_manager
895            .drop_subscription(database_id, subscription_id, table_id)
896            .await;
897        tracing::debug!("finish drop subscription");
898        Ok(version)
899    }
900
901    async fn alter_subscription_retention(
902        &self,
903        subscription_id: SubscriptionId,
904        retention_seconds: u64,
905        definition: String,
906    ) -> MetaResult<NotificationVersion> {
907        tracing::debug!("alter subscription retention");
908        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
909        let (version, subscription) = self
910            .metadata_manager
911            .catalog_controller
912            .alter_subscription_retention(subscription_id, retention_seconds, definition)
913            .await?;
914        self.stream_manager
915            .alter_subscription_retention(
916                subscription.database_id,
917                subscription.id,
918                subscription.dependent_table_id,
919                subscription.retention_seconds,
920            )
921            .await?;
922        tracing::debug!("finish alter subscription retention");
923        Ok(version)
924    }
925
926    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
927    #[await_tree::instrument]
928    pub(crate) async fn validate_cdc_table(
929        &self,
930        table: &Table,
931        table_fragments: &StreamJobFragments,
932    ) -> MetaResult<()> {
933        let stream_scan_fragment = table_fragments
934            .fragments
935            .values()
936            .filter(|f| {
937                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
938                    || f.fragment_type_mask
939                        .contains(FragmentTypeFlag::StreamCdcScan)
940            })
941            .exactly_one()
942            .ok()
943            .with_context(|| {
944                format!(
945                    "expect exactly one stream scan fragment, got: {:?}",
946                    table_fragments.fragments
947                )
948            })?;
949        fn assert_parallelism(
950            distribution_type: PbFragmentDistributionType,
951            node_body: &Option<NodeBody>,
952        ) {
953            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
954                if let Some(o) = node.options
955                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
956                {
957                    // Use parallel CDC backfill.
958                } else {
959                    assert_eq!(
960                        distribution_type,
961                        PbFragmentDistributionType::Single,
962                        "Non-parallelized CDC scan fragment should have Single distribution"
963                    );
964                }
965            }
966        }
967        let mut found_cdc_scan = false;
968        match &stream_scan_fragment.nodes.node_body {
969            Some(NodeBody::StreamCdcScan(_)) => {
970                assert_parallelism(
971                    stream_scan_fragment.distribution_type,
972                    &stream_scan_fragment.nodes.node_body,
973                );
974                if self
975                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
976                    .await?
977                {
978                    found_cdc_scan = true;
979                }
980            }
981            // When there's generated columns, the cdc scan node is wrapped in a project node
982            Some(NodeBody::Project(_)) => {
983                for input in &stream_scan_fragment.nodes.input {
984                    assert_parallelism(stream_scan_fragment.distribution_type, &input.node_body);
985                    if self
986                        .validate_cdc_table_inner(&input.node_body, table.id)
987                        .await?
988                    {
989                        found_cdc_scan = true;
990                    }
991                }
992            }
993            _ => {
994                bail!("Unexpected node body for stream cdc scan");
995            }
996        };
997        if !found_cdc_scan {
998            bail!("No stream cdc scan node found in stream scan fragment");
999        }
1000        Ok(())
1001    }
1002
1003    async fn validate_cdc_table_inner(
1004        &self,
1005        node_body: &Option<NodeBody>,
1006        table_id: TableId,
1007    ) -> MetaResult<bool> {
1008        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
1009            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
1010        {
1011            let options_with_secret = WithOptionsSecResolved::new(
1012                cdc_table_desc.connect_properties.clone(),
1013                cdc_table_desc.secret_refs.clone(),
1014            );
1015
1016            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
1017            props.init_from_pb_cdc_table_desc(cdc_table_desc);
1018
1019            // Try creating a split enumerator to validate
1020            let _enumerator = props
1021                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
1022                .await?;
1023
1024            tracing::debug!(?table_id, "validate cdc table success");
1025            Ok(true)
1026        } else {
1027            Ok(false)
1028        }
1029    }
1030
1031    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
1032        let migrated = self
1033            .metadata_manager
1034            .catalog_controller
1035            .has_table_been_migrated(table_id)
1036            .await?;
1037        if !migrated {
1038            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
1039        } else {
1040            Ok(())
1041        }
1042    }
1043
1044    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
1045    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
1046    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1047    pub async fn create_streaming_job(
1048        &self,
1049        mut streaming_job: StreamingJob,
1050        fragment_graph: StreamFragmentGraphProto,
1051        dependencies: HashSet<ObjectId>,
1052        resource_type: streaming_job_resource_type::ResourceType,
1053        if_not_exists: bool,
1054        refresh_interval_sec: Option<u64>,
1055    ) -> MetaResult<NotificationVersion> {
1056        if let StreamingJob::Sink(sink) = &streaming_job
1057            && let Some(target_table) = sink.target_table
1058        {
1059            self.validate_table_for_sink(target_table).await?;
1060        }
1061        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1062        let adaptive_parallelism_strategy =
1063            (!fragment_graph.adaptive_parallelism_strategy.is_empty()).then(|| {
1064                parse_strategy(&fragment_graph.adaptive_parallelism_strategy)
1065                    .expect("adaptive parallelism strategy should be validated in frontend")
1066            });
1067        let backfill_adaptive_parallelism_strategy = (!fragment_graph
1068            .backfill_adaptive_parallelism_strategy
1069            .is_empty())
1070        .then(|| {
1071            parse_strategy(&fragment_graph.backfill_adaptive_parallelism_strategy)
1072                .expect("backfill adaptive parallelism strategy should be validated in frontend")
1073        });
1074        let streaming_job_model = match self
1075            .metadata_manager
1076            .catalog_controller
1077            .create_job_catalog(
1078                &mut streaming_job,
1079                &ctx,
1080                &fragment_graph.parallelism,
1081                fragment_graph.max_parallelism as _,
1082                dependencies,
1083                resource_type.clone(),
1084                &fragment_graph.backfill_parallelism,
1085                adaptive_parallelism_strategy,
1086                backfill_adaptive_parallelism_strategy,
1087                refresh_interval_sec,
1088            )
1089            .await
1090        {
1091            Ok(model) => model,
1092            Err(meta_err) => {
1093                if !if_not_exists {
1094                    return Err(meta_err);
1095                }
1096                return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1097                    if streaming_job.create_type() == CreateType::Foreground {
1098                        let database_id = streaming_job.database_id();
1099                        self.metadata_manager
1100                            .wait_streaming_job_finished(database_id, *job_id)
1101                            .await
1102                    } else {
1103                        Ok(IGNORED_NOTIFICATION_VERSION)
1104                    }
1105                } else {
1106                    Err(meta_err)
1107                };
1108            }
1109        };
1110        let job_id = streaming_job.id();
1111        tracing::debug!(
1112            id = %job_id,
1113            definition = streaming_job.definition(),
1114            create_type = streaming_job.create_type().as_str_name(),
1115            job_type = ?streaming_job.job_type(),
1116            "starting streaming job",
1117        );
1118        // TODO: acquire permits for recovered background DDLs.
1119        let permit = self
1120            .creating_streaming_job_permits
1121            .semaphore
1122            .clone()
1123            .acquire_owned()
1124            .instrument_await("acquire_creating_streaming_job_permit")
1125            .await
1126            .unwrap();
1127        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1128
1129        let name = streaming_job.name();
1130        let definition = streaming_job.definition();
1131        let source_id = match &streaming_job {
1132            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1133            _ => None,
1134        };
1135        // Generate streaming job metadata and issue the create command in two steps, so that the
1136        // error phase is classified at the barrier command boundary.
1137        let create_result = match self
1138            .generate_streaming_job(
1139                ctx,
1140                streaming_job,
1141                fragment_graph,
1142                resource_type,
1143                streaming_job_model,
1144            )
1145            .await
1146        {
1147            Ok((stream_job_fragments, ctx)) => self
1148                .stream_manager
1149                .create_streaming_job(stream_job_fragments, ctx, permit)
1150                .await
1151                .map_err(|err| (err, true)),
1152            Err(err) => Err((err, false)),
1153        };
1154
1155        match create_result {
1156            Ok(version) => Ok(version),
1157            Err((err, is_cancelled)) => {
1158                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
1159                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1160                    id: job_id,
1161                    name,
1162                    definition,
1163                    error: err.as_report().to_string(),
1164                };
1165                self.env.event_log_manager_ref().add_event_logs(vec![
1166                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1167                ]);
1168                let (aborted, _) = self
1169                    .metadata_manager
1170                    .catalog_controller
1171                    .try_abort_creating_streaming_job(job_id, is_cancelled)
1172                    .await?;
1173                if aborted {
1174                    tracing::warn!(id = %job_id, is_cancelled, "aborted streaming job");
1175                    // FIXME: might also need other cleanup here
1176                    if let Some(source_id) = source_id {
1177                        self.source_manager
1178                            .apply_source_change(SourceChange::DropSource {
1179                                dropped_source_ids: vec![source_id],
1180                            })
1181                            .await;
1182                    }
1183                }
1184                Err(err)
1185            }
1186        }
1187    }
1188
1189    #[await_tree::instrument(boxed)]
1190    async fn generate_streaming_job(
1191        &self,
1192        ctx: StreamContext,
1193        mut streaming_job: StreamingJob,
1194        fragment_graph: StreamFragmentGraphProto,
1195        resource_type: streaming_job_resource_type::ResourceType,
1196        streaming_job_model: streaming_job::Model,
1197    ) -> MetaResult<(StreamJobFragmentsToCreate, CreateStreamingJobContext)> {
1198        let mut fragment_graph =
1199            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1200        streaming_job.set_info_from_graph(&fragment_graph);
1201
1202        // create internal table catalogs and refill table id.
1203        let incomplete_internal_tables = fragment_graph
1204            .incomplete_internal_tables()
1205            .into_values()
1206            .collect_vec();
1207        let table_id_map = self
1208            .metadata_manager
1209            .catalog_controller
1210            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1211            .await?;
1212        fragment_graph.refill_internal_table_ids(table_id_map);
1213
1214        // create fragment and actor catalogs.
1215        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1216        let (ctx, stream_job_fragments) = self
1217            .build_stream_job(
1218                ctx,
1219                streaming_job,
1220                fragment_graph,
1221                resource_type,
1222                streaming_job_model,
1223            )
1224            .await?;
1225
1226        let streaming_job = &ctx.streaming_job;
1227
1228        match streaming_job {
1229            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1230                self.validate_cdc_table(table, &stream_job_fragments)
1231                    .await?;
1232            }
1233            StreamingJob::Table(Some(source), ..) => {
1234                // Register the source on the connector node.
1235                self.source_manager.register_source(source).await?;
1236                let connector_name = source
1237                    .get_with_properties()
1238                    .get(UPSTREAM_SOURCE_KEY)
1239                    .cloned();
1240                let attr = source.info.as_ref().map(|source_info| {
1241                    jsonbb::json!({
1242                            "format": source_info.format().as_str_name(),
1243                            "encode": source_info.row_encode().as_str_name(),
1244                    })
1245                });
1246                report_create_object(
1247                    streaming_job.id(),
1248                    "source",
1249                    PbTelemetryDatabaseObject::Source,
1250                    connector_name,
1251                    attr,
1252                );
1253            }
1254            StreamingJob::Sink(sink) => {
1255                if sink.auto_refresh_schema_from_table.is_some() {
1256                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?;
1257                }
1258                // Validate the sink on the connector node.
1259                validate_sink(sink).await?;
1260                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1261                let attr = sink.format_desc.as_ref().map(|sink_info| {
1262                    jsonbb::json!({
1263                        "format": sink_info.format().as_str_name(),
1264                        "encode": sink_info.encode().as_str_name(),
1265                    })
1266                });
1267                report_create_object(
1268                    streaming_job.id(),
1269                    "sink",
1270                    PbTelemetryDatabaseObject::Sink,
1271                    connector_name,
1272                    attr,
1273                );
1274            }
1275            StreamingJob::Source(source) => {
1276                // Register the source on the connector node.
1277                self.source_manager.register_source(source).await?;
1278                let connector_name = source
1279                    .get_with_properties()
1280                    .get(UPSTREAM_SOURCE_KEY)
1281                    .cloned();
1282                let attr = source.info.as_ref().map(|source_info| {
1283                    jsonbb::json!({
1284                            "format": source_info.format().as_str_name(),
1285                            "encode": source_info.row_encode().as_str_name(),
1286                    })
1287                });
1288                report_create_object(
1289                    streaming_job.id(),
1290                    "source",
1291                    PbTelemetryDatabaseObject::Source,
1292                    connector_name,
1293                    attr,
1294                );
1295            }
1296            _ => {}
1297        }
1298
1299        let backfill_orders = ctx.fragment_backfill_ordering.to_meta_model();
1300        self.metadata_manager
1301            .catalog_controller
1302            .prepare_stream_job_fragments(
1303                &stream_job_fragments,
1304                streaming_job,
1305                false,
1306                Some(backfill_orders),
1307            )
1308            .await?;
1309
1310        Ok((stream_job_fragments, ctx))
1311    }
1312
1313    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1314    pub async fn drop_object(
1315        &self,
1316        object_type: ObjectType,
1317        object_id: impl Into<ObjectId>,
1318        drop_mode: DropMode,
1319    ) -> MetaResult<NotificationVersion> {
1320        let object_id = object_id.into();
1321        // Fence reschedule and source tick before catalog deletion so post-collect split updates
1322        // cannot race with dropped fragments.
1323        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1324        let _source_tick_pause_guard = self.source_manager.pause_tick().await;
1325
1326        let (release_ctx, version) = self
1327            .metadata_manager
1328            .catalog_controller
1329            .drop_object(object_type, object_id, drop_mode)
1330            .await?;
1331
1332        if object_type == ObjectType::Source {
1333            self.env
1334                .notification_manager_ref()
1335                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1336        }
1337
1338        let ReleaseContext {
1339            database_id,
1340            removed_streaming_job_ids,
1341            removed_state_table_ids,
1342            removed_source_ids,
1343            removed_secret_ids: secret_ids,
1344            removed_source_fragments,
1345            removed_fragments,
1346            removed_sink_fragment_by_targets,
1347            removed_iceberg_table_sinks,
1348        } = release_ctx;
1349
1350        // Notify serving module about deleted fragments so it can clean up serving vnode mappings.
1351        // This is driven by the fragment model deletion (cascade from Object::delete_many),
1352        // decoupled from the barrier-driven streaming mapping notifications.
1353        self.env
1354            .notification_manager_ref()
1355            .notify_serving_fragment_mapping_delete(
1356                removed_fragments.iter().map(|id| *id as _).collect(),
1357            );
1358
1359        self.stream_manager
1360            .drop_streaming_jobs(
1361                database_id,
1362                removed_streaming_job_ids,
1363                removed_state_table_ids,
1364                removed_sink_fragment_by_targets
1365                    .into_iter()
1366                    .map(|(target, sinks)| {
1367                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1368                    })
1369                    .collect(),
1370            )
1371            .await;
1372
1373        // clean up sources after dropping streaming jobs.
1374        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1375        self.source_manager
1376            .apply_source_change(SourceChange::DropSource {
1377                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1378            })
1379            .await;
1380
1381        // unregister fragments and actors from source manager.
1382        // FIXME: need also unregister source backfill fragments.
1383        let dropped_source_fragments = removed_source_fragments;
1384        self.source_manager
1385            .apply_source_change(SourceChange::DropMv {
1386                dropped_source_fragments,
1387            })
1388            .await;
1389
1390        // clean up iceberg table sinks
1391        let iceberg_sink_ids: Vec<SinkId> = removed_iceberg_table_sinks
1392            .iter()
1393            .map(|sink| sink.id)
1394            .collect();
1395
1396        for sink in removed_iceberg_table_sinks {
1397            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1398                .expect("Iceberg sink should be valid");
1399            let iceberg_sink =
1400                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1401            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1402                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1403                tracing::info!(
1404                    "dropping iceberg table {} for dropped sink",
1405                    table_identifier
1406                );
1407
1408                let _ = iceberg_catalog
1409                    .drop_table(&table_identifier)
1410                    .await
1411                    .inspect_err(|err| {
1412                        tracing::error!(
1413                            "failed to drop iceberg table {} during cleanup: {}",
1414                            table_identifier,
1415                            err.as_report()
1416                        );
1417                    });
1418            }
1419        }
1420
1421        // stop sink coordinators for iceberg table sinks
1422        if !iceberg_sink_ids.is_empty() {
1423            self.sink_manager
1424                .stop_sink_coordinator(iceberg_sink_ids.clone())
1425                .await;
1426
1427            for sink_id in iceberg_sink_ids {
1428                self.iceberg_compaction_manager
1429                    .clear_iceberg_maintenance_by_sink_id(sink_id);
1430            }
1431        }
1432
1433        // remove secrets.
1434        for secret in secret_ids {
1435            LocalSecretManager::global().remove_secret(secret);
1436        }
1437        Ok(version)
1438    }
1439
1440    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1441    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1442    pub async fn replace_job(
1443        &self,
1444        mut streaming_job: StreamingJob,
1445        fragment_graph: StreamFragmentGraphProto,
1446    ) -> MetaResult<NotificationVersion> {
1447        match &streaming_job {
1448            StreamingJob::Table(..)
1449            | StreamingJob::Source(..)
1450            | StreamingJob::MaterializedView(..) => {}
1451            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1452                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1453            }
1454        }
1455
1456        let job_id = streaming_job.id();
1457
1458        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1459        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1460
1461        // Ensure the max parallelism unchanged before replacing table.
1462        let original_max_parallelism = self
1463            .metadata_manager
1464            .get_job_max_parallelism(streaming_job.id())
1465            .await?;
1466        let fragment_graph = PbStreamFragmentGraph {
1467            max_parallelism: original_max_parallelism as _,
1468            ..fragment_graph
1469        };
1470
1471        // 1. build fragment graph.
1472        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1473        streaming_job.set_info_from_graph(&fragment_graph);
1474
1475        // make it immutable
1476        let streaming_job = streaming_job;
1477
1478        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1479            let auto_refresh_schema_sinks = self
1480                .metadata_manager
1481                .catalog_controller
1482                .get_sink_auto_refresh_schema_from(table.id)
1483                .await?;
1484            if !auto_refresh_schema_sinks.is_empty() {
1485                let original_table_columns = self
1486                    .metadata_manager
1487                    .catalog_controller
1488                    .get_table_columns(table.id)
1489                    .await?;
1490                // compare column id to find newly added and removed columns
1491                let original_table_column_ids: HashSet<_> = original_table_columns
1492                    .iter()
1493                    .map(|col| col.column_id())
1494                    .collect();
1495                let new_table_column_ids: HashSet<_> = table
1496                    .columns
1497                    .iter()
1498                    .map(|col| ColumnId::new(col.column_desc.as_ref().unwrap().column_id as _))
1499                    .collect();
1500                let newly_added_columns = table
1501                    .columns
1502                    .iter()
1503                    .filter(|col| {
1504                        !original_table_column_ids.contains(&ColumnId::new(
1505                            col.column_desc.as_ref().unwrap().column_id as _,
1506                        ))
1507                    })
1508                    .map(|col| ColumnCatalog::from(col.clone()))
1509                    .collect_vec();
1510                let removed_columns = original_table_columns
1511                    .iter()
1512                    .filter(|col| !new_table_column_ids.contains(&col.column_id()))
1513                    .cloned()
1514                    .collect_vec();
1515                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1516                for sink in auto_refresh_schema_sinks {
1517                    let sink_job_fragments = self
1518                        .metadata_manager
1519                        .get_job_fragments_by_id(sink.id.as_job_id())
1520                        .await?;
1521                    if sink_job_fragments.fragments.len() != 1 {
1522                        return Err(anyhow!(
1523                            "auto schema refresh sink must have only one fragment, but got {}",
1524                            sink_job_fragments.fragments.len()
1525                        )
1526                        .into());
1527                    }
1528                    let sink_ctx = sink_job_fragments.ctx;
1529                    let original_sink_fragment =
1530                        sink_job_fragments.fragments.into_values().next().unwrap();
1531                    let (new_sink_fragment, new_schema, new_log_store_table) =
1532                        rewrite_refresh_schema_sink_fragment(
1533                            &original_sink_fragment,
1534                            &sink,
1535                            &newly_added_columns,
1536                            &removed_columns,
1537                            table,
1538                            fragment_graph.table_fragment_id(),
1539                            self.env.id_gen_manager(),
1540                        )?;
1541
1542                    let streaming_job = StreamingJob::Sink(sink);
1543
1544                    let tmp_sink_model = self
1545                        .metadata_manager
1546                        .catalog_controller
1547                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1548                        .await?;
1549                    let tmp_sink_id = tmp_sink_model.job_id.as_sink_id();
1550                    let StreamingJob::Sink(sink) = streaming_job else {
1551                        unreachable!()
1552                    };
1553
1554                    sinks.push(AutoRefreshSchemaSinkContext {
1555                        tmp_sink_id,
1556                        original_sink: sink,
1557                        original_fragment: original_sink_fragment,
1558                        new_schema,
1559                        newly_add_fields: newly_added_columns
1560                            .iter()
1561                            .map(|col| Field::from(&col.column_desc))
1562                            .collect(),
1563                        removed_column_names: removed_columns
1564                            .iter()
1565                            .map(|col| col.name.clone())
1566                            .collect(),
1567                        new_fragment: new_sink_fragment,
1568                        new_log_store_table: new_log_store_table.map(Box::new),
1569                        ctx: sink_ctx,
1570                    });
1571                }
1572                Some(sinks)
1573            } else {
1574                None
1575            }
1576        } else {
1577            None
1578        };
1579
1580        let streaming_job_model = self
1581            .metadata_manager
1582            .catalog_controller
1583            .create_job_catalog_for_replace(
1584                &streaming_job,
1585                Some(&ctx),
1586                fragment_graph.specified_parallelism().as_ref(),
1587                Some(fragment_graph.max_parallelism()),
1588            )
1589            .await?;
1590        let tmp_id = streaming_job_model.job_id;
1591
1592        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1593            sinks
1594                .iter()
1595                .map(|sink| sink.tmp_sink_id.as_object_id())
1596                .collect_vec()
1597        });
1598
1599        tracing::debug!(id = %job_id, "building replace streaming job");
1600        let mut updated_sink_catalogs = vec![];
1601
1602        let mut drop_table_connector_ctx = None;
1603        let result: MetaResult<_> = try {
1604            let (mut ctx, mut stream_job_fragments) = self
1605                .build_replace_job(
1606                    ctx,
1607                    &streaming_job,
1608                    fragment_graph,
1609                    tmp_id,
1610                    auto_refresh_schema_sinks,
1611                    streaming_job_model,
1612                )
1613                .await?;
1614            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1615            let auto_refresh_schema_sink_finish_ctx =
1616                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1617                    sinks
1618                        .iter()
1619                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1620                            tmp_sink_id: sink.tmp_sink_id,
1621                            original_sink_id: sink.original_sink.id,
1622                            columns: sink.new_schema.clone(),
1623                            new_log_store_table: sink.new_log_store_table.clone(),
1624                        })
1625                        .collect()
1626                });
1627
1628            // Handle table that has incoming sinks.
1629            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1630                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1631                let upstream_infos = self
1632                    .metadata_manager
1633                    .catalog_controller
1634                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1635                    .await?;
1636                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1637
1638                for upstream_info in &upstream_infos {
1639                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1640                    ctx.upstream_fragment_downstreams
1641                        .entry(upstream_fragment_id)
1642                        .or_default()
1643                        .push(upstream_info.new_sink_downstream.clone());
1644                    if upstream_info.sink_original_target_columns.is_empty() {
1645                        updated_sink_catalogs.push(upstream_info.sink_id);
1646                    }
1647                }
1648            }
1649
1650            let replace_upstream = ctx.replace_upstream.clone();
1651
1652            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1653                let empty_downstreams = FragmentDownstreamRelation::default();
1654                for sink in sinks {
1655                    self.metadata_manager
1656                        .catalog_controller
1657                        .prepare_streaming_job(
1658                            sink.tmp_sink_id.as_job_id(),
1659                            || [&sink.new_fragment].into_iter(),
1660                            &empty_downstreams,
1661                            true,
1662                            None,
1663                            None,
1664                        )
1665                        .await?;
1666                }
1667            }
1668
1669            self.metadata_manager
1670                .catalog_controller
1671                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true, None)
1672                .await?;
1673
1674            self.stream_manager
1675                .replace_stream_job(stream_job_fragments, ctx)
1676                .await?;
1677            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1678        };
1679
1680        match result {
1681            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1682                let version = self
1683                    .metadata_manager
1684                    .catalog_controller
1685                    .finish_replace_streaming_job(
1686                        tmp_id,
1687                        streaming_job,
1688                        replace_upstream,
1689                        SinkIntoTableContext {
1690                            updated_sink_catalogs,
1691                        },
1692                        drop_table_connector_ctx.as_ref(),
1693                        auto_refresh_schema_sink_finish_ctx,
1694                    )
1695                    .await?;
1696                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1697                    self.source_manager
1698                        .apply_source_change(SourceChange::DropSource {
1699                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1700                        })
1701                        .await;
1702                }
1703                Ok(version)
1704            }
1705            Err(err) => {
1706                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1707                let _ = self.metadata_manager
1708                    .catalog_controller
1709                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1710                    .await.inspect_err(|err| {
1711                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1712                });
1713                Err(err)
1714            }
1715        }
1716    }
1717
1718    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1719    )]
1720    async fn drop_streaming_job(
1721        &self,
1722        job_id: StreamingJobId,
1723        drop_mode: DropMode,
1724    ) -> MetaResult<NotificationVersion> {
1725        let (object_id, object_type) = match job_id {
1726            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1727            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1728            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1729            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1730        };
1731
1732        let job_status = self
1733            .metadata_manager
1734            .catalog_controller
1735            .get_streaming_job_status(job_id.id())
1736            .await?;
1737        let version = match job_status {
1738            JobStatus::Initial => {
1739                self.metadata_manager
1740                    .catalog_controller
1741                    .try_abort_creating_streaming_job(job_id.id(), true)
1742                    .await?;
1743                IGNORED_NOTIFICATION_VERSION
1744            }
1745            JobStatus::Creating => {
1746                self.stream_manager
1747                    .cancel_streaming_jobs(vec![job_id.id()])
1748                    .await?;
1749                IGNORED_NOTIFICATION_VERSION
1750            }
1751            JobStatus::Created => self.drop_object(object_type, object_id, drop_mode).await?,
1752        };
1753
1754        Ok(version)
1755    }
1756
1757    /// Builds the actor graph:
1758    /// - Add the upstream fragments to the fragment graph
1759    /// - Schedule the fragments based on their distribution
1760    /// - Expand each fragment into one or several actors
1761    /// - Construct the fragment level backfill order control.
1762    #[await_tree::instrument]
1763    pub(crate) async fn build_stream_job(
1764        &self,
1765        stream_ctx: StreamContext,
1766        mut stream_job: StreamingJob,
1767        fragment_graph: StreamFragmentGraph,
1768        resource_type: streaming_job_resource_type::ResourceType,
1769        streaming_job_model: streaming_job::Model,
1770    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1771        let id = stream_job.id();
1772        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1773        Self::validate_specified_parallelism(
1774            fragment_graph.specified_parallelism(),
1775            fragment_graph.specified_backfill_parallelism(),
1776            max_parallelism,
1777        )?;
1778
1779        // 1. Fragment Level ordering graph
1780        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1781
1782        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1783        // contains all information needed for building the actor graph.
1784
1785        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1786            fragment_graph.collect_snapshot_backfill_info()?;
1787        assert!(
1788            snapshot_backfill_info
1789                .iter()
1790                .chain([&cross_db_snapshot_backfill_info])
1791                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1792                .all(|backfill_epoch| backfill_epoch.is_none()),
1793            "should not set backfill epoch when initially build the job: {:?} {:?}",
1794            snapshot_backfill_info,
1795            cross_db_snapshot_backfill_info
1796        );
1797
1798        let locality_fragment_state_table_mapping =
1799            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1800
1801        // check if log store exists for all cross-db upstreams
1802        self.metadata_manager
1803            .catalog_controller
1804            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1805            .await?;
1806
1807        let upstream_table_ids = fragment_graph
1808            .dependent_table_ids()
1809            .iter()
1810            .filter(|id| {
1811                !cross_db_snapshot_backfill_info
1812                    .upstream_mv_table_id_to_backfill_epoch
1813                    .contains_key(*id)
1814            })
1815            .cloned()
1816            .collect();
1817
1818        let upstream_root_fragments = self
1819            .metadata_manager
1820            .get_upstream_root_fragments(&upstream_table_ids)
1821            .await?;
1822
1823        if snapshot_backfill_info.is_some() {
1824            match stream_job {
1825                StreamingJob::MaterializedView(_)
1826                | StreamingJob::Sink(_)
1827                | StreamingJob::Index(_, _) => {}
1828                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1829                    return Err(
1830                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1831                    );
1832                }
1833            }
1834        }
1835
1836        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1837            fragment_graph,
1838            FragmentGraphUpstreamContext {
1839                upstream_root_fragments,
1840            },
1841            (&stream_job).into(),
1842        )?;
1843        let resource_group = if let Some(group) = resource_type.resource_group() {
1844            group
1845        } else {
1846            self.metadata_manager
1847                .get_database_resource_group(stream_job.database_id())
1848                .await?
1849        };
1850        let is_serverless_backfill = matches!(
1851            &resource_type,
1852            streaming_job_resource_type::ResourceType::ServerlessBackfillResourceGroup(_)
1853        );
1854
1855        // 3. Build the actor graph.
1856        let actor_graph_builder = ActorGraphBuilder::new(complete_graph)?;
1857
1858        let ActorGraphBuildResult {
1859            graph,
1860            downstream_fragment_relations,
1861            upstream_fragment_downstreams,
1862            replace_upstream,
1863        } = actor_graph_builder.generate_graph()?;
1864        assert!(replace_upstream.is_empty());
1865
1866        // 4. Build the table fragments structure that will be persisted in the stream manager,
1867        // and the context that contains all information needed for building the
1868        // actors on the compute nodes.
1869
1870        let stream_job_fragments =
1871            StreamJobFragments::new(id, graph, stream_ctx.clone(), max_parallelism.get());
1872
1873        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1874            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1875        }
1876
1877        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1878            && let Ok(table_id) = sink.get_target_table()
1879        {
1880            let tables = self
1881                .metadata_manager
1882                .get_table_catalog_by_ids(&[*table_id])
1883                .await?;
1884            let target_table = tables
1885                .first()
1886                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1887            let sink_fragment = stream_job_fragments
1888                .sink_fragment()
1889                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1890            let mview_fragment_id = self
1891                .metadata_manager
1892                .catalog_controller
1893                .get_mview_fragment_by_id(table_id.as_job_id())
1894                .await?;
1895            let upstream_sink_info = build_upstream_sink_info(
1896                sink.id,
1897                sink.original_target_columns.clone(),
1898                sink_fragment.fragment_id as _,
1899                target_table,
1900                mview_fragment_id,
1901            )?;
1902            Some(upstream_sink_info)
1903        } else {
1904            None
1905        };
1906
1907        let mut cdc_table_snapshot_splits = None;
1908        if let StreamingJob::Table(None, table, TableJobType::SharedCdcSource) = &stream_job
1909            && let Some((_, stream_cdc_scan)) =
1910                parallel_cdc_table_backfill_fragment(stream_job_fragments.fragments.values())
1911        {
1912            {
1913                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
1914                let splits = try_init_parallel_cdc_table_snapshot_splits(
1915                    table.id,
1916                    stream_cdc_scan.cdc_table_desc.as_ref().unwrap(),
1917                    self.env.meta_store_ref(),
1918                    stream_cdc_scan.options.as_ref().unwrap(),
1919                    self.env.opts.cdc_table_split_init_insert_batch_size,
1920                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
1921                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
1922                )
1923                .await?;
1924                cdc_table_snapshot_splits = Some(splits);
1925            }
1926        }
1927
1928        let ctx = CreateStreamingJobContext {
1929            upstream_fragment_downstreams,
1930            database_resource_group: resource_group,
1931            definition: stream_job.definition(),
1932            create_type: stream_job.create_type(),
1933            job_type: (&stream_job).into(),
1934            streaming_job: stream_job,
1935            new_upstream_sink,
1936            option: CreateStreamingJobOption {},
1937            snapshot_backfill_info,
1938            cross_db_snapshot_backfill_info,
1939            fragment_backfill_ordering,
1940            locality_fragment_state_table_mapping,
1941            cdc_table_snapshot_splits,
1942            is_serverless_backfill,
1943            streaming_job_model: streaming_job_model.clone(),
1944            refresh_interval_sec: streaming_job_model.refresh_interval_sec.map(|s| s as u64),
1945        };
1946
1947        Ok((
1948            ctx,
1949            StreamJobFragmentsToCreate {
1950                inner: stream_job_fragments,
1951                downstreams: downstream_fragment_relations,
1952            },
1953        ))
1954    }
1955
1956    /// `build_replace_table` builds a job replacement and returns the context and new job
1957    /// fragments.
1958    ///
1959    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1960    /// replacement is finished.
1961    pub(crate) async fn build_replace_job(
1962        &self,
1963        stream_ctx: StreamContext,
1964        stream_job: &StreamingJob,
1965        mut fragment_graph: StreamFragmentGraph,
1966        tmp_job_id: JobId,
1967        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1968        streaming_job_model: streaming_job::Model,
1969    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1970        match &stream_job {
1971            StreamingJob::Table(..)
1972            | StreamingJob::Source(..)
1973            | StreamingJob::MaterializedView(..) => {}
1974            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1975                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1976            }
1977        }
1978
1979        let id = stream_job.id();
1980
1981        // check if performing drop table connector
1982        let mut drop_table_associated_source_id = None;
1983        if let StreamingJob::Table(None, _, _) = &stream_job {
1984            drop_table_associated_source_id = self
1985                .metadata_manager
1986                .get_table_associated_source_id(id.as_mv_table_id())
1987                .await?;
1988        }
1989
1990        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1991        let old_internal_table_ids = old_fragments.internal_table_ids();
1992
1993        // handle drop table's associated source
1994        let mut drop_table_connector_ctx = None;
1995        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1996            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1997            debug_assert!(old_internal_table_ids.len() == 1);
1998
1999            drop_table_connector_ctx = Some(DropTableConnectorContext {
2000                // we do not remove the original table catalog as it's still needed for the streaming job
2001                // just need to remove the ref to the state table
2002                to_change_streaming_job_id: id,
2003                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
2004                to_remove_source_id,
2005            });
2006        } else if stream_job.is_materialized_view() {
2007            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
2008            // but more robust.
2009            let old_fragments_upstreams = self
2010                .metadata_manager
2011                .catalog_controller
2012                .upstream_fragments(old_fragments.fragment_ids())
2013                .await?;
2014
2015            let old_state_graph =
2016                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2017            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2018            let result = state_match::match_graph(&new_state_graph, &old_state_graph)
2019                .context("incompatible altering on the streaming job states")?;
2020
2021            fragment_graph.fit_internal_table_ids_with_mapping(result.table_matches);
2022            fragment_graph.fit_snapshot_backfill_epochs(result.snapshot_backfill_epochs);
2023        } else {
2024            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2025            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2026            let old_internal_tables = self
2027                .metadata_manager
2028                .get_table_catalog_by_ids(&old_internal_table_ids)
2029                .await?;
2030            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2031        }
2032
2033        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2034        // graph that contains all information needed for building the actor graph.
2035        let original_root_fragment = old_fragments
2036            .root_fragment()
2037            .expect("root fragment not found");
2038
2039        let job_type = StreamingJobType::from(stream_job);
2040
2041        // Extract the downstream fragments from the fragment graph.
2042        let mut downstream_fragments = self.metadata_manager.get_downstream_fragments(id).await?;
2043
2044        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2045            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2046                .iter()
2047                .map(|sink| sink.original_fragment.fragment_id)
2048                .collect();
2049            for (_, downstream_fragment) in &mut downstream_fragments {
2050                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2051                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2052                }) {
2053                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2054                    // Actor locations will be resolved inside barrier worker during rendering.
2055                    // For now, just replace fragment info and nodes.
2056                    *downstream_fragment = sink.new_fragment.clone();
2057                }
2058            }
2059            assert!(remaining_fragment.is_empty());
2060        }
2061
2062        // build complete graph based on the table job type
2063        let complete_graph = match &job_type {
2064            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2065                CompleteStreamFragmentGraph::with_downstreams(
2066                    fragment_graph,
2067                    FragmentGraphDownstreamContext {
2068                        original_root_fragment_id: original_root_fragment.fragment_id,
2069                        downstream_fragments,
2070                    },
2071                    job_type,
2072                )?
2073            }
2074            StreamingJobType::Table(TableJobType::SharedCdcSource)
2075            | StreamingJobType::MaterializedView => {
2076                // CDC tables or materialized views can have upstream jobs as well.
2077                let upstream_root_fragments = self
2078                    .metadata_manager
2079                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2080                    .await?;
2081
2082                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2083                    fragment_graph,
2084                    FragmentGraphUpstreamContext {
2085                        upstream_root_fragments,
2086                    },
2087                    FragmentGraphDownstreamContext {
2088                        original_root_fragment_id: original_root_fragment.fragment_id,
2089                        downstream_fragments,
2090                    },
2091                    job_type,
2092                )?
2093            }
2094            _ => unreachable!(),
2095        };
2096
2097        let resource_group = self
2098            .metadata_manager
2099            .get_database_resource_group(stream_job.database_id())
2100            .await?;
2101
2102        let actor_graph_builder = ActorGraphBuilder::new(complete_graph)?;
2103
2104        let ActorGraphBuildResult {
2105            graph,
2106            downstream_fragment_relations,
2107            upstream_fragment_downstreams,
2108            mut replace_upstream,
2109        } = actor_graph_builder.generate_graph()?;
2110
2111        // general table & source does not have upstream job, so the dispatchers should be empty
2112        if matches!(
2113            job_type,
2114            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2115        ) {
2116            assert!(upstream_fragment_downstreams.is_empty());
2117        }
2118
2119        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2120        // the context that contains all information needed for building the actors on the compute
2121        // nodes.
2122        let stream_job_fragments =
2123            StreamJobFragments::new(tmp_job_id, graph, stream_ctx, old_fragments.max_parallelism);
2124
2125        if let Some(sinks) = &auto_refresh_schema_sinks {
2126            for sink in sinks {
2127                replace_upstream
2128                    .remove(&sink.new_fragment.fragment_id)
2129                    .expect("should exist");
2130            }
2131        }
2132
2133        // Note: no need to set `vnode_count` as it's already set by the frontend.
2134        // See `get_replace_table_plan`.
2135
2136        let ctx = ReplaceStreamJobContext {
2137            old_fragments,
2138            replace_upstream,
2139            upstream_fragment_downstreams,
2140            streaming_job: stream_job.clone(),
2141            database_resource_group: resource_group,
2142            tmp_id: tmp_job_id,
2143            drop_table_connector_ctx,
2144            auto_refresh_schema_sinks,
2145            streaming_job_model,
2146        };
2147
2148        Ok((
2149            ctx,
2150            StreamJobFragmentsToCreate {
2151                inner: stream_job_fragments,
2152                downstreams: downstream_fragment_relations,
2153            },
2154        ))
2155    }
2156
2157    async fn alter_name(
2158        &self,
2159        relation: alter_name_request::Object,
2160        new_name: &str,
2161    ) -> MetaResult<NotificationVersion> {
2162        let (obj_type, id): (ObjectType, ObjectId) = match relation {
2163            alter_name_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2164            alter_name_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2165            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id.into()),
2166            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2167            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2168            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2169            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2170            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2171        };
2172        self.metadata_manager
2173            .catalog_controller
2174            .alter_name(obj_type, id, new_name)
2175            .await
2176    }
2177
2178    async fn alter_swap_rename(
2179        &self,
2180        object: alter_swap_rename_request::Object,
2181    ) -> MetaResult<NotificationVersion> {
2182        let (obj_type, src_id, dst_id) = match object {
2183            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2184            alter_swap_rename_request::Object::Table(objs) => {
2185                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2186                (ObjectType::Table, src_id, dst_id)
2187            }
2188            alter_swap_rename_request::Object::View(objs) => {
2189                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2190                (ObjectType::View, src_id, dst_id)
2191            }
2192            alter_swap_rename_request::Object::Source(objs) => {
2193                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2194                (ObjectType::Source, src_id, dst_id)
2195            }
2196            alter_swap_rename_request::Object::Sink(objs) => {
2197                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2198                (ObjectType::Sink, src_id, dst_id)
2199            }
2200            alter_swap_rename_request::Object::Subscription(objs) => {
2201                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2202                (ObjectType::Subscription, src_id, dst_id)
2203            }
2204        };
2205
2206        self.metadata_manager
2207            .catalog_controller
2208            .alter_swap_rename(obj_type, src_id, dst_id)
2209            .await
2210    }
2211
2212    async fn alter_owner(
2213        &self,
2214        object: Object,
2215        owner_id: UserId,
2216    ) -> MetaResult<NotificationVersion> {
2217        let (obj_type, id): (ObjectType, ObjectId) = match object {
2218            Object::TableId(id) => (ObjectType::Table, id.into()),
2219            Object::ViewId(id) => (ObjectType::View, id.into()),
2220            Object::SourceId(id) => (ObjectType::Source, id.into()),
2221            Object::SinkId(id) => (ObjectType::Sink, id.into()),
2222            Object::SchemaId(id) => (ObjectType::Schema, id.into()),
2223            Object::DatabaseId(id) => (ObjectType::Database, id.into()),
2224            Object::SubscriptionId(id) => (ObjectType::Subscription, id.into()),
2225            Object::ConnectionId(id) => (ObjectType::Connection, id.into()),
2226            Object::FunctionId(id) => (ObjectType::Function, id.into()),
2227            Object::SecretId(id) => (ObjectType::Secret, id.into()),
2228        };
2229        self.metadata_manager
2230            .catalog_controller
2231            .alter_owner(obj_type, id, owner_id as _)
2232            .await
2233    }
2234
2235    async fn alter_set_schema(
2236        &self,
2237        object: alter_set_schema_request::Object,
2238        new_schema_id: SchemaId,
2239    ) -> MetaResult<NotificationVersion> {
2240        let (obj_type, id): (ObjectType, ObjectId) = match object {
2241            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id.into()),
2242            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id.into()),
2243            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id.into()),
2244            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id.into()),
2245            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id.into()),
2246            alter_set_schema_request::Object::ConnectionId(id) => {
2247                (ObjectType::Connection, id.into())
2248            }
2249            alter_set_schema_request::Object::SubscriptionId(id) => {
2250                (ObjectType::Subscription, id.into())
2251            }
2252        };
2253        self.metadata_manager
2254            .catalog_controller
2255            .alter_schema(obj_type, id, new_schema_id)
2256            .await
2257    }
2258
2259    pub async fn wait(&self, job_id: Option<JobId>) -> MetaResult<WaitVersion> {
2260        if let Some(job_id) = job_id {
2261            let database_id = self
2262                .metadata_manager
2263                .catalog_controller
2264                .get_object_database_id(job_id)
2265                .await?;
2266            let catalog_version = self
2267                .metadata_manager
2268                .wait_streaming_job_finished(database_id, job_id)
2269                .await?;
2270            let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2271            return Ok(WaitVersion {
2272                catalog_version,
2273                hummock_version_id,
2274            });
2275        }
2276
2277        let timeout_ms = 2 * 60 * 60 * 1000;
2278        let poll_interval = Duration::from_millis(100);
2279        for _ in 0..(timeout_ms / poll_interval.as_millis() as usize) {
2280            let background_jobs = self
2281                .metadata_manager
2282                .catalog_controller
2283                .list_background_creating_jobs(true, None)
2284                .await?;
2285            if background_jobs.is_empty() {
2286                let catalog_version = self
2287                    .metadata_manager
2288                    .catalog_controller
2289                    .notify_frontend_trivial()
2290                    .await;
2291                let hummock_version_id = self.barrier_manager.get_hummock_version_id().await;
2292                return Ok(WaitVersion {
2293                    catalog_version,
2294                    hummock_version_id,
2295                });
2296            }
2297
2298            sleep(poll_interval).await;
2299        }
2300        Err(MetaError::cancelled(format!(
2301            "timeout after {timeout_ms}ms"
2302        )))
2303    }
2304
2305    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2306        self.metadata_manager
2307            .catalog_controller
2308            .comment_on(comment)
2309            .await
2310    }
2311
2312    async fn alter_streaming_job_config(
2313        &self,
2314        job_id: JobId,
2315        entries_to_add: HashMap<String, String>,
2316        keys_to_remove: Vec<String>,
2317    ) -> MetaResult<NotificationVersion> {
2318        self.metadata_manager
2319            .catalog_controller
2320            .alter_streaming_job_config(job_id, entries_to_add, keys_to_remove)
2321            .await
2322    }
2323}
2324
2325fn report_create_object(
2326    job_id: JobId,
2327    event_name: &str,
2328    obj_type: PbTelemetryDatabaseObject,
2329    connector_name: Option<String>,
2330    attr_info: Option<jsonbb::Value>,
2331) {
2332    report_event(
2333        PbTelemetryEventStage::CreateStreamJob,
2334        event_name,
2335        job_id.as_raw_id() as _,
2336        connector_name,
2337        Some(obj_type),
2338        attr_info,
2339    );
2340}
2341
2342pub fn build_upstream_sink_info(
2343    sink_id: SinkId,
2344    original_target_columns: Vec<PbColumnCatalog>,
2345    sink_fragment_id: FragmentId,
2346    target_table: &PbTable,
2347    target_fragment_id: FragmentId,
2348) -> MetaResult<UpstreamSinkInfo> {
2349    let sink_columns = if !original_target_columns.is_empty() {
2350        original_target_columns.clone()
2351    } else {
2352        // This is due to the fact that the value did not exist in earlier versions,
2353        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2354        // Therefore the columns of the table at this point are `original_target_columns`.
2355        // This value of sink will be filled on the meta.
2356        target_table.columns.clone()
2357    };
2358
2359    let sink_output_fields = sink_columns
2360        .iter()
2361        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2362        .collect_vec();
2363    let output_indices = (0..sink_output_fields.len())
2364        .map(|i| i as u32)
2365        .collect_vec();
2366
2367    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2368        let sink_idx_by_col_id = sink_columns
2369            .iter()
2370            .enumerate()
2371            .map(|(idx, col)| {
2372                let column_id = col.column_desc.as_ref().unwrap().column_id;
2373                (column_id, idx as u32)
2374            })
2375            .collect::<HashMap<_, _>>();
2376        target_table
2377            .distribution_key
2378            .iter()
2379            .map(|dist_idx| {
2380                let column_id = target_table.columns[*dist_idx as usize]
2381                    .column_desc
2382                    .as_ref()
2383                    .unwrap()
2384                    .column_id;
2385                let sink_idx = sink_idx_by_col_id
2386                    .get(&column_id)
2387                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2388                Ok(*sink_idx)
2389            })
2390            .collect::<anyhow::Result<Vec<_>>>()?
2391    };
2392    let dist_key_indices =
2393        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2394    let downstream_fragment_id = target_fragment_id as _;
2395    let new_downstream_relation = DownstreamFragmentRelation {
2396        downstream_fragment_id,
2397        dispatcher_type: DispatcherType::Hash,
2398        dist_key_indices,
2399        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2400    };
2401    let current_target_columns = target_table.get_columns();
2402    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2403    Ok(UpstreamSinkInfo {
2404        sink_id,
2405        sink_fragment_id: sink_fragment_id as _,
2406        sink_output_fields,
2407        sink_original_target_columns: original_target_columns,
2408        project_exprs,
2409        new_sink_downstream: new_downstream_relation,
2410    })
2411}
2412
2413pub fn refill_upstream_sink_union_in_table(
2414    union_fragment_root: &mut PbStreamNode,
2415    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2416) {
2417    visit_stream_node_cont_mut(union_fragment_root, |node| {
2418        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2419            let init_upstreams = upstream_sink_infos
2420                .iter()
2421                .map(|info| PbUpstreamSinkInfo {
2422                    upstream_fragment_id: info.sink_fragment_id,
2423                    sink_output_schema: info.sink_output_fields.clone(),
2424                    project_exprs: info.project_exprs.clone(),
2425                })
2426                .collect();
2427            upstream_sink_union.init_upstreams = init_upstreams;
2428            false
2429        } else {
2430            true
2431        }
2432    });
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437    use std::num::NonZeroUsize;
2438
2439    use super::*;
2440
2441    #[test]
2442    fn test_validate_specified_parallelism_accepts_within_max() {
2443        DdlController::validate_specified_parallelism(
2444            Some(NonZeroUsize::new(4).unwrap()),
2445            Some(NonZeroUsize::new(8).unwrap()),
2446            NonZeroUsize::new(8).unwrap(),
2447        )
2448        .unwrap();
2449    }
2450
2451    #[test]
2452    fn test_validate_specified_parallelism_rejects_parallelism_over_max() {
2453        let result = DdlController::validate_specified_parallelism(
2454            Some(NonZeroUsize::new(9).unwrap()),
2455            None,
2456            NonZeroUsize::new(8).unwrap(),
2457        );
2458        assert!(matches!(
2459            result,
2460            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2461        ));
2462    }
2463
2464    #[test]
2465    fn test_validate_specified_parallelism_rejects_backfill_parallelism_over_max() {
2466        let result = DdlController::validate_specified_parallelism(
2467            None,
2468            Some(NonZeroUsize::new(9).unwrap()),
2469            NonZeroUsize::new(8).unwrap(),
2470        );
2471        assert!(matches!(
2472            result,
2473            Err(ref e) if matches!(e.inner(), MetaErrorInner::InvalidParameter(_))
2474        ));
2475    }
2476}