risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
32use risingwave_common::system_param::reader::SystemParamsRead;
33use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
34use risingwave_common::{bail, bail_not_implemented};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::cdc::CdcScanOptions;
38use risingwave_connector::source::{
39    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
40};
41use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
42use risingwave_meta_model::object::ObjectType;
43use risingwave_meta_model::{
44    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
45    SchemaId, SecretId, SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, WorkerId,
46};
47use risingwave_pb::catalog::{
48    Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
49    Subscription, Table, View,
50};
51use risingwave_pb::common::PbActorLocation;
52use risingwave_pb::ddl_service::alter_owner_request::Object;
53use risingwave_pb::ddl_service::{
54    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
55    alter_swap_rename_request,
56};
57use risingwave_pb::meta::table_fragments::PbActorStatus;
58use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
59use risingwave_pb::stream_plan::stream_node::NodeBody;
60use risingwave_pb::stream_plan::{
61    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
62    StreamFragmentGraph as StreamFragmentGraphProto,
63};
64use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
65use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::controller::utils::build_select_node_list;
77use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
78use crate::manager::{
79    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
80    NotificationVersion, StreamingJob, StreamingJobType,
81};
82use crate::model::{
83    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, StreamContext,
84    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
85};
86use crate::stream::cdc::{
87    is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
88};
89use crate::stream::{
90    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
91    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
92    GlobalStreamManagerRef, JobRescheduleTarget, ReplaceStreamJobContext, SourceChange,
93    SourceManagerRef, StreamFragmentGraph, UpstreamSinkInfo,
94    check_sink_fragments_support_refresh_schema, create_source_worker,
95    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
96};
97use crate::telemetry::report_event;
98use crate::{MetaError, MetaResult};
99
100#[derive(PartialEq)]
101pub enum DropMode {
102    Restrict,
103    Cascade,
104}
105
106impl DropMode {
107    pub fn from_request_setting(cascade: bool) -> DropMode {
108        if cascade {
109            DropMode::Cascade
110        } else {
111            DropMode::Restrict
112        }
113    }
114}
115
116#[derive(strum::AsRefStr)]
117pub enum StreamingJobId {
118    MaterializedView(TableId),
119    Sink(SinkId),
120    Table(Option<SourceId>, TableId),
121    Index(IndexId),
122}
123
124impl std::fmt::Display for StreamingJobId {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        write!(f, "{}", self.as_ref())?;
127        write!(f, "({})", self.id())
128    }
129}
130
131impl StreamingJobId {
132    #[allow(dead_code)]
133    fn id(&self) -> TableId {
134        match self {
135            StreamingJobId::MaterializedView(id)
136            | StreamingJobId::Sink(id)
137            | StreamingJobId::Table(_, id)
138            | StreamingJobId::Index(id) => *id,
139        }
140    }
141}
142
143/// It’s used to describe the information of the job that needs to be replaced
144/// and it will be used during replacing table and creating sink into table operations.
145pub struct ReplaceStreamJobInfo {
146    pub streaming_job: StreamingJob,
147    pub fragment_graph: StreamFragmentGraphProto,
148}
149
150#[derive(Display)]
151pub enum DdlCommand {
152    CreateDatabase(Database),
153    DropDatabase(DatabaseId),
154    CreateSchema(Schema),
155    DropSchema(SchemaId, DropMode),
156    CreateNonSharedSource(Source),
157    DropSource(SourceId, DropMode),
158    CreateFunction(Function),
159    DropFunction(FunctionId, DropMode),
160    CreateView(View, HashSet<ObjectId>),
161    DropView(ViewId, DropMode),
162    CreateStreamingJob {
163        stream_job: StreamingJob,
164        fragment_graph: StreamFragmentGraphProto,
165        dependencies: HashSet<ObjectId>,
166        specific_resource_group: Option<String>, // specific resource group
167        if_not_exists: bool,
168    },
169    DropStreamingJob {
170        job_id: StreamingJobId,
171        drop_mode: DropMode,
172    },
173    AlterName(alter_name_request::Object, String),
174    AlterSwapRename(alter_swap_rename_request::Object),
175    ReplaceStreamJob(ReplaceStreamJobInfo),
176    AlterNonSharedSource(Source),
177    AlterObjectOwner(Object, UserId),
178    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
179    CreateConnection(Connection),
180    DropConnection(ConnectionId, DropMode),
181    CreateSecret(Secret),
182    AlterSecret(Secret),
183    DropSecret(SecretId),
184    CommentOn(Comment),
185    CreateSubscription(Subscription),
186    DropSubscription(SubscriptionId, DropMode),
187    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
188}
189
190impl DdlCommand {
191    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
192    fn object(&self) -> Either<String, ObjectId> {
193        use Either::*;
194        match self {
195            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
196            DdlCommand::DropDatabase(id) => Right(*id),
197            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
198            DdlCommand::DropSchema(id, _) => Right(*id),
199            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
200            DdlCommand::DropSource(id, _) => Right(*id),
201            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
202            DdlCommand::DropFunction(id, _) => Right(*id),
203            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
204            DdlCommand::DropView(id, _) => Right(*id),
205            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
206            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
207            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
208            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
209            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
210            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
211            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
212            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
213            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
214            DdlCommand::DropConnection(id, _) => Right(*id),
215            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
216            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
217            DdlCommand::DropSecret(id) => Right(*id),
218            DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
219            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
220            DdlCommand::DropSubscription(id, _) => Right(*id),
221            DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
222        }
223    }
224
225    fn allow_in_recovery(&self) -> bool {
226        match self {
227            DdlCommand::DropDatabase(_)
228            | DdlCommand::DropSchema(_, _)
229            | DdlCommand::DropSource(_, _)
230            | DdlCommand::DropFunction(_, _)
231            | DdlCommand::DropView(_, _)
232            | DdlCommand::DropStreamingJob { .. }
233            | DdlCommand::DropConnection(_, _)
234            | DdlCommand::DropSecret(_)
235            | DdlCommand::DropSubscription(_, _)
236            | DdlCommand::AlterName(_, _)
237            | DdlCommand::AlterObjectOwner(_, _)
238            | DdlCommand::AlterSetSchema(_, _)
239            | DdlCommand::CreateDatabase(_)
240            | DdlCommand::CreateSchema(_)
241            | DdlCommand::CreateFunction(_)
242            | DdlCommand::CreateView(_, _)
243            | DdlCommand::CreateConnection(_)
244            | DdlCommand::CommentOn(_)
245            | DdlCommand::CreateSecret(_)
246            | DdlCommand::AlterSecret(_)
247            | DdlCommand::AlterSwapRename(_)
248            | DdlCommand::AlterDatabaseParam(_, _) => true,
249            DdlCommand::CreateStreamingJob { .. }
250            | DdlCommand::CreateNonSharedSource(_)
251            | DdlCommand::ReplaceStreamJob(_)
252            | DdlCommand::AlterNonSharedSource(_)
253            | DdlCommand::CreateSubscription(_) => false,
254        }
255    }
256}
257
258#[derive(Clone)]
259pub struct DdlController {
260    pub(crate) env: MetaSrvEnv,
261
262    pub(crate) metadata_manager: MetadataManager,
263    pub(crate) stream_manager: GlobalStreamManagerRef,
264    pub(crate) source_manager: SourceManagerRef,
265    barrier_manager: BarrierManagerRef,
266
267    // The semaphore is used to limit the number of concurrent streaming job creation.
268    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
269
270    /// Sequence number for DDL commands, used for observability and debugging.
271    seq: Arc<AtomicU64>,
272}
273
274#[derive(Clone)]
275pub struct CreatingStreamingJobPermit {
276    pub(crate) semaphore: Arc<Semaphore>,
277}
278
279impl CreatingStreamingJobPermit {
280    async fn new(env: &MetaSrvEnv) -> Self {
281        let mut permits = env
282            .system_params_reader()
283            .await
284            .max_concurrent_creating_streaming_jobs() as usize;
285        if permits == 0 {
286            // if the system parameter is set to zero, use the max permitted value.
287            permits = Semaphore::MAX_PERMITS;
288        }
289        let semaphore = Arc::new(Semaphore::new(permits));
290
291        let (local_notification_tx, mut local_notification_rx) =
292            tokio::sync::mpsc::unbounded_channel();
293        env.notification_manager()
294            .insert_local_sender(local_notification_tx);
295        let semaphore_clone = semaphore.clone();
296        tokio::spawn(async move {
297            while let Some(notification) = local_notification_rx.recv().await {
298                let LocalNotification::SystemParamsChange(p) = &notification else {
299                    continue;
300                };
301                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
302                if new_permits == 0 {
303                    new_permits = Semaphore::MAX_PERMITS;
304                }
305                match permits.cmp(&new_permits) {
306                    Ordering::Less => {
307                        semaphore_clone.add_permits(new_permits - permits);
308                    }
309                    Ordering::Equal => continue,
310                    Ordering::Greater => {
311                        let to_release = permits - new_permits;
312                        let reduced = semaphore_clone.forget_permits(to_release);
313                        // TODO: implement dynamic semaphore with limits by ourself.
314                        if reduced != to_release {
315                            tracing::warn!(
316                                "no enough permits to release, expected {}, but reduced {}",
317                                to_release,
318                                reduced
319                            );
320                        }
321                    }
322                }
323                tracing::info!(
324                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
325                    permits,
326                    new_permits
327                );
328                permits = new_permits;
329            }
330        });
331
332        Self { semaphore }
333    }
334}
335
336impl DdlController {
337    pub async fn new(
338        env: MetaSrvEnv,
339        metadata_manager: MetadataManager,
340        stream_manager: GlobalStreamManagerRef,
341        source_manager: SourceManagerRef,
342        barrier_manager: BarrierManagerRef,
343    ) -> Self {
344        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
345        Self {
346            env,
347            metadata_manager,
348            stream_manager,
349            source_manager,
350            barrier_manager,
351            creating_streaming_job_permits,
352            seq: Arc::new(AtomicU64::new(0)),
353        }
354    }
355
356    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
357    pub fn next_seq(&self) -> u64 {
358        // This is a simple atomic increment operation.
359        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
360    }
361
362    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
363    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
364    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
365    /// would be a huge hassle and pain if we don't spawn here.
366    ///
367    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
368    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
369        if !command.allow_in_recovery() {
370            self.barrier_manager.check_status_running()?;
371        }
372
373        let await_tree_key = format!("DDL Command {}", self.next_seq());
374        let await_tree_span = await_tree::span!("{command}({})", command.object());
375
376        let ctrl = self.clone();
377        let fut = async move {
378            match command {
379                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
380                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
381                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
382                DdlCommand::DropSchema(schema_id, drop_mode) => {
383                    ctrl.drop_schema(schema_id, drop_mode).await
384                }
385                DdlCommand::CreateNonSharedSource(source) => {
386                    ctrl.create_non_shared_source(source).await
387                }
388                DdlCommand::DropSource(source_id, drop_mode) => {
389                    ctrl.drop_source(source_id, drop_mode).await
390                }
391                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
392                DdlCommand::DropFunction(function_id, drop_mode) => {
393                    ctrl.drop_function(function_id, drop_mode).await
394                }
395                DdlCommand::CreateView(view, dependencies) => {
396                    ctrl.create_view(view, dependencies).await
397                }
398                DdlCommand::DropView(view_id, drop_mode) => {
399                    ctrl.drop_view(view_id, drop_mode).await
400                }
401                DdlCommand::CreateStreamingJob {
402                    stream_job,
403                    fragment_graph,
404                    dependencies,
405                    specific_resource_group,
406                    if_not_exists,
407                } => {
408                    ctrl.create_streaming_job(
409                        stream_job,
410                        fragment_graph,
411                        dependencies,
412                        specific_resource_group,
413                        if_not_exists,
414                    )
415                    .await
416                }
417                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
418                    ctrl.drop_streaming_job(job_id, drop_mode).await
419                }
420                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
421                    streaming_job,
422                    fragment_graph,
423                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
424                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
425                DdlCommand::AlterObjectOwner(object, owner_id) => {
426                    ctrl.alter_owner(object, owner_id).await
427                }
428                DdlCommand::AlterSetSchema(object, new_schema_id) => {
429                    ctrl.alter_set_schema(object, new_schema_id).await
430                }
431                DdlCommand::CreateConnection(connection) => {
432                    ctrl.create_connection(connection).await
433                }
434                DdlCommand::DropConnection(connection_id, drop_mode) => {
435                    ctrl.drop_connection(connection_id, drop_mode).await
436                }
437                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
438                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
439                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
440                DdlCommand::AlterNonSharedSource(source) => {
441                    ctrl.alter_non_shared_source(source).await
442                }
443                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
444                DdlCommand::CreateSubscription(subscription) => {
445                    ctrl.create_subscription(subscription).await
446                }
447                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
448                    ctrl.drop_subscription(subscription_id, drop_mode).await
449                }
450                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
451                DdlCommand::AlterDatabaseParam(database_id, param) => {
452                    ctrl.alter_database_param(database_id, param).await
453                }
454            }
455        }
456        .in_current_span();
457        let fut = (self.env.await_tree_reg())
458            .register(await_tree_key, await_tree_span)
459            .instrument(Box::pin(fut));
460        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
461        Ok(Some(WaitVersion {
462            catalog_version: notification_version,
463            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
464        }))
465    }
466
467    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
468        self.barrier_manager.get_ddl_progress().await
469    }
470
471    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
472        let (version, updated_db) = self
473            .metadata_manager
474            .catalog_controller
475            .create_database(database)
476            .await?;
477        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
478        self.barrier_manager
479            .update_database_barrier(
480                updated_db.database_id,
481                updated_db.barrier_interval_ms.map(|v| v as u32),
482                updated_db.checkpoint_frequency.map(|v| v as u64),
483            )
484            .await?;
485        Ok(version)
486    }
487
488    #[tracing::instrument(skip(self), level = "debug")]
489    pub async fn reschedule_streaming_job(
490        &self,
491        job_id: u32,
492        target: JobRescheduleTarget,
493        mut deferred: bool,
494    ) -> MetaResult<()> {
495        tracing::info!("alter parallelism");
496        if self.barrier_manager.check_status_running().is_err() {
497            tracing::info!(
498                "alter parallelism is set to deferred mode because the system is in recovery state"
499            );
500            deferred = true;
501        }
502
503        self.stream_manager
504            .reschedule_streaming_job(job_id, target, deferred)
505            .await
506    }
507
508    pub async fn reschedule_cdc_table_backfill(
509        &self,
510        job_id: u32,
511        target: JobRescheduleTarget,
512    ) -> MetaResult<()> {
513        tracing::info!("alter CDC table backfill parallelism");
514        if self.barrier_manager.check_status_running().is_err() {
515            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
516        }
517        self.stream_manager
518            .reschedule_cdc_table_backfill(job_id, target)
519            .await
520    }
521
522    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
523        self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade)
524            .await
525    }
526
527    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
528        self.metadata_manager
529            .catalog_controller
530            .create_schema(schema)
531            .await
532    }
533
534    async fn drop_schema(
535        &self,
536        schema_id: SchemaId,
537        drop_mode: DropMode,
538    ) -> MetaResult<NotificationVersion> {
539        self.drop_object(ObjectType::Schema, schema_id as _, drop_mode)
540            .await
541    }
542
543    /// Shared source is handled in [`Self::create_streaming_job`]
544    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
545        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
546            .await
547            .context("failed to create source worker")?;
548
549        let (source_id, version) = self
550            .metadata_manager
551            .catalog_controller
552            .create_source(source)
553            .await?;
554        self.source_manager
555            .register_source_with_handle(source_id, handle)
556            .await;
557        Ok(version)
558    }
559
560    async fn drop_source(
561        &self,
562        source_id: SourceId,
563        drop_mode: DropMode,
564    ) -> MetaResult<NotificationVersion> {
565        self.drop_object(ObjectType::Source, source_id as _, drop_mode)
566            .await
567    }
568
569    /// This replaces the source in the catalog.
570    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
571    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
572        self.metadata_manager
573            .catalog_controller
574            .alter_non_shared_source(source)
575            .await
576    }
577
578    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
579        self.metadata_manager
580            .catalog_controller
581            .create_function(function)
582            .await
583    }
584
585    async fn drop_function(
586        &self,
587        function_id: FunctionId,
588        drop_mode: DropMode,
589    ) -> MetaResult<NotificationVersion> {
590        self.drop_object(ObjectType::Function, function_id as _, drop_mode)
591            .await
592    }
593
594    async fn create_view(
595        &self,
596        view: View,
597        dependencies: HashSet<ObjectId>,
598    ) -> MetaResult<NotificationVersion> {
599        self.metadata_manager
600            .catalog_controller
601            .create_view(view, dependencies)
602            .await
603    }
604
605    async fn drop_view(
606        &self,
607        view_id: ViewId,
608        drop_mode: DropMode,
609    ) -> MetaResult<NotificationVersion> {
610        self.drop_object(ObjectType::View, view_id as _, drop_mode)
611            .await
612    }
613
614    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
615        validate_connection(&connection).await?;
616        self.metadata_manager
617            .catalog_controller
618            .create_connection(connection)
619            .await
620    }
621
622    async fn drop_connection(
623        &self,
624        connection_id: ConnectionId,
625        drop_mode: DropMode,
626    ) -> MetaResult<NotificationVersion> {
627        self.drop_object(ObjectType::Connection, connection_id as _, drop_mode)
628            .await
629    }
630
631    async fn alter_database_param(
632        &self,
633        database_id: DatabaseId,
634        param: AlterDatabaseParam,
635    ) -> MetaResult<NotificationVersion> {
636        let (version, updated_db) = self
637            .metadata_manager
638            .catalog_controller
639            .alter_database_param(database_id, param)
640            .await?;
641        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
642        self.barrier_manager
643            .update_database_barrier(
644                database_id,
645                updated_db.barrier_interval_ms.map(|v| v as u32),
646                updated_db.checkpoint_frequency.map(|v| v as u64),
647            )
648            .await?;
649        Ok(version)
650    }
651
652    // The 'secret' part of the request we receive from the frontend is in plaintext;
653    // here, we need to encrypt it before storing it in the catalog.
654    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
655        let secret_store_private_key = self
656            .env
657            .opts
658            .secret_store_private_key
659            .clone()
660            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
661
662        let encrypted_payload = SecretEncryption::encrypt(
663            secret_store_private_key.as_slice(),
664            secret.get_value().as_slice(),
665        )
666        .context(format!("failed to encrypt secret {}", secret.name))?;
667        Ok(encrypted_payload
668            .serialize()
669            .context(format!("failed to serialize secret {}", secret.name))?)
670    }
671
672    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
673        // The 'secret' part of the request we receive from the frontend is in plaintext;
674        // here, we need to encrypt it before storing it in the catalog.
675        let secret_plain_payload = secret.value.clone();
676        let encrypted_payload = self.get_encrypted_payload(&secret)?;
677        secret.value = encrypted_payload;
678
679        self.metadata_manager
680            .catalog_controller
681            .create_secret(secret, secret_plain_payload)
682            .await
683    }
684
685    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
686        self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict)
687            .await
688    }
689
690    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
691        let secret_plain_payload = secret.value.clone();
692        let encrypted_payload = self.get_encrypted_payload(&secret)?;
693        secret.value = encrypted_payload;
694        self.metadata_manager
695            .catalog_controller
696            .alter_secret(secret, secret_plain_payload)
697            .await
698    }
699
700    async fn create_subscription(
701        &self,
702        mut subscription: Subscription,
703    ) -> MetaResult<NotificationVersion> {
704        tracing::debug!("create subscription");
705        let _permit = self
706            .creating_streaming_job_permits
707            .semaphore
708            .acquire()
709            .await
710            .unwrap();
711        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
712        self.metadata_manager
713            .catalog_controller
714            .create_subscription_catalog(&mut subscription)
715            .await?;
716        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
717            tracing::debug!(error = %err.as_report(), "failed to create subscription");
718            let _ = self
719                .metadata_manager
720                .catalog_controller
721                .try_abort_creating_subscription(subscription.id as _)
722                .await
723                .inspect_err(|e| {
724                    tracing::error!(
725                        error = %e.as_report(),
726                        "failed to abort create subscription after failure"
727                    );
728                });
729            return Err(err);
730        }
731
732        let version = self
733            .metadata_manager
734            .catalog_controller
735            .notify_create_subscription(subscription.id)
736            .await?;
737        tracing::debug!("finish create subscription");
738        Ok(version)
739    }
740
741    async fn drop_subscription(
742        &self,
743        subscription_id: SubscriptionId,
744        drop_mode: DropMode,
745    ) -> MetaResult<NotificationVersion> {
746        tracing::debug!("preparing drop subscription");
747        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
748        let subscription = self
749            .metadata_manager
750            .catalog_controller
751            .get_subscription_by_id(subscription_id)
752            .await?;
753        let table_id = subscription.dependent_table_id;
754        let database_id = subscription.database_id.into();
755        let (_, version) = self
756            .metadata_manager
757            .catalog_controller
758            .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
759            .await?;
760        self.stream_manager
761            .drop_subscription(database_id, subscription_id as _, table_id)
762            .await;
763        tracing::debug!("finish drop subscription");
764        Ok(version)
765    }
766
767    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
768    #[await_tree::instrument]
769    pub(crate) async fn validate_cdc_table(
770        &self,
771        table: &Table,
772        table_fragments: &StreamJobFragments,
773    ) -> MetaResult<()> {
774        let stream_scan_fragment = table_fragments
775            .fragments
776            .values()
777            .filter(|f| {
778                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
779                    || f.fragment_type_mask
780                        .contains(FragmentTypeFlag::StreamCdcScan)
781            })
782            .exactly_one()
783            .ok()
784            .with_context(|| {
785                format!(
786                    "expect exactly one stream scan fragment, got: {:?}",
787                    table_fragments.fragments
788                )
789            })?;
790        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
791            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
792                if let Some(o) = node.options
793                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
794                {
795                    // Use parallel CDC backfill.
796                } else {
797                    assert_eq!(
798                        stream_scan_fragment.actors.len(),
799                        1,
800                        "Stream scan fragment should have only one actor"
801                    );
802                }
803            }
804        }
805        let mut found_cdc_scan = false;
806        match &stream_scan_fragment.nodes.node_body {
807            Some(NodeBody::StreamCdcScan(_)) => {
808                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
809                if self
810                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
811                    .await?
812                {
813                    found_cdc_scan = true;
814                }
815            }
816            // When there's generated columns, the cdc scan node is wrapped in a project node
817            Some(NodeBody::Project(_)) => {
818                for input in &stream_scan_fragment.nodes.input {
819                    assert_parallelism(stream_scan_fragment, &input.node_body);
820                    if self
821                        .validate_cdc_table_inner(&input.node_body, table.id)
822                        .await?
823                    {
824                        found_cdc_scan = true;
825                    }
826                }
827            }
828            _ => {
829                bail!("Unexpected node body for stream cdc scan");
830            }
831        };
832        if !found_cdc_scan {
833            bail!("No stream cdc scan node found in stream scan fragment");
834        }
835        Ok(())
836    }
837
838    async fn validate_cdc_table_inner(
839        &self,
840        node_body: &Option<NodeBody>,
841        table_id: u32,
842    ) -> MetaResult<bool> {
843        let meta_store = self.env.meta_store_ref();
844        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
845            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
846        {
847            let options_with_secret = WithOptionsSecResolved::new(
848                cdc_table_desc.connect_properties.clone(),
849                cdc_table_desc.secret_refs.clone(),
850            );
851
852            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
853            props.init_from_pb_cdc_table_desc(cdc_table_desc);
854
855            // Try creating a split enumerator to validate
856            let _enumerator = props
857                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
858                .await?;
859
860            if is_parallelized_backfill_enabled(stream_cdc_scan) {
861                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
862                try_init_parallel_cdc_table_snapshot_splits(
863                    table_id,
864                    cdc_table_desc,
865                    meta_store,
866                    &stream_cdc_scan.options,
867                    self.env.opts.cdc_table_split_init_insert_batch_size,
868                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
869                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
870                )
871                .await?;
872            }
873
874            tracing::debug!(?table_id, "validate cdc table success");
875            Ok(true)
876        } else {
877            Ok(false)
878        }
879    }
880
881    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
882        let migrated = self
883            .metadata_manager
884            .catalog_controller
885            .has_table_been_migrated(table_id)
886            .await?;
887        if !migrated {
888            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
889        } else {
890            Ok(())
891        }
892    }
893
894    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
895    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
896    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
897    pub async fn create_streaming_job(
898        &self,
899        mut streaming_job: StreamingJob,
900        fragment_graph: StreamFragmentGraphProto,
901        dependencies: HashSet<ObjectId>,
902        specific_resource_group: Option<String>,
903        if_not_exists: bool,
904    ) -> MetaResult<NotificationVersion> {
905        if let StreamingJob::Sink(sink) = &streaming_job
906            && let Some(target_table) = sink.target_table
907        {
908            self.validate_table_for_sink(target_table as _).await?;
909        }
910        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
911        let check_ret = self
912            .metadata_manager
913            .catalog_controller
914            .create_job_catalog(
915                &mut streaming_job,
916                &ctx,
917                &fragment_graph.parallelism,
918                fragment_graph.max_parallelism as _,
919                dependencies,
920                specific_resource_group.clone(),
921            )
922            .await;
923        if let Err(meta_err) = check_ret {
924            if !if_not_exists {
925                return Err(meta_err);
926            }
927            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
928                if streaming_job.create_type() == CreateType::Foreground {
929                    let database_id = streaming_job.database_id();
930                    self.metadata_manager
931                        .wait_streaming_job_finished(database_id.into(), *job_id)
932                        .await
933                } else {
934                    Ok(IGNORED_NOTIFICATION_VERSION)
935                }
936            } else {
937                Err(meta_err)
938            };
939        }
940        let job_id = streaming_job.id();
941        tracing::debug!(
942            id = job_id,
943            definition = streaming_job.definition(),
944            create_type = streaming_job.create_type().as_str_name(),
945            job_type = ?streaming_job.job_type(),
946            "starting streaming job",
947        );
948        // TODO: acquire permits for recovered background DDLs.
949        let permit = self
950            .creating_streaming_job_permits
951            .semaphore
952            .clone()
953            .acquire_owned()
954            .instrument_await("acquire_creating_streaming_job_permit")
955            .await
956            .unwrap();
957        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
958
959        let name = streaming_job.name();
960        let definition = streaming_job.definition();
961        let source_id = match &streaming_job {
962            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
963            _ => None,
964        };
965
966        // create streaming job.
967        match self
968            .create_streaming_job_inner(
969                ctx,
970                streaming_job,
971                fragment_graph,
972                specific_resource_group,
973                permit,
974            )
975            .await
976        {
977            Ok(version) => Ok(version),
978            Err(err) => {
979                tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
980                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
981                    id: job_id,
982                    name,
983                    definition,
984                    error: err.as_report().to_string(),
985                };
986                self.env.event_log_manager_ref().add_event_logs(vec![
987                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
988                ]);
989                let (aborted, _) = self
990                    .metadata_manager
991                    .catalog_controller
992                    .try_abort_creating_streaming_job(job_id as _, false)
993                    .await?;
994                if aborted {
995                    tracing::warn!(id = job_id, "aborted streaming job");
996                    // FIXME: might also need other cleanup here
997                    if let Some(source_id) = source_id {
998                        self.source_manager
999                            .apply_source_change(SourceChange::DropSource {
1000                                dropped_source_ids: vec![source_id as SourceId],
1001                            })
1002                            .await;
1003                    }
1004                }
1005                Err(err)
1006            }
1007        }
1008    }
1009
1010    #[await_tree::instrument(boxed)]
1011    async fn create_streaming_job_inner(
1012        &self,
1013        ctx: StreamContext,
1014        mut streaming_job: StreamingJob,
1015        fragment_graph: StreamFragmentGraphProto,
1016        specific_resource_group: Option<String>,
1017        permit: OwnedSemaphorePermit,
1018    ) -> MetaResult<NotificationVersion> {
1019        let mut fragment_graph =
1020            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1021        streaming_job.set_info_from_graph(&fragment_graph);
1022
1023        // create internal table catalogs and refill table id.
1024        let incomplete_internal_tables = fragment_graph
1025            .incomplete_internal_tables()
1026            .into_values()
1027            .collect_vec();
1028        let table_id_map = self
1029            .metadata_manager
1030            .catalog_controller
1031            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1032            .await?;
1033        fragment_graph.refill_internal_table_ids(table_id_map);
1034
1035        // create fragment and actor catalogs.
1036        tracing::debug!(id = streaming_job.id(), "building streaming job");
1037        let (ctx, stream_job_fragments) = self
1038            .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1039            .await?;
1040
1041        let streaming_job = &ctx.streaming_job;
1042
1043        match streaming_job {
1044            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1045                self.validate_cdc_table(table, &stream_job_fragments)
1046                    .await?;
1047            }
1048            StreamingJob::Table(Some(source), ..) => {
1049                // Register the source on the connector node.
1050                self.source_manager.register_source(source).await?;
1051                let connector_name = source
1052                    .get_with_properties()
1053                    .get(UPSTREAM_SOURCE_KEY)
1054                    .cloned();
1055                let attr = source.info.as_ref().map(|source_info| {
1056                    jsonbb::json!({
1057                            "format": source_info.format().as_str_name(),
1058                            "encode": source_info.row_encode().as_str_name(),
1059                    })
1060                });
1061                report_create_object(
1062                    streaming_job.id(),
1063                    "source",
1064                    PbTelemetryDatabaseObject::Source,
1065                    connector_name,
1066                    attr,
1067                );
1068            }
1069            StreamingJob::Sink(sink) => {
1070                if sink.auto_refresh_schema_from_table.is_some() {
1071                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1072                }
1073                // Validate the sink on the connector node.
1074                validate_sink(sink).await?;
1075                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1076                let attr = sink.format_desc.as_ref().map(|sink_info| {
1077                    jsonbb::json!({
1078                        "format": sink_info.format().as_str_name(),
1079                        "encode": sink_info.encode().as_str_name(),
1080                    })
1081                });
1082                report_create_object(
1083                    streaming_job.id(),
1084                    "sink",
1085                    PbTelemetryDatabaseObject::Sink,
1086                    connector_name,
1087                    attr,
1088                );
1089            }
1090            StreamingJob::Source(source) => {
1091                // Register the source on the connector node.
1092                self.source_manager.register_source(source).await?;
1093                let connector_name = source
1094                    .get_with_properties()
1095                    .get(UPSTREAM_SOURCE_KEY)
1096                    .cloned();
1097                let attr = source.info.as_ref().map(|source_info| {
1098                    jsonbb::json!({
1099                            "format": source_info.format().as_str_name(),
1100                            "encode": source_info.row_encode().as_str_name(),
1101                    })
1102                });
1103                report_create_object(
1104                    streaming_job.id(),
1105                    "source",
1106                    PbTelemetryDatabaseObject::Source,
1107                    connector_name,
1108                    attr,
1109                );
1110            }
1111            _ => {}
1112        }
1113
1114        self.metadata_manager
1115            .catalog_controller
1116            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1117            .await?;
1118
1119        // create streaming jobs.
1120        let stream_job_id = streaming_job.id();
1121        match streaming_job.create_type() {
1122            CreateType::Unspecified | CreateType::Foreground => {
1123                let version = self
1124                    .stream_manager
1125                    .create_streaming_job(stream_job_fragments, ctx, None)
1126                    .await?;
1127                Ok(version)
1128            }
1129            CreateType::Background => {
1130                let await_tree_key = format!("Background DDL Worker ({})", stream_job_id);
1131                let await_tree_span =
1132                    span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1133
1134                let ctrl = self.clone();
1135                let (tx, rx) = oneshot::channel();
1136                let fut = async move {
1137                    let _ = ctrl
1138                        .stream_manager
1139                        .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1140                        .await
1141                        .inspect_err(|err| {
1142                            tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1143                        });
1144                    // drop the permit to release the semaphore
1145                    drop(permit);
1146                };
1147
1148                let fut = (self.env.await_tree_reg())
1149                    .register(await_tree_key, await_tree_span)
1150                    .instrument(fut);
1151                tokio::spawn(fut);
1152
1153                rx.instrument_await("wait_background_streaming_job_creation_started")
1154                    .await
1155                    .map_err(|_| {
1156                        anyhow!(
1157                            "failed to receive create streaming job result of job: {}",
1158                            stream_job_id
1159                        )
1160                    })??;
1161                Ok(IGNORED_NOTIFICATION_VERSION)
1162            }
1163        }
1164    }
1165
1166    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1167    pub async fn drop_object(
1168        &self,
1169        object_type: ObjectType,
1170        object_id: ObjectId,
1171        drop_mode: DropMode,
1172    ) -> MetaResult<NotificationVersion> {
1173        let (release_ctx, version) = self
1174            .metadata_manager
1175            .catalog_controller
1176            .drop_object(object_type, object_id, drop_mode)
1177            .await?;
1178
1179        let ReleaseContext {
1180            database_id,
1181            removed_streaming_job_ids,
1182            removed_state_table_ids,
1183            removed_source_ids,
1184            removed_secret_ids: secret_ids,
1185            removed_source_fragments,
1186            removed_actors,
1187            removed_fragments,
1188            removed_sink_fragment_by_targets,
1189        } = release_ctx;
1190
1191        let _guard = self.source_manager.pause_tick().await;
1192        self.stream_manager
1193            .drop_streaming_jobs(
1194                risingwave_common::catalog::DatabaseId::new(database_id as _),
1195                removed_actors.iter().map(|id| *id as _).collect(),
1196                removed_streaming_job_ids,
1197                removed_state_table_ids,
1198                removed_fragments.iter().map(|id| *id as _).collect(),
1199                removed_sink_fragment_by_targets
1200                    .into_iter()
1201                    .map(|(target, sinks)| {
1202                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1203                    })
1204                    .collect(),
1205            )
1206            .await;
1207
1208        // clean up sources after dropping streaming jobs.
1209        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1210        self.source_manager
1211            .apply_source_change(SourceChange::DropSource {
1212                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1213            })
1214            .await;
1215
1216        // unregister fragments and actors from source manager.
1217        // FIXME: need also unregister source backfill fragments.
1218        let dropped_source_fragments = removed_source_fragments
1219            .into_iter()
1220            .map(|(source_id, fragments)| {
1221                (
1222                    source_id,
1223                    fragments.into_iter().map(|id| id as u32).collect(),
1224                )
1225            })
1226            .collect();
1227        let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1228        self.source_manager
1229            .apply_source_change(SourceChange::DropMv {
1230                dropped_source_fragments,
1231                dropped_actors,
1232            })
1233            .await;
1234
1235        // remove secrets.
1236        for secret in secret_ids {
1237            LocalSecretManager::global().remove_secret(secret as _);
1238        }
1239        Ok(version)
1240    }
1241
1242    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1243    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1244    pub async fn replace_job(
1245        &self,
1246        mut streaming_job: StreamingJob,
1247        fragment_graph: StreamFragmentGraphProto,
1248    ) -> MetaResult<NotificationVersion> {
1249        match &streaming_job {
1250            StreamingJob::Table(..)
1251            | StreamingJob::Source(..)
1252            | StreamingJob::MaterializedView(..) => {}
1253            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1254                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1255            }
1256        }
1257
1258        let job_id = streaming_job.id();
1259
1260        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1261        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1262
1263        // Ensure the max parallelism unchanged before replacing table.
1264        let original_max_parallelism = self
1265            .metadata_manager
1266            .get_job_max_parallelism(streaming_job.id().into())
1267            .await?;
1268        let fragment_graph = PbStreamFragmentGraph {
1269            max_parallelism: original_max_parallelism as _,
1270            ..fragment_graph
1271        };
1272
1273        // 1. build fragment graph.
1274        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1275        streaming_job.set_info_from_graph(&fragment_graph);
1276
1277        // make it immutable
1278        let streaming_job = streaming_job;
1279
1280        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1281            let auto_refresh_schema_sinks = self
1282                .metadata_manager
1283                .catalog_controller
1284                .get_sink_auto_refresh_schema_from(table.id as _)
1285                .await?;
1286            if !auto_refresh_schema_sinks.is_empty() {
1287                let original_table_columns = self
1288                    .metadata_manager
1289                    .catalog_controller
1290                    .get_table_columns(table.id as _)
1291                    .await?;
1292                // compare column id to find newly added columns
1293                let mut original_table_column_ids: HashSet<_> = original_table_columns
1294                    .iter()
1295                    .map(|col| col.column_id())
1296                    .collect();
1297                let newly_added_columns = table
1298                    .columns
1299                    .iter()
1300                    .filter(|col| {
1301                        !original_table_column_ids.remove(&ColumnId::new(
1302                            col.column_desc.as_ref().unwrap().column_id as _,
1303                        ))
1304                    })
1305                    .map(|col| ColumnCatalog::from(col.clone()))
1306                    .collect_vec();
1307                if !original_table_column_ids.is_empty() {
1308                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1309                }
1310                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1311                for sink in auto_refresh_schema_sinks {
1312                    let sink_job_fragments = self
1313                        .metadata_manager
1314                        .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink.id))
1315                        .await?;
1316                    if sink_job_fragments.fragments.len() != 1 {
1317                        return Err(anyhow!(
1318                            "auto schema refresh sink must have only one fragment, but got {}",
1319                            sink_job_fragments.fragments.len()
1320                        )
1321                        .into());
1322                    }
1323                    let original_sink_fragment =
1324                        sink_job_fragments.fragments.into_values().next().unwrap();
1325                    let (new_sink_fragment, new_schema, new_log_store_table) =
1326                        rewrite_refresh_schema_sink_fragment(
1327                            &original_sink_fragment,
1328                            &sink,
1329                            &newly_added_columns,
1330                            table,
1331                            fragment_graph.table_fragment_id(),
1332                            self.env.id_gen_manager(),
1333                        )?;
1334
1335                    assert_eq!(
1336                        original_sink_fragment.actors.len(),
1337                        new_sink_fragment.actors.len()
1338                    );
1339                    let actor_status = (0..original_sink_fragment.actors.len())
1340                        .map(|i| {
1341                            let worker_node_id = sink_job_fragments.actor_status
1342                                [&original_sink_fragment.actors[i].actor_id]
1343                                .location
1344                                .as_ref()
1345                                .unwrap()
1346                                .worker_node_id;
1347                            (
1348                                new_sink_fragment.actors[i].actor_id,
1349                                PbActorStatus {
1350                                    location: Some(PbActorLocation { worker_node_id }),
1351                                    state: PbActorState::Inactive as _,
1352                                },
1353                            )
1354                        })
1355                        .collect();
1356
1357                    let streaming_job = StreamingJob::Sink(sink);
1358
1359                    let tmp_sink_id = self
1360                        .metadata_manager
1361                        .catalog_controller
1362                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1363                        .await?;
1364                    let StreamingJob::Sink(sink) = streaming_job else {
1365                        unreachable!()
1366                    };
1367
1368                    sinks.push(AutoRefreshSchemaSinkContext {
1369                        tmp_sink_id,
1370                        original_sink: sink,
1371                        original_fragment: original_sink_fragment,
1372                        new_schema,
1373                        newly_add_fields: newly_added_columns
1374                            .iter()
1375                            .map(|col| Field::from(&col.column_desc))
1376                            .collect(),
1377                        new_fragment: new_sink_fragment,
1378                        new_log_store_table,
1379                        actor_status,
1380                    });
1381                }
1382                Some(sinks)
1383            } else {
1384                None
1385            }
1386        } else {
1387            None
1388        };
1389
1390        let tmp_id = self
1391            .metadata_manager
1392            .catalog_controller
1393            .create_job_catalog_for_replace(
1394                &streaming_job,
1395                Some(&ctx),
1396                fragment_graph.specified_parallelism().as_ref(),
1397                Some(fragment_graph.max_parallelism()),
1398            )
1399            .await?;
1400
1401        let tmp_sink_ids = auto_refresh_schema_sinks
1402            .as_ref()
1403            .map(|sinks| sinks.iter().map(|sink| sink.tmp_sink_id).collect_vec());
1404
1405        tracing::debug!(id = job_id, "building replace streaming job");
1406        let mut updated_sink_catalogs = vec![];
1407
1408        let mut drop_table_connector_ctx = None;
1409        let result: MetaResult<_> = try {
1410            let (mut ctx, mut stream_job_fragments) = self
1411                .build_replace_job(
1412                    ctx,
1413                    &streaming_job,
1414                    fragment_graph,
1415                    tmp_id as _,
1416                    auto_refresh_schema_sinks,
1417                )
1418                .await?;
1419            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1420            let auto_refresh_schema_sink_finish_ctx =
1421                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1422                    sinks
1423                        .iter()
1424                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1425                            tmp_sink_id: sink.tmp_sink_id,
1426                            original_sink_id: sink.original_sink.id as _,
1427                            columns: sink.new_schema.clone(),
1428                            new_log_store_table: sink
1429                                .new_log_store_table
1430                                .as_ref()
1431                                .map(|table| (table.id as _, table.columns.clone())),
1432                        })
1433                        .collect()
1434                });
1435
1436            // Handle table that has incoming sinks.
1437            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1438                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1439                let upstream_infos = self
1440                    .metadata_manager
1441                    .catalog_controller
1442                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1443                    .await?;
1444                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1445
1446                for upstream_info in &upstream_infos {
1447                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1448                    ctx.upstream_fragment_downstreams
1449                        .entry(upstream_fragment_id)
1450                        .or_default()
1451                        .push(upstream_info.new_sink_downstream.clone());
1452                    if upstream_info.sink_original_target_columns.is_empty() {
1453                        updated_sink_catalogs.push(upstream_info.sink_id);
1454                    }
1455                }
1456            }
1457
1458            let replace_upstream = ctx.replace_upstream.clone();
1459
1460            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1461                let empty_actor_splits = HashMap::new();
1462                let empty_downstreams = FragmentDownstreamRelation::default();
1463                for sink in sinks {
1464                    self.metadata_manager
1465                        .catalog_controller
1466                        .prepare_streaming_job(
1467                            sink.tmp_sink_id,
1468                            || [&sink.new_fragment].into_iter(),
1469                            &sink.actor_status,
1470                            &empty_actor_splits,
1471                            &empty_downstreams,
1472                            true,
1473                            None,
1474                        )
1475                        .await?;
1476                }
1477            }
1478
1479            self.metadata_manager
1480                .catalog_controller
1481                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1482                .await?;
1483
1484            self.stream_manager
1485                .replace_stream_job(stream_job_fragments, ctx)
1486                .await?;
1487            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1488        };
1489
1490        match result {
1491            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1492                let version = self
1493                    .metadata_manager
1494                    .catalog_controller
1495                    .finish_replace_streaming_job(
1496                        tmp_id,
1497                        streaming_job,
1498                        replace_upstream,
1499                        SinkIntoTableContext {
1500                            updated_sink_catalogs,
1501                        },
1502                        drop_table_connector_ctx.as_ref(),
1503                        auto_refresh_schema_sink_finish_ctx,
1504                    )
1505                    .await?;
1506                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1507                    self.source_manager
1508                        .apply_source_change(SourceChange::DropSource {
1509                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1510                        })
1511                        .await;
1512                }
1513                Ok(version)
1514            }
1515            Err(err) => {
1516                tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1517                let _ = self.metadata_manager
1518                    .catalog_controller
1519                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1520                    .await.inspect_err(|err| {
1521                    tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1522                });
1523                Err(err)
1524            }
1525        }
1526    }
1527
1528    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1529    )]
1530    async fn drop_streaming_job(
1531        &self,
1532        job_id: StreamingJobId,
1533        drop_mode: DropMode,
1534    ) -> MetaResult<NotificationVersion> {
1535        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1536
1537        let (object_id, object_type) = match job_id {
1538            StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1539            StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1540            StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1541            StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1542        };
1543
1544        let job_status = self
1545            .metadata_manager
1546            .catalog_controller
1547            .get_streaming_job_status(job_id.id())
1548            .await?;
1549        let version = match job_status {
1550            JobStatus::Initial => {
1551                unreachable!(
1552                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1553                );
1554            }
1555            JobStatus::Creating => {
1556                let canceled_jobs = self
1557                    .stream_manager
1558                    .cancel_streaming_jobs(vec![(job_id.id() as u32).into()])
1559                    .await;
1560                if canceled_jobs.is_empty() {
1561                    tracing::warn!(job_id = job_id.id(), "failed to cancel streaming job");
1562                }
1563                IGNORED_NOTIFICATION_VERSION
1564            }
1565            JobStatus::Created => {
1566                let version = self.drop_object(object_type, object_id, drop_mode).await?;
1567                #[cfg(not(madsim))]
1568                if let StreamingJobId::Sink(sink_id) = job_id {
1569                    // delete system table for exactly once iceberg sink
1570                    // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1571                    let db = self.env.meta_store_ref().conn.clone();
1572                    clean_all_rows_by_sink_id(&db, sink_id).await?;
1573                }
1574                version
1575            }
1576        };
1577
1578        Ok(version)
1579    }
1580
1581    /// Resolve the parallelism of the stream job based on the given information.
1582    ///
1583    /// Returns error if user specifies a parallelism that cannot be satisfied.
1584    fn resolve_stream_parallelism(
1585        &self,
1586        specified: Option<NonZeroUsize>,
1587        max: NonZeroUsize,
1588        cluster_info: &StreamingClusterInfo,
1589        resource_group: String,
1590    ) -> MetaResult<NonZeroUsize> {
1591        let available = cluster_info.parallelism(&resource_group);
1592        let Some(available) = NonZeroUsize::new(available) else {
1593            bail_unavailable!(
1594                "no available slots to schedule in resource group \"{}\", \
1595                 have you allocated any compute nodes within this resource group?",
1596                resource_group
1597            );
1598        };
1599
1600        if let Some(specified) = specified {
1601            if specified > max {
1602                bail_invalid_parameter!(
1603                    "specified parallelism {} should not exceed max parallelism {}",
1604                    specified,
1605                    max,
1606                );
1607            }
1608            if specified > available {
1609                bail_unavailable!(
1610                    "insufficient parallelism to schedule in resource group \"{}\", \
1611                     required: {}, available: {}",
1612                    resource_group,
1613                    specified,
1614                    available,
1615                );
1616            }
1617            Ok(specified)
1618        } else {
1619            // Use configured parallelism if no default parallelism is specified.
1620            let default_parallelism = match self.env.opts.default_parallelism {
1621                DefaultParallelism::Full => available,
1622                DefaultParallelism::Default(num) => {
1623                    if num > available {
1624                        bail_unavailable!(
1625                            "insufficient parallelism to schedule in resource group \"{}\", \
1626                            required: {}, available: {}",
1627                            resource_group,
1628                            num,
1629                            available,
1630                        );
1631                    }
1632                    num
1633                }
1634            };
1635
1636            if default_parallelism > max {
1637                tracing::warn!(
1638                    max_parallelism = max.get(),
1639                    resource_group,
1640                    "too many parallelism available, use max parallelism instead",
1641                );
1642            }
1643            Ok(default_parallelism.min(max))
1644        }
1645    }
1646
1647    /// Builds the actor graph:
1648    /// - Add the upstream fragments to the fragment graph
1649    /// - Schedule the fragments based on their distribution
1650    /// - Expand each fragment into one or several actors
1651    /// - Construct the fragment level backfill order control.
1652    #[await_tree::instrument]
1653    pub(crate) async fn build_stream_job(
1654        &self,
1655        stream_ctx: StreamContext,
1656        mut stream_job: StreamingJob,
1657        fragment_graph: StreamFragmentGraph,
1658        specific_resource_group: Option<String>,
1659    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1660        let id = stream_job.id();
1661        let specified_parallelism = fragment_graph.specified_parallelism();
1662        let expr_context = stream_ctx.to_expr_context();
1663        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1664
1665        // 1. Fragment Level ordering graph
1666        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1667
1668        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1669        // contains all information needed for building the actor graph.
1670
1671        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1672            fragment_graph.collect_snapshot_backfill_info()?;
1673        assert!(
1674            snapshot_backfill_info
1675                .iter()
1676                .chain([&cross_db_snapshot_backfill_info])
1677                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1678                .all(|backfill_epoch| backfill_epoch.is_none()),
1679            "should not set backfill epoch when initially build the job: {:?} {:?}",
1680            snapshot_backfill_info,
1681            cross_db_snapshot_backfill_info
1682        );
1683
1684        // check if log store exists for all cross-db upstreams
1685        self.metadata_manager
1686            .catalog_controller
1687            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1688            .await?;
1689
1690        let upstream_table_ids = fragment_graph
1691            .dependent_table_ids()
1692            .iter()
1693            .filter(|id| {
1694                !cross_db_snapshot_backfill_info
1695                    .upstream_mv_table_id_to_backfill_epoch
1696                    .contains_key(id)
1697            })
1698            .cloned()
1699            .collect();
1700
1701        let (upstream_root_fragments, existing_actor_location) = self
1702            .metadata_manager
1703            .get_upstream_root_fragments(&upstream_table_ids)
1704            .await?;
1705
1706        if snapshot_backfill_info.is_some() {
1707            match stream_job {
1708                StreamingJob::MaterializedView(_)
1709                | StreamingJob::Sink(_)
1710                | StreamingJob::Index(_, _) => {}
1711                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1712                    return Err(
1713                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1714                    );
1715                }
1716            }
1717        }
1718
1719        let upstream_actors = upstream_root_fragments
1720            .values()
1721            .map(|fragment| {
1722                (
1723                    fragment.fragment_id,
1724                    fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1725                )
1726            })
1727            .collect();
1728
1729        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1730            fragment_graph,
1731            upstream_root_fragments,
1732            existing_actor_location,
1733            (&stream_job).into(),
1734        )?;
1735
1736        let resource_group = match specific_resource_group {
1737            None => {
1738                self.metadata_manager
1739                    .get_database_resource_group(stream_job.database_id() as ObjectId)
1740                    .await?
1741            }
1742            Some(resource_group) => resource_group,
1743        };
1744
1745        // 3. Build the actor graph.
1746        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1747
1748        let parallelism = self.resolve_stream_parallelism(
1749            specified_parallelism,
1750            max_parallelism,
1751            &cluster_info,
1752            resource_group.clone(),
1753        )?;
1754
1755        let parallelism = self
1756            .env
1757            .system_params_reader()
1758            .await
1759            .adaptive_parallelism_strategy()
1760            .compute_target_parallelism(parallelism.get());
1761
1762        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1763        let actor_graph_builder = ActorGraphBuilder::new(
1764            id,
1765            resource_group,
1766            complete_graph,
1767            cluster_info,
1768            parallelism,
1769        )?;
1770
1771        let ActorGraphBuildResult {
1772            graph,
1773            downstream_fragment_relations,
1774            building_locations,
1775            upstream_fragment_downstreams,
1776            new_no_shuffle,
1777            replace_upstream,
1778            ..
1779        } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1780        assert!(replace_upstream.is_empty());
1781
1782        // 4. Build the table fragments structure that will be persisted in the stream manager,
1783        // and the context that contains all information needed for building the
1784        // actors on the compute nodes.
1785
1786        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1787        // Otherwise, it defaults to FIXED based on deduction.
1788        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1789            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1790            _ => TableParallelism::Fixed(parallelism.get()),
1791        };
1792
1793        let stream_job_fragments = StreamJobFragments::new(
1794            id.into(),
1795            graph,
1796            &building_locations.actor_locations,
1797            stream_ctx.clone(),
1798            table_parallelism,
1799            max_parallelism.get(),
1800        );
1801
1802        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1803            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1804        }
1805
1806        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1807            && let Ok(table_id) = sink.get_target_table()
1808        {
1809            let tables = self
1810                .metadata_manager
1811                .get_table_catalog_by_ids(&[*table_id as _])
1812                .await?;
1813            let target_table = tables
1814                .first()
1815                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1816            let sink_fragment = stream_job_fragments
1817                .sink_fragment()
1818                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1819            let mview_fragment_id = self
1820                .metadata_manager
1821                .catalog_controller
1822                .get_mview_fragment_by_id(*table_id as _)
1823                .await?;
1824            let upstream_sink_info = build_upstream_sink_info(
1825                sink,
1826                sink_fragment.fragment_id as _,
1827                target_table,
1828                mview_fragment_id,
1829            )?;
1830            Some(upstream_sink_info)
1831        } else {
1832            None
1833        };
1834
1835        let ctx = CreateStreamingJobContext {
1836            upstream_fragment_downstreams,
1837            new_no_shuffle,
1838            upstream_actors,
1839            building_locations,
1840            definition: stream_job.definition(),
1841            mv_table_id: stream_job.mv_table(),
1842            create_type: stream_job.create_type(),
1843            job_type: (&stream_job).into(),
1844            streaming_job: stream_job,
1845            new_upstream_sink,
1846            option: CreateStreamingJobOption {},
1847            snapshot_backfill_info,
1848            cross_db_snapshot_backfill_info,
1849            fragment_backfill_ordering,
1850        };
1851
1852        Ok((
1853            ctx,
1854            StreamJobFragmentsToCreate {
1855                inner: stream_job_fragments,
1856                downstreams: downstream_fragment_relations,
1857            },
1858        ))
1859    }
1860
1861    /// `build_replace_table` builds a job replacement and returns the context and new job
1862    /// fragments.
1863    ///
1864    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1865    /// replacement is finished.
1866    pub(crate) async fn build_replace_job(
1867        &self,
1868        stream_ctx: StreamContext,
1869        stream_job: &StreamingJob,
1870        mut fragment_graph: StreamFragmentGraph,
1871        tmp_job_id: TableId,
1872        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1873    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1874        match &stream_job {
1875            StreamingJob::Table(..)
1876            | StreamingJob::Source(..)
1877            | StreamingJob::MaterializedView(..) => {}
1878            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1879                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1880            }
1881        }
1882
1883        let id = stream_job.id();
1884        let expr_context = stream_ctx.to_expr_context();
1885
1886        // check if performing drop table connector
1887        let mut drop_table_associated_source_id = None;
1888        if let StreamingJob::Table(None, _, _) = &stream_job {
1889            drop_table_associated_source_id = self
1890                .metadata_manager
1891                .get_table_associated_source_id(id as _)
1892                .await?;
1893        }
1894
1895        let old_fragments = self
1896            .metadata_manager
1897            .get_job_fragments_by_id(&id.into())
1898            .await?;
1899        let old_internal_table_ids = old_fragments.internal_table_ids();
1900
1901        // handle drop table's associated source
1902        let mut drop_table_connector_ctx = None;
1903        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1904            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1905            debug_assert!(old_internal_table_ids.len() == 1);
1906
1907            drop_table_connector_ctx = Some(DropTableConnectorContext {
1908                // we do not remove the original table catalog as it's still needed for the streaming job
1909                // just need to remove the ref to the state table
1910                to_change_streaming_job_id: id as i32,
1911                to_remove_state_table_id: old_internal_table_ids[0] as i32, // asserted before
1912                to_remove_source_id,
1913            });
1914        } else if stream_job.is_materialized_view() {
1915            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1916            // but more robust.
1917            let old_fragments_upstreams = self
1918                .metadata_manager
1919                .catalog_controller
1920                .upstream_fragments(old_fragments.fragment_ids())
1921                .await?;
1922
1923            let old_state_graph =
1924                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1925            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1926            let mapping =
1927                state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
1928                    .context("incompatible altering on the streaming job states")?;
1929
1930            fragment_graph.fit_internal_table_ids_with_mapping(mapping);
1931        } else {
1932            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
1933            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
1934            let old_internal_tables = self
1935                .metadata_manager
1936                .get_table_catalog_by_ids(&old_internal_table_ids)
1937                .await?;
1938            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1939        }
1940
1941        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
1942        // graph that contains all information needed for building the actor graph.
1943        let original_root_fragment = old_fragments
1944            .root_fragment()
1945            .expect("root fragment not found");
1946
1947        let job_type = StreamingJobType::from(stream_job);
1948
1949        // Extract the downstream fragments from the fragment graph.
1950        let (mut downstream_fragments, mut downstream_actor_location) =
1951            self.metadata_manager.get_downstream_fragments(id).await?;
1952
1953        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1954            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1955                .iter()
1956                .map(|sink| sink.original_fragment.fragment_id)
1957                .collect();
1958            for (_, downstream_fragment) in &mut downstream_fragments {
1959                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1960                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1961                }) {
1962                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1963                    for actor in &downstream_fragment.actors {
1964                        downstream_actor_location.remove(&actor.actor_id);
1965                    }
1966                    for (actor_id, status) in &sink.actor_status {
1967                        downstream_actor_location.insert(
1968                            *actor_id,
1969                            status.location.as_ref().unwrap().worker_node_id as WorkerId,
1970                        );
1971                    }
1972                    *downstream_fragment = sink.new_fragment.clone();
1973                }
1974            }
1975            assert!(remaining_fragment.is_empty());
1976        }
1977
1978        // build complete graph based on the table job type
1979        let complete_graph = match &job_type {
1980            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1981                CompleteStreamFragmentGraph::with_downstreams(
1982                    fragment_graph,
1983                    original_root_fragment.fragment_id,
1984                    downstream_fragments,
1985                    downstream_actor_location,
1986                    job_type,
1987                )?
1988            }
1989            StreamingJobType::Table(TableJobType::SharedCdcSource)
1990            | StreamingJobType::MaterializedView => {
1991                // CDC tables or materialized views can have upstream jobs as well.
1992                let (upstream_root_fragments, upstream_actor_location) = self
1993                    .metadata_manager
1994                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1995                    .await?;
1996
1997                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
1998                    fragment_graph,
1999                    upstream_root_fragments,
2000                    upstream_actor_location,
2001                    original_root_fragment.fragment_id,
2002                    downstream_fragments,
2003                    downstream_actor_location,
2004                    job_type,
2005                )?
2006            }
2007            _ => unreachable!(),
2008        };
2009
2010        let resource_group = self
2011            .metadata_manager
2012            .get_existing_job_resource_group(id as ObjectId)
2013            .await?;
2014
2015        // 2. Build the actor graph.
2016        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2017
2018        // XXX: what is this parallelism?
2019        // Is it "assigned parallelism"?
2020        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2021            .expect("The number of actors in the original table fragment should be greater than 0");
2022
2023        let actor_graph_builder = ActorGraphBuilder::new(
2024            id,
2025            resource_group,
2026            complete_graph,
2027            cluster_info,
2028            parallelism,
2029        )?;
2030
2031        let ActorGraphBuildResult {
2032            graph,
2033            downstream_fragment_relations,
2034            building_locations,
2035            upstream_fragment_downstreams,
2036            mut replace_upstream,
2037            new_no_shuffle,
2038            ..
2039        } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2040
2041        // general table & source does not have upstream job, so the dispatchers should be empty
2042        if matches!(
2043            job_type,
2044            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2045        ) {
2046            assert!(upstream_fragment_downstreams.is_empty());
2047        }
2048
2049        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2050        // the context that contains all information needed for building the actors on the compute
2051        // nodes.
2052        let stream_job_fragments = StreamJobFragments::new(
2053            (tmp_job_id as u32).into(),
2054            graph,
2055            &building_locations.actor_locations,
2056            stream_ctx,
2057            old_fragments.assigned_parallelism,
2058            old_fragments.max_parallelism,
2059        );
2060
2061        if let Some(sinks) = &auto_refresh_schema_sinks {
2062            for sink in sinks {
2063                replace_upstream
2064                    .remove(&sink.new_fragment.fragment_id)
2065                    .expect("should exist");
2066            }
2067        }
2068
2069        // Note: no need to set `vnode_count` as it's already set by the frontend.
2070        // See `get_replace_table_plan`.
2071
2072        let ctx = ReplaceStreamJobContext {
2073            old_fragments,
2074            replace_upstream,
2075            new_no_shuffle,
2076            upstream_fragment_downstreams,
2077            building_locations,
2078            streaming_job: stream_job.clone(),
2079            tmp_id: tmp_job_id as _,
2080            drop_table_connector_ctx,
2081            auto_refresh_schema_sinks,
2082        };
2083
2084        Ok((
2085            ctx,
2086            StreamJobFragmentsToCreate {
2087                inner: stream_job_fragments,
2088                downstreams: downstream_fragment_relations,
2089            },
2090        ))
2091    }
2092
2093    async fn alter_name(
2094        &self,
2095        relation: alter_name_request::Object,
2096        new_name: &str,
2097    ) -> MetaResult<NotificationVersion> {
2098        let (obj_type, id) = match relation {
2099            alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2100            alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2101            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2102            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2103            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2104            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2105            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2106            alter_name_request::Object::SubscriptionId(id) => {
2107                (ObjectType::Subscription, id as ObjectId)
2108            }
2109        };
2110        self.metadata_manager
2111            .catalog_controller
2112            .alter_name(obj_type, id, new_name)
2113            .await
2114    }
2115
2116    async fn alter_swap_rename(
2117        &self,
2118        object: alter_swap_rename_request::Object,
2119    ) -> MetaResult<NotificationVersion> {
2120        let (obj_type, src_id, dst_id) = match object {
2121            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2122            alter_swap_rename_request::Object::Table(objs) => {
2123                let (src_id, dst_id) = (
2124                    objs.src_object_id as ObjectId,
2125                    objs.dst_object_id as ObjectId,
2126                );
2127                (ObjectType::Table, src_id, dst_id)
2128            }
2129            alter_swap_rename_request::Object::View(objs) => {
2130                let (src_id, dst_id) = (
2131                    objs.src_object_id as ObjectId,
2132                    objs.dst_object_id as ObjectId,
2133                );
2134                (ObjectType::View, src_id, dst_id)
2135            }
2136            alter_swap_rename_request::Object::Source(objs) => {
2137                let (src_id, dst_id) = (
2138                    objs.src_object_id as ObjectId,
2139                    objs.dst_object_id as ObjectId,
2140                );
2141                (ObjectType::Source, src_id, dst_id)
2142            }
2143            alter_swap_rename_request::Object::Sink(objs) => {
2144                let (src_id, dst_id) = (
2145                    objs.src_object_id as ObjectId,
2146                    objs.dst_object_id as ObjectId,
2147                );
2148                (ObjectType::Sink, src_id, dst_id)
2149            }
2150            alter_swap_rename_request::Object::Subscription(objs) => {
2151                let (src_id, dst_id) = (
2152                    objs.src_object_id as ObjectId,
2153                    objs.dst_object_id as ObjectId,
2154                );
2155                (ObjectType::Subscription, src_id, dst_id)
2156            }
2157        };
2158
2159        self.metadata_manager
2160            .catalog_controller
2161            .alter_swap_rename(obj_type, src_id, dst_id)
2162            .await
2163    }
2164
2165    async fn alter_owner(
2166        &self,
2167        object: Object,
2168        owner_id: UserId,
2169    ) -> MetaResult<NotificationVersion> {
2170        let (obj_type, id) = match object {
2171            Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2172            Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2173            Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2174            Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2175            Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2176            Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2177            Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2178            Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2179        };
2180        self.metadata_manager
2181            .catalog_controller
2182            .alter_owner(obj_type, id, owner_id as _)
2183            .await
2184    }
2185
2186    async fn alter_set_schema(
2187        &self,
2188        object: alter_set_schema_request::Object,
2189        new_schema_id: SchemaId,
2190    ) -> MetaResult<NotificationVersion> {
2191        let (obj_type, id) = match object {
2192            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2193            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2194            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2195            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2196            alter_set_schema_request::Object::FunctionId(id) => {
2197                (ObjectType::Function, id as ObjectId)
2198            }
2199            alter_set_schema_request::Object::ConnectionId(id) => {
2200                (ObjectType::Connection, id as ObjectId)
2201            }
2202            alter_set_schema_request::Object::SubscriptionId(id) => {
2203                (ObjectType::Subscription, id as ObjectId)
2204            }
2205        };
2206        self.metadata_manager
2207            .catalog_controller
2208            .alter_schema(obj_type, id, new_schema_id as _)
2209            .await
2210    }
2211
2212    pub async fn wait(&self) -> MetaResult<()> {
2213        let timeout_ms = 30 * 60 * 1000;
2214        for _ in 0..timeout_ms {
2215            if self
2216                .metadata_manager
2217                .catalog_controller
2218                .list_background_creating_jobs(true)
2219                .await?
2220                .is_empty()
2221            {
2222                return Ok(());
2223            }
2224
2225            sleep(Duration::from_millis(1)).await;
2226        }
2227        Err(MetaError::cancelled(format!(
2228            "timeout after {timeout_ms}ms"
2229        )))
2230    }
2231
2232    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2233        self.metadata_manager
2234            .catalog_controller
2235            .comment_on(comment)
2236            .await
2237    }
2238}
2239
2240fn report_create_object(
2241    catalog_id: u32,
2242    event_name: &str,
2243    obj_type: PbTelemetryDatabaseObject,
2244    connector_name: Option<String>,
2245    attr_info: Option<jsonbb::Value>,
2246) {
2247    report_event(
2248        PbTelemetryEventStage::CreateStreamJob,
2249        event_name,
2250        catalog_id.into(),
2251        connector_name,
2252        Some(obj_type),
2253        attr_info,
2254    );
2255}
2256
2257async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2258    match Entity::delete_many()
2259        .filter(Column::SinkId.eq(sink_id))
2260        .exec(db)
2261        .await
2262    {
2263        Ok(result) => {
2264            let deleted_count = result.rows_affected;
2265
2266            tracing::info!(
2267                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2268                deleted_count,
2269                sink_id
2270            );
2271            Ok(())
2272        }
2273        Err(e) => {
2274            tracing::error!(
2275                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2276                sink_id,
2277                e.as_report()
2278            );
2279            Err(e.into())
2280        }
2281    }
2282}
2283
2284pub fn build_upstream_sink_info(
2285    sink: &PbSink,
2286    sink_fragment_id: FragmentId,
2287    target_table: &PbTable,
2288    target_fragment_id: FragmentId,
2289) -> MetaResult<UpstreamSinkInfo> {
2290    let sink_columns = if !sink.original_target_columns.is_empty() {
2291        sink.original_target_columns.clone()
2292    } else {
2293        // This is due to the fact that the value did not exist in earlier versions,
2294        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2295        // Therefore the columns of the table at this point are `original_target_columns`.
2296        // This value of sink will be filled on the meta.
2297        target_table.columns.clone()
2298    };
2299
2300    let sink_output_fields = sink_columns
2301        .iter()
2302        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2303        .collect_vec();
2304    let output_indices = (0..sink_output_fields.len())
2305        .map(|i| i as u32)
2306        .collect_vec();
2307
2308    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2309        let sink_idx_by_col_id = sink_columns
2310            .iter()
2311            .enumerate()
2312            .map(|(idx, col)| {
2313                let column_id = col.column_desc.as_ref().unwrap().column_id;
2314                (column_id, idx as u32)
2315            })
2316            .collect::<HashMap<_, _>>();
2317        target_table
2318            .distribution_key
2319            .iter()
2320            .map(|dist_idx| {
2321                let column_id = target_table.columns[*dist_idx as usize]
2322                    .column_desc
2323                    .as_ref()
2324                    .unwrap()
2325                    .column_id;
2326                let sink_idx = sink_idx_by_col_id
2327                    .get(&column_id)
2328                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2329                Ok(*sink_idx)
2330            })
2331            .collect::<anyhow::Result<Vec<_>>>()?
2332    };
2333    let dist_key_indices =
2334        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2335    let downstream_fragment_id = target_fragment_id as _;
2336    let new_downstream_relation = DownstreamFragmentRelation {
2337        downstream_fragment_id,
2338        dispatcher_type: DispatcherType::Hash,
2339        dist_key_indices,
2340        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2341    };
2342    let current_target_columns = target_table.get_columns();
2343    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2344    Ok(UpstreamSinkInfo {
2345        sink_id: sink.id as _,
2346        sink_fragment_id: sink_fragment_id as _,
2347        sink_output_fields: sink_output_fields.clone(),
2348        sink_original_target_columns: sink.get_original_target_columns().clone(),
2349        project_exprs,
2350        new_sink_downstream: new_downstream_relation,
2351    })
2352}
2353
2354pub fn refill_upstream_sink_union_in_table(
2355    union_fragment_root: &mut PbStreamNode,
2356    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2357) {
2358    visit_stream_node_cont_mut(union_fragment_root, |node| {
2359        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2360            let init_upstreams = upstream_sink_infos
2361                .iter()
2362                .map(|info| PbUpstreamSinkInfo {
2363                    upstream_fragment_id: info.sink_fragment_id,
2364                    sink_output_schema: info.sink_output_fields.clone(),
2365                    project_exprs: info.project_exprs.clone(),
2366                })
2367                .collect();
2368            upstream_sink_union.init_upstreams = init_upstreams;
2369            false
2370        } else {
2371            true
2372        }
2373    });
2374}