risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::{Context, anyhow};
22use itertools::Itertools;
23use risingwave_common::config::DefaultParallelism;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::stream_graph_visitor::{
29    visit_stream_node, visit_stream_node_cont_mut,
30};
31use risingwave_common::{bail, bail_not_implemented, must_match};
32use risingwave_connector::WithOptionsSecResolved;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::source::{
35    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
36};
37use risingwave_meta_model::actor_dispatcher::DispatcherType;
38use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
39use risingwave_meta_model::object::ObjectType;
40use risingwave_meta_model::{
41    ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId,
42    SubscriptionId, TableId, UserId, ViewId,
43};
44use risingwave_pb::catalog::{
45    Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
46    Subscription, Table, View,
47};
48use risingwave_pb::ddl_service::alter_owner_request::Object;
49use risingwave_pb::ddl_service::{
50    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
51    alter_swap_rename_request,
52};
53use risingwave_pb::stream_plan::stream_node::NodeBody;
54use risingwave_pb::stream_plan::{
55    FragmentTypeFlag, MergeNode, PbDispatcherType, PbStreamFragmentGraph,
56    StreamFragmentGraph as StreamFragmentGraphProto,
57};
58use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
59use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
60use thiserror_ext::AsReport;
61use tokio::sync::{Semaphore, oneshot};
62use tokio::time::sleep;
63use tracing::Instrument;
64
65use crate::barrier::BarrierManagerRef;
66use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
67use crate::controller::cluster::StreamingClusterInfo;
68use crate::controller::streaming_job::SinkIntoTableContext;
69use crate::error::{bail_invalid_parameter, bail_unavailable};
70use crate::manager::{
71    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
72    NotificationVersion, StreamingJob, StreamingJobType,
73};
74use crate::model::{
75    DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
76    StreamJobFragmentsToCreate, TableParallelism,
77};
78use crate::stream::{
79    ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
80    CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
81    JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
82    StreamFragmentGraph, create_source_worker, validate_sink,
83};
84use crate::telemetry::report_event;
85use crate::{MetaError, MetaResult};
86
87#[derive(PartialEq)]
88pub enum DropMode {
89    Restrict,
90    Cascade,
91}
92
93impl DropMode {
94    pub fn from_request_setting(cascade: bool) -> DropMode {
95        if cascade {
96            DropMode::Cascade
97        } else {
98            DropMode::Restrict
99        }
100    }
101}
102
103pub enum StreamingJobId {
104    MaterializedView(TableId),
105    Sink(SinkId),
106    Table(Option<SourceId>, TableId),
107    Index(IndexId),
108}
109
110impl StreamingJobId {
111    #[allow(dead_code)]
112    fn id(&self) -> TableId {
113        match self {
114            StreamingJobId::MaterializedView(id)
115            | StreamingJobId::Sink(id)
116            | StreamingJobId::Table(_, id)
117            | StreamingJobId::Index(id) => *id,
118        }
119    }
120}
121
122/// It’s used to describe the information of the job that needs to be replaced
123/// and it will be used during replacing table and creating sink into table operations.
124pub struct ReplaceStreamJobInfo {
125    pub streaming_job: StreamingJob,
126    pub fragment_graph: StreamFragmentGraphProto,
127    pub col_index_mapping: Option<ColIndexMapping>,
128}
129
130pub enum DdlCommand {
131    CreateDatabase(Database),
132    DropDatabase(DatabaseId),
133    CreateSchema(Schema),
134    DropSchema(SchemaId, DropMode),
135    CreateNonSharedSource(Source),
136    DropSource(SourceId, DropMode),
137    CreateFunction(Function),
138    DropFunction(FunctionId),
139    CreateView(View),
140    DropView(ViewId, DropMode),
141    CreateStreamingJob(
142        StreamingJob,
143        StreamFragmentGraphProto,
144        CreateType,
145        Option<ReplaceStreamJobInfo>,
146        HashSet<ObjectId>,
147        Option<String>, // specific resource group
148    ),
149    DropStreamingJob(StreamingJobId, DropMode, Option<ReplaceStreamJobInfo>),
150    AlterName(alter_name_request::Object, String),
151    AlterSwapRename(alter_swap_rename_request::Object),
152    ReplaceStreamJob(ReplaceStreamJobInfo),
153    AlterNonSharedSource(Source),
154    AlterObjectOwner(Object, UserId),
155    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
156    CreateConnection(Connection),
157    DropConnection(ConnectionId),
158    CreateSecret(Secret),
159    AlterSecret(Secret),
160    DropSecret(SecretId),
161    CommentOn(Comment),
162    CreateSubscription(Subscription),
163    DropSubscription(SubscriptionId, DropMode),
164}
165
166impl DdlCommand {
167    fn allow_in_recovery(&self) -> bool {
168        match self {
169            DdlCommand::DropDatabase(_)
170            | DdlCommand::DropSchema(_, _)
171            | DdlCommand::DropSource(_, _)
172            | DdlCommand::DropFunction(_)
173            | DdlCommand::DropView(_, _)
174            | DdlCommand::DropStreamingJob(_, _, _)
175            | DdlCommand::DropConnection(_)
176            | DdlCommand::DropSecret(_)
177            | DdlCommand::DropSubscription(_, _)
178            | DdlCommand::AlterName(_, _)
179            | DdlCommand::AlterObjectOwner(_, _)
180            | DdlCommand::AlterSetSchema(_, _)
181            | DdlCommand::CreateDatabase(_)
182            | DdlCommand::CreateSchema(_)
183            | DdlCommand::CreateFunction(_)
184            | DdlCommand::CreateView(_)
185            | DdlCommand::CreateConnection(_)
186            | DdlCommand::CommentOn(_)
187            | DdlCommand::CreateSecret(_)
188            | DdlCommand::AlterSecret(_)
189            | DdlCommand::AlterSwapRename(_) => true,
190            DdlCommand::CreateStreamingJob(_, _, _, _, _, _)
191            | DdlCommand::CreateNonSharedSource(_)
192            | DdlCommand::ReplaceStreamJob(_)
193            | DdlCommand::AlterNonSharedSource(_)
194            | DdlCommand::CreateSubscription(_) => false,
195        }
196    }
197}
198
199#[derive(Clone)]
200pub struct DdlController {
201    pub(crate) env: MetaSrvEnv,
202
203    pub(crate) metadata_manager: MetadataManager,
204    pub(crate) stream_manager: GlobalStreamManagerRef,
205    pub(crate) source_manager: SourceManagerRef,
206    barrier_manager: BarrierManagerRef,
207
208    // The semaphore is used to limit the number of concurrent streaming job creation.
209    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
210}
211
212#[derive(Clone)]
213pub struct CreatingStreamingJobPermit {
214    pub(crate) semaphore: Arc<Semaphore>,
215}
216
217impl CreatingStreamingJobPermit {
218    async fn new(env: &MetaSrvEnv) -> Self {
219        let mut permits = env
220            .system_params_reader()
221            .await
222            .max_concurrent_creating_streaming_jobs() as usize;
223        if permits == 0 {
224            // if the system parameter is set to zero, use the max permitted value.
225            permits = Semaphore::MAX_PERMITS;
226        }
227        let semaphore = Arc::new(Semaphore::new(permits));
228
229        let (local_notification_tx, mut local_notification_rx) =
230            tokio::sync::mpsc::unbounded_channel();
231        env.notification_manager()
232            .insert_local_sender(local_notification_tx)
233            .await;
234        let semaphore_clone = semaphore.clone();
235        tokio::spawn(async move {
236            while let Some(notification) = local_notification_rx.recv().await {
237                let LocalNotification::SystemParamsChange(p) = &notification else {
238                    continue;
239                };
240                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
241                if new_permits == 0 {
242                    new_permits = Semaphore::MAX_PERMITS;
243                }
244                match permits.cmp(&new_permits) {
245                    Ordering::Less => {
246                        semaphore_clone.add_permits(new_permits - permits);
247                    }
248                    Ordering::Equal => continue,
249                    Ordering::Greater => {
250                        semaphore_clone
251                            .acquire_many((permits - new_permits) as u32)
252                            .await
253                            .unwrap()
254                            .forget();
255                    }
256                }
257                tracing::info!(
258                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
259                    permits,
260                    new_permits
261                );
262                permits = new_permits;
263            }
264        });
265
266        Self { semaphore }
267    }
268}
269
270impl DdlController {
271    pub async fn new(
272        env: MetaSrvEnv,
273        metadata_manager: MetadataManager,
274        stream_manager: GlobalStreamManagerRef,
275        source_manager: SourceManagerRef,
276        barrier_manager: BarrierManagerRef,
277    ) -> Self {
278        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
279        Self {
280            env,
281            metadata_manager,
282            stream_manager,
283            source_manager,
284            barrier_manager,
285            creating_streaming_job_permits,
286        }
287    }
288
289    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
290    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
291    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
292    /// would be a huge hassle and pain if we don't spawn here.
293    ///
294    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
295    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
296        if !command.allow_in_recovery() {
297            self.barrier_manager.check_status_running()?;
298        }
299        let ctrl = self.clone();
300        let fut = async move {
301            match command {
302                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
303                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
304                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
305                DdlCommand::DropSchema(schema_id, drop_mode) => {
306                    ctrl.drop_schema(schema_id, drop_mode).await
307                }
308                DdlCommand::CreateNonSharedSource(source) => {
309                    ctrl.create_non_shared_source(source).await
310                }
311                DdlCommand::DropSource(source_id, drop_mode) => {
312                    ctrl.drop_source(source_id, drop_mode).await
313                }
314                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
315                DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
316                DdlCommand::CreateView(view) => ctrl.create_view(view).await,
317                DdlCommand::DropView(view_id, drop_mode) => {
318                    ctrl.drop_view(view_id, drop_mode).await
319                }
320                DdlCommand::CreateStreamingJob(
321                    stream_job,
322                    fragment_graph,
323                    _create_type,
324                    affected_table_replace_info,
325                    dependencies,
326                    specific_resource_group,
327                ) => {
328                    ctrl.create_streaming_job(
329                        stream_job,
330                        fragment_graph,
331                        affected_table_replace_info,
332                        dependencies,
333                        specific_resource_group,
334                    )
335                    .await
336                }
337                DdlCommand::DropStreamingJob(job_id, drop_mode, target_replace_info) => {
338                    ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
339                        .await
340                }
341                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
342                    streaming_job,
343                    fragment_graph,
344                    col_index_mapping,
345                }) => {
346                    ctrl.replace_job(streaming_job, fragment_graph, col_index_mapping)
347                        .await
348                }
349                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
350                DdlCommand::AlterObjectOwner(object, owner_id) => {
351                    ctrl.alter_owner(object, owner_id).await
352                }
353                DdlCommand::AlterSetSchema(object, new_schema_id) => {
354                    ctrl.alter_set_schema(object, new_schema_id).await
355                }
356                DdlCommand::CreateConnection(connection) => {
357                    ctrl.create_connection(connection).await
358                }
359                DdlCommand::DropConnection(connection_id) => {
360                    ctrl.drop_connection(connection_id).await
361                }
362                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
363                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
364                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
365                DdlCommand::AlterNonSharedSource(source) => {
366                    ctrl.alter_non_shared_source(source).await
367                }
368                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
369                DdlCommand::CreateSubscription(subscription) => {
370                    ctrl.create_subscription(subscription).await
371                }
372                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
373                    ctrl.drop_subscription(subscription_id, drop_mode).await
374                }
375                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
376            }
377        }
378        .in_current_span();
379        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
380        Ok(Some(WaitVersion {
381            catalog_version: notification_version,
382            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
383        }))
384    }
385
386    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
387        self.barrier_manager.get_ddl_progress().await
388    }
389
390    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
391        self.metadata_manager
392            .catalog_controller
393            .create_database(database)
394            .await
395    }
396
397    #[tracing::instrument(skip(self), level = "debug")]
398    pub async fn reschedule_streaming_job(
399        &self,
400        job_id: u32,
401        target: JobRescheduleTarget,
402        mut deferred: bool,
403    ) -> MetaResult<()> {
404        tracing::info!("alter parallelism");
405        if self.barrier_manager.check_status_running().is_err() {
406            tracing::info!(
407                "alter parallelism is set to deferred mode because the system is in recovery state"
408            );
409            deferred = true;
410        }
411
412        self.stream_manager
413            .reschedule_streaming_job(job_id, target, deferred)
414            .await
415    }
416
417    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
418        self.drop_object(
419            ObjectType::Database,
420            database_id as _,
421            DropMode::Cascade,
422            None,
423        )
424        .await
425    }
426
427    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
428        self.metadata_manager
429            .catalog_controller
430            .create_schema(schema)
431            .await
432    }
433
434    async fn drop_schema(
435        &self,
436        schema_id: SchemaId,
437        drop_mode: DropMode,
438    ) -> MetaResult<NotificationVersion> {
439        self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
440            .await
441    }
442
443    /// Shared source is handled in [`Self::create_streaming_job`]
444    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
445        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
446            .await
447            .context("failed to create source worker")?;
448
449        let (source_id, version) = self
450            .metadata_manager
451            .catalog_controller
452            .create_source(source)
453            .await?;
454        self.source_manager
455            .register_source_with_handle(source_id, handle)
456            .await;
457        Ok(version)
458    }
459
460    async fn drop_source(
461        &self,
462        source_id: SourceId,
463        drop_mode: DropMode,
464    ) -> MetaResult<NotificationVersion> {
465        self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
466            .await
467    }
468
469    /// This replaces the source in the catalog.
470    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
471    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
472        self.metadata_manager
473            .catalog_controller
474            .alter_non_shared_source(source)
475            .await
476    }
477
478    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
479        self.metadata_manager
480            .catalog_controller
481            .create_function(function)
482            .await
483    }
484
485    async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
486        self.drop_object(
487            ObjectType::Function,
488            function_id as _,
489            DropMode::Restrict,
490            None,
491        )
492        .await
493    }
494
495    async fn create_view(&self, view: View) -> MetaResult<NotificationVersion> {
496        self.metadata_manager
497            .catalog_controller
498            .create_view(view)
499            .await
500    }
501
502    async fn drop_view(
503        &self,
504        view_id: ViewId,
505        drop_mode: DropMode,
506    ) -> MetaResult<NotificationVersion> {
507        self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
508            .await
509    }
510
511    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
512        validate_connection(&connection).await?;
513        self.metadata_manager
514            .catalog_controller
515            .create_connection(connection)
516            .await
517    }
518
519    async fn drop_connection(
520        &self,
521        connection_id: ConnectionId,
522    ) -> MetaResult<NotificationVersion> {
523        self.drop_object(
524            ObjectType::Connection,
525            connection_id as _,
526            DropMode::Restrict,
527            None,
528        )
529        .await
530    }
531
532    // The 'secret' part of the request we receive from the frontend is in plaintext;
533    // here, we need to encrypt it before storing it in the catalog.
534    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
535        let secret_store_private_key = self
536            .env
537            .opts
538            .secret_store_private_key
539            .clone()
540            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
541
542        let encrypted_payload = SecretEncryption::encrypt(
543            secret_store_private_key.as_slice(),
544            secret.get_value().as_slice(),
545        )
546        .context(format!("failed to encrypt secret {}", secret.name))?;
547        Ok(encrypted_payload
548            .serialize()
549            .context(format!("failed to serialize secret {}", secret.name))?)
550    }
551
552    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
553        // The 'secret' part of the request we receive from the frontend is in plaintext;
554        // here, we need to encrypt it before storing it in the catalog.
555        let secret_plain_payload = secret.value.clone();
556        let encrypted_payload = self.get_encrypted_payload(&secret)?;
557        secret.value = encrypted_payload;
558
559        self.metadata_manager
560            .catalog_controller
561            .create_secret(secret, secret_plain_payload)
562            .await
563    }
564
565    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
566        self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
567            .await
568    }
569
570    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
571        let secret_plain_payload = secret.value.clone();
572        let encrypted_payload = self.get_encrypted_payload(&secret)?;
573        secret.value = encrypted_payload;
574        self.metadata_manager
575            .catalog_controller
576            .alter_secret(secret, secret_plain_payload)
577            .await
578    }
579
580    async fn create_subscription(
581        &self,
582        mut subscription: Subscription,
583    ) -> MetaResult<NotificationVersion> {
584        tracing::debug!("create subscription");
585        let _permit = self
586            .creating_streaming_job_permits
587            .semaphore
588            .acquire()
589            .await
590            .unwrap();
591        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
592        self.metadata_manager
593            .catalog_controller
594            .create_subscription_catalog(&mut subscription)
595            .await?;
596        self.stream_manager
597            .create_subscription(&subscription)
598            .await
599            .inspect_err(|e| {
600                tracing::debug!(error = %e.as_report(), "cancel create subscription");
601            })?;
602
603        let version = self
604            .metadata_manager
605            .catalog_controller
606            .notify_create_subscription(subscription.id)
607            .await?;
608        tracing::debug!("finish create subscription");
609        Ok(version)
610    }
611
612    async fn drop_subscription(
613        &self,
614        subscription_id: SubscriptionId,
615        drop_mode: DropMode,
616    ) -> MetaResult<NotificationVersion> {
617        tracing::debug!("preparing drop subscription");
618        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
619        let subscription = self
620            .metadata_manager
621            .catalog_controller
622            .get_subscription_by_id(subscription_id)
623            .await?;
624        let table_id = subscription.dependent_table_id;
625        let database_id = subscription.database_id.into();
626        let (_, version) = self
627            .metadata_manager
628            .catalog_controller
629            .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
630            .await?;
631        self.stream_manager
632            .drop_subscription(database_id, subscription_id as _, table_id)
633            .await;
634        tracing::debug!("finish drop subscription");
635        Ok(version)
636    }
637
638    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
639    pub(crate) async fn validate_cdc_table(
640        table: &Table,
641        table_fragments: &StreamJobFragments,
642    ) -> MetaResult<()> {
643        let stream_scan_fragment = table_fragments
644            .fragments
645            .values()
646            .filter(|f| f.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0)
647            .exactly_one()
648            .ok()
649            .with_context(|| {
650                format!(
651                    "expect exactly one stream scan fragment, got: {:?}",
652                    table_fragments.fragments
653                )
654            })?;
655
656        assert_eq!(
657            stream_scan_fragment.actors.len(),
658            1,
659            "Stream scan fragment should have only one actor"
660        );
661        let mut found_cdc_scan = false;
662        match &stream_scan_fragment.nodes.node_body {
663            Some(NodeBody::StreamCdcScan(_)) => {
664                if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
665                    .await?
666                {
667                    found_cdc_scan = true;
668                }
669            }
670            // When there's generated columns, the cdc scan node is wrapped in a project node
671            Some(NodeBody::Project(_)) => {
672                for input in &stream_scan_fragment.nodes.input {
673                    if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
674                        found_cdc_scan = true;
675                    }
676                }
677            }
678            _ => {
679                bail!("Unexpected node body for stream cdc scan");
680            }
681        };
682        if !found_cdc_scan {
683            bail!("No stream cdc scan node found in stream scan fragment");
684        }
685        Ok(())
686    }
687
688    async fn validate_cdc_table_inner(
689        node_body: &Option<NodeBody>,
690        table_id: u32,
691    ) -> MetaResult<bool> {
692        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
693            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
694        {
695            let options_with_secret = WithOptionsSecResolved::new(
696                cdc_table_desc.connect_properties.clone(),
697                cdc_table_desc.secret_refs.clone(),
698            );
699
700            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
701            props.init_from_pb_cdc_table_desc(cdc_table_desc);
702
703            // Try creating a split enumerator to validate
704            let _enumerator = props
705                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
706                .await?;
707
708            tracing::debug!(?table_id, "validate cdc table success");
709            Ok(true)
710        } else {
711            Ok(false)
712        }
713    }
714
715    // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
716    // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
717    // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
718    pub(crate) async fn inject_replace_table_job_for_table_sink(
719        &self,
720        tmp_id: u32,
721        mgr: &MetadataManager,
722        stream_ctx: StreamContext,
723        sink: Option<&Sink>,
724        creating_sink_table_fragments: Option<&StreamJobFragments>,
725        dropping_sink_id: Option<SinkId>,
726        streaming_job: &StreamingJob,
727        fragment_graph: StreamFragmentGraph,
728    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
729        let (mut replace_table_ctx, mut stream_job_fragments) = self
730            .build_replace_job(stream_ctx, streaming_job, fragment_graph, None, tmp_id as _)
731            .await?;
732
733        let target_table = streaming_job.table().unwrap();
734
735        if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
736            let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
737            let sink = sink.expect("sink not found");
738            Self::inject_replace_table_plan_for_sink(
739                sink.id,
740                &sink_fragment,
741                target_table,
742                &mut replace_table_ctx,
743                stream_job_fragments.inner.union_fragment_for_table(),
744                None,
745            );
746        }
747
748        let [table_catalog]: [_; 1] = mgr
749            .get_table_catalog_by_ids(vec![target_table.id])
750            .await?
751            .try_into()
752            .expect("Target table should exist in sink into table");
753
754        assert_eq!(table_catalog.incoming_sinks, target_table.incoming_sinks);
755
756        {
757            let catalogs = mgr
758                .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
759                .await?;
760
761            for sink in catalogs {
762                let sink_id = sink.id;
763
764                if let Some(dropping_sink_id) = dropping_sink_id
765                    && sink_id == (dropping_sink_id as u32)
766                {
767                    continue;
768                };
769
770                let sink_table_fragments = mgr
771                    .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
772                    .await?;
773
774                let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
775
776                Self::inject_replace_table_plan_for_sink(
777                    sink_id,
778                    &sink_fragment,
779                    target_table,
780                    &mut replace_table_ctx,
781                    stream_job_fragments.inner.union_fragment_for_table(),
782                    Some(&sink.unique_identity()),
783                );
784            }
785        }
786
787        // check if the union fragment is fully assigned.
788        for fragment in stream_job_fragments.fragments.values() {
789            {
790                {
791                    visit_stream_node(&fragment.nodes, |node| {
792                        if let NodeBody::Merge(merge_node) = node {
793                            let upstream_fragment_id = merge_node.upstream_fragment_id;
794                            if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
795                                .upstream_fragment_downstreams
796                                .get(&upstream_fragment_id)
797                            {
798                                let mut upstream_fragment_downstreams =
799                                    external_upstream_fragment_downstreams.iter().filter(
800                                        |downstream| {
801                                            downstream.downstream_fragment_id
802                                                == fragment.fragment_id
803                                        },
804                                    );
805                                assert!(
806                                    upstream_fragment_downstreams.next().is_some(),
807                                    "All the mergers for the union should have been fully assigned beforehand."
808                                );
809                            } else {
810                                let mut upstream_fragment_downstreams = stream_job_fragments
811                                    .downstreams
812                                    .get(&upstream_fragment_id)
813                                    .into_iter()
814                                    .flatten()
815                                    .filter(|downstream| {
816                                        downstream.downstream_fragment_id == fragment.fragment_id
817                                    });
818                                assert!(
819                                    upstream_fragment_downstreams.next().is_some(),
820                                    "All the mergers for the union should have been fully assigned beforehand."
821                                );
822                            }
823                        }
824                    });
825                }
826            }
827        }
828
829        Ok((replace_table_ctx, stream_job_fragments))
830    }
831
832    pub(crate) fn inject_replace_table_plan_for_sink(
833        sink_id: u32,
834        sink_fragment: &Fragment,
835        table: &Table,
836        replace_table_ctx: &mut ReplaceStreamJobContext,
837        union_fragment: &mut Fragment,
838        unique_identity: Option<&str>,
839    ) {
840        let sink_fields = sink_fragment.nodes.fields.clone();
841
842        let output_indices = sink_fields
843            .iter()
844            .enumerate()
845            .map(|(idx, _)| idx as _)
846            .collect_vec();
847
848        let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
849
850        let sink_fragment_downstreams = replace_table_ctx
851            .upstream_fragment_downstreams
852            .entry(sink_fragment.fragment_id)
853            .or_default();
854
855        {
856            sink_fragment_downstreams.push(DownstreamFragmentRelation {
857                downstream_fragment_id: union_fragment.fragment_id,
858                dispatcher_type: DispatcherType::Hash,
859                dist_key_indices: dist_key_indices.clone(),
860                output_indices: output_indices.clone(),
861            });
862        }
863
864        let upstream_fragment_id = sink_fragment.fragment_id;
865
866        {
867            {
868                visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
869                    if let Some(NodeBody::Union(_)) = &mut node.node_body {
870                        for input_project_node in &mut node.input {
871                            if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
872                                let merge_stream_node =
873                                    input_project_node.input.iter_mut().exactly_one().unwrap();
874
875                                // we need to align nodes here
876                                if input_project_node.identity.as_str()
877                                    != unique_identity
878                                        .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
879                                {
880                                    continue;
881                                }
882
883                                if let Some(NodeBody::Merge(merge_node)) =
884                                    &mut merge_stream_node.node_body
885                                {
886                                    {
887                                        merge_stream_node.identity =
888                                            format!("MergeExecutor(from sink {})", sink_id);
889
890                                        input_project_node.identity =
891                                            format!("ProjectExecutor(from sink {})", sink_id);
892                                    }
893
894                                    **merge_node = {
895                                        #[expect(deprecated)]
896                                        MergeNode {
897                                            upstream_actor_id: vec![],
898                                            upstream_fragment_id,
899                                            upstream_dispatcher_type: PbDispatcherType::Hash as _,
900                                            fields: sink_fields.to_vec(),
901                                        }
902                                    };
903
904                                    merge_stream_node.fields = sink_fields.to_vec();
905
906                                    return false;
907                                }
908                            }
909                        }
910                    }
911                    true
912                });
913            }
914        }
915    }
916
917    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
918    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
919    pub async fn create_streaming_job(
920        &self,
921        mut streaming_job: StreamingJob,
922        fragment_graph: StreamFragmentGraphProto,
923        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
924        dependencies: HashSet<ObjectId>,
925        specific_resource_group: Option<String>,
926    ) -> MetaResult<NotificationVersion> {
927        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
928        self.metadata_manager
929            .catalog_controller
930            .create_job_catalog(
931                &mut streaming_job,
932                &ctx,
933                &fragment_graph.parallelism,
934                fragment_graph.max_parallelism as _,
935                dependencies,
936                specific_resource_group.clone(),
937            )
938            .await?;
939        let job_id = streaming_job.id();
940
941        tracing::debug!(
942            id = job_id,
943            definition = streaming_job.definition(),
944            create_type = streaming_job.create_type().as_str_name(),
945            "starting streaming job",
946        );
947        let _permit = self
948            .creating_streaming_job_permits
949            .semaphore
950            .acquire()
951            .await
952            .unwrap();
953        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
954
955        let name = streaming_job.name();
956        let definition = streaming_job.definition();
957        let source_id = match &streaming_job {
958            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
959            _ => None,
960        };
961
962        // create streaming job.
963        match self
964            .create_streaming_job_inner(
965                ctx,
966                streaming_job,
967                fragment_graph,
968                affected_table_replace_info,
969                specific_resource_group,
970            )
971            .await
972        {
973            Ok(version) => Ok(version),
974            Err(err) => {
975                tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
976                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
977                    id: job_id,
978                    name,
979                    definition,
980                    error: err.as_report().to_string(),
981                };
982                self.env.event_log_manager_ref().add_event_logs(vec![
983                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
984                ]);
985                let (aborted, _) = self
986                    .metadata_manager
987                    .catalog_controller
988                    .try_abort_creating_streaming_job(job_id as _, false)
989                    .await?;
990                if aborted {
991                    tracing::warn!(id = job_id, "aborted streaming job");
992                    // FIXME: might also need other cleanup here
993                    if let Some(source_id) = source_id {
994                        self.source_manager
995                            .apply_source_change(SourceChange::DropSource {
996                                dropped_source_ids: vec![source_id as SourceId],
997                            })
998                            .await;
999                    }
1000                }
1001                Err(err)
1002            }
1003        }
1004    }
1005
1006    async fn create_streaming_job_inner(
1007        &self,
1008        ctx: StreamContext,
1009        mut streaming_job: StreamingJob,
1010        fragment_graph: StreamFragmentGraphProto,
1011        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1012        specific_resource_group: Option<String>,
1013    ) -> MetaResult<NotificationVersion> {
1014        let mut fragment_graph =
1015            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1016        streaming_job.set_info_from_graph(&fragment_graph);
1017
1018        // create internal table catalogs and refill table id.
1019        let incomplete_internal_tables = fragment_graph
1020            .incomplete_internal_tables()
1021            .into_values()
1022            .collect_vec();
1023        let table_id_map = self
1024            .metadata_manager
1025            .catalog_controller
1026            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1027            .await?;
1028        fragment_graph.refill_internal_table_ids(table_id_map);
1029
1030        let affected_table_replace_info = match affected_table_replace_info {
1031            Some(replace_table_info) => {
1032                assert!(
1033                    specific_resource_group.is_none(),
1034                    "specific_resource_group is not supported for replace table (alter column or sink into table)"
1035                );
1036
1037                let ReplaceStreamJobInfo {
1038                    mut streaming_job,
1039                    fragment_graph,
1040                    ..
1041                } = replace_table_info;
1042
1043                // Ensure the max parallelism unchanged before replacing table.
1044                let original_max_parallelism = self
1045                    .metadata_manager
1046                    .get_job_max_parallelism(streaming_job.id().into())
1047                    .await?;
1048                let fragment_graph = PbStreamFragmentGraph {
1049                    max_parallelism: original_max_parallelism as _,
1050                    ..fragment_graph
1051                };
1052
1053                let fragment_graph =
1054                    StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1055                streaming_job.set_info_from_graph(&fragment_graph);
1056                let streaming_job = streaming_job;
1057
1058                Some((streaming_job, fragment_graph))
1059            }
1060            None => None,
1061        };
1062
1063        // create fragment and actor catalogs.
1064        tracing::debug!(id = streaming_job.id(), "building streaming job");
1065        let (ctx, stream_job_fragments) = self
1066            .build_stream_job(
1067                ctx,
1068                streaming_job,
1069                fragment_graph,
1070                affected_table_replace_info,
1071                specific_resource_group,
1072            )
1073            .await?;
1074
1075        let streaming_job = &ctx.streaming_job;
1076
1077        match streaming_job {
1078            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1079                Self::validate_cdc_table(table, &stream_job_fragments).await?;
1080            }
1081            StreamingJob::Table(Some(source), ..) => {
1082                // Register the source on the connector node.
1083                self.source_manager.register_source(source).await?;
1084                let connector_name = source
1085                    .get_with_properties()
1086                    .get(UPSTREAM_SOURCE_KEY)
1087                    .cloned();
1088                let attr = source.info.as_ref().map(|source_info| {
1089                    jsonbb::json!({
1090                            "format": source_info.format().as_str_name(),
1091                            "encode": source_info.row_encode().as_str_name(),
1092                    })
1093                });
1094                report_create_object(
1095                    streaming_job.id(),
1096                    "source",
1097                    PbTelemetryDatabaseObject::Source,
1098                    connector_name,
1099                    attr,
1100                );
1101            }
1102            StreamingJob::Sink(sink, _) => {
1103                // Validate the sink on the connector node.
1104                validate_sink(sink).await?;
1105                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1106                let attr = sink.format_desc.as_ref().map(|sink_info| {
1107                    jsonbb::json!({
1108                        "format": sink_info.format().as_str_name(),
1109                        "encode": sink_info.encode().as_str_name(),
1110                    })
1111                });
1112                report_create_object(
1113                    streaming_job.id(),
1114                    "sink",
1115                    PbTelemetryDatabaseObject::Sink,
1116                    connector_name,
1117                    attr,
1118                );
1119            }
1120            StreamingJob::Source(source) => {
1121                // Register the source on the connector node.
1122                self.source_manager.register_source(source).await?;
1123                let connector_name = source
1124                    .get_with_properties()
1125                    .get(UPSTREAM_SOURCE_KEY)
1126                    .cloned();
1127                let attr = source.info.as_ref().map(|source_info| {
1128                    jsonbb::json!({
1129                            "format": source_info.format().as_str_name(),
1130                            "encode": source_info.row_encode().as_str_name(),
1131                    })
1132                });
1133                report_create_object(
1134                    streaming_job.id(),
1135                    "source",
1136                    PbTelemetryDatabaseObject::Source,
1137                    connector_name,
1138                    attr,
1139                );
1140            }
1141            _ => {}
1142        }
1143
1144        self.metadata_manager
1145            .catalog_controller
1146            .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1147            .await?;
1148
1149        // create streaming jobs.
1150        let stream_job_id = streaming_job.id();
1151        match (streaming_job.create_type(), &streaming_job) {
1152            (CreateType::Unspecified, _)
1153            | (CreateType::Foreground, _)
1154            // FIXME(kwannoel): Unify background stream's creation path with MV below.
1155            | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1156                let version = self.stream_manager
1157                    .create_streaming_job(stream_job_fragments, ctx, None)
1158                    .await?;
1159                Ok(version)
1160            }
1161            (CreateType::Background, _) => {
1162                let ctrl = self.clone();
1163                let (tx, rx) = oneshot::channel();
1164                let fut = async move {
1165                    let _ = ctrl
1166                        .stream_manager
1167                        .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1168                        .await.inspect_err(|err| {
1169                        tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1170                    });
1171                };
1172                tokio::spawn(fut);
1173                rx.await.map_err(|_| anyhow!("failed to receive create streaming job result of job: {}", stream_job_id))??;
1174                Ok(IGNORED_NOTIFICATION_VERSION)
1175            }
1176        }
1177    }
1178
1179    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1180    pub async fn drop_object(
1181        &self,
1182        object_type: ObjectType,
1183        object_id: ObjectId,
1184        drop_mode: DropMode,
1185        target_replace_info: Option<ReplaceStreamJobInfo>,
1186    ) -> MetaResult<NotificationVersion> {
1187        let (release_ctx, mut version) = self
1188            .metadata_manager
1189            .catalog_controller
1190            .drop_object(object_type, object_id, drop_mode)
1191            .await?;
1192
1193        if let Some(replace_table_info) = target_replace_info {
1194            let stream_ctx =
1195                StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1196
1197            let ReplaceStreamJobInfo {
1198                mut streaming_job,
1199                fragment_graph,
1200                ..
1201            } = replace_table_info;
1202
1203            let sink_id = if let ObjectType::Sink = object_type {
1204                object_id as _
1205            } else {
1206                panic!("additional replace table event only occurs when dropping sink into table")
1207            };
1208
1209            // Ensure the max parallelism unchanged before replacing table.
1210            let original_max_parallelism = self
1211                .metadata_manager
1212                .get_job_max_parallelism(streaming_job.id().into())
1213                .await?;
1214            let fragment_graph = PbStreamFragmentGraph {
1215                max_parallelism: original_max_parallelism as _,
1216                ..fragment_graph
1217            };
1218
1219            let fragment_graph =
1220                StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1221            streaming_job.set_info_from_graph(&fragment_graph);
1222            let streaming_job = streaming_job;
1223
1224            streaming_job.table().expect("should be table job");
1225
1226            tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1227            let tmp_id = self
1228                .metadata_manager
1229                .catalog_controller
1230                .create_job_catalog_for_replace(
1231                    &streaming_job,
1232                    &stream_ctx,
1233                    &fragment_graph.specified_parallelism(),
1234                    fragment_graph.max_parallelism(),
1235                )
1236                .await? as u32;
1237
1238            let (ctx, stream_job_fragments) = self
1239                .inject_replace_table_job_for_table_sink(
1240                    tmp_id,
1241                    &self.metadata_manager,
1242                    stream_ctx,
1243                    None,
1244                    None,
1245                    Some(sink_id),
1246                    &streaming_job,
1247                    fragment_graph,
1248                )
1249                .await?;
1250
1251            let result: MetaResult<_> = try {
1252                let replace_upstream = ctx.replace_upstream.clone();
1253
1254                self.metadata_manager
1255                    .catalog_controller
1256                    .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1257                    .await?;
1258
1259                self.stream_manager
1260                    .replace_stream_job(stream_job_fragments, ctx)
1261                    .await?;
1262
1263                replace_upstream
1264            };
1265
1266            version = match result {
1267                Ok(replace_upstream) => {
1268                    let version = self
1269                        .metadata_manager
1270                        .catalog_controller
1271                        .finish_replace_streaming_job(
1272                            tmp_id as _,
1273                            streaming_job,
1274                            replace_upstream,
1275                            None,
1276                            SinkIntoTableContext {
1277                                creating_sink_id: None,
1278                                dropping_sink_id: Some(sink_id),
1279                                updated_sink_catalogs: vec![],
1280                            },
1281                            None, // no source is dropped when dropping sink into table
1282                        )
1283                        .await?;
1284                    Ok(version)
1285                }
1286                Err(err) => {
1287                    tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1288                    let _ = self.metadata_manager
1289                        .catalog_controller
1290                        .try_abort_replacing_streaming_job(tmp_id as _)
1291                        .await
1292                        .inspect_err(|err| {
1293                            tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1294                        });
1295                    Err(err)
1296                }
1297            }?;
1298        }
1299
1300        let ReleaseContext {
1301            database_id,
1302            removed_streaming_job_ids,
1303            removed_state_table_ids,
1304            removed_source_ids,
1305            removed_secret_ids: secret_ids,
1306            removed_source_fragments,
1307            removed_actors,
1308            removed_fragments,
1309        } = release_ctx;
1310
1311        let _guard = self.source_manager.pause_tick().await;
1312        self.stream_manager
1313            .drop_streaming_jobs(
1314                risingwave_common::catalog::DatabaseId::new(database_id as _),
1315                removed_actors.iter().map(|id| *id as _).collect(),
1316                removed_streaming_job_ids,
1317                removed_state_table_ids,
1318                removed_fragments.iter().map(|id| *id as _).collect(),
1319            )
1320            .await;
1321
1322        // clean up sources after dropping streaming jobs.
1323        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1324        self.source_manager
1325            .apply_source_change(SourceChange::DropSource {
1326                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1327            })
1328            .await;
1329
1330        // unregister fragments and actors from source manager.
1331        // FIXME: need also unregister source backfill fragments.
1332        let dropped_source_fragments = removed_source_fragments
1333            .into_iter()
1334            .map(|(source_id, fragments)| {
1335                (
1336                    source_id,
1337                    fragments.into_iter().map(|id| id as u32).collect(),
1338                )
1339            })
1340            .collect();
1341        let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1342        self.source_manager
1343            .apply_source_change(SourceChange::DropMv {
1344                dropped_source_fragments,
1345                dropped_actors,
1346            })
1347            .await;
1348
1349        // remove secrets.
1350        for secret in secret_ids {
1351            LocalSecretManager::global().remove_secret(secret as _);
1352        }
1353        Ok(version)
1354    }
1355
1356    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1357    pub async fn replace_job(
1358        &self,
1359        mut streaming_job: StreamingJob,
1360        fragment_graph: StreamFragmentGraphProto,
1361        col_index_mapping: Option<ColIndexMapping>,
1362    ) -> MetaResult<NotificationVersion> {
1363        match &mut streaming_job {
1364            StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1365            StreamingJob::MaterializedView(..)
1366            | StreamingJob::Sink(..)
1367            | StreamingJob::Index(..) => {
1368                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1369            }
1370        }
1371
1372        let job_id = streaming_job.id();
1373
1374        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1375        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1376
1377        // Ensure the max parallelism unchanged before replacing table.
1378        let original_max_parallelism = self
1379            .metadata_manager
1380            .get_job_max_parallelism(streaming_job.id().into())
1381            .await?;
1382        let fragment_graph = PbStreamFragmentGraph {
1383            max_parallelism: original_max_parallelism as _,
1384            ..fragment_graph
1385        };
1386
1387        // 1. build fragment graph.
1388        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1389        streaming_job.set_info_from_graph(&fragment_graph);
1390
1391        // make it immutable
1392        let streaming_job = streaming_job;
1393
1394        let tmp_id = self
1395            .metadata_manager
1396            .catalog_controller
1397            .create_job_catalog_for_replace(
1398                &streaming_job,
1399                &ctx,
1400                &fragment_graph.specified_parallelism(),
1401                fragment_graph.max_parallelism(),
1402            )
1403            .await?;
1404
1405        tracing::debug!(id = job_id, "building replace streaming job");
1406        let mut updated_sink_catalogs = vec![];
1407
1408        let mut drop_table_connector_ctx = None;
1409        let result: MetaResult<_> = try {
1410            let (mut ctx, mut stream_job_fragments) = self
1411                .build_replace_job(
1412                    ctx,
1413                    &streaming_job,
1414                    fragment_graph,
1415                    col_index_mapping.as_ref(),
1416                    tmp_id as _,
1417                )
1418                .await?;
1419            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1420
1421            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1422                let catalogs = self
1423                    .metadata_manager
1424                    .get_sink_catalog_by_ids(&table.incoming_sinks)
1425                    .await?;
1426
1427                for sink in catalogs {
1428                    let sink_id = &sink.id;
1429
1430                    let sink_table_fragments = self
1431                        .metadata_manager
1432                        .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1433                            *sink_id,
1434                        ))
1435                        .await?;
1436
1437                    let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1438
1439                    Self::inject_replace_table_plan_for_sink(
1440                        *sink_id,
1441                        &sink_fragment,
1442                        table,
1443                        &mut ctx,
1444                        stream_job_fragments.inner.union_fragment_for_table(),
1445                        Some(&sink.unique_identity()),
1446                    );
1447
1448                    if sink.original_target_columns.is_empty() {
1449                        updated_sink_catalogs.push(sink.id as _);
1450                    }
1451                }
1452            }
1453
1454            let replace_upstream = ctx.replace_upstream.clone();
1455
1456            self.metadata_manager
1457                .catalog_controller
1458                .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1459                .await?;
1460
1461            self.stream_manager
1462                .replace_stream_job(stream_job_fragments, ctx)
1463                .await?;
1464            replace_upstream
1465        };
1466
1467        match result {
1468            Ok(replace_upstream) => {
1469                let version = self
1470                    .metadata_manager
1471                    .catalog_controller
1472                    .finish_replace_streaming_job(
1473                        tmp_id,
1474                        streaming_job,
1475                        replace_upstream,
1476                        col_index_mapping,
1477                        SinkIntoTableContext {
1478                            creating_sink_id: None,
1479                            dropping_sink_id: None,
1480                            updated_sink_catalogs,
1481                        },
1482                        drop_table_connector_ctx.as_ref(),
1483                    )
1484                    .await?;
1485                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1486                    self.source_manager
1487                        .apply_source_change(SourceChange::DropSource {
1488                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1489                        })
1490                        .await;
1491                }
1492                Ok(version)
1493            }
1494            Err(err) => {
1495                tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1496                let _ = self.metadata_manager
1497                    .catalog_controller
1498                    .try_abort_replacing_streaming_job(tmp_id)
1499                    .await.inspect_err(|err| {
1500                    tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1501                });
1502                Err(err)
1503            }
1504        }
1505    }
1506
1507    async fn drop_streaming_job(
1508        &self,
1509        job_id: StreamingJobId,
1510        drop_mode: DropMode,
1511        target_replace_info: Option<ReplaceStreamJobInfo>,
1512    ) -> MetaResult<NotificationVersion> {
1513        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1514
1515        let (object_id, object_type) = match job_id {
1516            StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1517            StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1518            StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1519            StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1520        };
1521
1522        let version = self
1523            .drop_object(object_type, object_id, drop_mode, target_replace_info)
1524            .await?;
1525        #[cfg(not(madsim))]
1526        if let StreamingJobId::Sink(sink_id) = job_id {
1527            // delete system table for exactly once iceberg sink
1528            // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1529            let db = self.env.meta_store_ref().conn.clone();
1530            clean_all_rows_by_sink_id(&db, sink_id).await?;
1531        }
1532        Ok(version)
1533    }
1534
1535    /// Resolve the parallelism of the stream job based on the given information.
1536    ///
1537    /// Returns error if user specifies a parallelism that cannot be satisfied.
1538    fn resolve_stream_parallelism(
1539        &self,
1540        specified: Option<NonZeroUsize>,
1541        max: NonZeroUsize,
1542        cluster_info: &StreamingClusterInfo,
1543        resource_group: String,
1544    ) -> MetaResult<NonZeroUsize> {
1545        let available = cluster_info.parallelism(resource_group);
1546        let Some(available) = NonZeroUsize::new(available) else {
1547            bail_unavailable!("no available slots to schedule");
1548        };
1549
1550        if let Some(specified) = specified {
1551            if specified > max {
1552                bail_invalid_parameter!(
1553                    "specified parallelism {} should not exceed max parallelism {}",
1554                    specified,
1555                    max,
1556                );
1557            }
1558            if specified > available {
1559                bail_unavailable!(
1560                    "not enough parallelism to schedule, required: {}, available: {}",
1561                    specified,
1562                    available,
1563                );
1564            }
1565            Ok(specified)
1566        } else {
1567            // Use configured parallelism if no default parallelism is specified.
1568            let default_parallelism = match self.env.opts.default_parallelism {
1569                DefaultParallelism::Full => available,
1570                DefaultParallelism::Default(num) => {
1571                    if num > available {
1572                        bail_unavailable!(
1573                            "not enough parallelism to schedule, required: {}, available: {}",
1574                            num,
1575                            available,
1576                        );
1577                    }
1578                    num
1579                }
1580            };
1581
1582            if default_parallelism > max {
1583                tracing::warn!(
1584                    "too many parallelism available, use max parallelism {} instead",
1585                    max
1586                );
1587            }
1588            Ok(default_parallelism.min(max))
1589        }
1590    }
1591
1592    /// Builds the actor graph:
1593    /// - Add the upstream fragments to the fragment graph
1594    /// - Schedule the fragments based on their distribution
1595    /// - Expand each fragment into one or several actors
1596    pub(crate) async fn build_stream_job(
1597        &self,
1598        stream_ctx: StreamContext,
1599        mut stream_job: StreamingJob,
1600        fragment_graph: StreamFragmentGraph,
1601        affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1602        specific_resource_group: Option<String>,
1603    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1604        let id = stream_job.id();
1605        let specified_parallelism = fragment_graph.specified_parallelism();
1606        let expr_context = stream_ctx.to_expr_context();
1607        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1608
1609        // 1. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1610        // contains all information needed for building the actor graph.
1611
1612        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1613            fragment_graph.collect_snapshot_backfill_info()?;
1614
1615        // check if log store exists for all cross-db upstreams
1616        self.metadata_manager
1617            .catalog_controller
1618            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1619            .await?;
1620
1621        let upstream_table_ids = fragment_graph
1622            .dependent_table_ids()
1623            .iter()
1624            .filter(|id| {
1625                !cross_db_snapshot_backfill_info
1626                    .upstream_mv_table_id_to_backfill_epoch
1627                    .contains_key(id)
1628            })
1629            .cloned()
1630            .collect();
1631
1632        let (upstream_root_fragments, existing_actor_location) = self
1633            .metadata_manager
1634            .get_upstream_root_fragments(&upstream_table_ids)
1635            .await?;
1636
1637        if snapshot_backfill_info.is_some() {
1638            if stream_job.create_type() == CreateType::Background {
1639                return Err(anyhow!("snapshot_backfill must be used as Foreground mode").into());
1640            }
1641            match stream_job {
1642                StreamingJob::MaterializedView(_)
1643                | StreamingJob::Sink(_, _)
1644                | StreamingJob::Index(_, _) => {}
1645                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1646                    return Err(
1647                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1648                    );
1649                }
1650            }
1651        }
1652
1653        let upstream_actors = upstream_root_fragments
1654            .values()
1655            .map(|fragment| {
1656                (
1657                    fragment.fragment_id,
1658                    fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1659                )
1660            })
1661            .collect();
1662
1663        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1664            fragment_graph,
1665            upstream_root_fragments,
1666            existing_actor_location,
1667            (&stream_job).into(),
1668        )?;
1669
1670        let resource_group = match specific_resource_group {
1671            None => {
1672                self.metadata_manager
1673                    .get_database_resource_group(stream_job.database_id() as ObjectId)
1674                    .await?
1675            }
1676            Some(resource_group) => resource_group,
1677        };
1678
1679        // 2. Build the actor graph.
1680        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1681
1682        let parallelism = self.resolve_stream_parallelism(
1683            specified_parallelism,
1684            max_parallelism,
1685            &cluster_info,
1686            resource_group.clone(),
1687        )?;
1688
1689        let parallelism = self
1690            .env
1691            .system_params_reader()
1692            .await
1693            .adaptive_parallelism_strategy()
1694            .compute_target_parallelism(parallelism.get());
1695
1696        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1697        let actor_graph_builder = ActorGraphBuilder::new(
1698            id,
1699            resource_group,
1700            complete_graph,
1701            cluster_info,
1702            parallelism,
1703        )?;
1704
1705        let ActorGraphBuildResult {
1706            graph,
1707            downstream_fragment_relations,
1708            building_locations,
1709            existing_locations,
1710            upstream_fragment_downstreams,
1711            new_no_shuffle,
1712            replace_upstream,
1713            ..
1714        } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1715        assert!(replace_upstream.is_empty());
1716
1717        // 3. Build the table fragments structure that will be persisted in the stream manager,
1718        // and the context that contains all information needed for building the
1719        // actors on the compute nodes.
1720
1721        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1722        // Otherwise, it defaults to FIXED based on deduction.
1723        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1724            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1725            _ => TableParallelism::Fixed(parallelism.get()),
1726        };
1727
1728        let stream_job_fragments = StreamJobFragments::new(
1729            id.into(),
1730            graph,
1731            &building_locations.actor_locations,
1732            stream_ctx.clone(),
1733            table_parallelism,
1734            max_parallelism.get(),
1735        );
1736        let internal_tables = stream_job_fragments.internal_tables();
1737
1738        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1739            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1740        }
1741
1742        let replace_table_job_info = match affected_table_replace_info {
1743            Some((table_stream_job, fragment_graph)) => {
1744                if snapshot_backfill_info.is_some() {
1745                    return Err(anyhow!(
1746                        "snapshot backfill should not have replace table info: {table_stream_job:?}"
1747                    )
1748                    .into());
1749                }
1750                let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1751                    bail!("additional replace table event only occurs when sinking into table");
1752                };
1753
1754                table_stream_job.table().expect("should be table job");
1755                let tmp_id = self
1756                    .metadata_manager
1757                    .catalog_controller
1758                    .create_job_catalog_for_replace(
1759                        &table_stream_job,
1760                        &stream_ctx,
1761                        &fragment_graph.specified_parallelism(),
1762                        fragment_graph.max_parallelism(),
1763                    )
1764                    .await? as u32;
1765
1766                let (context, table_fragments) = self
1767                    .inject_replace_table_job_for_table_sink(
1768                        tmp_id,
1769                        &self.metadata_manager,
1770                        stream_ctx,
1771                        Some(sink),
1772                        Some(&stream_job_fragments),
1773                        None,
1774                        &table_stream_job,
1775                        fragment_graph,
1776                    )
1777                    .await?;
1778                // When sinking into table occurs, some variables of the target table may be modified,
1779                // such as `fragment_id` being altered by `prepare_replace_table`.
1780                // At this point, it’s necessary to update the table info carried with the sink.
1781                must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1782                    // The StreamingJob in ReplaceTableInfo must be StreamingJob::Table
1783                    *target_table = Some((table.clone(), source.clone()));
1784                });
1785
1786                Some((table_stream_job, context, table_fragments))
1787            }
1788            None => None,
1789        };
1790
1791        let ctx = CreateStreamingJobContext {
1792            upstream_fragment_downstreams,
1793            new_no_shuffle,
1794            upstream_actors,
1795            internal_tables,
1796            building_locations,
1797            existing_locations,
1798            definition: stream_job.definition(),
1799            mv_table_id: stream_job.mv_table(),
1800            create_type: stream_job.create_type(),
1801            job_type: (&stream_job).into(),
1802            streaming_job: stream_job,
1803            replace_table_job_info,
1804            option: CreateStreamingJobOption {},
1805            snapshot_backfill_info,
1806            cross_db_snapshot_backfill_info,
1807        };
1808
1809        Ok((
1810            ctx,
1811            StreamJobFragmentsToCreate {
1812                inner: stream_job_fragments,
1813                downstreams: downstream_fragment_relations,
1814            },
1815        ))
1816    }
1817
1818    /// `build_replace_table` builds a job replacement and returns the context and new job
1819    /// fragments.
1820    ///
1821    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1822    /// replacement is finished.
1823    pub(crate) async fn build_replace_job(
1824        &self,
1825        stream_ctx: StreamContext,
1826        stream_job: &StreamingJob,
1827        mut fragment_graph: StreamFragmentGraph,
1828        col_index_mapping: Option<&ColIndexMapping>,
1829        tmp_job_id: TableId,
1830    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1831        match &stream_job {
1832            StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1833            StreamingJob::MaterializedView(..)
1834            | StreamingJob::Sink(..)
1835            | StreamingJob::Index(..) => {
1836                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1837            }
1838        }
1839
1840        let id = stream_job.id();
1841        let expr_context = stream_ctx.to_expr_context();
1842
1843        // check if performing drop table connector
1844        let mut drop_table_associated_source_id = None;
1845        if let StreamingJob::Table(None, _, _) = &stream_job {
1846            drop_table_associated_source_id = self
1847                .metadata_manager
1848                .get_table_associated_source_id(id as _)
1849                .await?;
1850        }
1851
1852        let old_fragments = self
1853            .metadata_manager
1854            .get_job_fragments_by_id(&id.into())
1855            .await?;
1856        let old_internal_table_ids = old_fragments.internal_table_ids();
1857
1858        // handle drop table's associated source
1859        let mut drop_table_connector_ctx = None;
1860        if drop_table_associated_source_id.is_some() {
1861            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1862            debug_assert!(old_internal_table_ids.len() == 1);
1863
1864            drop_table_connector_ctx = Some(DropTableConnectorContext {
1865                // we do not remove the original table catalog as it's still needed for the streaming job
1866                // just need to remove the ref to the state table
1867                to_change_streaming_job_id: id as i32,
1868                to_remove_state_table_id: old_internal_table_ids[0] as i32, // asserted before
1869                to_remove_source_id: drop_table_associated_source_id.unwrap(),
1870            });
1871        } else {
1872            let old_internal_tables = self
1873                .metadata_manager
1874                .get_table_catalog_by_ids(old_internal_table_ids)
1875                .await?;
1876            fragment_graph.fit_internal_table_ids(old_internal_tables)?;
1877        }
1878
1879        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
1880        // graph that contains all information needed for building the actor graph.
1881        let original_root_fragment = old_fragments
1882            .root_fragment()
1883            .expect("root fragment not found");
1884
1885        let job_type = StreamingJobType::from(stream_job);
1886
1887        // Map the column indices in the dispatchers with the given mapping.
1888        let (mut downstream_fragments, downstream_actor_location) =
1889            self.metadata_manager.get_downstream_fragments(id).await?;
1890        if let Some(mapping) = &col_index_mapping {
1891            for (d, _f) in &mut downstream_fragments {
1892                *d = mapping.rewrite_dispatch_strategy(d).ok_or_else(|| {
1893                    // The `rewrite` only fails if some column is dropped (missing) or altered (type changed).
1894                    // TODO: support altering referenced columns
1895                    MetaError::invalid_parameter(
1896                        "unable to drop or alter the column due to being referenced by downstream materialized views or sinks",
1897                    )
1898                })?;
1899            }
1900        }
1901
1902        // build complete graph based on the table job type
1903        let complete_graph = match &job_type {
1904            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1905                CompleteStreamFragmentGraph::with_downstreams(
1906                    fragment_graph,
1907                    original_root_fragment.fragment_id,
1908                    downstream_fragments,
1909                    downstream_actor_location,
1910                    job_type,
1911                )?
1912            }
1913            StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1914                // get the upstream fragment which should be the cdc source
1915                let (upstream_root_fragments, upstream_actor_location) = self
1916                    .metadata_manager
1917                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1918                    .await?;
1919
1920                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
1921                    fragment_graph,
1922                    upstream_root_fragments,
1923                    upstream_actor_location,
1924                    original_root_fragment.fragment_id,
1925                    downstream_fragments,
1926                    downstream_actor_location,
1927                    job_type,
1928                )?
1929            }
1930            _ => unreachable!(),
1931        };
1932
1933        let resource_group = self
1934            .metadata_manager
1935            .get_existing_job_resource_group(id as ObjectId)
1936            .await?;
1937
1938        // 2. Build the actor graph.
1939        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1940
1941        // XXX: what is this parallelism?
1942        // Is it "assigned parallelism"?
1943        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
1944            .expect("The number of actors in the original table fragment should be greater than 0");
1945
1946        let actor_graph_builder = ActorGraphBuilder::new(
1947            id,
1948            resource_group,
1949            complete_graph,
1950            cluster_info,
1951            parallelism,
1952        )?;
1953
1954        let ActorGraphBuildResult {
1955            graph,
1956            downstream_fragment_relations,
1957            building_locations,
1958            existing_locations,
1959            upstream_fragment_downstreams,
1960            replace_upstream,
1961            new_no_shuffle,
1962            ..
1963        } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
1964
1965        // general table & source does not have upstream job, so the dispatchers should be empty
1966        if matches!(
1967            job_type,
1968            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
1969        ) {
1970            assert!(upstream_fragment_downstreams.is_empty());
1971        }
1972
1973        // 3. Build the table fragments structure that will be persisted in the stream manager, and
1974        // the context that contains all information needed for building the actors on the compute
1975        // nodes.
1976        let stream_job_fragments = StreamJobFragments::new(
1977            (tmp_job_id as u32).into(),
1978            graph,
1979            &building_locations.actor_locations,
1980            stream_ctx,
1981            old_fragments.assigned_parallelism,
1982            old_fragments.max_parallelism,
1983        );
1984
1985        // Note: no need to set `vnode_count` as it's already set by the frontend.
1986        // See `get_replace_table_plan`.
1987
1988        let ctx = ReplaceStreamJobContext {
1989            old_fragments,
1990            replace_upstream,
1991            new_no_shuffle,
1992            upstream_fragment_downstreams,
1993            building_locations,
1994            existing_locations,
1995            streaming_job: stream_job.clone(),
1996            tmp_id: tmp_job_id as _,
1997            drop_table_connector_ctx,
1998        };
1999
2000        Ok((
2001            ctx,
2002            StreamJobFragmentsToCreate {
2003                inner: stream_job_fragments,
2004                downstreams: downstream_fragment_relations,
2005            },
2006        ))
2007    }
2008
2009    async fn alter_name(
2010        &self,
2011        relation: alter_name_request::Object,
2012        new_name: &str,
2013    ) -> MetaResult<NotificationVersion> {
2014        let (obj_type, id) = match relation {
2015            alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2016            alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2017            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2018            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2019            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2020            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2021            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2022            alter_name_request::Object::SubscriptionId(id) => {
2023                (ObjectType::Subscription, id as ObjectId)
2024            }
2025        };
2026        self.metadata_manager
2027            .catalog_controller
2028            .alter_name(obj_type, id, new_name)
2029            .await
2030    }
2031
2032    async fn alter_swap_rename(
2033        &self,
2034        object: alter_swap_rename_request::Object,
2035    ) -> MetaResult<NotificationVersion> {
2036        let (obj_type, src_id, dst_id) = match object {
2037            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2038            alter_swap_rename_request::Object::Table(objs) => {
2039                let (src_id, dst_id) = (
2040                    objs.src_object_id as ObjectId,
2041                    objs.dst_object_id as ObjectId,
2042                );
2043                (ObjectType::Table, src_id, dst_id)
2044            }
2045            alter_swap_rename_request::Object::View(objs) => {
2046                let (src_id, dst_id) = (
2047                    objs.src_object_id as ObjectId,
2048                    objs.dst_object_id as ObjectId,
2049                );
2050                (ObjectType::View, src_id, dst_id)
2051            }
2052            alter_swap_rename_request::Object::Source(objs) => {
2053                let (src_id, dst_id) = (
2054                    objs.src_object_id as ObjectId,
2055                    objs.dst_object_id as ObjectId,
2056                );
2057                (ObjectType::Source, src_id, dst_id)
2058            }
2059            alter_swap_rename_request::Object::Sink(objs) => {
2060                let (src_id, dst_id) = (
2061                    objs.src_object_id as ObjectId,
2062                    objs.dst_object_id as ObjectId,
2063                );
2064                (ObjectType::Sink, src_id, dst_id)
2065            }
2066            alter_swap_rename_request::Object::Subscription(objs) => {
2067                let (src_id, dst_id) = (
2068                    objs.src_object_id as ObjectId,
2069                    objs.dst_object_id as ObjectId,
2070                );
2071                (ObjectType::Subscription, src_id, dst_id)
2072            }
2073        };
2074
2075        self.metadata_manager
2076            .catalog_controller
2077            .alter_swap_rename(obj_type, src_id, dst_id)
2078            .await
2079    }
2080
2081    async fn alter_owner(
2082        &self,
2083        object: Object,
2084        owner_id: UserId,
2085    ) -> MetaResult<NotificationVersion> {
2086        let (obj_type, id) = match object {
2087            Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2088            Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2089            Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2090            Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2091            Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2092            Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2093            Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2094            Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2095        };
2096        self.metadata_manager
2097            .catalog_controller
2098            .alter_owner(obj_type, id, owner_id as _)
2099            .await
2100    }
2101
2102    async fn alter_set_schema(
2103        &self,
2104        object: alter_set_schema_request::Object,
2105        new_schema_id: SchemaId,
2106    ) -> MetaResult<NotificationVersion> {
2107        let (obj_type, id) = match object {
2108            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2109            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2110            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2111            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2112            alter_set_schema_request::Object::FunctionId(id) => {
2113                (ObjectType::Function, id as ObjectId)
2114            }
2115            alter_set_schema_request::Object::ConnectionId(id) => {
2116                (ObjectType::Connection, id as ObjectId)
2117            }
2118            alter_set_schema_request::Object::SubscriptionId(id) => {
2119                (ObjectType::Subscription, id as ObjectId)
2120            }
2121        };
2122        self.metadata_manager
2123            .catalog_controller
2124            .alter_schema(obj_type, id, new_schema_id as _)
2125            .await
2126    }
2127
2128    pub async fn wait(&self) -> MetaResult<()> {
2129        let timeout_ms = 30 * 60 * 1000;
2130        for _ in 0..timeout_ms {
2131            if self
2132                .metadata_manager
2133                .catalog_controller
2134                .list_background_creating_mviews(true)
2135                .await?
2136                .is_empty()
2137            {
2138                return Ok(());
2139            }
2140
2141            sleep(Duration::from_millis(1)).await;
2142        }
2143        Err(MetaError::cancelled(format!(
2144            "timeout after {timeout_ms}ms"
2145        )))
2146    }
2147
2148    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2149        self.metadata_manager
2150            .catalog_controller
2151            .comment_on(comment)
2152            .await
2153    }
2154}
2155
2156fn report_create_object(
2157    catalog_id: u32,
2158    event_name: &str,
2159    obj_type: PbTelemetryDatabaseObject,
2160    connector_name: Option<String>,
2161    attr_info: Option<jsonbb::Value>,
2162) {
2163    report_event(
2164        PbTelemetryEventStage::CreateStreamJob,
2165        event_name,
2166        catalog_id.into(),
2167        connector_name,
2168        Some(obj_type),
2169        attr_info,
2170    );
2171}
2172
2173async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2174    match Entity::delete_many()
2175        .filter(Column::SinkId.eq(sink_id))
2176        .exec(db)
2177        .await
2178    {
2179        Ok(result) => {
2180            let deleted_count = result.rows_affected;
2181
2182            tracing::info!(
2183                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2184                deleted_count,
2185                sink_id
2186            );
2187            Ok(())
2188        }
2189        Err(e) => {
2190            tracing::error!(
2191                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2192                sink_id,
2193                e.as_report()
2194            );
2195            Err(e.into())
2196        }
2197    }
2198}