risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::InstrumentAwait;
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{
27    AlterDatabaseParam, ColumnCatalog, ColumnId, Field, FragmentTypeFlag,
28};
29use risingwave_common::config::DefaultParallelism;
30use risingwave_common::hash::VnodeCountCompat;
31use risingwave_common::id::{JobId, TableId};
32use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
33use risingwave_common::system_param::reader::SystemParamsRead;
34use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont_mut;
35use risingwave_common::{bail, bail_not_implemented};
36use risingwave_connector::WithOptionsSecResolved;
37use risingwave_connector::connector_common::validate_connection;
38use risingwave_connector::sink::SinkParam;
39use risingwave_connector::sink::iceberg::IcebergSink;
40use risingwave_connector::source::cdc::CdcScanOptions;
41use risingwave_connector::source::{
42    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
43};
44use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
45use risingwave_meta_model::object::ObjectType;
46use risingwave_meta_model::{
47    ConnectionId, DatabaseId, DispatcherType, FragmentId, FunctionId, IndexId, JobStatus, ObjectId,
48    SchemaId, SecretId, SinkId, SourceId, StreamingParallelism, SubscriptionId, UserId, ViewId,
49};
50use risingwave_pb::catalog::{
51    Comment, Connection, CreateType, Database, Function, PbSink, PbTable, Schema, Secret, Source,
52    Subscription, Table, View,
53};
54use risingwave_pb::common::PbActorLocation;
55use risingwave_pb::ddl_service::alter_owner_request::Object;
56use risingwave_pb::ddl_service::{
57    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
58    alter_swap_rename_request,
59};
60use risingwave_pb::meta::table_fragments::PbActorStatus;
61use risingwave_pb::stream_plan::stream_node::NodeBody;
62use risingwave_pb::stream_plan::{
63    PbDispatchOutputMapping, PbStreamFragmentGraph, PbStreamNode, PbUpstreamSinkInfo,
64    StreamFragmentGraph as StreamFragmentGraphProto,
65};
66use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
67use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
68use strum::Display;
69use thiserror_ext::AsReport;
70use tokio::sync::{OwnedSemaphorePermit, Semaphore};
71use tokio::time::sleep;
72use tracing::Instrument;
73
74use crate::barrier::BarrierManagerRef;
75use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
76use crate::controller::cluster::StreamingClusterInfo;
77use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
78use crate::controller::utils::build_select_node_list;
79use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
80use crate::manager::{
81    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
82    NotificationVersion, StreamingJob, StreamingJobType,
83};
84use crate::model::{
85    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation,
86    FragmentId as CatalogFragmentId, StreamContext, StreamJobFragments, StreamJobFragmentsToCreate,
87    TableParallelism,
88};
89use crate::stream::cdc::{
90    is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
91};
92use crate::stream::{
93    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
94    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
95    FragmentGraphDownstreamContext, FragmentGraphUpstreamContext, GlobalStreamManagerRef,
96    ReplaceStreamJobContext, ReschedulePolicy, SourceChange, SourceManagerRef, StreamFragmentGraph,
97    UpstreamSinkInfo, check_sink_fragments_support_refresh_schema, create_source_worker,
98    rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
99};
100use crate::telemetry::report_event;
101use crate::{MetaError, MetaResult};
102
103#[derive(PartialEq)]
104pub enum DropMode {
105    Restrict,
106    Cascade,
107}
108
109impl DropMode {
110    pub fn from_request_setting(cascade: bool) -> DropMode {
111        if cascade {
112            DropMode::Cascade
113        } else {
114            DropMode::Restrict
115        }
116    }
117}
118
119#[derive(strum::AsRefStr)]
120pub enum StreamingJobId {
121    MaterializedView(TableId),
122    Sink(SinkId),
123    Table(Option<SourceId>, TableId),
124    Index(IndexId),
125}
126
127impl std::fmt::Display for StreamingJobId {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "{}", self.as_ref())?;
130        write!(f, "({})", self.id())
131    }
132}
133
134impl StreamingJobId {
135    fn id(&self) -> JobId {
136        match self {
137            StreamingJobId::MaterializedView(id) | StreamingJobId::Table(_, id) => id.as_job_id(),
138            StreamingJobId::Index(id) => id.as_job_id(),
139            StreamingJobId::Sink(id) => id.as_job_id(),
140        }
141    }
142}
143
144/// It’s used to describe the information of the job that needs to be replaced
145/// and it will be used during replacing table and creating sink into table operations.
146pub struct ReplaceStreamJobInfo {
147    pub streaming_job: StreamingJob,
148    pub fragment_graph: StreamFragmentGraphProto,
149}
150
151#[derive(Display)]
152pub enum DdlCommand {
153    CreateDatabase(Database),
154    DropDatabase(DatabaseId),
155    CreateSchema(Schema),
156    DropSchema(SchemaId, DropMode),
157    CreateNonSharedSource(Source),
158    DropSource(SourceId, DropMode),
159    CreateFunction(Function),
160    DropFunction(FunctionId, DropMode),
161    CreateView(View, HashSet<ObjectId>),
162    DropView(ViewId, DropMode),
163    CreateStreamingJob {
164        stream_job: StreamingJob,
165        fragment_graph: StreamFragmentGraphProto,
166        dependencies: HashSet<ObjectId>,
167        specific_resource_group: Option<String>, // specific resource group
168        if_not_exists: bool,
169    },
170    DropStreamingJob {
171        job_id: StreamingJobId,
172        drop_mode: DropMode,
173    },
174    AlterName(alter_name_request::Object, String),
175    AlterSwapRename(alter_swap_rename_request::Object),
176    ReplaceStreamJob(ReplaceStreamJobInfo),
177    AlterNonSharedSource(Source),
178    AlterObjectOwner(Object, UserId),
179    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
180    CreateConnection(Connection),
181    DropConnection(ConnectionId, DropMode),
182    CreateSecret(Secret),
183    AlterSecret(Secret),
184    DropSecret(SecretId),
185    CommentOn(Comment),
186    CreateSubscription(Subscription),
187    DropSubscription(SubscriptionId, DropMode),
188    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
189}
190
191impl DdlCommand {
192    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
193    fn object(&self) -> Either<String, ObjectId> {
194        use Either::*;
195        match self {
196            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
197            DdlCommand::DropDatabase(id) => Right(id.as_object_id()),
198            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
199            DdlCommand::DropSchema(id, _) => Right(id.as_object_id()),
200            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
201            DdlCommand::DropSource(id, _) => Right(id.as_object_id()),
202            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
203            DdlCommand::DropFunction(id, _) => Right(id.as_object_id()),
204            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
205            DdlCommand::DropView(id, _) => Right(id.as_object_id()),
206            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
207            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id().as_object_id()),
208            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
209            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
210            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
211            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
212            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
213            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
214            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
215            DdlCommand::DropConnection(id, _) => Right(id.as_object_id()),
216            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
217            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
218            DdlCommand::DropSecret(id) => Right(id.as_object_id()),
219            DdlCommand::CommentOn(comment) => Right(comment.table_id.into()),
220            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
221            DdlCommand::DropSubscription(id, _) => Right(id.as_object_id()),
222            DdlCommand::AlterDatabaseParam(id, _) => Right(id.as_object_id()),
223        }
224    }
225
226    fn allow_in_recovery(&self) -> bool {
227        match self {
228            DdlCommand::DropDatabase(_)
229            | DdlCommand::DropSchema(_, _)
230            | DdlCommand::DropSource(_, _)
231            | DdlCommand::DropFunction(_, _)
232            | DdlCommand::DropView(_, _)
233            | DdlCommand::DropStreamingJob { .. }
234            | DdlCommand::DropConnection(_, _)
235            | DdlCommand::DropSecret(_)
236            | DdlCommand::DropSubscription(_, _)
237            | DdlCommand::AlterName(_, _)
238            | DdlCommand::AlterObjectOwner(_, _)
239            | DdlCommand::AlterSetSchema(_, _)
240            | DdlCommand::CreateDatabase(_)
241            | DdlCommand::CreateSchema(_)
242            | DdlCommand::CreateFunction(_)
243            | DdlCommand::CreateView(_, _)
244            | DdlCommand::CreateConnection(_)
245            | DdlCommand::CommentOn(_)
246            | DdlCommand::CreateSecret(_)
247            | DdlCommand::AlterSecret(_)
248            | DdlCommand::AlterSwapRename(_)
249            | DdlCommand::AlterDatabaseParam(_, _) => true,
250            DdlCommand::CreateStreamingJob { .. }
251            | DdlCommand::CreateNonSharedSource(_)
252            | DdlCommand::ReplaceStreamJob(_)
253            | DdlCommand::AlterNonSharedSource(_)
254            | DdlCommand::CreateSubscription(_) => false,
255        }
256    }
257}
258
259#[derive(Clone)]
260pub struct DdlController {
261    pub(crate) env: MetaSrvEnv,
262
263    pub(crate) metadata_manager: MetadataManager,
264    pub(crate) stream_manager: GlobalStreamManagerRef,
265    pub(crate) source_manager: SourceManagerRef,
266    barrier_manager: BarrierManagerRef,
267
268    // The semaphore is used to limit the number of concurrent streaming job creation.
269    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
270
271    /// Sequence number for DDL commands, used for observability and debugging.
272    seq: Arc<AtomicU64>,
273}
274
275#[derive(Clone)]
276pub struct CreatingStreamingJobPermit {
277    pub(crate) semaphore: Arc<Semaphore>,
278}
279
280impl CreatingStreamingJobPermit {
281    async fn new(env: &MetaSrvEnv) -> Self {
282        let mut permits = env
283            .system_params_reader()
284            .await
285            .max_concurrent_creating_streaming_jobs() as usize;
286        if permits == 0 {
287            // if the system parameter is set to zero, use the max permitted value.
288            permits = Semaphore::MAX_PERMITS;
289        }
290        let semaphore = Arc::new(Semaphore::new(permits));
291
292        let (local_notification_tx, mut local_notification_rx) =
293            tokio::sync::mpsc::unbounded_channel();
294        env.notification_manager()
295            .insert_local_sender(local_notification_tx);
296        let semaphore_clone = semaphore.clone();
297        tokio::spawn(async move {
298            while let Some(notification) = local_notification_rx.recv().await {
299                let LocalNotification::SystemParamsChange(p) = &notification else {
300                    continue;
301                };
302                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
303                if new_permits == 0 {
304                    new_permits = Semaphore::MAX_PERMITS;
305                }
306                match permits.cmp(&new_permits) {
307                    Ordering::Less => {
308                        semaphore_clone.add_permits(new_permits - permits);
309                    }
310                    Ordering::Equal => continue,
311                    Ordering::Greater => {
312                        let to_release = permits - new_permits;
313                        let reduced = semaphore_clone.forget_permits(to_release);
314                        // TODO: implement dynamic semaphore with limits by ourself.
315                        if reduced != to_release {
316                            tracing::warn!(
317                                "no enough permits to release, expected {}, but reduced {}",
318                                to_release,
319                                reduced
320                            );
321                        }
322                    }
323                }
324                tracing::info!(
325                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
326                    permits,
327                    new_permits
328                );
329                permits = new_permits;
330            }
331        });
332
333        Self { semaphore }
334    }
335}
336
337impl DdlController {
338    pub async fn new(
339        env: MetaSrvEnv,
340        metadata_manager: MetadataManager,
341        stream_manager: GlobalStreamManagerRef,
342        source_manager: SourceManagerRef,
343        barrier_manager: BarrierManagerRef,
344    ) -> Self {
345        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
346        Self {
347            env,
348            metadata_manager,
349            stream_manager,
350            source_manager,
351            barrier_manager,
352            creating_streaming_job_permits,
353            seq: Arc::new(AtomicU64::new(0)),
354        }
355    }
356
357    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
358    pub fn next_seq(&self) -> u64 {
359        // This is a simple atomic increment operation.
360        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
361    }
362
363    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
364    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
365    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
366    /// would be a huge hassle and pain if we don't spawn here.
367    ///
368    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
369    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
370        if !command.allow_in_recovery() {
371            self.barrier_manager.check_status_running()?;
372        }
373
374        let await_tree_key = format!("DDL Command {}", self.next_seq());
375        let await_tree_span = await_tree::span!("{command}({})", command.object());
376
377        let ctrl = self.clone();
378        let fut = async move {
379            match command {
380                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
381                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
382                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
383                DdlCommand::DropSchema(schema_id, drop_mode) => {
384                    ctrl.drop_schema(schema_id, drop_mode).await
385                }
386                DdlCommand::CreateNonSharedSource(source) => {
387                    ctrl.create_non_shared_source(source).await
388                }
389                DdlCommand::DropSource(source_id, drop_mode) => {
390                    ctrl.drop_source(source_id, drop_mode).await
391                }
392                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
393                DdlCommand::DropFunction(function_id, drop_mode) => {
394                    ctrl.drop_function(function_id, drop_mode).await
395                }
396                DdlCommand::CreateView(view, dependencies) => {
397                    ctrl.create_view(view, dependencies).await
398                }
399                DdlCommand::DropView(view_id, drop_mode) => {
400                    ctrl.drop_view(view_id, drop_mode).await
401                }
402                DdlCommand::CreateStreamingJob {
403                    stream_job,
404                    fragment_graph,
405                    dependencies,
406                    specific_resource_group,
407                    if_not_exists,
408                } => {
409                    ctrl.create_streaming_job(
410                        stream_job,
411                        fragment_graph,
412                        dependencies,
413                        specific_resource_group,
414                        if_not_exists,
415                    )
416                    .await
417                }
418                DdlCommand::DropStreamingJob { job_id, drop_mode } => {
419                    ctrl.drop_streaming_job(job_id, drop_mode).await
420                }
421                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
422                    streaming_job,
423                    fragment_graph,
424                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
425                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
426                DdlCommand::AlterObjectOwner(object, owner_id) => {
427                    ctrl.alter_owner(object, owner_id).await
428                }
429                DdlCommand::AlterSetSchema(object, new_schema_id) => {
430                    ctrl.alter_set_schema(object, new_schema_id).await
431                }
432                DdlCommand::CreateConnection(connection) => {
433                    ctrl.create_connection(connection).await
434                }
435                DdlCommand::DropConnection(connection_id, drop_mode) => {
436                    ctrl.drop_connection(connection_id, drop_mode).await
437                }
438                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
439                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
440                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
441                DdlCommand::AlterNonSharedSource(source) => {
442                    ctrl.alter_non_shared_source(source).await
443                }
444                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
445                DdlCommand::CreateSubscription(subscription) => {
446                    ctrl.create_subscription(subscription).await
447                }
448                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
449                    ctrl.drop_subscription(subscription_id, drop_mode).await
450                }
451                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
452                DdlCommand::AlterDatabaseParam(database_id, param) => {
453                    ctrl.alter_database_param(database_id, param).await
454                }
455            }
456        }
457        .in_current_span();
458        let fut = (self.env.await_tree_reg())
459            .register(await_tree_key, await_tree_span)
460            .instrument(Box::pin(fut));
461        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
462        Ok(Some(WaitVersion {
463            catalog_version: notification_version,
464            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
465        }))
466    }
467
468    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
469        self.barrier_manager.get_ddl_progress().await
470    }
471
472    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
473        let (version, updated_db) = self
474            .metadata_manager
475            .catalog_controller
476            .create_database(database)
477            .await?;
478        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
479        self.barrier_manager
480            .update_database_barrier(
481                updated_db.database_id,
482                updated_db.barrier_interval_ms.map(|v| v as u32),
483                updated_db.checkpoint_frequency.map(|v| v as u64),
484            )
485            .await?;
486        Ok(version)
487    }
488
489    #[tracing::instrument(skip(self), level = "debug")]
490    pub async fn reschedule_streaming_job(
491        &self,
492        job_id: JobId,
493        target: ReschedulePolicy,
494        mut deferred: bool,
495    ) -> MetaResult<()> {
496        tracing::info!("altering parallelism for job {}", job_id);
497        if self.barrier_manager.check_status_running().is_err() {
498            tracing::info!(
499                "alter parallelism is set to deferred mode because the system is in recovery state"
500            );
501            deferred = true;
502        }
503
504        self.stream_manager
505            .reschedule_streaming_job(job_id, target, deferred)
506            .await
507    }
508
509    pub async fn reschedule_cdc_table_backfill(
510        &self,
511        job_id: JobId,
512        target: ReschedulePolicy,
513    ) -> MetaResult<()> {
514        tracing::info!("alter CDC table backfill parallelism");
515        if self.barrier_manager.check_status_running().is_err() {
516            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
517        }
518        self.stream_manager
519            .reschedule_cdc_table_backfill(job_id, target)
520            .await
521    }
522
523    pub async fn reschedule_fragments(
524        &self,
525        fragment_targets: HashMap<FragmentId, Option<StreamingParallelism>>,
526    ) -> MetaResult<()> {
527        tracing::info!(
528            "altering parallelism for fragments {:?}",
529            fragment_targets.keys()
530        );
531        let fragment_targets = fragment_targets
532            .into_iter()
533            .map(|(fragment_id, parallelism)| (fragment_id as CatalogFragmentId, parallelism))
534            .collect();
535
536        self.stream_manager
537            .reschedule_fragments(fragment_targets)
538            .await
539    }
540
541    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
542        self.drop_object(ObjectType::Database, database_id, DropMode::Cascade)
543            .await
544    }
545
546    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
547        self.metadata_manager
548            .catalog_controller
549            .create_schema(schema)
550            .await
551    }
552
553    async fn drop_schema(
554        &self,
555        schema_id: SchemaId,
556        drop_mode: DropMode,
557    ) -> MetaResult<NotificationVersion> {
558        self.drop_object(ObjectType::Schema, schema_id, drop_mode)
559            .await
560    }
561
562    /// Shared source is handled in [`Self::create_streaming_job`]
563    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
564        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
565            .await
566            .context("failed to create source worker")?;
567
568        let (source_id, version) = self
569            .metadata_manager
570            .catalog_controller
571            .create_source(source)
572            .await?;
573        self.source_manager
574            .register_source_with_handle(source_id, handle)
575            .await;
576        Ok(version)
577    }
578
579    async fn drop_source(
580        &self,
581        source_id: SourceId,
582        drop_mode: DropMode,
583    ) -> MetaResult<NotificationVersion> {
584        self.drop_object(ObjectType::Source, source_id, drop_mode)
585            .await
586    }
587
588    /// This replaces the source in the catalog.
589    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
590    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
591        self.metadata_manager
592            .catalog_controller
593            .alter_non_shared_source(source)
594            .await
595    }
596
597    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
598        self.metadata_manager
599            .catalog_controller
600            .create_function(function)
601            .await
602    }
603
604    async fn drop_function(
605        &self,
606        function_id: FunctionId,
607        drop_mode: DropMode,
608    ) -> MetaResult<NotificationVersion> {
609        self.drop_object(ObjectType::Function, function_id, drop_mode)
610            .await
611    }
612
613    async fn create_view(
614        &self,
615        view: View,
616        dependencies: HashSet<ObjectId>,
617    ) -> MetaResult<NotificationVersion> {
618        self.metadata_manager
619            .catalog_controller
620            .create_view(view, dependencies)
621            .await
622    }
623
624    async fn drop_view(
625        &self,
626        view_id: ViewId,
627        drop_mode: DropMode,
628    ) -> MetaResult<NotificationVersion> {
629        self.drop_object(ObjectType::View, view_id, drop_mode).await
630    }
631
632    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
633        validate_connection(&connection).await?;
634        self.metadata_manager
635            .catalog_controller
636            .create_connection(connection)
637            .await
638    }
639
640    async fn drop_connection(
641        &self,
642        connection_id: ConnectionId,
643        drop_mode: DropMode,
644    ) -> MetaResult<NotificationVersion> {
645        self.drop_object(ObjectType::Connection, connection_id, drop_mode)
646            .await
647    }
648
649    async fn alter_database_param(
650        &self,
651        database_id: DatabaseId,
652        param: AlterDatabaseParam,
653    ) -> MetaResult<NotificationVersion> {
654        let (version, updated_db) = self
655            .metadata_manager
656            .catalog_controller
657            .alter_database_param(database_id, param)
658            .await?;
659        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
660        self.barrier_manager
661            .update_database_barrier(
662                database_id,
663                updated_db.barrier_interval_ms.map(|v| v as u32),
664                updated_db.checkpoint_frequency.map(|v| v as u64),
665            )
666            .await?;
667        Ok(version)
668    }
669
670    // The 'secret' part of the request we receive from the frontend is in plaintext;
671    // here, we need to encrypt it before storing it in the catalog.
672    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
673        let secret_store_private_key = self
674            .env
675            .opts
676            .secret_store_private_key
677            .clone()
678            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
679
680        let encrypted_payload = SecretEncryption::encrypt(
681            secret_store_private_key.as_slice(),
682            secret.get_value().as_slice(),
683        )
684        .context(format!("failed to encrypt secret {}", secret.name))?;
685        Ok(encrypted_payload
686            .serialize()
687            .context(format!("failed to serialize secret {}", secret.name))?)
688    }
689
690    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
691        // The 'secret' part of the request we receive from the frontend is in plaintext;
692        // here, we need to encrypt it before storing it in the catalog.
693        let secret_plain_payload = secret.value.clone();
694        let encrypted_payload = self.get_encrypted_payload(&secret)?;
695        secret.value = encrypted_payload;
696
697        self.metadata_manager
698            .catalog_controller
699            .create_secret(secret, secret_plain_payload)
700            .await
701    }
702
703    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
704        self.drop_object(ObjectType::Secret, secret_id, DropMode::Restrict)
705            .await
706    }
707
708    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
709        let secret_plain_payload = secret.value.clone();
710        let encrypted_payload = self.get_encrypted_payload(&secret)?;
711        secret.value = encrypted_payload;
712        self.metadata_manager
713            .catalog_controller
714            .alter_secret(secret, secret_plain_payload)
715            .await
716    }
717
718    async fn create_subscription(
719        &self,
720        mut subscription: Subscription,
721    ) -> MetaResult<NotificationVersion> {
722        tracing::debug!("create subscription");
723        let _permit = self
724            .creating_streaming_job_permits
725            .semaphore
726            .acquire()
727            .await
728            .unwrap();
729        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
730        self.metadata_manager
731            .catalog_controller
732            .create_subscription_catalog(&mut subscription)
733            .await?;
734        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
735            tracing::debug!(error = %err.as_report(), "failed to create subscription");
736            let _ = self
737                .metadata_manager
738                .catalog_controller
739                .try_abort_creating_subscription(subscription.id)
740                .await
741                .inspect_err(|e| {
742                    tracing::error!(
743                        error = %e.as_report(),
744                        "failed to abort create subscription after failure"
745                    );
746                });
747            return Err(err);
748        }
749
750        let version = self
751            .metadata_manager
752            .catalog_controller
753            .notify_create_subscription(subscription.id)
754            .await?;
755        tracing::debug!("finish create subscription");
756        Ok(version)
757    }
758
759    async fn drop_subscription(
760        &self,
761        subscription_id: SubscriptionId,
762        drop_mode: DropMode,
763    ) -> MetaResult<NotificationVersion> {
764        tracing::debug!("preparing drop subscription");
765        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
766        let subscription = self
767            .metadata_manager
768            .catalog_controller
769            .get_subscription_by_id(subscription_id)
770            .await?;
771        let table_id = subscription.dependent_table_id;
772        let database_id = subscription.database_id;
773        let (_, version) = self
774            .metadata_manager
775            .catalog_controller
776            .drop_object(ObjectType::Subscription, subscription_id, drop_mode)
777            .await?;
778        self.stream_manager
779            .drop_subscription(database_id, subscription_id, table_id)
780            .await;
781        tracing::debug!("finish drop subscription");
782        Ok(version)
783    }
784
785    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
786    #[await_tree::instrument]
787    pub(crate) async fn validate_cdc_table(
788        &self,
789        table: &Table,
790        table_fragments: &StreamJobFragments,
791    ) -> MetaResult<()> {
792        let stream_scan_fragment = table_fragments
793            .fragments
794            .values()
795            .filter(|f| {
796                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
797                    || f.fragment_type_mask
798                        .contains(FragmentTypeFlag::StreamCdcScan)
799            })
800            .exactly_one()
801            .ok()
802            .with_context(|| {
803                format!(
804                    "expect exactly one stream scan fragment, got: {:?}",
805                    table_fragments.fragments
806                )
807            })?;
808        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
809            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
810                if let Some(o) = node.options
811                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
812                {
813                    // Use parallel CDC backfill.
814                } else {
815                    assert_eq!(
816                        stream_scan_fragment.actors.len(),
817                        1,
818                        "Stream scan fragment should have only one actor"
819                    );
820                }
821            }
822        }
823        let mut found_cdc_scan = false;
824        match &stream_scan_fragment.nodes.node_body {
825            Some(NodeBody::StreamCdcScan(_)) => {
826                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
827                if self
828                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
829                    .await?
830                {
831                    found_cdc_scan = true;
832                }
833            }
834            // When there's generated columns, the cdc scan node is wrapped in a project node
835            Some(NodeBody::Project(_)) => {
836                for input in &stream_scan_fragment.nodes.input {
837                    assert_parallelism(stream_scan_fragment, &input.node_body);
838                    if self
839                        .validate_cdc_table_inner(&input.node_body, table.id)
840                        .await?
841                    {
842                        found_cdc_scan = true;
843                    }
844                }
845            }
846            _ => {
847                bail!("Unexpected node body for stream cdc scan");
848            }
849        };
850        if !found_cdc_scan {
851            bail!("No stream cdc scan node found in stream scan fragment");
852        }
853        Ok(())
854    }
855
856    async fn validate_cdc_table_inner(
857        &self,
858        node_body: &Option<NodeBody>,
859        table_id: TableId,
860    ) -> MetaResult<bool> {
861        let meta_store = self.env.meta_store_ref();
862        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
863            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
864        {
865            let options_with_secret = WithOptionsSecResolved::new(
866                cdc_table_desc.connect_properties.clone(),
867                cdc_table_desc.secret_refs.clone(),
868            );
869
870            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
871            props.init_from_pb_cdc_table_desc(cdc_table_desc);
872
873            // Try creating a split enumerator to validate
874            let _enumerator = props
875                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
876                .await?;
877
878            if is_parallelized_backfill_enabled(stream_cdc_scan) {
879                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
880                try_init_parallel_cdc_table_snapshot_splits(
881                    table_id,
882                    cdc_table_desc,
883                    meta_store,
884                    &stream_cdc_scan.options,
885                    self.env.opts.cdc_table_split_init_insert_batch_size,
886                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
887                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
888                )
889                .await?;
890            }
891
892            tracing::debug!(?table_id, "validate cdc table success");
893            Ok(true)
894        } else {
895            Ok(false)
896        }
897    }
898
899    pub async fn validate_table_for_sink(&self, table_id: TableId) -> MetaResult<()> {
900        let migrated = self
901            .metadata_manager
902            .catalog_controller
903            .has_table_been_migrated(table_id)
904            .await?;
905        if !migrated {
906            Err(anyhow::anyhow!("Creating sink into table is not allowed for unmigrated table {}. Please migrate it first.", table_id).into())
907        } else {
908            Ok(())
909        }
910    }
911
912    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
913    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
914    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
915    pub async fn create_streaming_job(
916        &self,
917        mut streaming_job: StreamingJob,
918        fragment_graph: StreamFragmentGraphProto,
919        dependencies: HashSet<ObjectId>,
920        specific_resource_group: Option<String>,
921        if_not_exists: bool,
922    ) -> MetaResult<NotificationVersion> {
923        if let StreamingJob::Sink(sink) = &streaming_job
924            && let Some(target_table) = sink.target_table
925        {
926            self.validate_table_for_sink(target_table).await?;
927        }
928        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
929        let check_ret = self
930            .metadata_manager
931            .catalog_controller
932            .create_job_catalog(
933                &mut streaming_job,
934                &ctx,
935                &fragment_graph.parallelism,
936                fragment_graph.max_parallelism as _,
937                dependencies,
938                specific_resource_group.clone(),
939            )
940            .await;
941        if let Err(meta_err) = check_ret {
942            if !if_not_exists {
943                return Err(meta_err);
944            }
945            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
946                if streaming_job.create_type() == CreateType::Foreground {
947                    let database_id = streaming_job.database_id();
948                    self.metadata_manager
949                        .wait_streaming_job_finished(database_id, *job_id)
950                        .await
951                } else {
952                    Ok(IGNORED_NOTIFICATION_VERSION)
953                }
954            } else {
955                Err(meta_err)
956            };
957        }
958        let job_id = streaming_job.id();
959        tracing::debug!(
960            id = %job_id,
961            definition = streaming_job.definition(),
962            create_type = streaming_job.create_type().as_str_name(),
963            job_type = ?streaming_job.job_type(),
964            "starting streaming job",
965        );
966        // TODO: acquire permits for recovered background DDLs.
967        let permit = self
968            .creating_streaming_job_permits
969            .semaphore
970            .clone()
971            .acquire_owned()
972            .instrument_await("acquire_creating_streaming_job_permit")
973            .await
974            .unwrap();
975        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
976
977        let name = streaming_job.name();
978        let definition = streaming_job.definition();
979        let source_id = match &streaming_job {
980            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
981            _ => None,
982        };
983
984        // create streaming job.
985        match self
986            .create_streaming_job_inner(
987                ctx,
988                streaming_job,
989                fragment_graph,
990                specific_resource_group,
991                permit,
992            )
993            .await
994        {
995            Ok(version) => Ok(version),
996            Err(err) => {
997                tracing::error!(id = %job_id, error = %err.as_report(), "failed to create streaming job");
998                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
999                    id: job_id,
1000                    name,
1001                    definition,
1002                    error: err.as_report().to_string(),
1003                };
1004                self.env.event_log_manager_ref().add_event_logs(vec![
1005                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1006                ]);
1007                let (aborted, _) = self
1008                    .metadata_manager
1009                    .catalog_controller
1010                    .try_abort_creating_streaming_job(job_id, false)
1011                    .await?;
1012                if aborted {
1013                    tracing::warn!(id = %job_id, "aborted streaming job");
1014                    // FIXME: might also need other cleanup here
1015                    if let Some(source_id) = source_id {
1016                        self.source_manager
1017                            .apply_source_change(SourceChange::DropSource {
1018                                dropped_source_ids: vec![source_id],
1019                            })
1020                            .await;
1021                    }
1022                }
1023                Err(err)
1024            }
1025        }
1026    }
1027
1028    #[await_tree::instrument(boxed)]
1029    async fn create_streaming_job_inner(
1030        &self,
1031        ctx: StreamContext,
1032        mut streaming_job: StreamingJob,
1033        fragment_graph: StreamFragmentGraphProto,
1034        specific_resource_group: Option<String>,
1035        permit: OwnedSemaphorePermit,
1036    ) -> MetaResult<NotificationVersion> {
1037        let mut fragment_graph =
1038            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1039        streaming_job.set_info_from_graph(&fragment_graph);
1040
1041        // create internal table catalogs and refill table id.
1042        let incomplete_internal_tables = fragment_graph
1043            .incomplete_internal_tables()
1044            .into_values()
1045            .collect_vec();
1046        let table_id_map = self
1047            .metadata_manager
1048            .catalog_controller
1049            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1050            .await?;
1051        fragment_graph.refill_internal_table_ids(table_id_map);
1052
1053        // create fragment and actor catalogs.
1054        tracing::debug!(id = %streaming_job.id(), "building streaming job");
1055        let (ctx, stream_job_fragments) = self
1056            .build_stream_job(ctx, streaming_job, fragment_graph, specific_resource_group)
1057            .await?;
1058
1059        let streaming_job = &ctx.streaming_job;
1060
1061        match streaming_job {
1062            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1063                self.validate_cdc_table(table, &stream_job_fragments)
1064                    .await?;
1065            }
1066            StreamingJob::Table(Some(source), ..) => {
1067                // Register the source on the connector node.
1068                self.source_manager.register_source(source).await?;
1069                let connector_name = source
1070                    .get_with_properties()
1071                    .get(UPSTREAM_SOURCE_KEY)
1072                    .cloned();
1073                let attr = source.info.as_ref().map(|source_info| {
1074                    jsonbb::json!({
1075                            "format": source_info.format().as_str_name(),
1076                            "encode": source_info.row_encode().as_str_name(),
1077                    })
1078                });
1079                report_create_object(
1080                    streaming_job.id(),
1081                    "source",
1082                    PbTelemetryDatabaseObject::Source,
1083                    connector_name,
1084                    attr,
1085                );
1086            }
1087            StreamingJob::Sink(sink) => {
1088                if sink.auto_refresh_schema_from_table.is_some() {
1089                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1090                }
1091                // Validate the sink on the connector node.
1092                validate_sink(sink).await?;
1093                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1094                let attr = sink.format_desc.as_ref().map(|sink_info| {
1095                    jsonbb::json!({
1096                        "format": sink_info.format().as_str_name(),
1097                        "encode": sink_info.encode().as_str_name(),
1098                    })
1099                });
1100                report_create_object(
1101                    streaming_job.id(),
1102                    "sink",
1103                    PbTelemetryDatabaseObject::Sink,
1104                    connector_name,
1105                    attr,
1106                );
1107            }
1108            StreamingJob::Source(source) => {
1109                // Register the source on the connector node.
1110                self.source_manager.register_source(source).await?;
1111                let connector_name = source
1112                    .get_with_properties()
1113                    .get(UPSTREAM_SOURCE_KEY)
1114                    .cloned();
1115                let attr = source.info.as_ref().map(|source_info| {
1116                    jsonbb::json!({
1117                            "format": source_info.format().as_str_name(),
1118                            "encode": source_info.row_encode().as_str_name(),
1119                    })
1120                });
1121                report_create_object(
1122                    streaming_job.id(),
1123                    "source",
1124                    PbTelemetryDatabaseObject::Source,
1125                    connector_name,
1126                    attr,
1127                );
1128            }
1129            _ => {}
1130        }
1131
1132        self.metadata_manager
1133            .catalog_controller
1134            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1135            .await?;
1136
1137        // create streaming jobs.
1138        let version = self
1139            .stream_manager
1140            .create_streaming_job(stream_job_fragments, ctx, permit)
1141            .await?;
1142
1143        Ok(version)
1144    }
1145
1146    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1147    pub async fn drop_object(
1148        &self,
1149        object_type: ObjectType,
1150        object_id: impl Into<ObjectId>,
1151        drop_mode: DropMode,
1152    ) -> MetaResult<NotificationVersion> {
1153        let object_id = object_id.into();
1154        let (release_ctx, version) = self
1155            .metadata_manager
1156            .catalog_controller
1157            .drop_object(object_type, object_id, drop_mode)
1158            .await?;
1159
1160        if object_type == ObjectType::Source {
1161            self.env
1162                .notification_manager_ref()
1163                .notify_local_subscribers(LocalNotification::SourceDropped(object_id));
1164        }
1165
1166        let ReleaseContext {
1167            database_id,
1168            removed_streaming_job_ids,
1169            removed_state_table_ids,
1170            removed_source_ids,
1171            removed_secret_ids: secret_ids,
1172            removed_source_fragments,
1173            removed_actors,
1174            removed_fragments,
1175            removed_sink_fragment_by_targets,
1176            removed_iceberg_table_sinks,
1177        } = release_ctx;
1178
1179        let _guard = self.source_manager.pause_tick().await;
1180        self.stream_manager
1181            .drop_streaming_jobs(
1182                database_id,
1183                removed_actors.iter().map(|id| *id as _).collect(),
1184                removed_streaming_job_ids,
1185                removed_state_table_ids,
1186                removed_fragments.iter().map(|id| *id as _).collect(),
1187                removed_sink_fragment_by_targets
1188                    .into_iter()
1189                    .map(|(target, sinks)| {
1190                        (target as _, sinks.into_iter().map(|id| id as _).collect())
1191                    })
1192                    .collect(),
1193            )
1194            .await;
1195
1196        // clean up sources after dropping streaming jobs.
1197        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1198        self.source_manager
1199            .apply_source_change(SourceChange::DropSource {
1200                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1201            })
1202            .await;
1203
1204        // unregister fragments and actors from source manager.
1205        // FIXME: need also unregister source backfill fragments.
1206        let dropped_source_fragments = removed_source_fragments;
1207        self.source_manager
1208            .apply_source_change(SourceChange::DropMv {
1209                dropped_source_fragments,
1210            })
1211            .await;
1212
1213        // clean up iceberg table sinks
1214        for sink in removed_iceberg_table_sinks {
1215            let sink_param = SinkParam::try_from_sink_catalog(sink.into())
1216                .expect("Iceberg sink should be valid");
1217            let iceberg_sink =
1218                IcebergSink::try_from(sink_param).expect("Iceberg sink should be valid");
1219            if let Ok(iceberg_catalog) = iceberg_sink.config.create_catalog().await {
1220                let table_identifier = iceberg_sink.config.full_table_name().unwrap();
1221                tracing::info!(
1222                    "dropping iceberg table {} for dropped sink",
1223                    table_identifier
1224                );
1225
1226                let _ = iceberg_catalog
1227                    .drop_table(&table_identifier)
1228                    .await
1229                    .inspect_err(|err| {
1230                        tracing::error!(
1231                            "failed to drop iceberg table {} during cleanup: {}",
1232                            table_identifier,
1233                            err.as_report()
1234                        );
1235                    });
1236            }
1237        }
1238
1239        // remove secrets.
1240        for secret in secret_ids {
1241            LocalSecretManager::global().remove_secret(secret);
1242        }
1243        Ok(version)
1244    }
1245
1246    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1247    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1248    pub async fn replace_job(
1249        &self,
1250        mut streaming_job: StreamingJob,
1251        fragment_graph: StreamFragmentGraphProto,
1252    ) -> MetaResult<NotificationVersion> {
1253        match &streaming_job {
1254            StreamingJob::Table(..)
1255            | StreamingJob::Source(..)
1256            | StreamingJob::MaterializedView(..) => {}
1257            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1258                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1259            }
1260        }
1261
1262        let job_id = streaming_job.id();
1263
1264        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1265        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1266
1267        // Ensure the max parallelism unchanged before replacing table.
1268        let original_max_parallelism = self
1269            .metadata_manager
1270            .get_job_max_parallelism(streaming_job.id())
1271            .await?;
1272        let fragment_graph = PbStreamFragmentGraph {
1273            max_parallelism: original_max_parallelism as _,
1274            ..fragment_graph
1275        };
1276
1277        // 1. build fragment graph.
1278        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1279        streaming_job.set_info_from_graph(&fragment_graph);
1280
1281        // make it immutable
1282        let streaming_job = streaming_job;
1283
1284        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1285            let auto_refresh_schema_sinks = self
1286                .metadata_manager
1287                .catalog_controller
1288                .get_sink_auto_refresh_schema_from(table.id)
1289                .await?;
1290            if !auto_refresh_schema_sinks.is_empty() {
1291                let original_table_columns = self
1292                    .metadata_manager
1293                    .catalog_controller
1294                    .get_table_columns(table.id)
1295                    .await?;
1296                // compare column id to find newly added columns
1297                let mut original_table_column_ids: HashSet<_> = original_table_columns
1298                    .iter()
1299                    .map(|col| col.column_id())
1300                    .collect();
1301                let newly_added_columns = table
1302                    .columns
1303                    .iter()
1304                    .filter(|col| {
1305                        !original_table_column_ids.remove(&ColumnId::new(
1306                            col.column_desc.as_ref().unwrap().column_id as _,
1307                        ))
1308                    })
1309                    .map(|col| ColumnCatalog::from(col.clone()))
1310                    .collect_vec();
1311                if !original_table_column_ids.is_empty() {
1312                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1313                }
1314                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1315                for sink in auto_refresh_schema_sinks {
1316                    let sink_job_fragments = self
1317                        .metadata_manager
1318                        .get_job_fragments_by_id(sink.id.as_job_id())
1319                        .await?;
1320                    if sink_job_fragments.fragments.len() != 1 {
1321                        return Err(anyhow!(
1322                            "auto schema refresh sink must have only one fragment, but got {}",
1323                            sink_job_fragments.fragments.len()
1324                        )
1325                        .into());
1326                    }
1327                    let original_sink_fragment =
1328                        sink_job_fragments.fragments.into_values().next().unwrap();
1329                    let (new_sink_fragment, new_schema, new_log_store_table) =
1330                        rewrite_refresh_schema_sink_fragment(
1331                            &original_sink_fragment,
1332                            &sink,
1333                            &newly_added_columns,
1334                            table,
1335                            fragment_graph.table_fragment_id(),
1336                            self.env.id_gen_manager(),
1337                            self.env.actor_id_generator(),
1338                        )?;
1339
1340                    assert_eq!(
1341                        original_sink_fragment.actors.len(),
1342                        new_sink_fragment.actors.len()
1343                    );
1344                    let actor_status = (0..original_sink_fragment.actors.len())
1345                        .map(|i| {
1346                            let worker_node_id = sink_job_fragments.actor_status
1347                                [&original_sink_fragment.actors[i].actor_id]
1348                                .location
1349                                .as_ref()
1350                                .unwrap()
1351                                .worker_node_id;
1352                            (
1353                                new_sink_fragment.actors[i].actor_id,
1354                                PbActorStatus {
1355                                    location: Some(PbActorLocation { worker_node_id }),
1356                                },
1357                            )
1358                        })
1359                        .collect();
1360
1361                    let streaming_job = StreamingJob::Sink(sink);
1362
1363                    let tmp_sink_id = self
1364                        .metadata_manager
1365                        .catalog_controller
1366                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1367                        .await?
1368                        .as_sink_id();
1369                    let StreamingJob::Sink(sink) = streaming_job else {
1370                        unreachable!()
1371                    };
1372
1373                    sinks.push(AutoRefreshSchemaSinkContext {
1374                        tmp_sink_id,
1375                        original_sink: sink,
1376                        original_fragment: original_sink_fragment,
1377                        new_schema,
1378                        newly_add_fields: newly_added_columns
1379                            .iter()
1380                            .map(|col| Field::from(&col.column_desc))
1381                            .collect(),
1382                        new_fragment: new_sink_fragment,
1383                        new_log_store_table,
1384                        actor_status,
1385                    });
1386                }
1387                Some(sinks)
1388            } else {
1389                None
1390            }
1391        } else {
1392            None
1393        };
1394
1395        let tmp_id = self
1396            .metadata_manager
1397            .catalog_controller
1398            .create_job_catalog_for_replace(
1399                &streaming_job,
1400                Some(&ctx),
1401                fragment_graph.specified_parallelism().as_ref(),
1402                Some(fragment_graph.max_parallelism()),
1403            )
1404            .await?;
1405
1406        let tmp_sink_ids = auto_refresh_schema_sinks.as_ref().map(|sinks| {
1407            sinks
1408                .iter()
1409                .map(|sink| sink.tmp_sink_id.as_object_id())
1410                .collect_vec()
1411        });
1412
1413        tracing::debug!(id = %job_id, "building replace streaming job");
1414        let mut updated_sink_catalogs = vec![];
1415
1416        let mut drop_table_connector_ctx = None;
1417        let result: MetaResult<_> = try {
1418            let (mut ctx, mut stream_job_fragments) = self
1419                .build_replace_job(
1420                    ctx,
1421                    &streaming_job,
1422                    fragment_graph,
1423                    tmp_id,
1424                    auto_refresh_schema_sinks,
1425                )
1426                .await?;
1427            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1428            let auto_refresh_schema_sink_finish_ctx =
1429                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1430                    sinks
1431                        .iter()
1432                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1433                            tmp_sink_id: sink.tmp_sink_id,
1434                            original_sink_id: sink.original_sink.id,
1435                            columns: sink.new_schema.clone(),
1436                            new_log_store_table: sink
1437                                .new_log_store_table
1438                                .as_ref()
1439                                .map(|table| (table.id, table.columns.clone())),
1440                        })
1441                        .collect()
1442                });
1443
1444            // Handle table that has incoming sinks.
1445            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1446                let union_fragment = stream_job_fragments.inner.union_fragment_for_table();
1447                let upstream_infos = self
1448                    .metadata_manager
1449                    .catalog_controller
1450                    .get_all_upstream_sink_infos(table, union_fragment.fragment_id as _)
1451                    .await?;
1452                refill_upstream_sink_union_in_table(&mut union_fragment.nodes, &upstream_infos);
1453
1454                for upstream_info in &upstream_infos {
1455                    let upstream_fragment_id = upstream_info.sink_fragment_id;
1456                    ctx.upstream_fragment_downstreams
1457                        .entry(upstream_fragment_id)
1458                        .or_default()
1459                        .push(upstream_info.new_sink_downstream.clone());
1460                    if upstream_info.sink_original_target_columns.is_empty() {
1461                        updated_sink_catalogs.push(upstream_info.sink_id);
1462                    }
1463                }
1464            }
1465
1466            let replace_upstream = ctx.replace_upstream.clone();
1467
1468            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1469                let empty_downstreams = FragmentDownstreamRelation::default();
1470                for sink in sinks {
1471                    self.metadata_manager
1472                        .catalog_controller
1473                        .prepare_streaming_job(
1474                            sink.tmp_sink_id.as_job_id(),
1475                            || [&sink.new_fragment].into_iter(),
1476                            &empty_downstreams,
1477                            true,
1478                            None,
1479                        )
1480                        .await?;
1481                }
1482            }
1483
1484            self.metadata_manager
1485                .catalog_controller
1486                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1487                .await?;
1488
1489            self.stream_manager
1490                .replace_stream_job(stream_job_fragments, ctx)
1491                .await?;
1492            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1493        };
1494
1495        match result {
1496            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1497                let version = self
1498                    .metadata_manager
1499                    .catalog_controller
1500                    .finish_replace_streaming_job(
1501                        tmp_id,
1502                        streaming_job,
1503                        replace_upstream,
1504                        SinkIntoTableContext {
1505                            updated_sink_catalogs,
1506                        },
1507                        drop_table_connector_ctx.as_ref(),
1508                        auto_refresh_schema_sink_finish_ctx,
1509                    )
1510                    .await?;
1511                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1512                    self.source_manager
1513                        .apply_source_change(SourceChange::DropSource {
1514                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1515                        })
1516                        .await;
1517                }
1518                Ok(version)
1519            }
1520            Err(err) => {
1521                tracing::error!(id = %job_id, error = ?err.as_report(), "failed to replace job");
1522                let _ = self.metadata_manager
1523                    .catalog_controller
1524                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1525                    .await.inspect_err(|err| {
1526                    tracing::error!(id = %job_id, error = ?err.as_report(), "failed to abort replacing job");
1527                });
1528                Err(err)
1529            }
1530        }
1531    }
1532
1533    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1534    )]
1535    async fn drop_streaming_job(
1536        &self,
1537        job_id: StreamingJobId,
1538        drop_mode: DropMode,
1539    ) -> MetaResult<NotificationVersion> {
1540        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1541
1542        let (object_id, object_type) = match job_id {
1543            StreamingJobId::MaterializedView(id) => (id.as_object_id(), ObjectType::Table),
1544            StreamingJobId::Sink(id) => (id.as_object_id(), ObjectType::Sink),
1545            StreamingJobId::Table(_, id) => (id.as_object_id(), ObjectType::Table),
1546            StreamingJobId::Index(idx) => (idx.as_object_id(), ObjectType::Index),
1547        };
1548
1549        let job_status = self
1550            .metadata_manager
1551            .catalog_controller
1552            .get_streaming_job_status(job_id.id())
1553            .await?;
1554        let version = match job_status {
1555            JobStatus::Initial => {
1556                unreachable!(
1557                    "Job with Initial status should not notify frontend and therefore should not arrive here"
1558                );
1559            }
1560            JobStatus::Creating => {
1561                self.stream_manager
1562                    .cancel_streaming_jobs(vec![job_id.id()])
1563                    .await?;
1564                IGNORED_NOTIFICATION_VERSION
1565            }
1566            JobStatus::Created => {
1567                let version = self.drop_object(object_type, object_id, drop_mode).await?;
1568                #[cfg(not(madsim))]
1569                if let StreamingJobId::Sink(sink_id) = job_id {
1570                    // delete system table for exactly once iceberg sink
1571                    // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1572                    let db = self.env.meta_store_ref().conn.clone();
1573                    clean_all_rows_by_sink_id(&db, sink_id).await?;
1574                }
1575                version
1576            }
1577        };
1578
1579        Ok(version)
1580    }
1581
1582    /// Resolve the parallelism of the stream job based on the given information.
1583    ///
1584    /// Returns error if user specifies a parallelism that cannot be satisfied.
1585    fn resolve_stream_parallelism(
1586        &self,
1587        specified: Option<NonZeroUsize>,
1588        max: NonZeroUsize,
1589        cluster_info: &StreamingClusterInfo,
1590        resource_group: String,
1591    ) -> MetaResult<NonZeroUsize> {
1592        let available = cluster_info.parallelism(&resource_group);
1593        let Some(available) = NonZeroUsize::new(available) else {
1594            bail_unavailable!(
1595                "no available slots to schedule in resource group \"{}\", \
1596                 have you allocated any compute nodes within this resource group?",
1597                resource_group
1598            );
1599        };
1600
1601        if let Some(specified) = specified {
1602            if specified > max {
1603                bail_invalid_parameter!(
1604                    "specified parallelism {} should not exceed max parallelism {}",
1605                    specified,
1606                    max,
1607                );
1608            }
1609            if specified > available {
1610                bail_unavailable!(
1611                    "insufficient parallelism to schedule in resource group \"{}\", \
1612                     required: {}, available: {}",
1613                    resource_group,
1614                    specified,
1615                    available,
1616                );
1617            }
1618            Ok(specified)
1619        } else {
1620            // Use configured parallelism if no default parallelism is specified.
1621            let default_parallelism = match self.env.opts.default_parallelism {
1622                DefaultParallelism::Full => available,
1623                DefaultParallelism::Default(num) => {
1624                    if num > available {
1625                        bail_unavailable!(
1626                            "insufficient parallelism to schedule in resource group \"{}\", \
1627                            required: {}, available: {}",
1628                            resource_group,
1629                            num,
1630                            available,
1631                        );
1632                    }
1633                    num
1634                }
1635            };
1636
1637            if default_parallelism > max {
1638                tracing::warn!(
1639                    max_parallelism = max.get(),
1640                    resource_group,
1641                    "too many parallelism available, use max parallelism instead",
1642                );
1643            }
1644            Ok(default_parallelism.min(max))
1645        }
1646    }
1647
1648    /// Builds the actor graph:
1649    /// - Add the upstream fragments to the fragment graph
1650    /// - Schedule the fragments based on their distribution
1651    /// - Expand each fragment into one or several actors
1652    /// - Construct the fragment level backfill order control.
1653    #[await_tree::instrument]
1654    pub(crate) async fn build_stream_job(
1655        &self,
1656        stream_ctx: StreamContext,
1657        mut stream_job: StreamingJob,
1658        fragment_graph: StreamFragmentGraph,
1659        specific_resource_group: Option<String>,
1660    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1661        let id = stream_job.id();
1662        let specified_parallelism = fragment_graph.specified_parallelism();
1663        let expr_context = stream_ctx.to_expr_context();
1664        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1665
1666        // 1. Fragment Level ordering graph
1667        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1668
1669        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1670        // contains all information needed for building the actor graph.
1671
1672        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1673            fragment_graph.collect_snapshot_backfill_info()?;
1674        assert!(
1675            snapshot_backfill_info
1676                .iter()
1677                .chain([&cross_db_snapshot_backfill_info])
1678                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1679                .all(|backfill_epoch| backfill_epoch.is_none()),
1680            "should not set backfill epoch when initially build the job: {:?} {:?}",
1681            snapshot_backfill_info,
1682            cross_db_snapshot_backfill_info
1683        );
1684
1685        let locality_fragment_state_table_mapping =
1686            fragment_graph.find_locality_provider_fragment_state_table_mapping();
1687
1688        // check if log store exists for all cross-db upstreams
1689        self.metadata_manager
1690            .catalog_controller
1691            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1692            .await?;
1693
1694        let upstream_table_ids = fragment_graph
1695            .dependent_table_ids()
1696            .iter()
1697            .filter(|id| {
1698                !cross_db_snapshot_backfill_info
1699                    .upstream_mv_table_id_to_backfill_epoch
1700                    .contains_key(id)
1701            })
1702            .cloned()
1703            .collect();
1704
1705        let (upstream_root_fragments, existing_actor_location) = self
1706            .metadata_manager
1707            .get_upstream_root_fragments(&upstream_table_ids)
1708            .await?;
1709
1710        if snapshot_backfill_info.is_some() {
1711            match stream_job {
1712                StreamingJob::MaterializedView(_)
1713                | StreamingJob::Sink(_)
1714                | StreamingJob::Index(_, _) => {}
1715                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1716                    return Err(
1717                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1718                    );
1719                }
1720            }
1721        }
1722
1723        let upstream_actors = upstream_root_fragments
1724            .values()
1725            .map(|(fragment, _)| {
1726                (
1727                    fragment.fragment_id,
1728                    fragment.actors.keys().copied().collect(),
1729                )
1730            })
1731            .collect();
1732
1733        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1734            fragment_graph,
1735            FragmentGraphUpstreamContext {
1736                upstream_root_fragments,
1737                upstream_actor_location: existing_actor_location,
1738            },
1739            (&stream_job).into(),
1740        )?;
1741        let resource_group = match specific_resource_group {
1742            None => {
1743                self.metadata_manager
1744                    .get_database_resource_group(stream_job.database_id())
1745                    .await?
1746            }
1747            Some(resource_group) => resource_group,
1748        };
1749
1750        // 3. Build the actor graph.
1751        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1752
1753        let parallelism = self.resolve_stream_parallelism(
1754            specified_parallelism,
1755            max_parallelism,
1756            &cluster_info,
1757            resource_group.clone(),
1758        )?;
1759
1760        let parallelism = self
1761            .env
1762            .system_params_reader()
1763            .await
1764            .adaptive_parallelism_strategy()
1765            .compute_target_parallelism(parallelism.get());
1766
1767        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1768        let actor_graph_builder = ActorGraphBuilder::new(
1769            id,
1770            resource_group,
1771            complete_graph,
1772            cluster_info,
1773            parallelism,
1774        )?;
1775
1776        let ActorGraphBuildResult {
1777            graph,
1778            downstream_fragment_relations,
1779            building_locations,
1780            upstream_fragment_downstreams,
1781            new_no_shuffle,
1782            replace_upstream,
1783            ..
1784        } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1785        assert!(replace_upstream.is_empty());
1786
1787        // 4. Build the table fragments structure that will be persisted in the stream manager,
1788        // and the context that contains all information needed for building the
1789        // actors on the compute nodes.
1790
1791        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1792        // Otherwise, it defaults to FIXED based on deduction.
1793        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1794            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1795            _ => TableParallelism::Fixed(parallelism.get()),
1796        };
1797
1798        let stream_job_fragments = StreamJobFragments::new(
1799            id,
1800            graph,
1801            &building_locations.actor_locations,
1802            stream_ctx.clone(),
1803            table_parallelism,
1804            max_parallelism.get(),
1805        );
1806
1807        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1808            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1809        }
1810
1811        let new_upstream_sink = if let StreamingJob::Sink(sink) = &stream_job
1812            && let Ok(table_id) = sink.get_target_table()
1813        {
1814            let tables = self
1815                .metadata_manager
1816                .get_table_catalog_by_ids(&[*table_id])
1817                .await?;
1818            let target_table = tables
1819                .first()
1820                .ok_or_else(|| MetaError::catalog_id_not_found("table", *table_id))?;
1821            let sink_fragment = stream_job_fragments
1822                .sink_fragment()
1823                .ok_or_else(|| anyhow::anyhow!("sink fragment not found for sink {}", sink.id))?;
1824            let mview_fragment_id = self
1825                .metadata_manager
1826                .catalog_controller
1827                .get_mview_fragment_by_id(table_id.as_job_id())
1828                .await?;
1829            let upstream_sink_info = build_upstream_sink_info(
1830                sink,
1831                sink_fragment.fragment_id as _,
1832                target_table,
1833                mview_fragment_id,
1834            )?;
1835            Some(upstream_sink_info)
1836        } else {
1837            None
1838        };
1839
1840        let ctx = CreateStreamingJobContext {
1841            upstream_fragment_downstreams,
1842            new_no_shuffle,
1843            upstream_actors,
1844            building_locations,
1845            definition: stream_job.definition(),
1846            create_type: stream_job.create_type(),
1847            job_type: (&stream_job).into(),
1848            streaming_job: stream_job,
1849            new_upstream_sink,
1850            option: CreateStreamingJobOption {},
1851            snapshot_backfill_info,
1852            cross_db_snapshot_backfill_info,
1853            fragment_backfill_ordering,
1854            locality_fragment_state_table_mapping,
1855        };
1856
1857        Ok((
1858            ctx,
1859            StreamJobFragmentsToCreate {
1860                inner: stream_job_fragments,
1861                downstreams: downstream_fragment_relations,
1862            },
1863        ))
1864    }
1865
1866    /// `build_replace_table` builds a job replacement and returns the context and new job
1867    /// fragments.
1868    ///
1869    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1870    /// replacement is finished.
1871    pub(crate) async fn build_replace_job(
1872        &self,
1873        stream_ctx: StreamContext,
1874        stream_job: &StreamingJob,
1875        mut fragment_graph: StreamFragmentGraph,
1876        tmp_job_id: JobId,
1877        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
1878    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1879        match &stream_job {
1880            StreamingJob::Table(..)
1881            | StreamingJob::Source(..)
1882            | StreamingJob::MaterializedView(..) => {}
1883            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1884                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1885            }
1886        }
1887
1888        let id = stream_job.id();
1889        let expr_context = stream_ctx.to_expr_context();
1890
1891        // check if performing drop table connector
1892        let mut drop_table_associated_source_id = None;
1893        if let StreamingJob::Table(None, _, _) = &stream_job {
1894            drop_table_associated_source_id = self
1895                .metadata_manager
1896                .get_table_associated_source_id(id.as_mv_table_id())
1897                .await?;
1898        }
1899
1900        let old_fragments = self.metadata_manager.get_job_fragments_by_id(id).await?;
1901        let old_internal_table_ids = old_fragments.internal_table_ids();
1902
1903        // handle drop table's associated source
1904        let mut drop_table_connector_ctx = None;
1905        if let Some(to_remove_source_id) = drop_table_associated_source_id {
1906            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1907            debug_assert!(old_internal_table_ids.len() == 1);
1908
1909            drop_table_connector_ctx = Some(DropTableConnectorContext {
1910                // we do not remove the original table catalog as it's still needed for the streaming job
1911                // just need to remove the ref to the state table
1912                to_change_streaming_job_id: id,
1913                to_remove_state_table_id: old_internal_table_ids[0], // asserted before
1914                to_remove_source_id,
1915            });
1916        } else if stream_job.is_materialized_view() {
1917            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
1918            // but more robust.
1919            let old_fragments_upstreams = self
1920                .metadata_manager
1921                .catalog_controller
1922                .upstream_fragments(old_fragments.fragment_ids())
1923                .await?;
1924
1925            let old_state_graph =
1926                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
1927            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
1928            let mapping =
1929                state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
1930                    .context("incompatible altering on the streaming job states")?;
1931
1932            fragment_graph.fit_internal_table_ids_with_mapping(mapping);
1933        } else {
1934            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
1935            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
1936            let old_internal_tables = self
1937                .metadata_manager
1938                .get_table_catalog_by_ids(&old_internal_table_ids)
1939                .await?;
1940            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
1941        }
1942
1943        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
1944        // graph that contains all information needed for building the actor graph.
1945        let original_root_fragment = old_fragments
1946            .root_fragment()
1947            .expect("root fragment not found");
1948
1949        let job_type = StreamingJobType::from(stream_job);
1950
1951        // Extract the downstream fragments from the fragment graph.
1952        let (mut downstream_fragments, mut downstream_actor_location) =
1953            self.metadata_manager.get_downstream_fragments(id).await?;
1954
1955        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
1956            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
1957                .iter()
1958                .map(|sink| sink.original_fragment.fragment_id)
1959                .collect();
1960            for (_, downstream_fragment, _) in &mut downstream_fragments {
1961                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
1962                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
1963                }) {
1964                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
1965                    for actor_id in downstream_fragment.actors.keys() {
1966                        downstream_actor_location.remove(actor_id);
1967                    }
1968                    for (actor_id, status) in &sink.actor_status {
1969                        downstream_actor_location
1970                            .insert(*actor_id, status.location.as_ref().unwrap().worker_node_id);
1971                    }
1972
1973                    *downstream_fragment = (&sink.new_fragment_info(), stream_job.id()).into();
1974                }
1975            }
1976            assert!(remaining_fragment.is_empty());
1977        }
1978
1979        // build complete graph based on the table job type
1980        let complete_graph = match &job_type {
1981            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1982                CompleteStreamFragmentGraph::with_downstreams(
1983                    fragment_graph,
1984                    FragmentGraphDownstreamContext {
1985                        original_root_fragment_id: original_root_fragment.fragment_id,
1986                        downstream_fragments,
1987                        downstream_actor_location,
1988                    },
1989                    job_type,
1990                )?
1991            }
1992            StreamingJobType::Table(TableJobType::SharedCdcSource)
1993            | StreamingJobType::MaterializedView => {
1994                // CDC tables or materialized views can have upstream jobs as well.
1995                let (upstream_root_fragments, upstream_actor_location) = self
1996                    .metadata_manager
1997                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1998                    .await?;
1999
2000                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2001                    fragment_graph,
2002                    FragmentGraphUpstreamContext {
2003                        upstream_root_fragments,
2004                        upstream_actor_location,
2005                    },
2006                    FragmentGraphDownstreamContext {
2007                        original_root_fragment_id: original_root_fragment.fragment_id,
2008                        downstream_fragments,
2009                        downstream_actor_location,
2010                    },
2011                    job_type,
2012                )?
2013            }
2014            _ => unreachable!(),
2015        };
2016
2017        let resource_group = self
2018            .metadata_manager
2019            .get_existing_job_resource_group(id)
2020            .await?;
2021
2022        // 2. Build the actor graph.
2023        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2024
2025        // XXX: what is this parallelism?
2026        // Is it "assigned parallelism"?
2027        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2028            .expect("The number of actors in the original table fragment should be greater than 0");
2029
2030        let actor_graph_builder = ActorGraphBuilder::new(
2031            id,
2032            resource_group,
2033            complete_graph,
2034            cluster_info,
2035            parallelism,
2036        )?;
2037
2038        let ActorGraphBuildResult {
2039            graph,
2040            downstream_fragment_relations,
2041            building_locations,
2042            upstream_fragment_downstreams,
2043            mut replace_upstream,
2044            new_no_shuffle,
2045            ..
2046        } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2047
2048        // general table & source does not have upstream job, so the dispatchers should be empty
2049        if matches!(
2050            job_type,
2051            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2052        ) {
2053            assert!(upstream_fragment_downstreams.is_empty());
2054        }
2055
2056        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2057        // the context that contains all information needed for building the actors on the compute
2058        // nodes.
2059        let stream_job_fragments = StreamJobFragments::new(
2060            tmp_job_id,
2061            graph,
2062            &building_locations.actor_locations,
2063            stream_ctx,
2064            old_fragments.assigned_parallelism,
2065            old_fragments.max_parallelism,
2066        );
2067
2068        if let Some(sinks) = &auto_refresh_schema_sinks {
2069            for sink in sinks {
2070                replace_upstream
2071                    .remove(&sink.new_fragment.fragment_id)
2072                    .expect("should exist");
2073            }
2074        }
2075
2076        // Note: no need to set `vnode_count` as it's already set by the frontend.
2077        // See `get_replace_table_plan`.
2078
2079        let ctx = ReplaceStreamJobContext {
2080            old_fragments,
2081            replace_upstream,
2082            new_no_shuffle,
2083            upstream_fragment_downstreams,
2084            building_locations,
2085            streaming_job: stream_job.clone(),
2086            tmp_id: tmp_job_id,
2087            drop_table_connector_ctx,
2088            auto_refresh_schema_sinks,
2089        };
2090
2091        Ok((
2092            ctx,
2093            StreamJobFragmentsToCreate {
2094                inner: stream_job_fragments,
2095                downstreams: downstream_fragment_relations,
2096            },
2097        ))
2098    }
2099
2100    async fn alter_name(
2101        &self,
2102        relation: alter_name_request::Object,
2103        new_name: &str,
2104    ) -> MetaResult<NotificationVersion> {
2105        let (obj_type, id) = match relation {
2106            alter_name_request::Object::TableId(id) => (ObjectType::Table, id),
2107            alter_name_request::Object::ViewId(id) => (ObjectType::View, id),
2108            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id),
2109            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id),
2110            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id),
2111            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id),
2112            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id),
2113            alter_name_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2114        };
2115        self.metadata_manager
2116            .catalog_controller
2117            .alter_name(obj_type, id, new_name)
2118            .await
2119    }
2120
2121    async fn alter_swap_rename(
2122        &self,
2123        object: alter_swap_rename_request::Object,
2124    ) -> MetaResult<NotificationVersion> {
2125        let (obj_type, src_id, dst_id) = match object {
2126            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2127            alter_swap_rename_request::Object::Table(objs) => {
2128                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2129                (ObjectType::Table, src_id, dst_id)
2130            }
2131            alter_swap_rename_request::Object::View(objs) => {
2132                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2133                (ObjectType::View, src_id, dst_id)
2134            }
2135            alter_swap_rename_request::Object::Source(objs) => {
2136                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2137                (ObjectType::Source, src_id, dst_id)
2138            }
2139            alter_swap_rename_request::Object::Sink(objs) => {
2140                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2141                (ObjectType::Sink, src_id, dst_id)
2142            }
2143            alter_swap_rename_request::Object::Subscription(objs) => {
2144                let (src_id, dst_id) = (objs.src_object_id, objs.dst_object_id);
2145                (ObjectType::Subscription, src_id, dst_id)
2146            }
2147        };
2148
2149        self.metadata_manager
2150            .catalog_controller
2151            .alter_swap_rename(obj_type, src_id, dst_id)
2152            .await
2153    }
2154
2155    async fn alter_owner(
2156        &self,
2157        object: Object,
2158        owner_id: UserId,
2159    ) -> MetaResult<NotificationVersion> {
2160        let (obj_type, id) = match object {
2161            Object::TableId(id) => (ObjectType::Table, id),
2162            Object::ViewId(id) => (ObjectType::View, id),
2163            Object::SourceId(id) => (ObjectType::Source, id),
2164            Object::SinkId(id) => (ObjectType::Sink, id),
2165            Object::SchemaId(id) => (ObjectType::Schema, id),
2166            Object::DatabaseId(id) => (ObjectType::Database, id),
2167            Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2168            Object::ConnectionId(id) => (ObjectType::Connection, id),
2169        };
2170        self.metadata_manager
2171            .catalog_controller
2172            .alter_owner(obj_type, id.into(), owner_id as _)
2173            .await
2174    }
2175
2176    async fn alter_set_schema(
2177        &self,
2178        object: alter_set_schema_request::Object,
2179        new_schema_id: SchemaId,
2180    ) -> MetaResult<NotificationVersion> {
2181        let (obj_type, id) = match object {
2182            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id),
2183            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id),
2184            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id),
2185            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id),
2186            alter_set_schema_request::Object::FunctionId(id) => (ObjectType::Function, id),
2187            alter_set_schema_request::Object::ConnectionId(id) => (ObjectType::Connection, id),
2188            alter_set_schema_request::Object::SubscriptionId(id) => (ObjectType::Subscription, id),
2189        };
2190        self.metadata_manager
2191            .catalog_controller
2192            .alter_schema(obj_type, id.into(), new_schema_id as _)
2193            .await
2194    }
2195
2196    pub async fn wait(&self) -> MetaResult<()> {
2197        let timeout_ms = 30 * 60 * 1000;
2198        for _ in 0..timeout_ms {
2199            if self
2200                .metadata_manager
2201                .catalog_controller
2202                .list_background_creating_jobs(true, None)
2203                .await?
2204                .is_empty()
2205            {
2206                return Ok(());
2207            }
2208
2209            sleep(Duration::from_millis(1)).await;
2210        }
2211        Err(MetaError::cancelled(format!(
2212            "timeout after {timeout_ms}ms"
2213        )))
2214    }
2215
2216    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2217        self.metadata_manager
2218            .catalog_controller
2219            .comment_on(comment)
2220            .await
2221    }
2222}
2223
2224fn report_create_object(
2225    job_id: JobId,
2226    event_name: &str,
2227    obj_type: PbTelemetryDatabaseObject,
2228    connector_name: Option<String>,
2229    attr_info: Option<jsonbb::Value>,
2230) {
2231    report_event(
2232        PbTelemetryEventStage::CreateStreamJob,
2233        event_name,
2234        job_id.as_raw_id() as _,
2235        connector_name,
2236        Some(obj_type),
2237        attr_info,
2238    );
2239}
2240
2241async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: SinkId) -> MetaResult<()> {
2242    match Entity::delete_many()
2243        .filter(Column::SinkId.eq(sink_id))
2244        .exec(db)
2245        .await
2246    {
2247        Ok(result) => {
2248            let deleted_count = result.rows_affected;
2249
2250            tracing::info!(
2251                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2252                deleted_count,
2253                sink_id
2254            );
2255            Ok(())
2256        }
2257        Err(e) => {
2258            tracing::error!(
2259                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2260                sink_id,
2261                e.as_report()
2262            );
2263            Err(e.into())
2264        }
2265    }
2266}
2267
2268pub fn build_upstream_sink_info(
2269    sink: &PbSink,
2270    sink_fragment_id: FragmentId,
2271    target_table: &PbTable,
2272    target_fragment_id: FragmentId,
2273) -> MetaResult<UpstreamSinkInfo> {
2274    let sink_columns = if !sink.original_target_columns.is_empty() {
2275        sink.original_target_columns.clone()
2276    } else {
2277        // This is due to the fact that the value did not exist in earlier versions,
2278        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
2279        // Therefore the columns of the table at this point are `original_target_columns`.
2280        // This value of sink will be filled on the meta.
2281        target_table.columns.clone()
2282    };
2283
2284    let sink_output_fields = sink_columns
2285        .iter()
2286        .map(|col| Field::from(col.column_desc.as_ref().unwrap()).to_prost())
2287        .collect_vec();
2288    let output_indices = (0..sink_output_fields.len())
2289        .map(|i| i as u32)
2290        .collect_vec();
2291
2292    let dist_key_indices: anyhow::Result<Vec<u32>> = try {
2293        let sink_idx_by_col_id = sink_columns
2294            .iter()
2295            .enumerate()
2296            .map(|(idx, col)| {
2297                let column_id = col.column_desc.as_ref().unwrap().column_id;
2298                (column_id, idx as u32)
2299            })
2300            .collect::<HashMap<_, _>>();
2301        target_table
2302            .distribution_key
2303            .iter()
2304            .map(|dist_idx| {
2305                let column_id = target_table.columns[*dist_idx as usize]
2306                    .column_desc
2307                    .as_ref()
2308                    .unwrap()
2309                    .column_id;
2310                let sink_idx = sink_idx_by_col_id
2311                    .get(&column_id)
2312                    .ok_or_else(|| anyhow::anyhow!("column id {} not found in sink", column_id))?;
2313                Ok(*sink_idx)
2314            })
2315            .collect::<anyhow::Result<Vec<_>>>()?
2316    };
2317    let dist_key_indices =
2318        dist_key_indices.map_err(|e| e.context("failed to get distribution key indices"))?;
2319    let downstream_fragment_id = target_fragment_id as _;
2320    let new_downstream_relation = DownstreamFragmentRelation {
2321        downstream_fragment_id,
2322        dispatcher_type: DispatcherType::Hash,
2323        dist_key_indices,
2324        output_mapping: PbDispatchOutputMapping::simple(output_indices),
2325    };
2326    let current_target_columns = target_table.get_columns();
2327    let project_exprs = build_select_node_list(&sink_columns, current_target_columns)?;
2328    Ok(UpstreamSinkInfo {
2329        sink_id: sink.id,
2330        sink_fragment_id: sink_fragment_id as _,
2331        sink_output_fields,
2332        sink_original_target_columns: sink.get_original_target_columns().clone(),
2333        project_exprs,
2334        new_sink_downstream: new_downstream_relation,
2335    })
2336}
2337
2338pub fn refill_upstream_sink_union_in_table(
2339    union_fragment_root: &mut PbStreamNode,
2340    upstream_sink_infos: &Vec<UpstreamSinkInfo>,
2341) {
2342    visit_stream_node_cont_mut(union_fragment_root, |node| {
2343        if let Some(NodeBody::UpstreamSinkUnion(upstream_sink_union)) = &mut node.node_body {
2344            let init_upstreams = upstream_sink_infos
2345                .iter()
2346                .map(|info| PbUpstreamSinkInfo {
2347                    upstream_fragment_id: info.sink_fragment_id,
2348                    sink_output_schema: info.sink_output_fields.clone(),
2349                    project_exprs: info.project_exprs.clone(),
2350                })
2351                .collect();
2352            upstream_sink_union.init_upstreams = init_upstreams;
2353            false
2354        } else {
2355            true
2356        }
2357    });
2358}