risingwave_meta/rpc/
ddl_controller.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::HashSet;
17use std::num::NonZeroUsize;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::{Context, anyhow};
22use itertools::Itertools;
23use risingwave_common::config::DefaultParallelism;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::column_index_mapping::ColIndexMapping;
28use risingwave_common::util::stream_graph_visitor::{
29    visit_stream_node, visit_stream_node_cont_mut,
30};
31use risingwave_common::{bail, bail_not_implemented, must_match};
32use risingwave_connector::WithOptionsSecResolved;
33use risingwave_connector::connector_common::validate_connection;
34use risingwave_connector::source::{
35    ConnectorProperties, SourceEnumeratorContext, UPSTREAM_SOURCE_KEY,
36};
37use risingwave_meta_model::exactly_once_iceberg_sink::{Column, Entity};
38use risingwave_meta_model::object::ObjectType;
39use risingwave_meta_model::{
40    ConnectionId, DatabaseId, DispatcherType, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
41    SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId,
42};
43use risingwave_pb::catalog::{
44    Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
45    Subscription, Table, View,
46};
47use risingwave_pb::ddl_service::alter_owner_request::Object;
48use risingwave_pb::ddl_service::{
49    DdlProgress, TableJobType, WaitVersion, alter_name_request, alter_set_schema_request,
50    alter_swap_rename_request,
51};
52use risingwave_pb::stream_plan::stream_node::NodeBody;
53use risingwave_pb::stream_plan::{
54    FragmentTypeFlag, MergeNode, PbDispatcherType, PbStreamFragmentGraph,
55    StreamFragmentGraph as StreamFragmentGraphProto,
56};
57use risingwave_pb::telemetry::{PbTelemetryDatabaseObject, PbTelemetryEventStage};
58use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
59use thiserror_ext::AsReport;
60use tokio::sync::{Semaphore, oneshot};
61use tokio::time::sleep;
62use tracing::Instrument;
63
64use crate::barrier::BarrierManagerRef;
65use crate::controller::catalog::{DropTableConnectorContext, ReleaseContext};
66use crate::controller::cluster::StreamingClusterInfo;
67use crate::controller::streaming_job::SinkIntoTableContext;
68use crate::error::{bail_invalid_parameter, bail_unavailable};
69use crate::manager::{
70    IGNORED_NOTIFICATION_VERSION, LocalNotification, MetaSrvEnv, MetadataManager,
71    NotificationVersion, StreamingJob, StreamingJobType,
72};
73use crate::model::{
74    DownstreamFragmentRelation, Fragment, StreamContext, StreamJobFragments,
75    StreamJobFragmentsToCreate, TableParallelism,
76};
77use crate::stream::{
78    ActorGraphBuildResult, ActorGraphBuilder, CompleteStreamFragmentGraph,
79    CreateStreamingJobContext, CreateStreamingJobOption, GlobalStreamManagerRef,
80    JobRescheduleTarget, ReplaceStreamJobContext, SourceChange, SourceManagerRef,
81    StreamFragmentGraph, create_source_worker, validate_sink,
82};
83use crate::telemetry::report_event;
84use crate::{MetaError, MetaResult};
85
86#[derive(PartialEq)]
87pub enum DropMode {
88    Restrict,
89    Cascade,
90}
91
92impl DropMode {
93    pub fn from_request_setting(cascade: bool) -> DropMode {
94        if cascade {
95            DropMode::Cascade
96        } else {
97            DropMode::Restrict
98        }
99    }
100}
101
102pub enum StreamingJobId {
103    MaterializedView(TableId),
104    Sink(SinkId),
105    Table(Option<SourceId>, TableId),
106    Index(IndexId),
107}
108
109impl StreamingJobId {
110    #[allow(dead_code)]
111    fn id(&self) -> TableId {
112        match self {
113            StreamingJobId::MaterializedView(id)
114            | StreamingJobId::Sink(id)
115            | StreamingJobId::Table(_, id)
116            | StreamingJobId::Index(id) => *id,
117        }
118    }
119}
120
121/// It’s used to describe the information of the job that needs to be replaced
122/// and it will be used during replacing table and creating sink into table operations.
123pub struct ReplaceStreamJobInfo {
124    pub streaming_job: StreamingJob,
125    pub fragment_graph: StreamFragmentGraphProto,
126    pub col_index_mapping: Option<ColIndexMapping>,
127}
128
129pub enum DdlCommand {
130    CreateDatabase(Database),
131    DropDatabase(DatabaseId),
132    CreateSchema(Schema),
133    DropSchema(SchemaId, DropMode),
134    CreateNonSharedSource(Source),
135    DropSource(SourceId, DropMode),
136    CreateFunction(Function),
137    DropFunction(FunctionId),
138    CreateView(View),
139    DropView(ViewId, DropMode),
140    CreateStreamingJob(
141        StreamingJob,
142        StreamFragmentGraphProto,
143        CreateType,
144        Option<ReplaceStreamJobInfo>,
145        HashSet<ObjectId>,
146        Option<String>, // specific resource group
147    ),
148    DropStreamingJob(StreamingJobId, DropMode, Option<ReplaceStreamJobInfo>),
149    AlterName(alter_name_request::Object, String),
150    AlterSwapRename(alter_swap_rename_request::Object),
151    ReplaceStreamJob(ReplaceStreamJobInfo),
152    AlterNonSharedSource(Source),
153    AlterObjectOwner(Object, UserId),
154    AlterSetSchema(alter_set_schema_request::Object, SchemaId),
155    CreateConnection(Connection),
156    DropConnection(ConnectionId),
157    CreateSecret(Secret),
158    AlterSecret(Secret),
159    DropSecret(SecretId),
160    CommentOn(Comment),
161    CreateSubscription(Subscription),
162    DropSubscription(SubscriptionId, DropMode),
163}
164
165impl DdlCommand {
166    fn allow_in_recovery(&self) -> bool {
167        match self {
168            DdlCommand::DropDatabase(_)
169            | DdlCommand::DropSchema(_, _)
170            | DdlCommand::DropSource(_, _)
171            | DdlCommand::DropFunction(_)
172            | DdlCommand::DropView(_, _)
173            | DdlCommand::DropStreamingJob(_, _, _)
174            | DdlCommand::DropConnection(_)
175            | DdlCommand::DropSecret(_)
176            | DdlCommand::DropSubscription(_, _)
177            | DdlCommand::AlterName(_, _)
178            | DdlCommand::AlterObjectOwner(_, _)
179            | DdlCommand::AlterSetSchema(_, _)
180            | DdlCommand::CreateDatabase(_)
181            | DdlCommand::CreateSchema(_)
182            | DdlCommand::CreateFunction(_)
183            | DdlCommand::CreateView(_)
184            | DdlCommand::CreateConnection(_)
185            | DdlCommand::CommentOn(_)
186            | DdlCommand::CreateSecret(_)
187            | DdlCommand::AlterSecret(_)
188            | DdlCommand::AlterSwapRename(_) => true,
189            DdlCommand::CreateStreamingJob(_, _, _, _, _, _)
190            | DdlCommand::CreateNonSharedSource(_)
191            | DdlCommand::ReplaceStreamJob(_)
192            | DdlCommand::AlterNonSharedSource(_)
193            | DdlCommand::CreateSubscription(_) => false,
194        }
195    }
196}
197
198#[derive(Clone)]
199pub struct DdlController {
200    pub(crate) env: MetaSrvEnv,
201
202    pub(crate) metadata_manager: MetadataManager,
203    pub(crate) stream_manager: GlobalStreamManagerRef,
204    pub(crate) source_manager: SourceManagerRef,
205    barrier_manager: BarrierManagerRef,
206
207    // The semaphore is used to limit the number of concurrent streaming job creation.
208    pub(crate) creating_streaming_job_permits: Arc<CreatingStreamingJobPermit>,
209}
210
211#[derive(Clone)]
212pub struct CreatingStreamingJobPermit {
213    pub(crate) semaphore: Arc<Semaphore>,
214}
215
216impl CreatingStreamingJobPermit {
217    async fn new(env: &MetaSrvEnv) -> Self {
218        let mut permits = env
219            .system_params_reader()
220            .await
221            .max_concurrent_creating_streaming_jobs() as usize;
222        if permits == 0 {
223            // if the system parameter is set to zero, use the max permitted value.
224            permits = Semaphore::MAX_PERMITS;
225        }
226        let semaphore = Arc::new(Semaphore::new(permits));
227
228        let (local_notification_tx, mut local_notification_rx) =
229            tokio::sync::mpsc::unbounded_channel();
230        env.notification_manager()
231            .insert_local_sender(local_notification_tx)
232            .await;
233        let semaphore_clone = semaphore.clone();
234        tokio::spawn(async move {
235            while let Some(notification) = local_notification_rx.recv().await {
236                let LocalNotification::SystemParamsChange(p) = &notification else {
237                    continue;
238                };
239                let mut new_permits = p.max_concurrent_creating_streaming_jobs() as usize;
240                if new_permits == 0 {
241                    new_permits = Semaphore::MAX_PERMITS;
242                }
243                match permits.cmp(&new_permits) {
244                    Ordering::Less => {
245                        semaphore_clone.add_permits(new_permits - permits);
246                    }
247                    Ordering::Equal => continue,
248                    Ordering::Greater => {
249                        semaphore_clone
250                            .acquire_many((permits - new_permits) as u32)
251                            .await
252                            .unwrap()
253                            .forget();
254                    }
255                }
256                tracing::info!(
257                    "max_concurrent_creating_streaming_jobs changed from {} to {}",
258                    permits,
259                    new_permits
260                );
261                permits = new_permits;
262            }
263        });
264
265        Self { semaphore }
266    }
267}
268
269impl DdlController {
270    pub async fn new(
271        env: MetaSrvEnv,
272        metadata_manager: MetadataManager,
273        stream_manager: GlobalStreamManagerRef,
274        source_manager: SourceManagerRef,
275        barrier_manager: BarrierManagerRef,
276    ) -> Self {
277        let creating_streaming_job_permits = Arc::new(CreatingStreamingJobPermit::new(&env).await);
278        Self {
279            env,
280            metadata_manager,
281            stream_manager,
282            source_manager,
283            barrier_manager,
284            creating_streaming_job_permits,
285        }
286    }
287
288    /// `run_command` spawns a tokio coroutine to execute the target ddl command. When the client
289    /// has been interrupted during executing, the request will be cancelled by tonic. Since we have
290    /// a lot of logic for revert, status management, notification and so on, ensuring consistency
291    /// would be a huge hassle and pain if we don't spawn here.
292    ///
293    /// Though returning `Option`, it's always `Some`, to simplify the handling logic
294    pub async fn run_command(&self, command: DdlCommand) -> MetaResult<Option<WaitVersion>> {
295        if !command.allow_in_recovery() {
296            self.barrier_manager.check_status_running()?;
297        }
298        let ctrl = self.clone();
299        let fut = async move {
300            match command {
301                DdlCommand::CreateDatabase(database) => ctrl.create_database(database).await,
302                DdlCommand::DropDatabase(database_id) => ctrl.drop_database(database_id).await,
303                DdlCommand::CreateSchema(schema) => ctrl.create_schema(schema).await,
304                DdlCommand::DropSchema(schema_id, drop_mode) => {
305                    ctrl.drop_schema(schema_id, drop_mode).await
306                }
307                DdlCommand::CreateNonSharedSource(source) => {
308                    ctrl.create_non_shared_source(source).await
309                }
310                DdlCommand::DropSource(source_id, drop_mode) => {
311                    ctrl.drop_source(source_id, drop_mode).await
312                }
313                DdlCommand::CreateFunction(function) => ctrl.create_function(function).await,
314                DdlCommand::DropFunction(function_id) => ctrl.drop_function(function_id).await,
315                DdlCommand::CreateView(view) => ctrl.create_view(view).await,
316                DdlCommand::DropView(view_id, drop_mode) => {
317                    ctrl.drop_view(view_id, drop_mode).await
318                }
319                DdlCommand::CreateStreamingJob(
320                    stream_job,
321                    fragment_graph,
322                    _create_type,
323                    affected_table_replace_info,
324                    dependencies,
325                    specific_resource_group,
326                ) => {
327                    ctrl.create_streaming_job(
328                        stream_job,
329                        fragment_graph,
330                        affected_table_replace_info,
331                        dependencies,
332                        specific_resource_group,
333                    )
334                    .await
335                }
336                DdlCommand::DropStreamingJob(job_id, drop_mode, target_replace_info) => {
337                    ctrl.drop_streaming_job(job_id, drop_mode, target_replace_info)
338                        .await
339                }
340                DdlCommand::ReplaceStreamJob(ReplaceStreamJobInfo {
341                    streaming_job,
342                    fragment_graph,
343                    col_index_mapping,
344                }) => {
345                    ctrl.replace_job(streaming_job, fragment_graph, col_index_mapping)
346                        .await
347                }
348                DdlCommand::AlterName(relation, name) => ctrl.alter_name(relation, &name).await,
349                DdlCommand::AlterObjectOwner(object, owner_id) => {
350                    ctrl.alter_owner(object, owner_id).await
351                }
352                DdlCommand::AlterSetSchema(object, new_schema_id) => {
353                    ctrl.alter_set_schema(object, new_schema_id).await
354                }
355                DdlCommand::CreateConnection(connection) => {
356                    ctrl.create_connection(connection).await
357                }
358                DdlCommand::DropConnection(connection_id) => {
359                    ctrl.drop_connection(connection_id).await
360                }
361                DdlCommand::CreateSecret(secret) => ctrl.create_secret(secret).await,
362                DdlCommand::DropSecret(secret_id) => ctrl.drop_secret(secret_id).await,
363                DdlCommand::AlterSecret(secret) => ctrl.alter_secret(secret).await,
364                DdlCommand::AlterNonSharedSource(source) => {
365                    ctrl.alter_non_shared_source(source).await
366                }
367                DdlCommand::CommentOn(comment) => ctrl.comment_on(comment).await,
368                DdlCommand::CreateSubscription(subscription) => {
369                    ctrl.create_subscription(subscription).await
370                }
371                DdlCommand::DropSubscription(subscription_id, drop_mode) => {
372                    ctrl.drop_subscription(subscription_id, drop_mode).await
373                }
374                DdlCommand::AlterSwapRename(objects) => ctrl.alter_swap_rename(objects).await,
375            }
376        }
377        .in_current_span();
378        let notification_version = tokio::spawn(fut).await.map_err(|e| anyhow!(e))??;
379        Ok(Some(WaitVersion {
380            catalog_version: notification_version,
381            hummock_version_id: self.barrier_manager.get_hummock_version_id().await.to_u64(),
382        }))
383    }
384
385    pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
386        self.barrier_manager.get_ddl_progress().await
387    }
388
389    async fn create_database(&self, database: Database) -> MetaResult<NotificationVersion> {
390        self.metadata_manager
391            .catalog_controller
392            .create_database(database)
393            .await
394    }
395
396    #[tracing::instrument(skip(self), level = "debug")]
397    pub async fn reschedule_streaming_job(
398        &self,
399        job_id: u32,
400        target: JobRescheduleTarget,
401        mut deferred: bool,
402    ) -> MetaResult<()> {
403        tracing::info!("alter parallelism");
404        if self.barrier_manager.check_status_running().is_err() {
405            tracing::info!(
406                "alter parallelism is set to deferred mode because the system is in recovery state"
407            );
408            deferred = true;
409        }
410
411        self.stream_manager
412            .reschedule_streaming_job(job_id, target, deferred)
413            .await
414    }
415
416    async fn drop_database(&self, database_id: DatabaseId) -> MetaResult<NotificationVersion> {
417        self.drop_object(
418            ObjectType::Database,
419            database_id as _,
420            DropMode::Cascade,
421            None,
422        )
423        .await
424    }
425
426    async fn create_schema(&self, schema: Schema) -> MetaResult<NotificationVersion> {
427        self.metadata_manager
428            .catalog_controller
429            .create_schema(schema)
430            .await
431    }
432
433    async fn drop_schema(
434        &self,
435        schema_id: SchemaId,
436        drop_mode: DropMode,
437    ) -> MetaResult<NotificationVersion> {
438        self.drop_object(ObjectType::Schema, schema_id as _, drop_mode, None)
439            .await
440    }
441
442    /// Shared source is handled in [`Self::create_streaming_job`]
443    async fn create_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
444        let handle = create_source_worker(&source, self.source_manager.metrics.clone())
445            .await
446            .context("failed to create source worker")?;
447
448        let (source_id, version) = self
449            .metadata_manager
450            .catalog_controller
451            .create_source(source)
452            .await?;
453        self.source_manager
454            .register_source_with_handle(source_id, handle)
455            .await;
456        Ok(version)
457    }
458
459    async fn drop_source(
460        &self,
461        source_id: SourceId,
462        drop_mode: DropMode,
463    ) -> MetaResult<NotificationVersion> {
464        self.drop_object(ObjectType::Source, source_id as _, drop_mode, None)
465            .await
466    }
467
468    /// This replaces the source in the catalog.
469    /// Note: `StreamSourceInfo` in downstream MVs' `SourceExecutor`s are not updated.
470    async fn alter_non_shared_source(&self, source: Source) -> MetaResult<NotificationVersion> {
471        self.metadata_manager
472            .catalog_controller
473            .alter_non_shared_source(source)
474            .await
475    }
476
477    async fn create_function(&self, function: Function) -> MetaResult<NotificationVersion> {
478        self.metadata_manager
479            .catalog_controller
480            .create_function(function)
481            .await
482    }
483
484    async fn drop_function(&self, function_id: FunctionId) -> MetaResult<NotificationVersion> {
485        self.drop_object(
486            ObjectType::Function,
487            function_id as _,
488            DropMode::Restrict,
489            None,
490        )
491        .await
492    }
493
494    async fn create_view(&self, view: View) -> MetaResult<NotificationVersion> {
495        self.metadata_manager
496            .catalog_controller
497            .create_view(view)
498            .await
499    }
500
501    async fn drop_view(
502        &self,
503        view_id: ViewId,
504        drop_mode: DropMode,
505    ) -> MetaResult<NotificationVersion> {
506        self.drop_object(ObjectType::View, view_id as _, drop_mode, None)
507            .await
508    }
509
510    async fn create_connection(&self, connection: Connection) -> MetaResult<NotificationVersion> {
511        validate_connection(&connection).await?;
512        self.metadata_manager
513            .catalog_controller
514            .create_connection(connection)
515            .await
516    }
517
518    async fn drop_connection(
519        &self,
520        connection_id: ConnectionId,
521    ) -> MetaResult<NotificationVersion> {
522        self.drop_object(
523            ObjectType::Connection,
524            connection_id as _,
525            DropMode::Restrict,
526            None,
527        )
528        .await
529    }
530
531    // The 'secret' part of the request we receive from the frontend is in plaintext;
532    // here, we need to encrypt it before storing it in the catalog.
533    fn get_encrypted_payload(&self, secret: &Secret) -> MetaResult<Vec<u8>> {
534        let secret_store_private_key = self
535            .env
536            .opts
537            .secret_store_private_key
538            .clone()
539            .ok_or_else(|| anyhow!("secret_store_private_key is not configured"))?;
540
541        let encrypted_payload = SecretEncryption::encrypt(
542            secret_store_private_key.as_slice(),
543            secret.get_value().as_slice(),
544        )
545        .context(format!("failed to encrypt secret {}", secret.name))?;
546        Ok(encrypted_payload
547            .serialize()
548            .context(format!("failed to serialize secret {}", secret.name))?)
549    }
550
551    async fn create_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
552        // The 'secret' part of the request we receive from the frontend is in plaintext;
553        // here, we need to encrypt it before storing it in the catalog.
554        let secret_plain_payload = secret.value.clone();
555        let encrypted_payload = self.get_encrypted_payload(&secret)?;
556        secret.value = encrypted_payload;
557
558        self.metadata_manager
559            .catalog_controller
560            .create_secret(secret, secret_plain_payload)
561            .await
562    }
563
564    async fn drop_secret(&self, secret_id: SecretId) -> MetaResult<NotificationVersion> {
565        self.drop_object(ObjectType::Secret, secret_id as _, DropMode::Restrict, None)
566            .await
567    }
568
569    async fn alter_secret(&self, mut secret: Secret) -> MetaResult<NotificationVersion> {
570        let secret_plain_payload = secret.value.clone();
571        let encrypted_payload = self.get_encrypted_payload(&secret)?;
572        secret.value = encrypted_payload;
573        self.metadata_manager
574            .catalog_controller
575            .alter_secret(secret, secret_plain_payload)
576            .await
577    }
578
579    async fn create_subscription(
580        &self,
581        mut subscription: Subscription,
582    ) -> MetaResult<NotificationVersion> {
583        tracing::debug!("create subscription");
584        let _permit = self
585            .creating_streaming_job_permits
586            .semaphore
587            .acquire()
588            .await
589            .unwrap();
590        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
591        self.metadata_manager
592            .catalog_controller
593            .create_subscription_catalog(&mut subscription)
594            .await?;
595        self.stream_manager
596            .create_subscription(&subscription)
597            .await
598            .inspect_err(|e| {
599                tracing::debug!(error = %e.as_report(), "cancel create subscription");
600            })?;
601
602        let version = self
603            .metadata_manager
604            .catalog_controller
605            .notify_create_subscription(subscription.id)
606            .await?;
607        tracing::debug!("finish create subscription");
608        Ok(version)
609    }
610
611    async fn drop_subscription(
612        &self,
613        subscription_id: SubscriptionId,
614        drop_mode: DropMode,
615    ) -> MetaResult<NotificationVersion> {
616        tracing::debug!("preparing drop subscription");
617        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
618        let subscription = self
619            .metadata_manager
620            .catalog_controller
621            .get_subscription_by_id(subscription_id)
622            .await?;
623        let table_id = subscription.dependent_table_id;
624        let database_id = subscription.database_id.into();
625        let (_, version) = self
626            .metadata_manager
627            .catalog_controller
628            .drop_object(ObjectType::Subscription, subscription_id as _, drop_mode)
629            .await?;
630        self.stream_manager
631            .drop_subscription(database_id, subscription_id as _, table_id)
632            .await;
633        tracing::debug!("finish drop subscription");
634        Ok(version)
635    }
636
637    /// Validates the connect properties in the `cdc_table_desc` stored in the `StreamCdcScan` node
638    pub(crate) async fn validate_cdc_table(
639        table: &Table,
640        table_fragments: &StreamJobFragments,
641    ) -> MetaResult<()> {
642        let stream_scan_fragment = table_fragments
643            .fragments
644            .values()
645            .filter(|f| f.fragment_type_mask & FragmentTypeFlag::StreamScan as u32 != 0)
646            .exactly_one()
647            .ok()
648            .with_context(|| {
649                format!(
650                    "expect exactly one stream scan fragment, got: {:?}",
651                    table_fragments.fragments
652                )
653            })?;
654
655        assert_eq!(
656            stream_scan_fragment.actors.len(),
657            1,
658            "Stream scan fragment should have only one actor"
659        );
660        let mut found_cdc_scan = false;
661        match &stream_scan_fragment.nodes.node_body {
662            Some(NodeBody::StreamCdcScan(_)) => {
663                if Self::validate_cdc_table_inner(&stream_scan_fragment.nodes.node_body, table.id)
664                    .await?
665                {
666                    found_cdc_scan = true;
667                }
668            }
669            // When there's generated columns, the cdc scan node is wrapped in a project node
670            Some(NodeBody::Project(_)) => {
671                for input in &stream_scan_fragment.nodes.input {
672                    if Self::validate_cdc_table_inner(&input.node_body, table.id).await? {
673                        found_cdc_scan = true;
674                    }
675                }
676            }
677            _ => {
678                bail!("Unexpected node body for stream cdc scan");
679            }
680        };
681        if !found_cdc_scan {
682            bail!("No stream cdc scan node found in stream scan fragment");
683        }
684        Ok(())
685    }
686
687    async fn validate_cdc_table_inner(
688        node_body: &Option<NodeBody>,
689        table_id: u32,
690    ) -> MetaResult<bool> {
691        if let Some(NodeBody::StreamCdcScan(stream_cdc_scan)) = node_body
692            && let Some(ref cdc_table_desc) = stream_cdc_scan.cdc_table_desc
693        {
694            let options_with_secret = WithOptionsSecResolved::new(
695                cdc_table_desc.connect_properties.clone(),
696                cdc_table_desc.secret_refs.clone(),
697            );
698
699            let mut props = ConnectorProperties::extract(options_with_secret, true)?;
700            props.init_from_pb_cdc_table_desc(cdc_table_desc);
701
702            // Try creating a split enumerator to validate
703            let _enumerator = props
704                .create_split_enumerator(SourceEnumeratorContext::dummy().into())
705                .await?;
706
707            tracing::debug!(?table_id, "validate cdc table success");
708            Ok(true)
709        } else {
710            Ok(false)
711        }
712    }
713
714    // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
715    // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
716    // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
717    pub(crate) async fn inject_replace_table_job_for_table_sink(
718        &self,
719        tmp_id: u32,
720        mgr: &MetadataManager,
721        stream_ctx: StreamContext,
722        sink: Option<&Sink>,
723        creating_sink_table_fragments: Option<&StreamJobFragments>,
724        dropping_sink_id: Option<SinkId>,
725        streaming_job: &StreamingJob,
726        fragment_graph: StreamFragmentGraph,
727    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
728        let (mut replace_table_ctx, mut stream_job_fragments) = self
729            .build_replace_job(stream_ctx, streaming_job, fragment_graph, tmp_id as _)
730            .await?;
731
732        let target_table = streaming_job.table().unwrap();
733
734        if let Some(creating_sink_table_fragments) = creating_sink_table_fragments {
735            let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap();
736            let sink = sink.expect("sink not found");
737            Self::inject_replace_table_plan_for_sink(
738                sink.id,
739                &sink_fragment,
740                target_table,
741                &mut replace_table_ctx,
742                stream_job_fragments.inner.union_fragment_for_table(),
743                None,
744            );
745        }
746
747        let [table_catalog]: [_; 1] = mgr
748            .get_table_catalog_by_ids(vec![target_table.id])
749            .await?
750            .try_into()
751            .expect("Target table should exist in sink into table");
752
753        {
754            let catalogs = mgr
755                .get_sink_catalog_by_ids(&table_catalog.incoming_sinks)
756                .await?;
757
758            for sink in catalogs {
759                let sink_id = sink.id;
760
761                if let Some(dropping_sink_id) = dropping_sink_id
762                    && sink_id == (dropping_sink_id as u32)
763                {
764                    continue;
765                };
766
767                let sink_table_fragments = mgr
768                    .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id))
769                    .await?;
770
771                let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
772
773                Self::inject_replace_table_plan_for_sink(
774                    sink_id,
775                    &sink_fragment,
776                    target_table,
777                    &mut replace_table_ctx,
778                    stream_job_fragments.inner.union_fragment_for_table(),
779                    Some(&sink.unique_identity()),
780                );
781            }
782        }
783
784        // check if the union fragment is fully assigned.
785        for fragment in stream_job_fragments.fragments.values() {
786            {
787                {
788                    visit_stream_node(&fragment.nodes, |node| {
789                        if let NodeBody::Merge(merge_node) = node {
790                            let upstream_fragment_id = merge_node.upstream_fragment_id;
791                            if let Some(external_upstream_fragment_downstreams) = replace_table_ctx
792                                .upstream_fragment_downstreams
793                                .get(&upstream_fragment_id)
794                            {
795                                let mut upstream_fragment_downstreams =
796                                    external_upstream_fragment_downstreams.iter().filter(
797                                        |downstream| {
798                                            downstream.downstream_fragment_id
799                                                == fragment.fragment_id
800                                        },
801                                    );
802                                assert!(
803                                    upstream_fragment_downstreams.next().is_some(),
804                                    "All the mergers for the union should have been fully assigned beforehand."
805                                );
806                            } else {
807                                let mut upstream_fragment_downstreams = stream_job_fragments
808                                    .downstreams
809                                    .get(&upstream_fragment_id)
810                                    .into_iter()
811                                    .flatten()
812                                    .filter(|downstream| {
813                                        downstream.downstream_fragment_id == fragment.fragment_id
814                                    });
815                                assert!(
816                                    upstream_fragment_downstreams.next().is_some(),
817                                    "All the mergers for the union should have been fully assigned beforehand."
818                                );
819                            }
820                        }
821                    });
822                }
823            }
824        }
825
826        Ok((replace_table_ctx, stream_job_fragments))
827    }
828
829    pub(crate) fn inject_replace_table_plan_for_sink(
830        sink_id: u32,
831        sink_fragment: &Fragment,
832        table: &Table,
833        replace_table_ctx: &mut ReplaceStreamJobContext,
834        union_fragment: &mut Fragment,
835        unique_identity: Option<&str>,
836    ) {
837        let sink_fields = sink_fragment.nodes.fields.clone();
838
839        let output_indices = sink_fields
840            .iter()
841            .enumerate()
842            .map(|(idx, _)| idx as _)
843            .collect_vec();
844
845        let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec();
846
847        let sink_fragment_downstreams = replace_table_ctx
848            .upstream_fragment_downstreams
849            .entry(sink_fragment.fragment_id)
850            .or_default();
851
852        {
853            sink_fragment_downstreams.push(DownstreamFragmentRelation {
854                downstream_fragment_id: union_fragment.fragment_id,
855                dispatcher_type: DispatcherType::Hash,
856                dist_key_indices: dist_key_indices.clone(),
857                output_indices: output_indices.clone(),
858            });
859        }
860
861        let upstream_fragment_id = sink_fragment.fragment_id;
862
863        {
864            {
865                visit_stream_node_cont_mut(&mut union_fragment.nodes, |node| {
866                    if let Some(NodeBody::Union(_)) = &mut node.node_body {
867                        for input_project_node in &mut node.input {
868                            if let Some(NodeBody::Project(_)) = &mut input_project_node.node_body {
869                                let merge_stream_node =
870                                    input_project_node.input.iter_mut().exactly_one().unwrap();
871
872                                // we need to align nodes here
873                                if input_project_node.identity.as_str()
874                                    != unique_identity
875                                        .unwrap_or(PbSink::UNIQUE_IDENTITY_FOR_CREATING_TABLE_SINK)
876                                {
877                                    continue;
878                                }
879
880                                if let Some(NodeBody::Merge(merge_node)) =
881                                    &mut merge_stream_node.node_body
882                                {
883                                    {
884                                        merge_stream_node.identity =
885                                            format!("MergeExecutor(from sink {})", sink_id);
886
887                                        input_project_node.identity =
888                                            format!("ProjectExecutor(from sink {})", sink_id);
889                                    }
890
891                                    **merge_node = {
892                                        #[expect(deprecated)]
893                                        MergeNode {
894                                            upstream_actor_id: vec![],
895                                            upstream_fragment_id,
896                                            upstream_dispatcher_type: PbDispatcherType::Hash as _,
897                                            fields: sink_fields.to_vec(),
898                                        }
899                                    };
900
901                                    merge_stream_node.fields = sink_fields.to_vec();
902
903                                    return false;
904                                }
905                            }
906                        }
907                    }
908                    true
909                });
910            }
911        }
912    }
913
914    /// For [`CreateType::Foreground`], the function will only return after backfilling finishes
915    /// ([`crate::manager::MetadataManager::wait_streaming_job_finished`]).
916    pub async fn create_streaming_job(
917        &self,
918        mut streaming_job: StreamingJob,
919        fragment_graph: StreamFragmentGraphProto,
920        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
921        dependencies: HashSet<ObjectId>,
922        specific_resource_group: Option<String>,
923    ) -> MetaResult<NotificationVersion> {
924        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
925        self.metadata_manager
926            .catalog_controller
927            .create_job_catalog(
928                &mut streaming_job,
929                &ctx,
930                &fragment_graph.parallelism,
931                fragment_graph.max_parallelism as _,
932                dependencies,
933                specific_resource_group.clone(),
934            )
935            .await?;
936        let job_id = streaming_job.id();
937
938        tracing::debug!(
939            id = job_id,
940            definition = streaming_job.definition(),
941            create_type = streaming_job.create_type().as_str_name(),
942            job_type = ?streaming_job.job_type(),
943            "starting streaming job",
944        );
945        let _permit = self
946            .creating_streaming_job_permits
947            .semaphore
948            .acquire()
949            .await
950            .unwrap();
951        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
952
953        let name = streaming_job.name();
954        let definition = streaming_job.definition();
955        let source_id = match &streaming_job {
956            StreamingJob::Table(Some(src), _, _) | StreamingJob::Source(src) => Some(src.id),
957            _ => None,
958        };
959
960        // create streaming job.
961        match self
962            .create_streaming_job_inner(
963                ctx,
964                streaming_job,
965                fragment_graph,
966                affected_table_replace_info,
967                specific_resource_group,
968            )
969            .await
970        {
971            Ok(version) => Ok(version),
972            Err(err) => {
973                tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
974                let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
975                    id: job_id,
976                    name,
977                    definition,
978                    error: err.as_report().to_string(),
979                };
980                self.env.event_log_manager_ref().add_event_logs(vec![
981                    risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event),
982                ]);
983                let (aborted, _) = self
984                    .metadata_manager
985                    .catalog_controller
986                    .try_abort_creating_streaming_job(job_id as _, false)
987                    .await?;
988                if aborted {
989                    tracing::warn!(id = job_id, "aborted streaming job");
990                    // FIXME: might also need other cleanup here
991                    if let Some(source_id) = source_id {
992                        self.source_manager
993                            .apply_source_change(SourceChange::DropSource {
994                                dropped_source_ids: vec![source_id as SourceId],
995                            })
996                            .await;
997                    }
998                }
999                Err(err)
1000            }
1001        }
1002    }
1003
1004    async fn create_streaming_job_inner(
1005        &self,
1006        ctx: StreamContext,
1007        mut streaming_job: StreamingJob,
1008        fragment_graph: StreamFragmentGraphProto,
1009        affected_table_replace_info: Option<ReplaceStreamJobInfo>,
1010        specific_resource_group: Option<String>,
1011    ) -> MetaResult<NotificationVersion> {
1012        let mut fragment_graph =
1013            StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1014        streaming_job.set_info_from_graph(&fragment_graph);
1015
1016        // create internal table catalogs and refill table id.
1017        let incomplete_internal_tables = fragment_graph
1018            .incomplete_internal_tables()
1019            .into_values()
1020            .collect_vec();
1021        let table_id_map = self
1022            .metadata_manager
1023            .catalog_controller
1024            .create_internal_table_catalog(&streaming_job, incomplete_internal_tables)
1025            .await?;
1026        fragment_graph.refill_internal_table_ids(table_id_map);
1027
1028        let affected_table_replace_info = match affected_table_replace_info {
1029            Some(replace_table_info) => {
1030                assert!(
1031                    specific_resource_group.is_none(),
1032                    "specific_resource_group is not supported for replace table (alter column or sink into table)"
1033                );
1034
1035                let ReplaceStreamJobInfo {
1036                    mut streaming_job,
1037                    fragment_graph,
1038                    ..
1039                } = replace_table_info;
1040
1041                // Ensure the max parallelism unchanged before replacing table.
1042                let original_max_parallelism = self
1043                    .metadata_manager
1044                    .get_job_max_parallelism(streaming_job.id().into())
1045                    .await?;
1046                let fragment_graph = PbStreamFragmentGraph {
1047                    max_parallelism: original_max_parallelism as _,
1048                    ..fragment_graph
1049                };
1050
1051                let fragment_graph =
1052                    StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1053                streaming_job.set_info_from_graph(&fragment_graph);
1054                let streaming_job = streaming_job;
1055
1056                Some((streaming_job, fragment_graph))
1057            }
1058            None => None,
1059        };
1060
1061        // create fragment and actor catalogs.
1062        tracing::debug!(id = streaming_job.id(), "building streaming job");
1063        let (ctx, stream_job_fragments) = self
1064            .build_stream_job(
1065                ctx,
1066                streaming_job,
1067                fragment_graph,
1068                affected_table_replace_info,
1069                specific_resource_group,
1070            )
1071            .await?;
1072
1073        let streaming_job = &ctx.streaming_job;
1074
1075        match streaming_job {
1076            StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
1077                Self::validate_cdc_table(table, &stream_job_fragments).await?;
1078            }
1079            StreamingJob::Table(Some(source), ..) => {
1080                // Register the source on the connector node.
1081                self.source_manager.register_source(source).await?;
1082                let connector_name = source
1083                    .get_with_properties()
1084                    .get(UPSTREAM_SOURCE_KEY)
1085                    .cloned();
1086                let attr = source.info.as_ref().map(|source_info| {
1087                    jsonbb::json!({
1088                            "format": source_info.format().as_str_name(),
1089                            "encode": source_info.row_encode().as_str_name(),
1090                    })
1091                });
1092                report_create_object(
1093                    streaming_job.id(),
1094                    "source",
1095                    PbTelemetryDatabaseObject::Source,
1096                    connector_name,
1097                    attr,
1098                );
1099            }
1100            StreamingJob::Sink(sink, _) => {
1101                // Validate the sink on the connector node.
1102                validate_sink(sink).await?;
1103                let connector_name = sink.get_properties().get(UPSTREAM_SOURCE_KEY).cloned();
1104                let attr = sink.format_desc.as_ref().map(|sink_info| {
1105                    jsonbb::json!({
1106                        "format": sink_info.format().as_str_name(),
1107                        "encode": sink_info.encode().as_str_name(),
1108                    })
1109                });
1110                report_create_object(
1111                    streaming_job.id(),
1112                    "sink",
1113                    PbTelemetryDatabaseObject::Sink,
1114                    connector_name,
1115                    attr,
1116                );
1117            }
1118            StreamingJob::Source(source) => {
1119                // Register the source on the connector node.
1120                self.source_manager.register_source(source).await?;
1121                let connector_name = source
1122                    .get_with_properties()
1123                    .get(UPSTREAM_SOURCE_KEY)
1124                    .cloned();
1125                let attr = source.info.as_ref().map(|source_info| {
1126                    jsonbb::json!({
1127                            "format": source_info.format().as_str_name(),
1128                            "encode": source_info.row_encode().as_str_name(),
1129                    })
1130                });
1131                report_create_object(
1132                    streaming_job.id(),
1133                    "source",
1134                    PbTelemetryDatabaseObject::Source,
1135                    connector_name,
1136                    attr,
1137                );
1138            }
1139            _ => {}
1140        }
1141
1142        self.metadata_manager
1143            .catalog_controller
1144            .prepare_streaming_job(&stream_job_fragments, streaming_job, false)
1145            .await?;
1146
1147        // create streaming jobs.
1148        let stream_job_id = streaming_job.id();
1149        match (streaming_job.create_type(), &streaming_job) {
1150            (CreateType::Unspecified, _)
1151            | (CreateType::Foreground, _)
1152            // FIXME(kwannoel): Unify background stream's creation path with MV below.
1153            | (CreateType::Background, StreamingJob::Sink(_, _)) => {
1154                let version = self.stream_manager
1155                    .create_streaming_job(stream_job_fragments, ctx, None)
1156                    .await?;
1157                Ok(version)
1158            }
1159            (CreateType::Background, _) => {
1160                let ctrl = self.clone();
1161                let (tx, rx) = oneshot::channel();
1162                let fut = async move {
1163                    let _ = ctrl
1164                        .stream_manager
1165                        .create_streaming_job(stream_job_fragments, ctx, Some(tx))
1166                        .await.inspect_err(|err| {
1167                        tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
1168                    });
1169                };
1170                tokio::spawn(fut);
1171                rx.await.map_err(|_| anyhow!("failed to receive create streaming job result of job: {}", stream_job_id))??;
1172                Ok(IGNORED_NOTIFICATION_VERSION)
1173            }
1174        }
1175    }
1176
1177    /// `target_replace_info`: when dropping a sink into table, we need to replace the table.
1178    pub async fn drop_object(
1179        &self,
1180        object_type: ObjectType,
1181        object_id: ObjectId,
1182        drop_mode: DropMode,
1183        target_replace_info: Option<ReplaceStreamJobInfo>,
1184    ) -> MetaResult<NotificationVersion> {
1185        let (release_ctx, mut version) = self
1186            .metadata_manager
1187            .catalog_controller
1188            .drop_object(object_type, object_id, drop_mode)
1189            .await?;
1190
1191        if let Some(replace_table_info) = target_replace_info {
1192            let stream_ctx =
1193                StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap());
1194
1195            let ReplaceStreamJobInfo {
1196                mut streaming_job,
1197                fragment_graph,
1198                ..
1199            } = replace_table_info;
1200
1201            let sink_id = if let ObjectType::Sink = object_type {
1202                object_id as _
1203            } else {
1204                panic!("additional replace table event only occurs when dropping sink into table")
1205            };
1206
1207            // Ensure the max parallelism unchanged before replacing table.
1208            let original_max_parallelism = self
1209                .metadata_manager
1210                .get_job_max_parallelism(streaming_job.id().into())
1211                .await?;
1212            let fragment_graph = PbStreamFragmentGraph {
1213                max_parallelism: original_max_parallelism as _,
1214                ..fragment_graph
1215            };
1216
1217            let fragment_graph =
1218                StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1219            streaming_job.set_info_from_graph(&fragment_graph);
1220            let streaming_job = streaming_job;
1221
1222            streaming_job.table().expect("should be table job");
1223
1224            tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink");
1225            let tmp_id = self
1226                .metadata_manager
1227                .catalog_controller
1228                .create_job_catalog_for_replace(
1229                    &streaming_job,
1230                    &stream_ctx,
1231                    &fragment_graph.specified_parallelism(),
1232                    fragment_graph.max_parallelism(),
1233                )
1234                .await? as u32;
1235
1236            let (ctx, stream_job_fragments) = self
1237                .inject_replace_table_job_for_table_sink(
1238                    tmp_id,
1239                    &self.metadata_manager,
1240                    stream_ctx,
1241                    None,
1242                    None,
1243                    Some(sink_id),
1244                    &streaming_job,
1245                    fragment_graph,
1246                )
1247                .await?;
1248
1249            let result: MetaResult<_> = try {
1250                let replace_upstream = ctx.replace_upstream.clone();
1251
1252                self.metadata_manager
1253                    .catalog_controller
1254                    .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1255                    .await?;
1256
1257                self.stream_manager
1258                    .replace_stream_job(stream_job_fragments, ctx)
1259                    .await?;
1260
1261                replace_upstream
1262            };
1263
1264            version = match result {
1265                Ok(replace_upstream) => {
1266                    let version = self
1267                        .metadata_manager
1268                        .catalog_controller
1269                        .finish_replace_streaming_job(
1270                            tmp_id as _,
1271                            streaming_job,
1272                            replace_upstream,
1273                            None,
1274                            SinkIntoTableContext {
1275                                creating_sink_id: None,
1276                                dropping_sink_id: Some(sink_id),
1277                                updated_sink_catalogs: vec![],
1278                            },
1279                            None, // no source is dropped when dropping sink into table
1280                        )
1281                        .await?;
1282                    Ok(version)
1283                }
1284                Err(err) => {
1285                    tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table");
1286                    let _ = self.metadata_manager
1287                        .catalog_controller
1288                        .try_abort_replacing_streaming_job(tmp_id as _)
1289                        .await
1290                        .inspect_err(|err| {
1291                            tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table");
1292                        });
1293                    Err(err)
1294                }
1295            }?;
1296        }
1297
1298        let ReleaseContext {
1299            database_id,
1300            removed_streaming_job_ids,
1301            removed_state_table_ids,
1302            removed_source_ids,
1303            removed_secret_ids: secret_ids,
1304            removed_source_fragments,
1305            removed_actors,
1306            removed_fragments,
1307        } = release_ctx;
1308
1309        let _guard = self.source_manager.pause_tick().await;
1310        self.stream_manager
1311            .drop_streaming_jobs(
1312                risingwave_common::catalog::DatabaseId::new(database_id as _),
1313                removed_actors.iter().map(|id| *id as _).collect(),
1314                removed_streaming_job_ids,
1315                removed_state_table_ids,
1316                removed_fragments.iter().map(|id| *id as _).collect(),
1317            )
1318            .await;
1319
1320        // clean up sources after dropping streaming jobs.
1321        // Otherwise, e.g., Kafka consumer groups might be recreated after deleted.
1322        self.source_manager
1323            .apply_source_change(SourceChange::DropSource {
1324                dropped_source_ids: removed_source_ids.into_iter().map(|id| id as _).collect(),
1325            })
1326            .await;
1327
1328        // unregister fragments and actors from source manager.
1329        // FIXME: need also unregister source backfill fragments.
1330        let dropped_source_fragments = removed_source_fragments
1331            .into_iter()
1332            .map(|(source_id, fragments)| {
1333                (
1334                    source_id,
1335                    fragments.into_iter().map(|id| id as u32).collect(),
1336                )
1337            })
1338            .collect();
1339        let dropped_actors = removed_actors.iter().map(|id| *id as _).collect();
1340        self.source_manager
1341            .apply_source_change(SourceChange::DropMv {
1342                dropped_source_fragments,
1343                dropped_actors,
1344            })
1345            .await;
1346
1347        // remove secrets.
1348        for secret in secret_ids {
1349            LocalSecretManager::global().remove_secret(secret as _);
1350        }
1351        Ok(version)
1352    }
1353
1354    /// This is used for `ALTER TABLE ADD/DROP COLUMN` / `ALTER SOURCE ADD COLUMN`.
1355    pub async fn replace_job(
1356        &self,
1357        mut streaming_job: StreamingJob,
1358        fragment_graph: StreamFragmentGraphProto,
1359        col_index_mapping: Option<ColIndexMapping>,
1360    ) -> MetaResult<NotificationVersion> {
1361        match &mut streaming_job {
1362            StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1363            StreamingJob::MaterializedView(..)
1364            | StreamingJob::Sink(..)
1365            | StreamingJob::Index(..) => {
1366                bail_not_implemented!("schema change for {}", streaming_job.job_type_str())
1367            }
1368        }
1369
1370        let job_id = streaming_job.id();
1371
1372        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1373        let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
1374
1375        // Ensure the max parallelism unchanged before replacing table.
1376        let original_max_parallelism = self
1377            .metadata_manager
1378            .get_job_max_parallelism(streaming_job.id().into())
1379            .await?;
1380        let fragment_graph = PbStreamFragmentGraph {
1381            max_parallelism: original_max_parallelism as _,
1382            ..fragment_graph
1383        };
1384
1385        // 1. build fragment graph.
1386        let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?;
1387        streaming_job.set_info_from_graph(&fragment_graph);
1388
1389        // make it immutable
1390        let streaming_job = streaming_job;
1391
1392        let tmp_id = self
1393            .metadata_manager
1394            .catalog_controller
1395            .create_job_catalog_for_replace(
1396                &streaming_job,
1397                &ctx,
1398                &fragment_graph.specified_parallelism(),
1399                fragment_graph.max_parallelism(),
1400            )
1401            .await?;
1402
1403        tracing::debug!(id = job_id, "building replace streaming job");
1404        let mut updated_sink_catalogs = vec![];
1405
1406        let mut drop_table_connector_ctx = None;
1407        let result: MetaResult<_> = try {
1408            let (mut ctx, mut stream_job_fragments) = self
1409                .build_replace_job(ctx, &streaming_job, fragment_graph, tmp_id as _)
1410                .await?;
1411            drop_table_connector_ctx = ctx.drop_table_connector_ctx.clone();
1412
1413            if let StreamingJob::Table(_, table, ..) = &streaming_job {
1414                let catalogs = self
1415                    .metadata_manager
1416                    .get_sink_catalog_by_ids(&table.incoming_sinks)
1417                    .await?;
1418
1419                for sink in catalogs {
1420                    let sink_id = &sink.id;
1421
1422                    let sink_table_fragments = self
1423                        .metadata_manager
1424                        .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(
1425                            *sink_id,
1426                        ))
1427                        .await?;
1428
1429                    let sink_fragment = sink_table_fragments.sink_fragment().unwrap();
1430
1431                    Self::inject_replace_table_plan_for_sink(
1432                        *sink_id,
1433                        &sink_fragment,
1434                        table,
1435                        &mut ctx,
1436                        stream_job_fragments.inner.union_fragment_for_table(),
1437                        Some(&sink.unique_identity()),
1438                    );
1439
1440                    if sink.original_target_columns.is_empty() {
1441                        updated_sink_catalogs.push(sink.id as _);
1442                    }
1443                }
1444            }
1445
1446            let replace_upstream = ctx.replace_upstream.clone();
1447
1448            self.metadata_manager
1449                .catalog_controller
1450                .prepare_streaming_job(&stream_job_fragments, &streaming_job, true)
1451                .await?;
1452
1453            self.stream_manager
1454                .replace_stream_job(stream_job_fragments, ctx)
1455                .await?;
1456            replace_upstream
1457        };
1458
1459        match result {
1460            Ok(replace_upstream) => {
1461                let version = self
1462                    .metadata_manager
1463                    .catalog_controller
1464                    .finish_replace_streaming_job(
1465                        tmp_id,
1466                        streaming_job,
1467                        replace_upstream,
1468                        col_index_mapping,
1469                        SinkIntoTableContext {
1470                            creating_sink_id: None,
1471                            dropping_sink_id: None,
1472                            updated_sink_catalogs,
1473                        },
1474                        drop_table_connector_ctx.as_ref(),
1475                    )
1476                    .await?;
1477                if let Some(drop_table_connector_ctx) = &drop_table_connector_ctx {
1478                    self.source_manager
1479                        .apply_source_change(SourceChange::DropSource {
1480                            dropped_source_ids: vec![drop_table_connector_ctx.to_remove_source_id],
1481                        })
1482                        .await;
1483                }
1484                Ok(version)
1485            }
1486            Err(err) => {
1487                tracing::error!(id = job_id, error = ?err.as_report(), "failed to replace job");
1488                let _ = self.metadata_manager
1489                    .catalog_controller
1490                    .try_abort_replacing_streaming_job(tmp_id)
1491                    .await.inspect_err(|err| {
1492                    tracing::error!(id = job_id, error = ?err.as_report(), "failed to abort replacing job");
1493                });
1494                Err(err)
1495            }
1496        }
1497    }
1498
1499    async fn drop_streaming_job(
1500        &self,
1501        job_id: StreamingJobId,
1502        drop_mode: DropMode,
1503        target_replace_info: Option<ReplaceStreamJobInfo>,
1504    ) -> MetaResult<NotificationVersion> {
1505        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
1506
1507        let (object_id, object_type) = match job_id {
1508            StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table),
1509            StreamingJobId::Sink(id) => (id as _, ObjectType::Sink),
1510            StreamingJobId::Table(_, id) => (id as _, ObjectType::Table),
1511            StreamingJobId::Index(idx) => (idx as _, ObjectType::Index),
1512        };
1513
1514        let version = self
1515            .drop_object(object_type, object_id, drop_mode, target_replace_info)
1516            .await?;
1517        #[cfg(not(madsim))]
1518        if let StreamingJobId::Sink(sink_id) = job_id {
1519            // delete system table for exactly once iceberg sink
1520            // todo(wcy-fdu): optimize the logic to be Iceberg unique.
1521            let db = self.env.meta_store_ref().conn.clone();
1522            clean_all_rows_by_sink_id(&db, sink_id).await?;
1523        }
1524        Ok(version)
1525    }
1526
1527    /// Resolve the parallelism of the stream job based on the given information.
1528    ///
1529    /// Returns error if user specifies a parallelism that cannot be satisfied.
1530    fn resolve_stream_parallelism(
1531        &self,
1532        specified: Option<NonZeroUsize>,
1533        max: NonZeroUsize,
1534        cluster_info: &StreamingClusterInfo,
1535        resource_group: String,
1536    ) -> MetaResult<NonZeroUsize> {
1537        let available = cluster_info.parallelism(&resource_group);
1538        let Some(available) = NonZeroUsize::new(available) else {
1539            bail_unavailable!(
1540                "no available slots to schedule in resource group \"{}\", \
1541                 have you allocated any compute nodes within this resource group?",
1542                resource_group
1543            );
1544        };
1545
1546        if let Some(specified) = specified {
1547            if specified > max {
1548                bail_invalid_parameter!(
1549                    "specified parallelism {} should not exceed max parallelism {}",
1550                    specified,
1551                    max,
1552                );
1553            }
1554            if specified > available {
1555                bail_unavailable!(
1556                    "insufficient parallelism to schedule in resource group \"{}\", \
1557                     required: {}, available: {}",
1558                    resource_group,
1559                    specified,
1560                    available,
1561                );
1562            }
1563            Ok(specified)
1564        } else {
1565            // Use configured parallelism if no default parallelism is specified.
1566            let default_parallelism = match self.env.opts.default_parallelism {
1567                DefaultParallelism::Full => available,
1568                DefaultParallelism::Default(num) => {
1569                    if num > available {
1570                        bail_unavailable!(
1571                            "insufficient parallelism to schedule in resource group \"{}\", \
1572                            required: {}, available: {}",
1573                            resource_group,
1574                            num,
1575                            available,
1576                        );
1577                    }
1578                    num
1579                }
1580            };
1581
1582            if default_parallelism > max {
1583                tracing::warn!(
1584                    max_parallelism = max.get(),
1585                    resource_group,
1586                    "too many parallelism available, use max parallelism instead",
1587                );
1588            }
1589            Ok(default_parallelism.min(max))
1590        }
1591    }
1592
1593    /// Builds the actor graph:
1594    /// - Add the upstream fragments to the fragment graph
1595    /// - Schedule the fragments based on their distribution
1596    /// - Expand each fragment into one or several actors
1597    /// - Construct the fragment level backfill order control.
1598    pub(crate) async fn build_stream_job(
1599        &self,
1600        stream_ctx: StreamContext,
1601        mut stream_job: StreamingJob,
1602        fragment_graph: StreamFragmentGraph,
1603        affected_table_replace_info: Option<(StreamingJob, StreamFragmentGraph)>,
1604        specific_resource_group: Option<String>,
1605    ) -> MetaResult<(CreateStreamingJobContext, StreamJobFragmentsToCreate)> {
1606        let id = stream_job.id();
1607        let specified_parallelism = fragment_graph.specified_parallelism();
1608        let expr_context = stream_ctx.to_expr_context();
1609        let max_parallelism = NonZeroUsize::new(fragment_graph.max_parallelism()).unwrap();
1610
1611        // 1. Fragment Level ordering graph
1612        let fragment_backfill_ordering = fragment_graph.create_fragment_backfill_ordering();
1613
1614        // 2. Resolve the upstream fragments, extend the fragment graph to a complete graph that
1615        // contains all information needed for building the actor graph.
1616
1617        let (snapshot_backfill_info, cross_db_snapshot_backfill_info) =
1618            fragment_graph.collect_snapshot_backfill_info()?;
1619        assert!(
1620            snapshot_backfill_info
1621                .iter()
1622                .chain([&cross_db_snapshot_backfill_info])
1623                .flat_map(|info| info.upstream_mv_table_id_to_backfill_epoch.values())
1624                .all(|backfill_epoch| backfill_epoch.is_none()),
1625            "should not set backfill epoch when initially build the job: {:?} {:?}",
1626            snapshot_backfill_info,
1627            cross_db_snapshot_backfill_info
1628        );
1629
1630        // check if log store exists for all cross-db upstreams
1631        self.metadata_manager
1632            .catalog_controller
1633            .validate_cross_db_snapshot_backfill(&cross_db_snapshot_backfill_info)
1634            .await?;
1635
1636        let upstream_table_ids = fragment_graph
1637            .dependent_table_ids()
1638            .iter()
1639            .filter(|id| {
1640                !cross_db_snapshot_backfill_info
1641                    .upstream_mv_table_id_to_backfill_epoch
1642                    .contains_key(id)
1643            })
1644            .cloned()
1645            .collect();
1646
1647        let (upstream_root_fragments, existing_actor_location) = self
1648            .metadata_manager
1649            .get_upstream_root_fragments(&upstream_table_ids)
1650            .await?;
1651
1652        if snapshot_backfill_info.is_some() {
1653            match stream_job {
1654                StreamingJob::MaterializedView(_)
1655                | StreamingJob::Sink(_, _)
1656                | StreamingJob::Index(_, _) => {}
1657                StreamingJob::Table(_, _, _) | StreamingJob::Source(_) => {
1658                    return Err(
1659                        anyhow!("snapshot_backfill not enabled for table and source").into(),
1660                    );
1661                }
1662            }
1663        }
1664
1665        let upstream_actors = upstream_root_fragments
1666            .values()
1667            .map(|fragment| {
1668                (
1669                    fragment.fragment_id,
1670                    fragment.actors.iter().map(|actor| actor.actor_id).collect(),
1671                )
1672            })
1673            .collect();
1674
1675        let complete_graph = CompleteStreamFragmentGraph::with_upstreams(
1676            fragment_graph,
1677            upstream_root_fragments,
1678            existing_actor_location,
1679            (&stream_job).into(),
1680        )?;
1681
1682        let resource_group = match specific_resource_group {
1683            None => {
1684                self.metadata_manager
1685                    .get_database_resource_group(stream_job.database_id() as ObjectId)
1686                    .await?
1687            }
1688            Some(resource_group) => resource_group,
1689        };
1690
1691        // 3. Build the actor graph.
1692        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1693
1694        let parallelism = self.resolve_stream_parallelism(
1695            specified_parallelism,
1696            max_parallelism,
1697            &cluster_info,
1698            resource_group.clone(),
1699        )?;
1700
1701        let parallelism = self
1702            .env
1703            .system_params_reader()
1704            .await
1705            .adaptive_parallelism_strategy()
1706            .compute_target_parallelism(parallelism.get());
1707
1708        let parallelism = NonZeroUsize::new(parallelism).expect("parallelism must be positive");
1709        let actor_graph_builder = ActorGraphBuilder::new(
1710            id,
1711            resource_group,
1712            complete_graph,
1713            cluster_info,
1714            parallelism,
1715        )?;
1716
1717        let ActorGraphBuildResult {
1718            graph,
1719            downstream_fragment_relations,
1720            building_locations,
1721            existing_locations,
1722            upstream_fragment_downstreams,
1723            new_no_shuffle,
1724            replace_upstream,
1725            ..
1726        } = actor_graph_builder.generate_graph(&self.env, &stream_job, expr_context)?;
1727        assert!(replace_upstream.is_empty());
1728
1729        // 4. Build the table fragments structure that will be persisted in the stream manager,
1730        // and the context that contains all information needed for building the
1731        // actors on the compute nodes.
1732
1733        // If the frontend does not specify the degree of parallelism and the default_parallelism is set to full, then set it to ADAPTIVE.
1734        // Otherwise, it defaults to FIXED based on deduction.
1735        let table_parallelism = match (specified_parallelism, &self.env.opts.default_parallelism) {
1736            (None, DefaultParallelism::Full) => TableParallelism::Adaptive,
1737            _ => TableParallelism::Fixed(parallelism.get()),
1738        };
1739
1740        let stream_job_fragments = StreamJobFragments::new(
1741            id.into(),
1742            graph,
1743            &building_locations.actor_locations,
1744            stream_ctx.clone(),
1745            table_parallelism,
1746            max_parallelism.get(),
1747        );
1748        let internal_tables = stream_job_fragments.internal_tables();
1749
1750        if let Some(mview_fragment) = stream_job_fragments.mview_fragment() {
1751            stream_job.set_table_vnode_count(mview_fragment.vnode_count());
1752        }
1753
1754        let replace_table_job_info = match affected_table_replace_info {
1755            Some((table_stream_job, fragment_graph)) => {
1756                if snapshot_backfill_info.is_some() {
1757                    return Err(anyhow!(
1758                        "snapshot backfill should not have replace table info: {table_stream_job:?}"
1759                    )
1760                    .into());
1761                }
1762                let StreamingJob::Sink(sink, target_table) = &mut stream_job else {
1763                    bail!("additional replace table event only occurs when sinking into table");
1764                };
1765
1766                table_stream_job.table().expect("should be table job");
1767                let tmp_id = self
1768                    .metadata_manager
1769                    .catalog_controller
1770                    .create_job_catalog_for_replace(
1771                        &table_stream_job,
1772                        &stream_ctx,
1773                        &fragment_graph.specified_parallelism(),
1774                        fragment_graph.max_parallelism(),
1775                    )
1776                    .await? as u32;
1777
1778                let (context, table_fragments) = self
1779                    .inject_replace_table_job_for_table_sink(
1780                        tmp_id,
1781                        &self.metadata_manager,
1782                        stream_ctx,
1783                        Some(sink),
1784                        Some(&stream_job_fragments),
1785                        None,
1786                        &table_stream_job,
1787                        fragment_graph,
1788                    )
1789                    .await?;
1790                // When sinking into table occurs, some variables of the target table may be modified,
1791                // such as `fragment_id` being altered by `prepare_replace_table`.
1792                // At this point, it’s necessary to update the table info carried with the sink.
1793                must_match!(&table_stream_job, StreamingJob::Table(source, table, _) => {
1794                    // The StreamingJob in ReplaceTableInfo must be StreamingJob::Table
1795                    *target_table = Some((table.clone(), source.clone()));
1796                });
1797
1798                Some((table_stream_job, context, table_fragments))
1799            }
1800            None => None,
1801        };
1802
1803        let ctx = CreateStreamingJobContext {
1804            upstream_fragment_downstreams,
1805            new_no_shuffle,
1806            upstream_actors,
1807            internal_tables,
1808            building_locations,
1809            existing_locations,
1810            definition: stream_job.definition(),
1811            mv_table_id: stream_job.mv_table(),
1812            create_type: stream_job.create_type(),
1813            job_type: (&stream_job).into(),
1814            streaming_job: stream_job,
1815            replace_table_job_info,
1816            option: CreateStreamingJobOption {},
1817            snapshot_backfill_info,
1818            cross_db_snapshot_backfill_info,
1819            fragment_backfill_ordering,
1820        };
1821
1822        Ok((
1823            ctx,
1824            StreamJobFragmentsToCreate {
1825                inner: stream_job_fragments,
1826                downstreams: downstream_fragment_relations,
1827            },
1828        ))
1829    }
1830
1831    /// `build_replace_table` builds a job replacement and returns the context and new job
1832    /// fragments.
1833    ///
1834    /// Note that we use a dummy ID for the new job fragments and replace it with the real one after
1835    /// replacement is finished.
1836    pub(crate) async fn build_replace_job(
1837        &self,
1838        stream_ctx: StreamContext,
1839        stream_job: &StreamingJob,
1840        mut fragment_graph: StreamFragmentGraph,
1841        tmp_job_id: TableId,
1842    ) -> MetaResult<(ReplaceStreamJobContext, StreamJobFragmentsToCreate)> {
1843        match &stream_job {
1844            StreamingJob::Table(..) | StreamingJob::Source(..) => {}
1845            StreamingJob::MaterializedView(..)
1846            | StreamingJob::Sink(..)
1847            | StreamingJob::Index(..) => {
1848                bail_not_implemented!("schema change for {}", stream_job.job_type_str())
1849            }
1850        }
1851
1852        let id = stream_job.id();
1853        let expr_context = stream_ctx.to_expr_context();
1854
1855        // check if performing drop table connector
1856        let mut drop_table_associated_source_id = None;
1857        if let StreamingJob::Table(None, _, _) = &stream_job {
1858            drop_table_associated_source_id = self
1859                .metadata_manager
1860                .get_table_associated_source_id(id as _)
1861                .await?;
1862        }
1863
1864        let old_fragments = self
1865            .metadata_manager
1866            .get_job_fragments_by_id(&id.into())
1867            .await?;
1868        let old_internal_table_ids = old_fragments.internal_table_ids();
1869
1870        // handle drop table's associated source
1871        let mut drop_table_connector_ctx = None;
1872        if drop_table_associated_source_id.is_some() {
1873            // drop table's associated source means the fragment containing the table has just one internal table (associated source's state table)
1874            debug_assert!(old_internal_table_ids.len() == 1);
1875
1876            drop_table_connector_ctx = Some(DropTableConnectorContext {
1877                // we do not remove the original table catalog as it's still needed for the streaming job
1878                // just need to remove the ref to the state table
1879                to_change_streaming_job_id: id as i32,
1880                to_remove_state_table_id: old_internal_table_ids[0] as i32, // asserted before
1881                to_remove_source_id: drop_table_associated_source_id.unwrap(),
1882            });
1883        } else {
1884            let old_internal_tables = self
1885                .metadata_manager
1886                .get_table_catalog_by_ids(old_internal_table_ids)
1887                .await?;
1888            fragment_graph.fit_internal_table_ids(old_internal_tables)?;
1889        }
1890
1891        // 1. Resolve the edges to the downstream fragments, extend the fragment graph to a complete
1892        // graph that contains all information needed for building the actor graph.
1893        let original_root_fragment = old_fragments
1894            .root_fragment()
1895            .expect("root fragment not found");
1896
1897        let job_type = StreamingJobType::from(stream_job);
1898
1899        // Extract the downstream fragments from the fragment graph.
1900        let (downstream_fragments, downstream_actor_location) =
1901            self.metadata_manager.get_downstream_fragments(id).await?;
1902
1903        // build complete graph based on the table job type
1904        let complete_graph = match &job_type {
1905            StreamingJobType::Table(TableJobType::General) | StreamingJobType::Source => {
1906                CompleteStreamFragmentGraph::with_downstreams(
1907                    fragment_graph,
1908                    original_root_fragment.fragment_id,
1909                    downstream_fragments,
1910                    downstream_actor_location,
1911                    job_type,
1912                )?
1913            }
1914            StreamingJobType::Table(TableJobType::SharedCdcSource) => {
1915                // get the upstream fragment which should be the cdc source
1916                let (upstream_root_fragments, upstream_actor_location) = self
1917                    .metadata_manager
1918                    .get_upstream_root_fragments(fragment_graph.dependent_table_ids())
1919                    .await?;
1920
1921                CompleteStreamFragmentGraph::with_upstreams_and_downstreams(
1922                    fragment_graph,
1923                    upstream_root_fragments,
1924                    upstream_actor_location,
1925                    original_root_fragment.fragment_id,
1926                    downstream_fragments,
1927                    downstream_actor_location,
1928                    job_type,
1929                )?
1930            }
1931            _ => unreachable!(),
1932        };
1933
1934        let resource_group = self
1935            .metadata_manager
1936            .get_existing_job_resource_group(id as ObjectId)
1937            .await?;
1938
1939        // 2. Build the actor graph.
1940        let cluster_info = self.metadata_manager.get_streaming_cluster_info().await?;
1941
1942        // XXX: what is this parallelism?
1943        // Is it "assigned parallelism"?
1944        let parallelism = NonZeroUsize::new(original_root_fragment.actors.len())
1945            .expect("The number of actors in the original table fragment should be greater than 0");
1946
1947        let actor_graph_builder = ActorGraphBuilder::new(
1948            id,
1949            resource_group,
1950            complete_graph,
1951            cluster_info,
1952            parallelism,
1953        )?;
1954
1955        let ActorGraphBuildResult {
1956            graph,
1957            downstream_fragment_relations,
1958            building_locations,
1959            existing_locations,
1960            upstream_fragment_downstreams,
1961            replace_upstream,
1962            new_no_shuffle,
1963            ..
1964        } = actor_graph_builder.generate_graph(&self.env, stream_job, expr_context)?;
1965
1966        // general table & source does not have upstream job, so the dispatchers should be empty
1967        if matches!(
1968            job_type,
1969            StreamingJobType::Source | StreamingJobType::Table(TableJobType::General)
1970        ) {
1971            assert!(upstream_fragment_downstreams.is_empty());
1972        }
1973
1974        // 3. Build the table fragments structure that will be persisted in the stream manager, and
1975        // the context that contains all information needed for building the actors on the compute
1976        // nodes.
1977        let stream_job_fragments = StreamJobFragments::new(
1978            (tmp_job_id as u32).into(),
1979            graph,
1980            &building_locations.actor_locations,
1981            stream_ctx,
1982            old_fragments.assigned_parallelism,
1983            old_fragments.max_parallelism,
1984        );
1985
1986        // Note: no need to set `vnode_count` as it's already set by the frontend.
1987        // See `get_replace_table_plan`.
1988
1989        let ctx = ReplaceStreamJobContext {
1990            old_fragments,
1991            replace_upstream,
1992            new_no_shuffle,
1993            upstream_fragment_downstreams,
1994            building_locations,
1995            existing_locations,
1996            streaming_job: stream_job.clone(),
1997            tmp_id: tmp_job_id as _,
1998            drop_table_connector_ctx,
1999        };
2000
2001        Ok((
2002            ctx,
2003            StreamJobFragmentsToCreate {
2004                inner: stream_job_fragments,
2005                downstreams: downstream_fragment_relations,
2006            },
2007        ))
2008    }
2009
2010    async fn alter_name(
2011        &self,
2012        relation: alter_name_request::Object,
2013        new_name: &str,
2014    ) -> MetaResult<NotificationVersion> {
2015        let (obj_type, id) = match relation {
2016            alter_name_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2017            alter_name_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2018            alter_name_request::Object::IndexId(id) => (ObjectType::Index, id as ObjectId),
2019            alter_name_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2020            alter_name_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2021            alter_name_request::Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2022            alter_name_request::Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2023            alter_name_request::Object::SubscriptionId(id) => {
2024                (ObjectType::Subscription, id as ObjectId)
2025            }
2026        };
2027        self.metadata_manager
2028            .catalog_controller
2029            .alter_name(obj_type, id, new_name)
2030            .await
2031    }
2032
2033    async fn alter_swap_rename(
2034        &self,
2035        object: alter_swap_rename_request::Object,
2036    ) -> MetaResult<NotificationVersion> {
2037        let (obj_type, src_id, dst_id) = match object {
2038            alter_swap_rename_request::Object::Schema(_) => unimplemented!("schema swap"),
2039            alter_swap_rename_request::Object::Table(objs) => {
2040                let (src_id, dst_id) = (
2041                    objs.src_object_id as ObjectId,
2042                    objs.dst_object_id as ObjectId,
2043                );
2044                (ObjectType::Table, src_id, dst_id)
2045            }
2046            alter_swap_rename_request::Object::View(objs) => {
2047                let (src_id, dst_id) = (
2048                    objs.src_object_id as ObjectId,
2049                    objs.dst_object_id as ObjectId,
2050                );
2051                (ObjectType::View, src_id, dst_id)
2052            }
2053            alter_swap_rename_request::Object::Source(objs) => {
2054                let (src_id, dst_id) = (
2055                    objs.src_object_id as ObjectId,
2056                    objs.dst_object_id as ObjectId,
2057                );
2058                (ObjectType::Source, src_id, dst_id)
2059            }
2060            alter_swap_rename_request::Object::Sink(objs) => {
2061                let (src_id, dst_id) = (
2062                    objs.src_object_id as ObjectId,
2063                    objs.dst_object_id as ObjectId,
2064                );
2065                (ObjectType::Sink, src_id, dst_id)
2066            }
2067            alter_swap_rename_request::Object::Subscription(objs) => {
2068                let (src_id, dst_id) = (
2069                    objs.src_object_id as ObjectId,
2070                    objs.dst_object_id as ObjectId,
2071                );
2072                (ObjectType::Subscription, src_id, dst_id)
2073            }
2074        };
2075
2076        self.metadata_manager
2077            .catalog_controller
2078            .alter_swap_rename(obj_type, src_id, dst_id)
2079            .await
2080    }
2081
2082    async fn alter_owner(
2083        &self,
2084        object: Object,
2085        owner_id: UserId,
2086    ) -> MetaResult<NotificationVersion> {
2087        let (obj_type, id) = match object {
2088            Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2089            Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2090            Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2091            Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2092            Object::SchemaId(id) => (ObjectType::Schema, id as ObjectId),
2093            Object::DatabaseId(id) => (ObjectType::Database, id as ObjectId),
2094            Object::SubscriptionId(id) => (ObjectType::Subscription, id as ObjectId),
2095            Object::ConnectionId(id) => (ObjectType::Connection, id as ObjectId),
2096        };
2097        self.metadata_manager
2098            .catalog_controller
2099            .alter_owner(obj_type, id, owner_id as _)
2100            .await
2101    }
2102
2103    async fn alter_set_schema(
2104        &self,
2105        object: alter_set_schema_request::Object,
2106        new_schema_id: SchemaId,
2107    ) -> MetaResult<NotificationVersion> {
2108        let (obj_type, id) = match object {
2109            alter_set_schema_request::Object::TableId(id) => (ObjectType::Table, id as ObjectId),
2110            alter_set_schema_request::Object::ViewId(id) => (ObjectType::View, id as ObjectId),
2111            alter_set_schema_request::Object::SourceId(id) => (ObjectType::Source, id as ObjectId),
2112            alter_set_schema_request::Object::SinkId(id) => (ObjectType::Sink, id as ObjectId),
2113            alter_set_schema_request::Object::FunctionId(id) => {
2114                (ObjectType::Function, id as ObjectId)
2115            }
2116            alter_set_schema_request::Object::ConnectionId(id) => {
2117                (ObjectType::Connection, id as ObjectId)
2118            }
2119            alter_set_schema_request::Object::SubscriptionId(id) => {
2120                (ObjectType::Subscription, id as ObjectId)
2121            }
2122        };
2123        self.metadata_manager
2124            .catalog_controller
2125            .alter_schema(obj_type, id, new_schema_id as _)
2126            .await
2127    }
2128
2129    pub async fn wait(&self) -> MetaResult<()> {
2130        let timeout_ms = 30 * 60 * 1000;
2131        for _ in 0..timeout_ms {
2132            if self
2133                .metadata_manager
2134                .catalog_controller
2135                .list_background_creating_mviews(true)
2136                .await?
2137                .is_empty()
2138            {
2139                return Ok(());
2140            }
2141
2142            sleep(Duration::from_millis(1)).await;
2143        }
2144        Err(MetaError::cancelled(format!(
2145            "timeout after {timeout_ms}ms"
2146        )))
2147    }
2148
2149    async fn comment_on(&self, comment: Comment) -> MetaResult<NotificationVersion> {
2150        self.metadata_manager
2151            .catalog_controller
2152            .comment_on(comment)
2153            .await
2154    }
2155}
2156
2157fn report_create_object(
2158    catalog_id: u32,
2159    event_name: &str,
2160    obj_type: PbTelemetryDatabaseObject,
2161    connector_name: Option<String>,
2162    attr_info: Option<jsonbb::Value>,
2163) {
2164    report_event(
2165        PbTelemetryEventStage::CreateStreamJob,
2166        event_name,
2167        catalog_id.into(),
2168        connector_name,
2169        Some(obj_type),
2170        attr_info,
2171    );
2172}
2173
2174async fn clean_all_rows_by_sink_id(db: &DatabaseConnection, sink_id: i32) -> MetaResult<()> {
2175    match Entity::delete_many()
2176        .filter(Column::SinkId.eq(sink_id))
2177        .exec(db)
2178        .await
2179    {
2180        Ok(result) => {
2181            let deleted_count = result.rows_affected;
2182
2183            tracing::info!(
2184                "Deleted {} items for sink_id = {} in iceberg exactly once system table.",
2185                deleted_count,
2186                sink_id
2187            );
2188            Ok(())
2189        }
2190        Err(e) => {
2191            tracing::error!(
2192                "Error deleting records for sink_id = {} from iceberg exactly once system table: {:?}",
2193                sink_id,
2194                e.as_report()
2195            );
2196            Err(e.into())
2197        }
2198    }
2199}