risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::{HashMap, HashSet};
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use await_tree::{InstrumentAwait, span};
24use either::Either;
25use itertools::Itertools;
26use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog, ColumnId, FragmentTypeFlag};
27use risingwave_common::config::DefaultParallelism;
28use risingwave_common::hash::VnodeCountCompat;
29use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
30use risingwave_common::system_param::reader::SystemParamsRead;
31use risingwave_common::util::stream_graph_visitor::{
32    visit_stream_node, visit_stream_node_body, visit_stream_node_cont_mut,
33};
34use risingwave_common::{bail, bail_not_implemented, must_match};
35use risingwave_connector::WithOptionsSecResolved;
36use risingwave_connector::connector_common::validate_connection;
37use risingwave_connector::source::cdc::CdcScanOptions;
38use risingwave_connector::source::{
39    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
40};
41use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
42use risingwave_meta_model::object::ObjectType;
43use risingwave_meta_model::{
44    ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
45    SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId, WorkerId,
46};
47use risingwave_pb::catalog::{
48    Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
49    Subscription, Table, View,
50};
51use risingwave_pb::common::PbActorLocation;
52use risingwave_pb::ddl_service::alter_owner_request::Object;
53use risingwave_pb::ddl_service::{
54    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
55    alter_swap_rename_request,
56};
57use risingwave_pb::meta::table_fragments::PbActorStatus;
58use risingwave_pb::meta::table_fragments::actor_status::PbActorState;
59use risingwave_pb::stream_plan::stream_node::NodeBody;
60use risingwave_pb::stream_plan::{
61    MergeNode, PbDispatchOutputMapping, PbDispatcherType, PbStreamFragmentGraph,
62    StreamFragmentGraph as StreamFragmentGraphProto,
63};
64use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
65use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
66use strum::Display;
67use thiserror_ext::AsReport;
68use tokio::sync::{OwnedSemaphorePermit, Semaphore, oneshot};
69use tokio::time::sleep;
70use tracing::Instrument;
71
72use crate::barrier::BarrierManagerRef;
73use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
74use crate::controller::cluster::StreamingClusterInfo;
75use crate::controller::streaming_job::{FinishAutoRefreshSchemaSinkContext, SinkIntoTableContext};
76use crate::error::{MetaErrorInner, bail_invalid_parameter, bail_unavailable};
77use crate::manager::{
78    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
79    NotificationVersion, StreamingJob, StreamingJobType,
80};
81use crate::model::{
82    DownstreamFragmentRelation, Fragment, FragmentDownstreamRelation, StreamContext,
83    StreamJobFragments, StreamJobFragmentsToCreate, TableParallelism,
84};
85use crate::stream::cdc::{
86    is_parallelized_backfill_enabled, try_init_parallel_cdc_table_snapshot_splits,
87};
88use crate::stream::{
89    ActorGraphBuildResult, ActorGraphBuilder, AutoRefreshSchemaSinkContext,
90    CompleteStreamFragmentGraph, CreateStreamingJobContext, CreateStreamingJobOption,
91    GlobalStreamManagerRef, JobRescheduleTarget, ReplaceStreamJobContext, SourceChange,
92    SourceManagerRef, StreamFragmentGraph, check_sink_fragments_support_refresh_schema,
93    create_source_worker, rewrite_refresh_schema_sink_fragment, state_match, validate_sink,
94};
95use crate::telemetry::report_event;
96use crate::{MetaError, MetaResult};
97
98#[derive(PartialEq)]
99pub enum DropMode {
100    Restrict,
101    Cascade,
102}
103
104impl DropMode {
105    pub fn from_request_setting(cascade: bool) -> DropMode {
106        if cascade {
107            DropMode::Cascade
108        } else {
109            DropMode::Restrict
110        }
111    }
112}
113
114#[derive(strum::AsRefStr)]
115pub enum StreamingJobId {
116    MaterializedView(TableId),
117    Sink(SinkId),
118    Table(Option<SourceId>, TableId),
119    Index(IndexId),
120}
121
122impl std::fmt::Display for StreamingJobId {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        write!(f, "{}", self.as_ref())?;
125        write!(f, "({})", self.id())
126    }
127}
128
129impl StreamingJobId {
130    #[allow(dead_code)]
131    fn id(&self) -> TableId {
132        match self {
133            StreamingJobId::MaterializedView(id)
134            | StreamingJobId::Sink(id)
135            | StreamingJobId::Table(_, id)
136            | StreamingJobId::Index(id) => *id,
137        }
138    }
139}
140
141/// It’s used to describe the information of the job that needs to be replaced
142/// and it will be used during replacing table and creating sink into table operations.
143pub struct ReplaceStreamJobInfo {
144    pub streaming_job: StreamingJob,
145    pub fragment_graph: StreamFragmentGraphProto,
146}
147
148#[derive(Display)]
149pub enum DdlCommand {
150    CreateDatabase(Database),
151    DropDatabase(DatabaseId),
152    CreateSchema(Schema),
153    DropSchema(SchemaId, DropMode),
154    CreateNonSharedSource(Source),
155    DropSource(SourceId, DropMode),
156    CreateFunction(Function),
157    DropFunction(FunctionId),
158    CreateView(View, HashSet<ObjectId>),
159    DropView(ViewId, DropMode),
160    CreateStreamingJob {
161        stream_job: StreamingJob,
162        fragment_graph: StreamFragmentGraphProto,
163        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
164        dependencies: HashSet<ObjectId>,
165        specific_resource_group: Option<String>, // specific resource group
166        if_not_exists: bool,
167    },
168    DropStreamingJob {
169        job_id: StreamingJobId,
170        drop_mode: DropMode,
171        target_replace_info: Option<ReplaceStreamJobInfo>,
172    },
173    AlterName(alter_name_request::Object, String),
174    AlterSwapRename(alter_swap_rename_request::Object),
175    ReplaceStreamJob(ReplaceStreamJobInfo),
176    AlterNonSharedSource(Source),
177    AlterObjectOwner(Object, UserId),
178    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
179    CreateConnection(Connection),
180    DropConnection(ConnectionId, DropMode),
181    CreateSecret(Secret),
182    AlterSecret(Secret),
183    DropSecret(SecretId),
184    CommentOn(Comment),
185    CreateSubscription(Subscription),
186    DropSubscription(SubscriptionId, DropMode),
187    AlterDatabaseParam(DatabaseId, AlterDatabaseParam),
188}
189
190impl DdlCommand {
191    /// Returns the name or ID of the object that this command operates on, for observability and debugging.
192    fn object(&self) -> Either<String, ObjectId> {
193        use Either::*;
194        match self {
195            DdlCommand::CreateDatabase(database) => Left(database.name.clone()),
196            DdlCommand::DropDatabase(id) => Right(*id),
197            DdlCommand::CreateSchema(schema) => Left(schema.name.clone()),
198            DdlCommand::DropSchema(id, _) => Right(*id),
199            DdlCommand::CreateNonSharedSource(source) => Left(source.name.clone()),
200            DdlCommand::DropSource(id, _) => Right(*id),
201            DdlCommand::CreateFunction(function) => Left(function.name.clone()),
202            DdlCommand::DropFunction(id) => Right(*id),
203            DdlCommand::CreateView(view, _) => Left(view.name.clone()),
204            DdlCommand::DropView(id, _) => Right(*id),
205            DdlCommand::CreateStreamingJob { stream_job, .. } => Left(stream_job.name()),
206            DdlCommand::DropStreamingJob { job_id, .. } => Right(job_id.id()),
207            DdlCommand::AlterName(object, _) => Left(format!("{object:?}")),
208            DdlCommand::AlterSwapRename(object) => Left(format!("{object:?}")),
209            DdlCommand::ReplaceStreamJob(info) => Left(info.streaming_job.name()),
210            DdlCommand::AlterNonSharedSource(source) => Left(source.name.clone()),
211            DdlCommand::AlterObjectOwner(object, _) => Left(format!("{object:?}")),
212            DdlCommand::AlterSetSchema(object, _) => Left(format!("{object:?}")),
213            DdlCommand::CreateConnection(connection) => Left(connection.name.clone()),
214            DdlCommand::DropConnection(id, _) => Right(*id),
215            DdlCommand::CreateSecret(secret) => Left(secret.name.clone()),
216            DdlCommand::AlterSecret(secret) => Left(secret.name.clone()),
217            DdlCommand::DropSecret(id) => Right(*id),
218            DdlCommand::CommentOn(comment) => Right(comment.table_id as _),
219            DdlCommand::CreateSubscription(subscription) => Left(subscription.name.clone()),
220            DdlCommand::DropSubscription(id, _) => Right(*id),
221            DdlCommand::AlterDatabaseParam(id, _) => Right(*id),
222        }
223    }
224
225    fn allow_in_recovery(&self) -> bool {
226        match self {
227            DdlCommand::DropDatabase(_)
228            | DdlCommand::DropSchema(_, _)
229            | DdlCommand::DropSource(_, _)
230            | DdlCommand::DropFunction(_)
231            | DdlCommand::DropView(_, _)
232            | DdlCommand::DropStreamingJob { .. }
233            | DdlCommand::DropConnection(_, _)
234            | DdlCommand::DropSecret(_)
235            | DdlCommand::DropSubscription(_, _)
236            | DdlCommand::AlterName(_, _)
237            | DdlCommand::AlterObjectOwner(_, _)
238            | DdlCommand::AlterSetSchema(_, _)
239            | DdlCommand::CreateDatabase(_)
240            | DdlCommand::CreateSchema(_)
241            | DdlCommand::CreateFunction(_)
242            | DdlCommand::CreateView(_, _)
243            | DdlCommand::CreateConnection(_)
244            | DdlCommand::CommentOn(_)
245            | DdlCommand::CreateSecret(_)
246            | DdlCommand::AlterSecret(_)
247            | DdlCommand::AlterSwapRename(_)
248            | DdlCommand::AlterDatabaseParam(_, _) => true,
249            DdlCommand::CreateStreamingJob { .. }
250            | DdlCommand::CreateNonSharedSource(_)
251            | DdlCommand::ReplaceStreamJob(_)
252            | DdlCommand::AlterNonSharedSource(_)
253            | DdlCommand::CreateSubscription(_) => false,
254        }
255    }
256}
257
258#[derive(Clone)]
259pub struct DdlController {
260    pub(crate) env: MetaSrvEnv,
261
262    pub(crate) metadata_manager: MetadataManager,
263    pub(crate) stream_manager: GlobalStreamManagerRef,
264    pub(crate) source_manager: SourceManagerRef,
265    barrier_manager: BarrierManagerRef,
266
267    // The semaphore is used to limit the number of concurrent streaming job creation.
268    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
269
270    /// Sequence number for DDL commands, used for observability and debugging.
271    seq: Arc<AtomicU64>,
272}
273
274#[derive(Clone)]
275pub struct CreatingStreamingJobPermit {
276    pub(crate) semaphore: Arc<Semaphore>,
277}
278
279impl CreatingStreamingJobPermit {
280    async fn new(env: &MetaSrvEnv) -> Self {
281        let mut permits = env
282            .system_params_reader()
283            .await
284            .max_concurrent_creating_streaming_jobs() as usize;
285        if permits == 0 {
286            // if the system parameter is set to zero, use the max permitted value.
287            permits = Semaphore::MAX_PERMITS;
288        }
289        let semaphore = Arc::new(Semaphore::new(permits));
290
291        let (local_notification_tx, mut local_notification_rx) =
292            tokio::sync::mpsc::unbounded_channel();
293        env.notification_manager()
294            .insert_local_sender(local_notification_tx);
295        let semaphore_clone = semaphore.clone();
296        tokio::spawn(async move {
297            while let Some(notification) = local_notification_rx.recv().await {
298                let LocalNotification::SystemParamsChange(p) = &notification else {
299                    continue;
300                };
301                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
302                if new_permits == 0 {
303                    new_permits = Semaphore::MAX_PERMITS;
304                }
305                match permits.cmp(&new_permits) {
306                    Ordering::Less => {
307                        semaphore_clone.add_permits(new_permits - permits);
308                    }
309                    Ordering::Equal => continue,
310                    Ordering::Greater => {
311                        let to_release = permits - new_permits;
312                        let reduced = semaphore_clone.forget_permits(to_release);
313                        // TODO: implement dynamic semaphore with limits by ourself.
314                        if reduced != to_release {
315                            tracing::warn!(
316                                "no enough permits to release, expected {}, but reduced {}",
317                                to_release,
318                                reduced
319                            );
320                        }
321                    }
322                }
323                tracing::info!(
324                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
325                    permits,
326                    new_permits
327                );
328                permits = new_permits;
329            }
330        });
331
332        Self { semaphore }
333    }
334}
335
336impl DdlController {
337    pub async fn new(
338        env: MetaSrvEnv,
339        metadata_manager: MetadataManager,
340        stream_manager: GlobalStreamManagerRef,
341        source_manager: SourceManagerRef,
342        barrier_manager: BarrierManagerRef,
343    ) -> Self {
344        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
345        Self {
346            env,
347            metadata_manager,
348            stream_manager,
349            source_manager,
350            barrier_manager,
351            creating_streaming_job_permits,
352            seq: Arc::new(AtomicU64::new(0)),
353        }
354    }
355
356    /// Obtains the next sequence number for DDL commands, for observability and debugging purposes.
357    pub fn next_seq(&self) -> u64 {
358        // This is a simple atomic increment operation.
359        self.seq.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
360    }
361
362    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
363    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
364    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
365    /// would be a huge hassle and pain if we don't spawn here.
366    ///
367    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
368    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
369        if !command.allow_in_recovery() {
370            self.barrier_manager.check_status_running()?;
371        }
372
373        let await_tree_key = format!("DDL Command {}", self.next_seq());
374        let await_tree_span = await_tree::span!("{command}({})", command.object());
375
376        let ctrl = self.clone();
377        let fut = async move {
378            match command {
379                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
380                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
381                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
382                DdlCommand::DropSchema(schema_id, drop_mode) => {
383                    ctrl.drop_schema(schema_id, drop_mode).await
384                }
385                DdlCommand::CreateNonSharedSource(source) => {
386                    ctrl.create_non_shared_source(source).await
387                }
388                DdlCommand::DropSource(source_id, drop_mode) => {
389                    ctrl.drop_source(source_id, drop_mode).await
390                }
391                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
392                DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
393                DdlCommand::CreateView(view, dependencies) => {
394                    ctrl.create_view(view, dependencies).await
395                }
396                DdlCommand::DropView(view_id, drop_mode) => {
397                    ctrl.drop_view(view_id, drop_mode).await
398                }
399                DdlCommand::CreateStreamingJob {
400                    stream_job,
401                    fragment_graph,
402                    affected_table_replace_info,
403                    dependencies,
404                    specific_resource_group,
405                    if_not_exists,
406                } => {
407                    ctrl.create_streaming_job(
408                        stream_job,
409                        fragment_graph,
410                        affected_table_replace_info,
411                        dependencies,
412                        specific_resource_group,
413                        if_not_exists,
414                    )
415                    .await
416                }
417                DdlCommand::DropStreamingJob {
418                    job_id,
419                    drop_mode,
420                    target_replace_info,
421                } => {
422                    ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
423                        .await
424                }
425                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
426                    streaming_job,
427                    fragment_graph,
428                }) => ctrl.replace_job(streaming_job, fragment_graph).await,
429                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
430                DdlCommand::AlterObjectOwner(object, owner_id) => {
431                    ctrl.alter_owner(object, owner_id).await
432                }
433                DdlCommand::AlterSetSchema(object, new_schema_id) => {
434                    ctrl.alter_set_schema(object, new_schema_id).await
435                }
436                DdlCommand::CreateConnection(connection) => {
437                    ctrl.create_connection(connection).await
438                }
439                DdlCommand::DropConnection(connection_id, drop_mode) => {
440                    ctrl.drop_connection(connection_id, drop_mode).await
441                }
442                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
443                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
444                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
445                DdlCommand::AlterNonSharedSource(source) => {
446                    ctrl.alter_non_shared_source(source).await
447                }
448                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
449                DdlCommand::CreateSubscription(subscription) => {
450                    ctrl.create_subscription(subscription).await
451                }
452                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
453                    ctrl.drop_subscription(subscription_id, drop_mode).await
454                }
455                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
456                DdlCommand::AlterDatabaseParam(database_id, param) => {
457                    ctrl.alter_database_param(database_id, param).await
458                }
459            }
460        }
461        .in_current_span();
462        let fut = (self.env.await_tree_reg())
463            .register(await_tree_key, await_tree_span)
464            .instrument(Box::pin(fut));
465        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
466        Ok(Some(WaitVersion {
467            catalog_version: notification_version,
468            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
469        }))
470    }
471
472    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
473        self.barrier_manager.get_ddl_progress().await
474    }
475
476    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
477        let (version, updated_db) = self
478            .metadata_manager
479            .catalog_controller
480            .create_database(database)
481            .await?;
482        // If persistent successfully, notify `GlobalBarrierManager` to create database asynchronously.
483        self.barrier_manager
484            .update_database_barrier(
485                updated_db.database_id,
486                updated_db.barrier_interval_ms.map(|v| v as u32),
487                updated_db.checkpoint_frequency.map(|v| v as u64),
488            )
489            .await?;
490        Ok(version)
491    }
492
493    #[tracing::instrument(skip(self), level = "debug")]
494    pub async fn reschedule_streaming_job(
495        &self,
496        job_id: u32,
497        target: JobRescheduleTarget,
498        mut deferred: bool,
499    ) -> MetaResult<()> {
500        tracing::info!("alter parallelism");
501        if self.barrier_manager.check_status_running().is_err() {
502            tracing::info!(
503                "alter parallelism is set to deferred mode because the system is in recovery state"
504            );
505            deferred = true;
506        }
507
508        self.stream_manager
509            .reschedule_streaming_job(job_id, target, deferred)
510            .await
511    }
512
513    pub async fn reschedule_cdc_table_backfill(
514        &self,
515        job_id: u32,
516        target: JobRescheduleTarget,
517    ) -> MetaResult<()> {
518        tracing::info!("alter CDC table backfill parallelism");
519        if self.barrier_manager.check_status_running().is_err() {
520            return Err(anyhow::anyhow!("CDC table backfill reschedule is unavailable because the system is in recovery state").into());
521        }
522        self.stream_manager
523            .reschedule_cdc_table_backfill(job_id, target)
524            .await
525    }
526
527    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
528        self.drop_object(
529            ObjectType::Database,
530            database_id as _,
531            DropMode::Cascade,
532            None,
533        )
534        .await
535    }
536
537    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
538        self.metadata_manager
539            .catalog_controller
540            .create_schema(schema)
541            .await
542    }
543
544    async fn drop_schema(
545        &self,
546        schema_id: SchemaId,
547        drop_mode: DropMode,
548    ) -> MetaResult<NotificationVersion> {
549        self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
550            .await
551    }
552
553    /// Shared source is handled in [`Self::create_streaming_job`]
554    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
555        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
556            .await
557            .context("failed to create source worker")?;
558
559        let (source_id, version) = self
560            .metadata_manager
561            .catalog_controller
562            .create_source(source)
563            .await?;
564        self.source_manager
565            .register_source_with_handle(source_id, handle)
566            .await;
567        Ok(version)
568    }
569
570    async fn drop_source(
571        &self,
572        source_id: SourceId,
573        drop_mode: DropMode,
574    ) -> MetaResult<NotificationVersion> {
575        self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
576            .await
577    }
578
579    /// This replaces the source in the catalog.
580    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
581    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
582        self.metadata_manager
583            .catalog_controller
584            .alter_non_shared_source(source)
585            .await
586    }
587
588    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
589        self.metadata_manager
590            .catalog_controller
591            .create_function(function)
592            .await
593    }
594
595    async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
596        self.drop_object(
597            ObjectType::Function,
598            function_id as _,
599            DropMode::Restrict,
600            None,
601        )
602        .await
603    }
604
605    async fn create_view(
606        &self,
607        view: View,
608        dependencies: HashSet<ObjectId>,
609    ) -> MetaResult<NotificationVersion> {
610        self.metadata_manager
611            .catalog_controller
612            .create_view(view, dependencies)
613            .await
614    }
615
616    async fn drop_view(
617        &self,
618        view_id: ViewId,
619        drop_mode: DropMode,
620    ) -> MetaResult<NotificationVersion> {
621        self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
622            .await
623    }
624
625    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
626        validate_connection(&connection).await?;
627        self.metadata_manager
628            .catalog_controller
629            .create_connection(connection)
630            .await
631    }
632
633    async fn drop_connection(
634        &self,
635        connection_id: ConnectionId,
636        drop_mode: DropMode,
637    ) -> MetaResult<NotificationVersion> {
638        self.drop_object(ObjectType::Connection, connection_id as _, drop_mode, None)
639            .await
640    }
641
642    async fn alter_database_param(
643        &self,
644        database_id: DatabaseId,
645        param: AlterDatabaseParam,
646    ) -> MetaResult<NotificationVersion> {
647        let (version, updated_db) = self
648            .metadata_manager
649            .catalog_controller
650            .alter_database_param(database_id, param)
651            .await?;
652        // If persistent successfully, notify `GlobalBarrierManager` to update param asynchronously.
653        self.barrier_manager
654            .update_database_barrier(
655                database_id,
656                updated_db.barrier_interval_ms.map(|v| v as u32),
657                updated_db.checkpoint_frequency.map(|v| v as u64),
658            )
659            .await?;
660        Ok(version)
661    }
662
663    // The 'secret' part of the request we receive from the frontend is in plaintext;
664    // here, we need to encrypt it before storing it in the catalog.
665    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
666        let secret_store_private_key = self
667            .env
668            .opts
669            .secret_store_private_key
670            .clone()
671            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
672
673        let encrypted_payload = SecretEncryption::encrypt(
674            secret_store_private_key.as_slice(),
675            secret.get_value().as_slice(),
676        )
677        .context(format!("failed to encrypt secret {}", secret.name))?;
678        Ok(encrypted_payload
679            .serialize()
680            .context(format!("failed to serialize secret {}", secret.name))?)
681    }
682
683    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
684        // The 'secret' part of the request we receive from the frontend is in plaintext;
685        // here, we need to encrypt it before storing it in the catalog.
686        let secret_plain_payload = secret.value.clone();
687        let encrypted_payload = self.get_encrypted_payload(&secret)?;
688        secret.value = encrypted_payload;
689
690        self.metadata_manager
691            .catalog_controller
692            .create_secret(secret, secret_plain_payload)
693            .await
694    }
695
696    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
697        self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
698            .await
699    }
700
701    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
702        let secret_plain_payload = secret.value.clone();
703        let encrypted_payload = self.get_encrypted_payload(&secret)?;
704        secret.value = encrypted_payload;
705        self.metadata_manager
706            .catalog_controller
707            .alter_secret(secret, secret_plain_payload)
708            .await
709    }
710
711    async fn create_subscription(
712        &self,
713        mut subscription: Subscription,
714    ) -> MetaResult<NotificationVersion> {
715        tracing::debug!("create subscription");
716        let _permit = self
717            .creating_streaming_job_permits
718            .semaphore
719            .acquire()
720            .await
721            .unwrap();
722        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
723        self.metadata_manager
724            .catalog_controller
725            .create_subscription_catalog(&mut subscription)
726            .await?;
727        if let Err(err) = self.stream_manager.create_subscription(&subscription).await {
728            tracing::debug!(error = %err.as_report(), "failed to create subscription");
729            let _ = self
730                .metadata_manager
731                .catalog_controller
732                .try_abort_creating_subscription(subscription.id as _)
733                .await
734                .inspect_err(|e| {
735                    tracing::error!(
736                        error = %e.as_report(),
737                        "failed to abort create subscription after failure"
738                    );
739                });
740            return Err(err);
741        }
742
743        let version = self
744            .metadata_manager
745            .catalog_controller
746            .notify_create_subscription(subscription.id)
747            .await?;
748        tracing::debug!("finish create subscription");
749        Ok(version)
750    }
751
752    async fn drop_subscription(
753        &self,
754        subscription_id: SubscriptionId,
755        drop_mode: DropMode,
756    ) -> MetaResult<NotificationVersion> {
757        tracing::debug!("preparing drop subscription");
758        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
759        let subscription = self
760            .metadata_manager
761            .catalog_controller
762            .get_subscription_by_id(subscription_id)
763            .await?;
764        let table_id = subscription.dependent_table_id;
765        let database_id = subscription.database_id.into();
766        let (_, version) = self
767            .metadata_manager
768            .catalog_controller
769            .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
770            .await?;
771        self.stream_manager
772            .drop_subscription(database_id, subscription_id as _, table_id)
773            .await;
774        tracing::debug!("finish drop subscription");
775        Ok(version)
776    }
777
778    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
779    #[await_tree::instrument]
780    pub(crate) async fn validate_cdc_table(
781        &self,
782        table: &Table,
783        table_fragments: &StreamJobFragments,
784    ) -> MetaResult<()> {
785        let stream_scan_fragment = table_fragments
786            .fragments
787            .values()
788            .filter(|f| {
789                f.fragment_type_mask.contains(FragmentTypeFlag::StreamScan)
790                    || f.fragment_type_mask
791                        .contains(FragmentTypeFlag::StreamCdcScan)
792            })
793            .exactly_one()
794            .ok()
795            .with_context(|| {
796                format!(
797                    "expect exactly one stream scan fragment, got: {:?}",
798                    table_fragments.fragments
799                )
800            })?;
801        fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
802            if let Some(NodeBody::StreamCdcScan(node)) = node_body {
803                if let Some(o) = node.options
804                    && CdcScanOptions::from_proto(&o).is_parallelized_backfill()
805                {
806                    // Use parallel CDC backfill.
807                } else {
808                    assert_eq!(
809                        stream_scan_fragment.actors.len(),
810                        1,
811                        "Stream scan fragment should have only one actor"
812                    );
813                }
814            }
815        }
816        let mut found_cdc_scan = false;
817        match &stream_scan_fragment.nodes.node_body {
818            Some(NodeBody::StreamCdcScan(_)) => {
819                assert_parallelism(stream_scan_fragment, &stream_scan_fragment.nodes.node_body);
820                if self
821                    .validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
822                    .await?
823                {
824                    found_cdc_scan = true;
825                }
826            }
827            // When there's generated columns, the cdc scan node is wrapped in a project node
828            Some(NodeBody::Project(_)) => {
829                for input in &stream_scan_fragment.nodes.input {
830                    assert_parallelism(stream_scan_fragment, &input.node_body);
831                    if self
832                        .validate_cdc_table_inner(&input.node_body, table.id)
833                        .await?
834                    {
835                        found_cdc_scan = true;
836                    }
837                }
838            }
839            _ => {
840                bail!("Unexpected node body for stream cdc scan");
841            }
842        };
843        if !found_cdc_scan {
844            bail!("No stream cdc scan node found in stream scan fragment");
845        }
846        Ok(())
847    }
848
849    async fn validate_cdc_table_inner(
850        &self,
851        node_body: &Option<NodeBody>,
852        table_id: u32,
853    ) -> MetaResult<bool> {
854        let meta_store = self.env.meta_store_ref();
855        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
856            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
857        {
858            let options_with_secret = WithOptionsSecResolved::new(
859                cdc_table_desc.connect_properties.clone(),
860                cdc_table_desc.secret_refs.clone(),
861            );
862
863            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
864            props.init_from_pb_cdc_table_desc(cdc_table_desc);
865
866            // Try creating a split enumerator to validate
867            let _enumerator = props
868                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
869                .await?;
870
871            if is_parallelized_backfill_enabled(stream_cdc_scan) {
872                // Create parallel splits for a CDC table. The resulted split assignments are persisted and immutable.
873                try_init_parallel_cdc_table_snapshot_splits(
874                    table_id,
875                    cdc_table_desc,
876                    meta_store,
877                    &stream_cdc_scan.options,
878                    self.env.opts.cdc_table_split_init_insert_batch_size,
879                    self.env.opts.cdc_table_split_init_sleep_interval_splits,
880                    self.env.opts.cdc_table_split_init_sleep_duration_millis,
881                )
882                .await?;
883            }
884
885            tracing::debug!(?table_id, "validate cdc table success");
886            Ok(true)
887        } else {
888            Ok(false)
889        }
890    }
891
892    // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
893    // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
894    // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
895    pub(crate) async fn inject_replace_table_job_for_table_sink(
896        &self,
897        tmp_id: u32,
898        mgr: &MetadataManager,
899        stream_ctx: StreamContext,
900        sink: Option<&Sink>,
901        creating_sink_table_fragments: Option<&StreamJobFragments>,
902        dropping_sink_id: Option<SinkId>,
903        streaming_job: &StreamingJob,
904        fragment_graph: StreamFragmentGraph,
905    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
906        let (mut replace_table_ctx, mut stream_job_fragments) = self
907            .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _, None)
908            .await?;
909
910        let target_table = streaming_job.table().unwrap();
911
912        if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
913            let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
914            let sink = sink.expect("sink not found");
915            Self::inject_replace_table_plan_for_sink(
916                sink.id,
917                &sink_fragment,
918                target_table,
919                &mut replace_table_ctx,
920                stream_job_fragments.inner.union_fragment_for_table(),
921                None,
922            );
923        }
924
925        let [table_catalog]: [_; 1] = mgr
926            .get_table_catalog_by_ids(vec![target_table.id])
927            .await?
928            .try_into()
929            .expect("Target table should exist in sink into table");
930
931        {
932            let catalogs = mgr
933                .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
934                .await?;
935
936            for sink in catalogs {
937                let sink_id = sink.id;
938
939                if let Some(dropping_sink_id) = dropping_sink_id
940                    && sink_id == (dropping_sink_id as u32)
941                {
942                    continue;
943                };
944
945                let sink_table_fragments = mgr
946                    .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
947                    .await?;
948
949                let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
950
951                Self::inject_replace_table_plan_for_sink(
952                    sink_id,
953                    &sink_fragment,
954                    target_table,
955                    &mut replace_table_ctx,
956                    stream_job_fragments.inner.union_fragment_for_table(),
957                    Some(&sink.unique_identity()),
958                );
959            }
960        }
961
962        // check if the union fragment is fully assigned.
963        for fragment in stream_job_fragments.fragments.values() {
964            {
965                {
966                    visit_stream_node_body(&fragment.nodes, |node| {
967                        if let NodeBody::Merge(merge_node) = node {
968                            let upstream_fragment_id = merge_node.upstream_fragment_id;
969                            if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
970                                .upstream_fragment_downstreams
971                                .get(&upstream_fragment_id)
972                            {
973                                let mut upstream_fragment_downstreams =
974                                    external_upstream_fragment_downstreams.iter().filter(
975                                        |downstream| {
976                                            downstream.downstream_fragment_id
977                                                == fragment.fragment_id
978                                        },
979                                    );
980                                assert!(
981                                    upstream_fragment_downstreams.next().is_some(),
982                                    "All the mergers for the union should have been fully assigned beforehand."
983                                );
984                            } else {
985                                let mut upstream_fragment_downstreams = stream_job_fragments
986                                    .downstreams
987                                    .get(&upstream_fragment_id)
988                                    .into_iter()
989                                    .flatten()
990                                    .filter(|downstream| {
991                                        downstream.downstream_fragment_id == fragment.fragment_id
992                                    });
993                                assert!(
994                                    upstream_fragment_downstreams.next().is_some(),
995                                    "All the mergers for the union should have been fully assigned beforehand."
996                                );
997                            }
998                        }
999                    });
1000                }
1001            }
1002        }
1003
1004        Ok((replace_table_ctx, stream_job_fragments))
1005    }
1006
1007    pub(crate) fn inject_replace_table_plan_for_sink(
1008        sink_id: u32,
1009        sink_fragment: &Fragment,
1010        table: &Table,
1011        replace_table_ctx: &mut ReplaceStreamJobContext,
1012        union_fragment: &mut Fragment,
1013        unique_identity: Option<&str>,
1014    ) {
1015        let sink_fields = sink_fragment.nodes.fields.clone();
1016
1017        let output_indices = sink_fields
1018            .iter()
1019            .enumerate()
1020            .map(|(idx, _)| idx as _)
1021            .collect_vec();
1022
1023        let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
1024
1025        let sink_fragment_downstreams = replace_table_ctx
1026            .upstream_fragment_downstreams
1027            .entry(sink_fragment.fragment_id)
1028            .or_default();
1029
1030        {
1031            sink_fragment_downstreams.push(DownstreamFragmentRelation {
1032                downstream_fragment_id: union_fragment.fragment_id,
1033                dispatcher_type: DispatcherType::Hash,
1034                dist_key_indices: dist_key_indices.clone(),
1035                output_mapping: PbDispatchOutputMapping::simple(output_indices),
1036            });
1037        }
1038
1039        let upstream_fragment_id = sink_fragment.fragment_id;
1040
1041        let mut max_operator_id = 0;
1042
1043        visit_stream_node(&union_fragment.nodes, |node| {
1044            max_operator_id = max_operator_id.max(node.operator_id);
1045        });
1046
1047        {
1048            {
1049                visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
1050                    if let Some(NodeBody::Union(_)) = &mut node.node_body {
1051                        for input_project_node in &mut node.input {
1052                            if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
1053                                let merge_stream_node =
1054                                    input_project_node.input.iter_mut().exactly_one().unwrap();
1055
1056                                // we need to align nodes here
1057                                if input_project_node.identity.as_str()
1058                                    != unique_identity
1059                                        .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
1060                                {
1061                                    continue;
1062                                }
1063
1064                                if let Some(NodeBody::Merge(merge_node)) =
1065                                    &mut merge_stream_node.node_body
1066                                {
1067                                    {
1068                                        merge_stream_node.identity =
1069                                            format!("MergeExecutor(from sink {})", sink_id);
1070                                        merge_stream_node.operator_id = max_operator_id + 1;
1071
1072                                        input_project_node.identity =
1073                                            format!("ProjectExecutor(from sink {})", sink_id);
1074                                        input_project_node.operator_id = max_operator_id + 2;
1075                                    }
1076
1077                                    **merge_node = {
1078                                        MergeNode {
1079                                            upstream_fragment_id,
1080                                            upstream_dispatcher_type: PbDispatcherType::Hash as _,
1081                                            ..Default::default()
1082                                        }
1083                                    };
1084
1085                                    merge_stream_node.fields = sink_fields.to_vec();
1086
1087                                    return false;
1088                                }
1089                            }
1090                        }
1091                    }
1092                    true
1093                });
1094            }
1095        }
1096    }
1097
1098    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
1099    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
1100    #[await_tree::instrument(boxed, "create_streaming_job({streaming_job})")]
1101    pub async fn create_streaming_job(
1102        &self,
1103        mut streaming_job: StreamingJob,
1104        fragment_graph: StreamFragmentGraphProto,
1105        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1106        dependencies: HashSet<ObjectId>,
1107        specific_resource_group: Option<String>,
1108        if_not_exists: bool,
1109    ) -> MetaResult<NotificationVersion> {
1110        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1111        let check_ret = self
1112            .metadata_manager
1113            .catalog_controller
1114            .create_job_catalog(
1115                &mut streaming_job,
1116                &ctx,
1117                &fragment_graph.parallelism,
1118                fragment_graph.max_parallelism as _,
1119                dependencies,
1120                specific_resource_group.clone(),
1121            )
1122            .await;
1123        if let Err(meta_err) = check_ret {
1124            if !if_not_exists {
1125                return Err(meta_err);
1126            }
1127            return if let MetaErrorInner::Duplicated(_, _, Some(job_id)) = meta_err.inner() {
1128                if streaming_job.create_type() == CreateType::Foreground {
1129                    let database_id = streaming_job.database_id();
1130                    self.metadata_manager
1131                        .wait_streaming_job_finished(database_id.into(), *job_id)
1132                        .await
1133                } else {
1134                    Ok(IGNORED_NOTIFICATION_VERSION)
1135                }
1136            } else {
1137                Err(meta_err)
1138            };
1139        }
1140        let job_id = streaming_job.id();
1141        tracing::debug!(
1142            id = job_id,
1143            definition = streaming_job.definition(),
1144            create_type = streaming_job.create_type().as_str_name(),
1145            job_type = ?streaming_job.job_type(),
1146            "starting streaming job",
1147        );
1148        // TODO: acquire permits for recovered background DDLs.
1149        let permit = self
1150            .creating_streaming_job_permits
1151            .semaphore
1152            .clone()
1153            .acquire_owned()
1154            .instrument_await("acquire_creating_streaming_job_permit")
1155            .await
1156            .unwrap();
1157        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1158
1159        let name = streaming_job.name();
1160        let definition = streaming_job.definition();
1161        let source_id = match &streaming_job {
1162            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
1163            _ => None,
1164        };
1165
1166        // create streaming job.
1167        match self
1168            .create_streaming_job_inner(
1169                ctx,
1170                streaming_job,
1171                fragment_graph,
1172                affected_table_replace_info,
1173                specific_resource_group,
1174                permit,
1175            )
1176            .await
1177        {
1178            Ok(version) => Ok(version),
1179            Err(err) => {
1180                tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
1181                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
1182                    id: job_id,
1183                    name,
1184                    definition,
1185                    error: err.as_report().to_string(),
1186                };
1187                self.env.event_log_manager_ref().add_event_logs(vec![
1188                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
1189                ]);
1190                let (aborted, _) = self
1191                    .metadata_manager
1192                    .catalog_controller
1193                    .try_abort_creating_streaming_job(job_id as _, false)
1194                    .await?;
1195                if aborted {
1196                    tracing::warn!(id = job_id, "aborted streaming job");
1197                    // FIXME: might also need other cleanup here
1198                    if let Some(source_id) = source_id {
1199                        self.source_manager
1200                            .apply_source_change(SourceChange::DropSource {
1201                                dropped_source_ids: vec![source_id as SourceId],
1202                            })
1203                            .await;
1204                    }
1205                }
1206                Err(err)
1207            }
1208        }
1209    }
1210
1211    #[await_tree::instrument(boxed)]
1212    async fn create_streaming_job_inner(
1213        &self,
1214        ctx: StreamContext,
1215        mut streaming_job: StreamingJob,
1216        fragment_graph: StreamFragmentGraphProto,
1217        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1218        specific_resource_group: Option<String>,
1219        permit: OwnedSemaphorePermit,
1220    ) -> MetaResult<NotificationVersion> {
1221        let mut fragment_graph =
1222            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1223        streaming_job.set_info_from_graph(&fragment_graph);
1224
1225        // create internal table catalogs and refill table id.
1226        let incomplete_internal_tables = fragment_graph
1227            .incomplete_internal_tables()
1228            .into_values()
1229            .collect_vec();
1230        let table_id_map = self
1231            .metadata_manager
1232            .catalog_controller
1233            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1234            .await?;
1235        fragment_graph.refill_internal_table_ids(table_id_map);
1236
1237        let affected_table_replace_info = match affected_table_replace_info {
1238            Some(replace_table_info) => {
1239                assert!(
1240                    specific_resource_group.is_none(),
1241                    "specific_resource_group is not supported for replace table (alter column or sink into table)"
1242                );
1243
1244                let ReplaceStreamJobInfo {
1245                    mut streaming_job,
1246                    fragment_graph,
1247                    ..
1248                } = replace_table_info;
1249
1250                // Ensure the max parallelism unchanged before replacing table.
1251                let original_max_parallelism = self
1252                    .metadata_manager
1253                    .get_job_max_parallelism(streaming_job.id().into())
1254                    .await?;
1255                let fragment_graph = PbStreamFragmentGraph {
1256                    max_parallelism: original_max_parallelism as _,
1257                    ..fragment_graph
1258                };
1259
1260                let fragment_graph =
1261                    StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1262                streaming_job.set_info_from_graph(&fragment_graph);
1263                let streaming_job = streaming_job;
1264
1265                Some((streaming_job, fragment_graph))
1266            }
1267            None => None,
1268        };
1269
1270        // create fragment and actor catalogs.
1271        tracing::debug!(id = streaming_job.id(), "building streaming job");
1272        let (ctx, stream_job_fragments) = self
1273            .build_stream_job(
1274                ctx,
1275                streaming_job,
1276                fragment_graph,
1277                affected_table_replace_info,
1278                specific_resource_group,
1279            )
1280            .await?;
1281
1282        let streaming_job = &ctx.streaming_job;
1283
1284        match streaming_job {
1285            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1286                self.validate_cdc_table(table, &stream_job_fragments)
1287                    .await?;
1288            }
1289            StreamingJob::Table(Some(source), ..) => {
1290                // Register the source on the connector node.
1291                self.source_manager.register_source(source).await?;
1292                let connector_name = source
1293                    .get_with_properties()
1294                    .get(UPSTREAM_SOURCE_KEY)
1295                    .cloned();
1296                let attr = source.info.as_ref().map(|source_info| {
1297                    jsonbb::json!({
1298                            "format": source_info.format().as_str_name(),
1299                            "encode": source_info.row_encode().as_str_name(),
1300                    })
1301                });
1302                report_create_object(
1303                    streaming_job.id(),
1304                    "source",
1305                    PbTelemetryDatabaseObject::Source,
1306                    connector_name,
1307                    attr,
1308                );
1309            }
1310            StreamingJob::Sink(sink, _) => {
1311                if sink.auto_refresh_schema_from_table.is_some() {
1312                    check_sink_fragments_support_refresh_schema(&stream_job_fragments.fragments)?
1313                }
1314                // Validate the sink on the connector node.
1315                validate_sink(sink).await?;
1316                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1317                let attr = sink.format_desc.as_ref().map(|sink_info| {
1318                    jsonbb::json!({
1319                        "format": sink_info.format().as_str_name(),
1320                        "encode": sink_info.encode().as_str_name(),
1321                    })
1322                });
1323                report_create_object(
1324                    streaming_job.id(),
1325                    "sink",
1326                    PbTelemetryDatabaseObject::Sink,
1327                    connector_name,
1328                    attr,
1329                );
1330            }
1331            StreamingJob::Source(source) => {
1332                // Register the source on the connector node.
1333                self.source_manager.register_source(source).await?;
1334                let connector_name = source
1335                    .get_with_properties()
1336                    .get(UPSTREAM_SOURCE_KEY)
1337                    .cloned();
1338                let attr = source.info.as_ref().map(|source_info| {
1339                    jsonbb::json!({
1340                            "format": source_info.format().as_str_name(),
1341                            "encode": source_info.row_encode().as_str_name(),
1342                    })
1343                });
1344                report_create_object(
1345                    streaming_job.id(),
1346                    "source",
1347                    PbTelemetryDatabaseObject::Source,
1348                    connector_name,
1349                    attr,
1350                );
1351            }
1352            _ => {}
1353        }
1354
1355        self.metadata_manager
1356            .catalog_controller
1357            .prepare_stream_job_fragments(&stream_job_fragments, streaming_job, false)
1358            .await?;
1359
1360        // create streaming jobs.
1361        let stream_job_id = streaming_job.id();
1362        match streaming_job.create_type() {
1363            CreateType::Unspecified | CreateType::Foreground => {
1364                let version = self
1365                    .stream_manager
1366                    .create_streaming_job(stream_job_fragments, ctx, None)
1367                    .await?;
1368                Ok(version)
1369            }
1370            CreateType::Background => {
1371                let await_tree_key = format!("Background DDL Worker ({})", stream_job_id);
1372                let await_tree_span =
1373                    span!("{:?}({})", streaming_job.job_type(), streaming_job.name());
1374
1375                let ctrl = self.clone();
1376                let (tx, rx) = oneshot::channel();
1377                let fut = async move {
1378                    let _ = ctrl
1379                        .stream_manager
1380                        .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1381                        .await
1382                        .inspect_err(|err| {
1383                            tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1384                        });
1385                    // drop the permit to release the semaphore
1386                    drop(permit);
1387                };
1388
1389                let fut = (self.env.await_tree_reg())
1390                    .register(await_tree_key, await_tree_span)
1391                    .instrument(fut);
1392                tokio::spawn(fut);
1393
1394                rx.instrument_await("wait_background_streaming_job_creation_started")
1395                    .await
1396                    .map_err(|_| {
1397                        anyhow!(
1398                            "failed to receive create streaming job result of job: {}",
1399                            stream_job_id
1400                        )
1401                    })??;
1402                Ok(IGNORED_NOTIFICATION_VERSION)
1403            }
1404        }
1405    }
1406
1407    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1408    pub async fn drop_object(
1409        &self,
1410        object_type: ObjectType,
1411        object_id: ObjectId,
1412        drop_mode: DropMode,
1413        target_replace_info: Option<ReplaceStreamJobInfo>,
1414    ) -> MetaResult<NotificationVersion> {
1415        let (release_ctx, mut version) = self
1416            .metadata_manager
1417            .catalog_controller
1418            .drop_object(object_type, object_id, drop_mode)
1419            .await?;
1420
1421        if let Some(replace_table_info) = target_replace_info {
1422            let stream_ctx =
1423                StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1424
1425            let ReplaceStreamJobInfo {
1426                mut streaming_job,
1427                fragment_graph,
1428                ..
1429            } = replace_table_info;
1430
1431            let sink_id = if let ObjectType::Sink = object_type {
1432                object_id as _
1433            } else {
1434                panic!("additional replace table event only occurs when dropping sink into table")
1435            };
1436
1437            // Ensure the max parallelism unchanged before replacing table.
1438            let original_max_parallelism = self
1439                .metadata_manager
1440                .get_job_max_parallelism(streaming_job.id().into())
1441                .await?;
1442            let fragment_graph = PbStreamFragmentGraph {
1443                max_parallelism: original_max_parallelism as _,
1444                ..fragment_graph
1445            };
1446
1447            let fragment_graph =
1448                StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1449            streaming_job.set_info_from_graph(&fragment_graph);
1450            let streaming_job = streaming_job;
1451
1452            streaming_job.table().expect("should be table job");
1453
1454            tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1455            let tmp_id = self
1456                .metadata_manager
1457                .catalog_controller
1458                .create_job_catalog_for_replace(
1459                    &streaming_job,
1460                    Some(&stream_ctx),
1461                    fragment_graph.specified_parallelism().as_ref(),
1462                    Some(fragment_graph.max_parallelism()),
1463                )
1464                .await? as u32;
1465
1466            let (ctx, stream_job_fragments) = self
1467                .inject_replace_table_job_for_table_sink(
1468                    tmp_id,
1469                    &self.metadata_manager,
1470                    stream_ctx,
1471                    None,
1472                    None,
1473                    Some(sink_id),
1474                    &streaming_job,
1475                    fragment_graph,
1476                )
1477                .await?;
1478
1479            let result: MetaResult<_> = try {
1480                let replace_upstream = ctx.replace_upstream.clone();
1481
1482                self.metadata_manager
1483                    .catalog_controller
1484                    .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1485                    .await?;
1486
1487                self.stream_manager
1488                    .replace_stream_job(stream_job_fragments, ctx)
1489                    .await?;
1490
1491                replace_upstream
1492            };
1493
1494            version = match result {
1495                Ok(replace_upstream) => {
1496                    let version = self
1497                        .metadata_manager
1498                        .catalog_controller
1499                        .finish_replace_streaming_job(
1500                            tmp_id as _,
1501                            streaming_job,
1502                            replace_upstream,
1503                            SinkIntoTableContext {
1504                                creating_sink_id: None,
1505                                dropping_sink_id: Some(sink_id),
1506                                updated_sink_catalogs: vec![],
1507                            },
1508                            None, // no source is dropped when dropping sink into table
1509                            None, // no auto refresh schema sink when dropping sink into table
1510                        )
1511                        .await?;
1512                    Ok(version)
1513                }
1514                Err(err) => {
1515                    tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1516                    let _ = self.metadata_manager
1517                        .catalog_controller
1518                        .try_abort_replacing_streaming_job(tmp_id as _, None)
1519                        .await
1520                        .inspect_err(|err| {
1521                            tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1522                        });
1523                    Err(err)
1524                }
1525            }?;
1526        }
1527
1528        let ReleaseContext {
1529            database_id,
1530            removed_streaming_job_ids,
1531            removed_state_table_ids,
1532            removed_source_ids,
1533            removed_secret_ids: secret_ids,
1534            removed_source_fragments,
1535            removed_actors,
1536            removed_fragments,
1537        } = release_ctx;
1538
1539        let _guard = self.source_manager.pause_tick().await;
1540        self.stream_manager
1541            .drop_streaming_jobs(
1542                risingwave_common::catalog::DatabaseId::new(database_id as _),
1543                removed_actors.iter().map(|id| *id as _).collect(),
1544                removed_streaming_job_ids,
1545                removed_state_table_ids,
1546                removed_fragments.iter().map(|id| *id as _).collect(),
1547            )
1548            .await;
1549
1550        // clean up sources after dropping streaming jobs.
1551        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1552        self.source_manager
1553            .apply_source_change(SourceChange::DropSource {
1554                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1555            })
1556            .await;
1557
1558        // unregister fragments and actors from source manager.
1559        // FIXME: need also unregister source backfill fragments.
1560        let dropped_source_fragments = removed_source_fragments
1561            .into_iter()
1562            .map(|(source_id, fragments)| {
1563                (
1564                    source_id,
1565                    fragments.into_iter().map(|id| id as u32).collect(),
1566                )
1567            })
1568            .collect();
1569        let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1570        self.source_manager
1571            .apply_source_change(SourceChange::DropMv {
1572                dropped_source_fragments,
1573                dropped_actors,
1574            })
1575            .await;
1576
1577        // remove secrets.
1578        for secret in secret_ids {
1579            LocalSecretManager::global().remove_secret(secret as _);
1580        }
1581        Ok(version)
1582    }
1583
1584    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1585    #[await_tree::instrument(boxed, "replace_streaming_job({streaming_job})")]
1586    pub async fn replace_job(
1587        &self,
1588        mut streaming_job: StreamingJob,
1589        fragment_graph: StreamFragmentGraphProto,
1590    ) -> MetaResult<NotificationVersion> {
1591        match &streaming_job {
1592            StreamingJob::Table(..)
1593            | StreamingJob::Source(..)
1594            | StreamingJob::MaterializedView(..) => {}
1595            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
1596                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1597            }
1598        }
1599
1600        let job_id = streaming_job.id();
1601
1602        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1603        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1604
1605        // Ensure the max parallelism unchanged before replacing table.
1606        let original_max_parallelism = self
1607            .metadata_manager
1608            .get_job_max_parallelism(streaming_job.id().into())
1609            .await?;
1610        let fragment_graph = PbStreamFragmentGraph {
1611            max_parallelism: original_max_parallelism as _,
1612            ..fragment_graph
1613        };
1614
1615        // 1. build fragment graph.
1616        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1617        streaming_job.set_info_from_graph(&fragment_graph);
1618
1619        // make it immutable
1620        let streaming_job = streaming_job;
1621
1622        let auto_refresh_schema_sinks = if let StreamingJob::Table(_, table, _) = &streaming_job {
1623            let auto_refresh_schema_sinks = self
1624                .metadata_manager
1625                .catalog_controller
1626                .get_sink_auto_refresh_schema_from(table.id as _)
1627                .await?;
1628            if !auto_refresh_schema_sinks.is_empty() {
1629                let original_table_columns = self
1630                    .metadata_manager
1631                    .catalog_controller
1632                    .get_table_columns(table.id as _)
1633                    .await?;
1634                // compare column id to find newly added columns
1635                let mut original_table_column_ids: HashSet<_> = original_table_columns
1636                    .iter()
1637                    .map(|col| col.column_id())
1638                    .collect();
1639                let newly_added_columns = table
1640                    .columns
1641                    .iter()
1642                    .filter(|col| {
1643                        !original_table_column_ids.remove(&ColumnId::new(
1644                            col.column_desc.as_ref().unwrap().column_id as _,
1645                        ))
1646                    })
1647                    .map(|col| ColumnCatalog::from(col.clone()))
1648                    .collect_vec();
1649                if !original_table_column_ids.is_empty() {
1650                    return Err(anyhow!("new table columns does not contains all original columns. new: {:?}, original: {:?}, not included: {:?}", table.columns, original_table_columns, original_table_column_ids).into());
1651                }
1652                let mut sinks = Vec::with_capacity(auto_refresh_schema_sinks.len());
1653                for sink in auto_refresh_schema_sinks {
1654                    let sink_job_fragments = self
1655                        .metadata_manager
1656                        .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink.id))
1657                        .await?;
1658                    if sink_job_fragments.fragments.len() != 1 {
1659                        return Err(anyhow!(
1660                            "auto schema refresh sink must have only one fragment, but got {}",
1661                            sink_job_fragments.fragments.len()
1662                        )
1663                        .into());
1664                    }
1665                    let original_sink_fragment =
1666                        sink_job_fragments.fragments.into_values().next().unwrap();
1667                    let (new_sink_fragment, new_sink_columns, new_log_store_table) =
1668                        rewrite_refresh_schema_sink_fragment(
1669                            &original_sink_fragment,
1670                            &sink,
1671                            &newly_added_columns,
1672                            table,
1673                            fragment_graph.table_fragment_id(),
1674                            self.env.id_gen_manager(),
1675                        )?;
1676
1677                    assert_eq!(
1678                        original_sink_fragment.actors.len(),
1679                        new_sink_fragment.actors.len()
1680                    );
1681                    let actor_status = (0..original_sink_fragment.actors.len())
1682                        .map(|i| {
1683                            let worker_node_id = sink_job_fragments.actor_status
1684                                [&original_sink_fragment.actors[i].actor_id]
1685                                .location
1686                                .as_ref()
1687                                .unwrap()
1688                                .worker_node_id;
1689                            (
1690                                new_sink_fragment.actors[i].actor_id,
1691                                PbActorStatus {
1692                                    location: Some(PbActorLocation { worker_node_id }),
1693                                    state: PbActorState::Inactive as _,
1694                                },
1695                            )
1696                        })
1697                        .collect();
1698
1699                    let streaming_job = StreamingJob::Sink(sink, None);
1700
1701                    let tmp_sink_id = self
1702                        .metadata_manager
1703                        .catalog_controller
1704                        .create_job_catalog_for_replace(&streaming_job, None, None, None)
1705                        .await?;
1706                    let StreamingJob::Sink(sink, _) = streaming_job else {
1707                        unreachable!()
1708                    };
1709
1710                    sinks.push(AutoRefreshSchemaSinkContext {
1711                        tmp_sink_id,
1712                        original_sink: sink,
1713                        original_fragment: original_sink_fragment,
1714                        new_columns: new_sink_columns,
1715                        new_fragment: new_sink_fragment,
1716                        new_log_store_table,
1717                        actor_status,
1718                    });
1719                }
1720                Some(sinks)
1721            } else {
1722                None
1723            }
1724        } else {
1725            None
1726        };
1727
1728        let tmp_id = self
1729            .metadata_manager
1730            .catalog_controller
1731            .create_job_catalog_for_replace(
1732                &streaming_job,
1733                Some(&ctx),
1734                fragment_graph.specified_parallelism().as_ref(),
1735                Some(fragment_graph.max_parallelism()),
1736            )
1737            .await?;
1738
1739        let tmp_sink_ids = auto_refresh_schema_sinks
1740            .as_ref()
1741            .map(|sinks| sinks.iter().map(|sink| sink.tmp_sink_id).collect_vec());
1742
1743        tracing::debug!(id = job_id, "building replace streaming job");
1744        let mut updated_sink_catalogs = vec![];
1745
1746        let mut drop_table_connector_ctx = None;
1747        let result: MetaResult<_> = try {
1748            let (mut ctx, mut stream_job_fragments) = self
1749                .build_replace_job(
1750                    ctx,
1751                    &streaming_job,
1752                    fragment_graph,
1753                    tmp_id as _,
1754                    auto_refresh_schema_sinks,
1755                )
1756                .await?;
1757            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1758            let auto_refresh_schema_sink_finish_ctx =
1759                ctx.auto_refresh_schema_sinks.as_ref().map(|sinks| {
1760                    sinks
1761                        .iter()
1762                        .map(|sink| FinishAutoRefreshSchemaSinkContext {
1763                            tmp_sink_id: sink.tmp_sink_id,
1764                            original_sink_id: sink.original_sink.id as _,
1765                            columns: sink.new_columns.clone(),
1766                            new_log_store_table: sink
1767                                .new_log_store_table
1768                                .as_ref()
1769                                .map(|table| (table.id as _, table.columns.clone())),
1770                        })
1771                        .collect()
1772                });
1773
1774            // Handle sinks that sink into the table.
1775            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1776                let catalogs = self
1777                    .metadata_manager
1778                    .get_sink_catalog_by_ids(&table.incoming_sinks)
1779                    .await?;
1780
1781                for sink in catalogs {
1782                    let sink_id = &sink.id;
1783
1784                    let sink_table_fragments = self
1785                        .metadata_manager
1786                        .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1787                            *sink_id,
1788                        ))
1789                        .await?;
1790
1791                    let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1792
1793                    Self::inject_replace_table_plan_for_sink(
1794                        *sink_id,
1795                        &sink_fragment,
1796                        table,
1797                        &mut ctx,
1798                        stream_job_fragments.inner.union_fragment_for_table(),
1799                        Some(&sink.unique_identity()),
1800                    );
1801
1802                    if sink.original_target_columns.is_empty() {
1803                        updated_sink_catalogs.push(sink.id as _);
1804                    }
1805                }
1806            }
1807
1808            let replace_upstream = ctx.replace_upstream.clone();
1809
1810            if let Some(sinks) = &ctx.auto_refresh_schema_sinks {
1811                let empty_actor_splits = HashMap::new();
1812                let empty_downstreams = FragmentDownstreamRelation::default();
1813                for sink in sinks {
1814                    self.metadata_manager
1815                        .catalog_controller
1816                        .prepare_streaming_job(
1817                            sink.tmp_sink_id,
1818                            || [&sink.new_fragment].into_iter(),
1819                            &sink.actor_status,
1820                            &empty_actor_splits,
1821                            &empty_downstreams,
1822                            false,
1823                            sink.original_sink.definition.clone(),
1824                            true,
1825                            Some(&sink.original_sink),
1826                            None,
1827                        )
1828                        .await?;
1829                }
1830            }
1831
1832            self.metadata_manager
1833                .catalog_controller
1834                .prepare_stream_job_fragments(&stream_job_fragments, &streaming_job, true)
1835                .await?;
1836
1837            self.stream_manager
1838                .replace_stream_job(stream_job_fragments, ctx)
1839                .await?;
1840            (replace_upstream, auto_refresh_schema_sink_finish_ctx)
1841        };
1842
1843        match result {
1844            Ok((replace_upstream, auto_refresh_schema_sink_finish_ctx)) => {
1845                let version = self
1846                    .metadata_manager
1847                    .catalog_controller
1848                    .finish_replace_streaming_job(
1849                        tmp_id,
1850                        streaming_job,
1851                        replace_upstream,
1852                        SinkIntoTableContext {
1853                            creating_sink_id: None,
1854                            dropping_sink_id: None,
1855                            updated_sink_catalogs,
1856                        },
1857                        drop_table_connector_ctx.as_ref(),
1858                        auto_refresh_schema_sink_finish_ctx,
1859                    )
1860                    .await?;
1861                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1862                    self.source_manager
1863                        .apply_source_change(SourceChange::DropSource {
1864                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1865                        })
1866                        .await;
1867                }
1868                Ok(version)
1869            }
1870            Err(err) => {
1871                tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1872                let _ = self.metadata_manager
1873                    .catalog_controller
1874                    .try_abort_replacing_streaming_job(tmp_id, tmp_sink_ids)
1875                    .await.inspect_err(|err| {
1876                    tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1877                });
1878                Err(err)
1879            }
1880        }
1881    }
1882
1883    #[await_tree::instrument(boxed, "drop_streaming_job{}({job_id})", if let DropMode::Cascade = drop_mode { "_cascade" } else { "" }
1884    )]
1885    async fn drop_streaming_job(
1886        &self,
1887        job_id: StreamingJobId,
1888        drop_mode: DropMode,
1889        target_replace_info: Option<ReplaceStreamJobInfo>,
1890    ) -> MetaResult<NotificationVersion> {
1891        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1892
1893        let (object_id, object_type) = match job_id {
1894            StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1895            StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1896            StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1897            StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1898        };
1899
1900        let version = self
1901            .drop_object(object_type, object_id, drop_mode, target_replace_info)
1902            .await?;
1903        #[cfg(not(madsim))]
1904        if let StreamingJobId::Sink(sink_id) = job_id {
1905            // delete system table for exactly once iceberg sink
1906            // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1907            let db = self.env.meta_store_ref().conn.clone();
1908            clean_all_rows_by_sink_id(&db, sink_id).await?;
1909        }
1910        Ok(version)
1911    }
1912
1913    /// Resolve the parallelism of the stream job based on the given information.
1914    ///
1915    /// Returns error if user specifies a parallelism that cannot be satisfied.
1916    fn resolve_stream_parallelism(
1917        &self,
1918        specified: Option<NonZeroUsize>,
1919        max: NonZeroUsize,
1920        cluster_info: &StreamingClusterInfo,
1921        resource_group: String,
1922    ) -> MetaResult<NonZeroUsize> {
1923        let available = cluster_info.parallelism(&resource_group);
1924        let Some(available) = NonZeroUsize::new(available) else {
1925            bail_unavailable!(
1926                "no available slots to schedule in resource group \"{}\", \
1927                 have you allocated any compute nodes within this resource group?",
1928                resource_group
1929            );
1930        };
1931
1932        if let Some(specified) = specified {
1933            if specified > max {
1934                bail_invalid_parameter!(
1935                    "specified parallelism {} should not exceed max parallelism {}",
1936                    specified,
1937                    max,
1938                );
1939            }
1940            if specified > available {
1941                bail_unavailable!(
1942                    "insufficient parallelism to schedule in resource group \"{}\", \
1943                     required: {}, available: {}",
1944                    resource_group,
1945                    specified,
1946                    available,
1947                );
1948            }
1949            Ok(specified)
1950        } else {
1951            // Use configured parallelism if no default parallelism is specified.
1952            let default_parallelism = match self.env.opts.default_parallelism {
1953                DefaultParallelism::Full => available,
1954                DefaultParallelism::Default(num) => {
1955                    if num > available {
1956                        bail_unavailable!(
1957                            "insufficient parallelism to schedule in resource group \"{}\", \
1958                            required: {}, available: {}",
1959                            resource_group,
1960                            num,
1961                            available,
1962                        );
1963                    }
1964                    num
1965                }
1966            };
1967
1968            if default_parallelism > max {
1969                tracing::warn!(
1970                    max_parallelism = max.get(),
1971                    resource_group,
1972                    "too many parallelism available, use max parallelism instead",
1973                );
1974            }
1975            Ok(default_parallelism.min(max))
1976        }
1977    }
1978
1979    /// Builds the actor graph:
1980    /// - Add the upstream fragments to the fragment graph
1981    /// - Schedule the fragments based on their distribution
1982    /// - Expand each fragment into one or several actors
1983    /// - Construct the fragment level backfill order control.
1984    #[await_tree::instrument]
1985    pub(crate) async fn build_stream_job(
1986        &self,
1987        stream_ctx: StreamContext,
1988        mut stream_job: StreamingJob,
1989        fragment_graph: StreamFragmentGraph,
1990        affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1991        specific_resource_group: Option<String>,
1992    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1993        let id = stream_job.id();
1994        let specified_parallelism = fragment_graph.specified_parallelism();
1995        let expr_context = stream_ctx.to_expr_context();
1996        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1997
1998        // 1. Fragment Level ordering graph
1999        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
2000
2001        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
2002        // contains all information needed for building the actor graph.
2003
2004        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
2005            fragment_graph.collect_snapshot_backfill_info()?;
2006        assert!(
2007            snapshot_backfill_info
2008                .iter()
2009                .chain([&cross_db_snapshot_backfill_info])
2010                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
2011                .all(|backfill_epoch| backfill_epoch.is_none()),
2012            "should not set backfill epoch when initially build the job: {:?} {:?}",
2013            snapshot_backfill_info,
2014            cross_db_snapshot_backfill_info
2015        );
2016
2017        // check if log store exists for all cross-db upstreams
2018        self.metadata_manager
2019            .catalog_controller
2020            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
2021            .await?;
2022
2023        let upstream_table_ids = fragment_graph
2024            .dependent_table_ids()
2025            .iter()
2026            .filter(|id| {
2027                !cross_db_snapshot_backfill_info
2028                    .upstream_mv_table_id_to_backfill_epoch
2029                    .contains_key(id)
2030            })
2031            .cloned()
2032            .collect();
2033
2034        let (upstream_root_fragments, existing_actor_location) = self
2035            .metadata_manager
2036            .get_upstream_root_fragments(&upstream_table_ids)
2037            .await?;
2038
2039        if snapshot_backfill_info.is_some() {
2040            match stream_job {
2041                StreamingJob::MaterializedView(_)
2042                | StreamingJob::Sink(_, _)
2043                | StreamingJob::Index(_, _) => {}
2044                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
2045                    return Err(
2046                        anyhow!("snapshot_backfill not enabled for table and source").into(),
2047                    );
2048                }
2049            }
2050        }
2051
2052        let upstream_actors = upstream_root_fragments
2053            .values()
2054            .map(|fragment| {
2055                (
2056                    fragment.fragment_id,
2057                    fragment.actors.iter().map(|actor| actor.actor_id).collect(),
2058                )
2059            })
2060            .collect();
2061
2062        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
2063            fragment_graph,
2064            upstream_root_fragments,
2065            existing_actor_location,
2066            (&stream_job).into(),
2067        )?;
2068
2069        let resource_group = match specific_resource_group {
2070            None => {
2071                self.metadata_manager
2072                    .get_database_resource_group(stream_job.database_id() as ObjectId)
2073                    .await?
2074            }
2075            Some(resource_group) => resource_group,
2076        };
2077
2078        // 3. Build the actor graph.
2079        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2080
2081        let parallelism = self.resolve_stream_parallelism(
2082            specified_parallelism,
2083            max_parallelism,
2084            &cluster_info,
2085            resource_group.clone(),
2086        )?;
2087
2088        let parallelism = self
2089            .env
2090            .system_params_reader()
2091            .await
2092            .adaptive_parallelism_strategy()
2093            .compute_target_parallelism(parallelism.get());
2094
2095        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
2096        let actor_graph_builder = ActorGraphBuilder::new(
2097            id,
2098            resource_group,
2099            complete_graph,
2100            cluster_info,
2101            parallelism,
2102        )?;
2103
2104        let ActorGraphBuildResult {
2105            graph,
2106            downstream_fragment_relations,
2107            building_locations,
2108            upstream_fragment_downstreams,
2109            new_no_shuffle,
2110            replace_upstream,
2111            ..
2112        } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
2113        assert!(replace_upstream.is_empty());
2114
2115        // 4. Build the table fragments structure that will be persisted in the stream manager,
2116        // and the context that contains all information needed for building the
2117        // actors on the compute nodes.
2118
2119        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
2120        // Otherwise, it defaults to FIXED based on deduction.
2121        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
2122            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
2123            _ => TableParallelism::Fixed(parallelism.get()),
2124        };
2125
2126        let stream_job_fragments = StreamJobFragments::new(
2127            id.into(),
2128            graph,
2129            &building_locations.actor_locations,
2130            stream_ctx.clone(),
2131            table_parallelism,
2132            max_parallelism.get(),
2133        );
2134
2135        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
2136            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
2137        }
2138
2139        let replace_table_job_info = match affected_table_replace_info {
2140            Some((table_stream_job, fragment_graph)) => {
2141                if snapshot_backfill_info.is_some() {
2142                    return Err(anyhow!(
2143                        "snapshot backfill should not have replace table info: {table_stream_job:?}"
2144                    )
2145                    .into());
2146                }
2147                let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
2148                    bail!("additional replace table event only occurs when sinking into table");
2149                };
2150
2151                table_stream_job.table().expect("should be table job");
2152                let tmp_id = self
2153                    .metadata_manager
2154                    .catalog_controller
2155                    .create_job_catalog_for_replace(
2156                        &table_stream_job,
2157                        Some(&stream_ctx),
2158                        fragment_graph.specified_parallelism().as_ref(),
2159                        Some(fragment_graph.max_parallelism()),
2160                    )
2161                    .await? as u32;
2162
2163                let (context, table_fragments) = self
2164                    .inject_replace_table_job_for_table_sink(
2165                        tmp_id,
2166                        &self.metadata_manager,
2167                        stream_ctx,
2168                        Some(sink),
2169                        Some(&stream_job_fragments),
2170                        None,
2171                        &table_stream_job,
2172                        fragment_graph,
2173                    )
2174                    .await?;
2175                // When sinking into table occurs, some variables of the target table may be modified,
2176                // such as `fragment_id` being altered by `prepare_replace_table`.
2177                // At this point, it’s necessary to update the table info carried with the sink.
2178                must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
2179                    // The StreamingJob in ReplaceTableInfo must be StreamingJob::Table
2180                    *target_table = Some((table.clone(), source.clone()));
2181                });
2182
2183                Some((table_stream_job, context, table_fragments))
2184            }
2185            None => None,
2186        };
2187
2188        let ctx = CreateStreamingJobContext {
2189            upstream_fragment_downstreams,
2190            new_no_shuffle,
2191            upstream_actors,
2192            building_locations,
2193            definition: stream_job.definition(),
2194            mv_table_id: stream_job.mv_table(),
2195            create_type: stream_job.create_type(),
2196            job_type: (&stream_job).into(),
2197            streaming_job: stream_job,
2198            replace_table_job_info,
2199            option: CreateStreamingJobOption {},
2200            snapshot_backfill_info,
2201            cross_db_snapshot_backfill_info,
2202            fragment_backfill_ordering,
2203        };
2204
2205        Ok((
2206            ctx,
2207            StreamJobFragmentsToCreate {
2208                inner: stream_job_fragments,
2209                downstreams: downstream_fragment_relations,
2210            },
2211        ))
2212    }
2213
2214    /// `build_replace_table` builds a job replacement and returns the context and new job
2215    /// fragments.
2216    ///
2217    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
2218    /// replacement is finished.
2219    pub(crate) async fn build_replace_job(
2220        &self,
2221        stream_ctx: StreamContext,
2222        stream_job: &StreamingJob,
2223        mut fragment_graph: StreamFragmentGraph,
2224        tmp_job_id: TableId,
2225        auto_refresh_schema_sinks: Option<Vec<AutoRefreshSchemaSinkContext>>,
2226    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
2227        match &stream_job {
2228            StreamingJob::Table(..)
2229            | StreamingJob::Source(..)
2230            | StreamingJob::MaterializedView(..) => {}
2231            StreamingJob::Sink(..) | StreamingJob::Index(..) => {
2232                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
2233            }
2234        }
2235
2236        let id = stream_job.id();
2237        let expr_context = stream_ctx.to_expr_context();
2238
2239        // check if performing drop table connector
2240        let mut drop_table_associated_source_id = None;
2241        if let StreamingJob::Table(None, _, _) = &stream_job {
2242            drop_table_associated_source_id = self
2243                .metadata_manager
2244                .get_table_associated_source_id(id as _)
2245                .await?;
2246        }
2247
2248        let old_fragments = self
2249            .metadata_manager
2250            .get_job_fragments_by_id(&id.into())
2251            .await?;
2252        let old_internal_table_ids = old_fragments.internal_table_ids();
2253
2254        // handle drop table's associated source
2255        let mut drop_table_connector_ctx = None;
2256        if let Some(to_remove_source_id) = drop_table_associated_source_id {
2257            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
2258            debug_assert!(old_internal_table_ids.len() == 1);
2259
2260            drop_table_connector_ctx = Some(DropTableConnectorContext {
2261                // we do not remove the original table catalog as it's still needed for the streaming job
2262                // just need to remove the ref to the state table
2263                to_change_streaming_job_id: id as i32,
2264                to_remove_state_table_id: old_internal_table_ids[0] as i32, // asserted before
2265                to_remove_source_id,
2266            });
2267        } else if stream_job.is_materialized_view() {
2268            // If it's ALTER MV, use `state::match` to match the internal tables, which is more complicated
2269            // but more robust.
2270            let old_fragments_upstreams = self
2271                .metadata_manager
2272                .catalog_controller
2273                .upstream_fragments(old_fragments.fragment_ids())
2274                .await?;
2275
2276            let old_state_graph =
2277                state_match::Graph::from_existing(&old_fragments, &old_fragments_upstreams);
2278            let new_state_graph = state_match::Graph::from_building(&fragment_graph);
2279            let mapping =
2280                state_match::match_graph_internal_tables(&new_state_graph, &old_state_graph)
2281                    .context("incompatible altering on the streaming job states")?;
2282
2283            fragment_graph.fit_internal_table_ids_with_mapping(mapping);
2284        } else {
2285            // If it's ALTER TABLE or SOURCE, use a trivial table id matching algorithm to keep the original behavior.
2286            // TODO(alter-mv): this is actually a special case of ALTER MV, can we merge the two branches?
2287            let old_internal_tables = self
2288                .metadata_manager
2289                .get_table_catalog_by_ids(old_internal_table_ids)
2290                .await?;
2291            fragment_graph.fit_internal_tables_trivial(old_internal_tables)?;
2292        }
2293
2294        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
2295        // graph that contains all information needed for building the actor graph.
2296        let original_root_fragment = old_fragments
2297            .root_fragment()
2298            .expect("root fragment not found");
2299
2300        let job_type = StreamingJobType::from(stream_job);
2301
2302        // Extract the downstream fragments from the fragment graph.
2303        let (mut downstream_fragments, mut downstream_actor_location) =
2304            self.metadata_manager.get_downstream_fragments(id).await?;
2305
2306        if let Some(auto_refresh_schema_sinks) = &auto_refresh_schema_sinks {
2307            let mut remaining_fragment: HashSet<_> = auto_refresh_schema_sinks
2308                .iter()
2309                .map(|sink| sink.original_fragment.fragment_id)
2310                .collect();
2311            for (_, downstream_fragment) in &mut downstream_fragments {
2312                if let Some(sink) = auto_refresh_schema_sinks.iter().find(|sink| {
2313                    sink.original_fragment.fragment_id == downstream_fragment.fragment_id
2314                }) {
2315                    assert!(remaining_fragment.remove(&downstream_fragment.fragment_id));
2316                    for actor in &downstream_fragment.actors {
2317                        downstream_actor_location.remove(&actor.actor_id);
2318                    }
2319                    for (actor_id, status) in &sink.actor_status {
2320                        downstream_actor_location.insert(
2321                            *actor_id,
2322                            status.location.as_ref().unwrap().worker_node_id as WorkerId,
2323                        );
2324                    }
2325                    *downstream_fragment = sink.new_fragment.clone();
2326                }
2327            }
2328            assert!(remaining_fragment.is_empty());
2329        }
2330
2331        // build complete graph based on the table job type
2332        let complete_graph = match &job_type {
2333            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
2334                CompleteStreamFragmentGraph::with_downstreams(
2335                    fragment_graph,
2336                    original_root_fragment.fragment_id,
2337                    downstream_fragments,
2338                    downstream_actor_location,
2339                    job_type,
2340                )?
2341            }
2342            StreamingJobType::Table(TableJobType::SharedCdcSource)
2343            | StreamingJobType::MaterializedView => {
2344                // CDC tables or materialized views can have upstream jobs as well.
2345                let (upstream_root_fragments, upstream_actor_location) = self
2346                    .metadata_manager
2347                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
2348                    .await?;
2349
2350                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
2351                    fragment_graph,
2352                    upstream_root_fragments,
2353                    upstream_actor_location,
2354                    original_root_fragment.fragment_id,
2355                    downstream_fragments,
2356                    downstream_actor_location,
2357                    job_type,
2358                )?
2359            }
2360            _ => unreachable!(),
2361        };
2362
2363        let resource_group = self
2364            .metadata_manager
2365            .get_existing_job_resource_group(id as ObjectId)
2366            .await?;
2367
2368        // 2. Build the actor graph.
2369        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
2370
2371        // XXX: what is this parallelism?
2372        // Is it "assigned parallelism"?
2373        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
2374            .expect("The number of actors in the original table fragment should be greater than 0");
2375
2376        let actor_graph_builder = ActorGraphBuilder::new(
2377            id,
2378            resource_group,
2379            complete_graph,
2380            cluster_info,
2381            parallelism,
2382        )?;
2383
2384        let ActorGraphBuildResult {
2385            graph,
2386            downstream_fragment_relations,
2387            building_locations,
2388            upstream_fragment_downstreams,
2389            mut replace_upstream,
2390            new_no_shuffle,
2391            ..
2392        } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
2393
2394        // general table & source does not have upstream job, so the dispatchers should be empty
2395        if matches!(
2396            job_type,
2397            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
2398        ) {
2399            assert!(upstream_fragment_downstreams.is_empty());
2400        }
2401
2402        // 3. Build the table fragments structure that will be persisted in the stream manager, and
2403        // the context that contains all information needed for building the actors on the compute
2404        // nodes.
2405        let stream_job_fragments = StreamJobFragments::new(
2406            (tmp_job_id as u32).into(),
2407            graph,
2408            &building_locations.actor_locations,
2409            stream_ctx,
2410            old_fragments.assigned_parallelism,
2411            old_fragments.max_parallelism,
2412        );
2413
2414        if let Some(sinks) = &auto_refresh_schema_sinks {
2415            for sink in sinks {
2416                replace_upstream
2417                    .remove(&sink.new_fragment.fragment_id)
2418                    .expect("should exist");
2419            }
2420        }
2421
2422        // Note: no need to set `vnode_count` as it's already set by the frontend.
2423        // See `get_replace_table_plan`.
2424
2425        let ctx = ReplaceStreamJobContext {
2426            old_fragments,
2427            replace_upstream,
2428            new_no_shuffle,
2429            upstream_fragment_downstreams,
2430            building_locations,
2431            streaming_job: stream_job.clone(),
2432            tmp_id: tmp_job_id as _,
2433            drop_table_connector_ctx,
2434            auto_refresh_schema_sinks,
2435        };
2436
2437        Ok((
2438            ctx,
2439            StreamJobFragmentsToCreate {
2440                inner: stream_job_fragments,
2441                downstreams: downstream_fragment_relations,
2442            },
2443        ))
2444    }
2445
2446    async fn alter_name(
2447        &self,
2448        relation: alter_name_request::Object,
2449        new_name: &str,
2450    ) -> MetaResult<NotificationVersion> {
2451        let (obj_type, id) = match relation {
2452            alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2453            alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2454            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2455            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2456            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2457            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2458            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2459            alter_name_request::Object::SubscriptionId(id) => {
2460                (ObjectType::Subscription, id as ObjectId)
2461            }
2462        };
2463        self.metadata_manager
2464            .catalog_controller
2465            .alter_name(obj_type, id, new_name)
2466            .await
2467    }
2468
2469    async fn alter_swap_rename(
2470        &self,
2471        object: alter_swap_rename_request::Object,
2472    ) -> MetaResult<NotificationVersion> {
2473        let (obj_type, src_id, dst_id) = match object {
2474            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2475            alter_swap_rename_request::Object::Table(objs) => {
2476                let (src_id, dst_id) = (
2477                    objs.src_object_id as ObjectId,
2478                    objs.dst_object_id as ObjectId,
2479                );
2480                (ObjectType::Table, src_id, dst_id)
2481            }
2482            alter_swap_rename_request::Object::View(objs) => {
2483                let (src_id, dst_id) = (
2484                    objs.src_object_id as ObjectId,
2485                    objs.dst_object_id as ObjectId,
2486                );
2487                (ObjectType::View, src_id, dst_id)
2488            }
2489            alter_swap_rename_request::Object::Source(objs) => {
2490                let (src_id, dst_id) = (
2491                    objs.src_object_id as ObjectId,
2492                    objs.dst_object_id as ObjectId,
2493                );
2494                (ObjectType::Source, src_id, dst_id)
2495            }
2496            alter_swap_rename_request::Object::Sink(objs) => {
2497                let (src_id, dst_id) = (
2498                    objs.src_object_id as ObjectId,
2499                    objs.dst_object_id as ObjectId,
2500                );
2501                (ObjectType::Sink, src_id, dst_id)
2502            }
2503            alter_swap_rename_request::Object::Subscription(objs) => {
2504                let (src_id, dst_id) = (
2505                    objs.src_object_id as ObjectId,
2506                    objs.dst_object_id as ObjectId,
2507                );
2508                (ObjectType::Subscription, src_id, dst_id)
2509            }
2510        };
2511
2512        self.metadata_manager
2513            .catalog_controller
2514            .alter_swap_rename(obj_type, src_id, dst_id)
2515            .await
2516    }
2517
2518    async fn alter_owner(
2519        &self,
2520        object: Object,
2521        owner_id: UserId,
2522    ) -> MetaResult<NotificationVersion> {
2523        let (obj_type, id) = match object {
2524            Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2525            Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2526            Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2527            Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2528            Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2529            Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2530            Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2531            Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2532        };
2533        self.metadata_manager
2534            .catalog_controller
2535            .alter_owner(obj_type, id, owner_id as _)
2536            .await
2537    }
2538
2539    async fn alter_set_schema(
2540        &self,
2541        object: alter_set_schema_request::Object,
2542        new_schema_id: SchemaId,
2543    ) -> MetaResult<NotificationVersion> {
2544        let (obj_type, id) = match object {
2545            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2546            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2547            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2548            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2549            alter_set_schema_request::Object::FunctionId(id) => {
2550                (ObjectType::Function, id as ObjectId)
2551            }
2552            alter_set_schema_request::Object::ConnectionId(id) => {
2553                (ObjectType::Connection, id as ObjectId)
2554            }
2555            alter_set_schema_request::Object::SubscriptionId(id) => {
2556                (ObjectType::Subscription, id as ObjectId)
2557            }
2558        };
2559        self.metadata_manager
2560            .catalog_controller
2561            .alter_schema(obj_type, id, new_schema_id as _)
2562            .await
2563    }
2564
2565    pub async fn wait(&self) -> MetaResult<()> {
2566        let timeout_ms = 30 * 60 * 1000;
2567        for _ in 0..timeout_ms {
2568            if self
2569                .metadata_manager
2570                .catalog_controller
2571                .list_background_creating_jobs(true)
2572                .await?
2573                .is_empty()
2574            {
2575                return Ok(());
2576            }
2577
2578            sleep(Duration::from_millis(1)).await;
2579        }
2580        Err(MetaError::cancelled(format!(
2581            "timeout after {timeout_ms}ms"
2582        )))
2583    }
2584
2585    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2586        self.metadata_manager
2587            .catalog_controller
2588            .comment_on(comment)
2589            .await
2590    }
2591}
2592
2593fn report_create_object(
2594    catalog_id: u32,
2595    event_name: &str,
2596    obj_type: PbTelemetryDatabaseObject,
2597    connector_name: Option<String>,
2598    attr_info: Option<jsonbb::Value>,
2599) {
2600    report_event(
2601        PbTelemetryEventStage::CreateStreamJob,
2602        event_name,
2603        catalog_id.into(),
2604        connector_name,
2605        Some(obj_type),
2606        attr_info,
2607    );
2608}
2609
2610async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2611    match Entity::delete_many()
2612        .filter(Column::SinkId.eq(sink_id))
2613        .exec(db)
2614        .await
2615    {
2616        Ok(result) => {
2617            let deleted_count = result.rows_affected;
2618
2619            tracing::info!(
2620                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2621                deleted_count,
2622                sink_id
2623            );
2624            Ok(())
2625        }
2626        Err(e) => {
2627            tracing::error!(
2628                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2629                sink_id,
2630                e.as_report()
2631            );
2632            Err(e.into())
2633        }
2634    }
2635}