risingwave_meta_service/
ddl_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::TableId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
34use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
35use risingwave_meta_model::StreamingParallelism;
36use risingwave_pb::catalog::connection::Info as ConnectionInfo;
37use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
38use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::common::worker_node::State;
41use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
42use risingwave_pb::ddl_service::ddl_service_server::DdlService;
43use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
44use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
45use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
46use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
47use risingwave_pb::id::SourceId;
48use risingwave_pb::meta::event_log;
49use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
50use risingwave_pb::stream_plan::stream_node::NodeBody;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use thiserror_ext::AsReport;
53use tokio::sync::oneshot::Sender;
54use tokio::task::JoinHandle;
55use tonic::{Request, Response, Status};
56
57use crate::MetaError;
58use crate::barrier::BarrierManagerRef;
59use crate::manager::sink_coordination::SinkCoordinatorManager;
60use crate::manager::{MetaSrvEnv, StreamingJob};
61use crate::rpc::ddl_controller::{
62    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
63};
64use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
65
66#[derive(Clone)]
67pub struct DdlServiceImpl {
68    env: MetaSrvEnv,
69
70    metadata_manager: MetadataManager,
71    sink_manager: SinkCoordinatorManager,
72    ddl_controller: DdlController,
73    meta_metrics: Arc<MetaMetrics>,
74    iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
75    barrier_scheduler: BarrierScheduler,
76}
77
78impl DdlServiceImpl {
79    #[allow(clippy::too_many_arguments)]
80    pub async fn new(
81        env: MetaSrvEnv,
82        metadata_manager: MetadataManager,
83        stream_manager: GlobalStreamManagerRef,
84        source_manager: SourceManagerRef,
85        barrier_manager: BarrierManagerRef,
86        sink_manager: SinkCoordinatorManager,
87        meta_metrics: Arc<MetaMetrics>,
88        iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
89        barrier_scheduler: BarrierScheduler,
90    ) -> Self {
91        let ddl_controller = DdlController::new(
92            env.clone(),
93            metadata_manager.clone(),
94            stream_manager,
95            source_manager,
96            barrier_manager,
97            sink_manager.clone(),
98        )
99        .await;
100        Self {
101            env,
102            metadata_manager,
103            sink_manager,
104            ddl_controller,
105            meta_metrics,
106            iceberg_compaction_manager,
107            barrier_scheduler,
108        }
109    }
110
111    fn extract_replace_table_info(
112        ReplaceJobPlan {
113            fragment_graph,
114            replace_job,
115        }: ReplaceJobPlan,
116    ) -> ReplaceStreamJobInfo {
117        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
118            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
119                table,
120                source,
121                job_type,
122            }) => StreamingJob::Table(
123                source,
124                table.unwrap(),
125                TableJobType::try_from(job_type).unwrap(),
126            ),
127            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
128                StreamingJob::Source(source.unwrap())
129            }
130            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
131                table,
132            }) => StreamingJob::MaterializedView(table.unwrap()),
133        };
134
135        ReplaceStreamJobInfo {
136            streaming_job: replace_streaming_job,
137            fragment_graph: fragment_graph.unwrap(),
138        }
139    }
140
141    fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
142        streaming_job_resource_type::ResourceType::Regular(true)
143    }
144
145    pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
146        tracing::info!("start migrate legacy table fragments task");
147        let env = self.env.clone();
148        let metadata_manager = self.metadata_manager.clone();
149        let ddl_controller = self.ddl_controller.clone();
150
151        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
152        let join_handle = tokio::spawn(async move {
153            async fn migrate_inner(
154                env: &MetaSrvEnv,
155                metadata_manager: &MetadataManager,
156                ddl_controller: &DdlController,
157            ) -> MetaResult<()> {
158                let tables = metadata_manager
159                    .catalog_controller
160                    .list_unmigrated_tables()
161                    .await?;
162
163                if tables.is_empty() {
164                    tracing::info!("no legacy table fragments need migration");
165                    return Ok(());
166                }
167
168                let client = {
169                    let workers = metadata_manager
170                        .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
171                        .await?;
172                    if workers.is_empty() {
173                        return Err(anyhow::anyhow!("no active frontend nodes found").into());
174                    }
175                    let worker = workers.choose(&mut thread_rng()).unwrap();
176                    env.frontend_client_pool().get(worker).await?
177                };
178
179                for table in tables {
180                    let start = tokio::time::Instant::now();
181                    let req = GetTableReplacePlanRequest {
182                        database_id: table.database_id,
183                        table_id: table.id,
184                        cdc_table_change: None,
185                    };
186                    let resp = client
187                        .get_table_replace_plan(req)
188                        .await
189                        .context("failed to get table replace plan from frontend")?;
190
191                    let plan = resp.into_inner().replace_plan.unwrap();
192                    let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
193                    ddl_controller
194                        .run_command(DdlCommand::ReplaceStreamJob(replace_info))
195                        .await?;
196                    tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
197                }
198                tracing::info!("successfully migrated all legacy table fragments");
199
200                Ok(())
201            }
202
203            let migrate_future = async move {
204                let mut attempt = 0;
205                loop {
206                    match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
207                        Ok(_) => break,
208                        Err(e) => {
209                            attempt += 1;
210                            tracing::error!(
211                                "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
212                                e.as_report(),
213                                attempt
214                            );
215                            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
216                        }
217                    }
218                }
219            };
220
221            select(pin!(migrate_future), shutdown_rx).await;
222        });
223
224        (join_handle, shutdown_tx)
225    }
226}
227
228#[async_trait::async_trait]
229impl DdlService for DdlServiceImpl {
230    async fn create_database(
231        &self,
232        request: Request<CreateDatabaseRequest>,
233    ) -> Result<Response<CreateDatabaseResponse>, Status> {
234        let req = request.into_inner();
235        let database = req.get_db()?.clone();
236        let version = self
237            .ddl_controller
238            .run_command(DdlCommand::CreateDatabase(database))
239            .await?;
240
241        Ok(Response::new(CreateDatabaseResponse {
242            status: None,
243            version,
244        }))
245    }
246
247    async fn drop_database(
248        &self,
249        request: Request<DropDatabaseRequest>,
250    ) -> Result<Response<DropDatabaseResponse>, Status> {
251        let req = request.into_inner();
252        let database_id = req.get_database_id();
253
254        let version = self
255            .ddl_controller
256            .run_command(DdlCommand::DropDatabase(database_id))
257            .await?;
258
259        Ok(Response::new(DropDatabaseResponse {
260            status: None,
261            version,
262        }))
263    }
264
265    async fn create_secret(
266        &self,
267        request: Request<CreateSecretRequest>,
268    ) -> Result<Response<CreateSecretResponse>, Status> {
269        let req = request.into_inner();
270        let pb_secret = Secret {
271            id: 0.into(),
272            name: req.get_name().clone(),
273            database_id: req.get_database_id(),
274            value: req.get_value().clone(),
275            owner: req.get_owner_id(),
276            schema_id: req.get_schema_id(),
277        };
278        let version = self
279            .ddl_controller
280            .run_command(DdlCommand::CreateSecret(pb_secret))
281            .await?;
282
283        Ok(Response::new(CreateSecretResponse { version }))
284    }
285
286    async fn drop_secret(
287        &self,
288        request: Request<DropSecretRequest>,
289    ) -> Result<Response<DropSecretResponse>, Status> {
290        let req = request.into_inner();
291        let secret_id = req.get_secret_id();
292        let version = self
293            .ddl_controller
294            .run_command(DdlCommand::DropSecret(secret_id))
295            .await?;
296
297        Ok(Response::new(DropSecretResponse { version }))
298    }
299
300    async fn alter_secret(
301        &self,
302        request: Request<AlterSecretRequest>,
303    ) -> Result<Response<AlterSecretResponse>, Status> {
304        let req = request.into_inner();
305        let pb_secret = Secret {
306            id: req.get_secret_id(),
307            name: req.get_name().clone(),
308            database_id: req.get_database_id(),
309            value: req.get_value().clone(),
310            owner: req.get_owner_id(),
311            schema_id: req.get_schema_id(),
312        };
313        let version = self
314            .ddl_controller
315            .run_command(DdlCommand::AlterSecret(pb_secret))
316            .await?;
317
318        Ok(Response::new(AlterSecretResponse { version }))
319    }
320
321    async fn create_schema(
322        &self,
323        request: Request<CreateSchemaRequest>,
324    ) -> Result<Response<CreateSchemaResponse>, Status> {
325        let req = request.into_inner();
326        let schema = req.get_schema()?.clone();
327        let version = self
328            .ddl_controller
329            .run_command(DdlCommand::CreateSchema(schema))
330            .await?;
331
332        Ok(Response::new(CreateSchemaResponse {
333            status: None,
334            version,
335        }))
336    }
337
338    async fn drop_schema(
339        &self,
340        request: Request<DropSchemaRequest>,
341    ) -> Result<Response<DropSchemaResponse>, Status> {
342        let req = request.into_inner();
343        let schema_id = req.get_schema_id();
344        let drop_mode = DropMode::from_request_setting(req.cascade);
345        let version = self
346            .ddl_controller
347            .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
348            .await?;
349        Ok(Response::new(DropSchemaResponse {
350            status: None,
351            version,
352        }))
353    }
354
355    async fn create_source(
356        &self,
357        request: Request<CreateSourceRequest>,
358    ) -> Result<Response<CreateSourceResponse>, Status> {
359        let req = request.into_inner();
360        let source = req.get_source()?.clone();
361
362        match req.fragment_graph {
363            None => {
364                let version = self
365                    .ddl_controller
366                    .run_command(DdlCommand::CreateNonSharedSource(source))
367                    .await?;
368                Ok(Response::new(CreateSourceResponse {
369                    status: None,
370                    version,
371                }))
372            }
373            Some(fragment_graph) => {
374                // The id of stream job has been set above
375                let stream_job = StreamingJob::Source(source);
376                let version = self
377                    .ddl_controller
378                    .run_command(DdlCommand::CreateStreamingJob {
379                        stream_job,
380                        fragment_graph,
381                        dependencies: HashSet::new(),
382                        resource_type: Self::default_streaming_job_resource_type(),
383                        if_not_exists: req.if_not_exists,
384                    })
385                    .await?;
386                Ok(Response::new(CreateSourceResponse {
387                    status: None,
388                    version,
389                }))
390            }
391        }
392    }
393
394    async fn drop_source(
395        &self,
396        request: Request<DropSourceRequest>,
397    ) -> Result<Response<DropSourceResponse>, Status> {
398        let request = request.into_inner();
399        let source_id = request.source_id;
400        let drop_mode = DropMode::from_request_setting(request.cascade);
401        let version = self
402            .ddl_controller
403            .run_command(DdlCommand::DropSource(source_id, drop_mode))
404            .await?;
405
406        Ok(Response::new(DropSourceResponse {
407            status: None,
408            version,
409        }))
410    }
411
412    async fn reset_source(
413        &self,
414        request: Request<ResetSourceRequest>,
415    ) -> Result<Response<ResetSourceResponse>, Status> {
416        let request = request.into_inner();
417        let source_id = request.source_id;
418
419        tracing::info!(
420            source_id = %source_id,
421            "Received RESET SOURCE request, routing to DDL controller"
422        );
423
424        // Route to DDL controller
425        let version = self
426            .ddl_controller
427            .run_command(DdlCommand::ResetSource(source_id))
428            .await?;
429
430        Ok(Response::new(ResetSourceResponse {
431            status: None,
432            version,
433        }))
434    }
435
436    async fn create_sink(
437        &self,
438        request: Request<CreateSinkRequest>,
439    ) -> Result<Response<CreateSinkResponse>, Status> {
440        self.env.idle_manager().record_activity();
441
442        let req = request.into_inner();
443
444        let sink = req.get_sink()?.clone();
445        let fragment_graph = req.get_fragment_graph()?.clone();
446        let dependencies = req.get_dependencies().iter().copied().collect();
447
448        let stream_job = StreamingJob::Sink(sink);
449
450        let command = DdlCommand::CreateStreamingJob {
451            stream_job,
452            fragment_graph,
453            dependencies,
454            resource_type: Self::default_streaming_job_resource_type(),
455            if_not_exists: req.if_not_exists,
456        };
457
458        let version = self.ddl_controller.run_command(command).await?;
459
460        Ok(Response::new(CreateSinkResponse {
461            status: None,
462            version,
463        }))
464    }
465
466    async fn drop_sink(
467        &self,
468        request: Request<DropSinkRequest>,
469    ) -> Result<Response<DropSinkResponse>, Status> {
470        let request = request.into_inner();
471        let sink_id = request.sink_id;
472        let drop_mode = DropMode::from_request_setting(request.cascade);
473
474        let command = DdlCommand::DropStreamingJob {
475            job_id: StreamingJobId::Sink(sink_id),
476            drop_mode,
477        };
478
479        let version = self.ddl_controller.run_command(command).await?;
480
481        self.sink_manager
482            .stop_sink_coordinator(vec![SinkId::from(sink_id)])
483            .await;
484
485        Ok(Response::new(DropSinkResponse {
486            status: None,
487            version,
488        }))
489    }
490
491    async fn create_subscription(
492        &self,
493        request: Request<CreateSubscriptionRequest>,
494    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
495        self.env.idle_manager().record_activity();
496
497        let req = request.into_inner();
498
499        let subscription = req.get_subscription()?.clone();
500        let command = DdlCommand::CreateSubscription(subscription);
501
502        let version = self.ddl_controller.run_command(command).await?;
503
504        Ok(Response::new(CreateSubscriptionResponse {
505            status: None,
506            version,
507        }))
508    }
509
510    async fn drop_subscription(
511        &self,
512        request: Request<DropSubscriptionRequest>,
513    ) -> Result<Response<DropSubscriptionResponse>, Status> {
514        let request = request.into_inner();
515        let subscription_id = request.subscription_id;
516        let drop_mode = DropMode::from_request_setting(request.cascade);
517
518        let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
519
520        let version = self.ddl_controller.run_command(command).await?;
521
522        Ok(Response::new(DropSubscriptionResponse {
523            status: None,
524            version,
525        }))
526    }
527
528    async fn create_materialized_view(
529        &self,
530        request: Request<CreateMaterializedViewRequest>,
531    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
532        self.env.idle_manager().record_activity();
533
534        let req = request.into_inner();
535        let mview = req.get_materialized_view()?.clone();
536        let fragment_graph = req.get_fragment_graph()?.clone();
537        let dependencies = req.get_dependencies().iter().copied().collect();
538        let resource_type = req.resource_type.unwrap().resource_type.unwrap();
539
540        let stream_job = StreamingJob::MaterializedView(mview);
541        let version = self
542            .ddl_controller
543            .run_command(DdlCommand::CreateStreamingJob {
544                stream_job,
545                fragment_graph,
546                dependencies,
547                resource_type,
548                if_not_exists: req.if_not_exists,
549            })
550            .await?;
551
552        Ok(Response::new(CreateMaterializedViewResponse {
553            status: None,
554            version,
555        }))
556    }
557
558    async fn drop_materialized_view(
559        &self,
560        request: Request<DropMaterializedViewRequest>,
561    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
562        self.env.idle_manager().record_activity();
563
564        let request = request.into_inner();
565        let table_id = request.table_id;
566        let drop_mode = DropMode::from_request_setting(request.cascade);
567
568        let version = self
569            .ddl_controller
570            .run_command(DdlCommand::DropStreamingJob {
571                job_id: StreamingJobId::MaterializedView(table_id),
572                drop_mode,
573            })
574            .await?;
575
576        Ok(Response::new(DropMaterializedViewResponse {
577            status: None,
578            version,
579        }))
580    }
581
582    async fn create_index(
583        &self,
584        request: Request<CreateIndexRequest>,
585    ) -> Result<Response<CreateIndexResponse>, Status> {
586        self.env.idle_manager().record_activity();
587
588        let req = request.into_inner();
589        let index = req.get_index()?.clone();
590        let index_table = req.get_index_table()?.clone();
591        let fragment_graph = req.get_fragment_graph()?.clone();
592
593        let stream_job = StreamingJob::Index(index, index_table);
594        let version = self
595            .ddl_controller
596            .run_command(DdlCommand::CreateStreamingJob {
597                stream_job,
598                fragment_graph,
599                dependencies: HashSet::new(),
600                resource_type: Self::default_streaming_job_resource_type(),
601                if_not_exists: req.if_not_exists,
602            })
603            .await?;
604
605        Ok(Response::new(CreateIndexResponse {
606            status: None,
607            version,
608        }))
609    }
610
611    async fn drop_index(
612        &self,
613        request: Request<DropIndexRequest>,
614    ) -> Result<Response<DropIndexResponse>, Status> {
615        self.env.idle_manager().record_activity();
616
617        let request = request.into_inner();
618        let index_id = request.index_id;
619        let drop_mode = DropMode::from_request_setting(request.cascade);
620        let version = self
621            .ddl_controller
622            .run_command(DdlCommand::DropStreamingJob {
623                job_id: StreamingJobId::Index(index_id),
624                drop_mode,
625            })
626            .await?;
627
628        Ok(Response::new(DropIndexResponse {
629            status: None,
630            version,
631        }))
632    }
633
634    async fn create_function(
635        &self,
636        request: Request<CreateFunctionRequest>,
637    ) -> Result<Response<CreateFunctionResponse>, Status> {
638        let req = request.into_inner();
639        let function = req.get_function()?.clone();
640
641        let version = self
642            .ddl_controller
643            .run_command(DdlCommand::CreateFunction(function))
644            .await?;
645
646        Ok(Response::new(CreateFunctionResponse {
647            status: None,
648            version,
649        }))
650    }
651
652    async fn drop_function(
653        &self,
654        request: Request<DropFunctionRequest>,
655    ) -> Result<Response<DropFunctionResponse>, Status> {
656        let request = request.into_inner();
657
658        let version = self
659            .ddl_controller
660            .run_command(DdlCommand::DropFunction(
661                request.function_id,
662                DropMode::from_request_setting(request.cascade),
663            ))
664            .await?;
665
666        Ok(Response::new(DropFunctionResponse {
667            status: None,
668            version,
669        }))
670    }
671
672    async fn create_table(
673        &self,
674        request: Request<CreateTableRequest>,
675    ) -> Result<Response<CreateTableResponse>, Status> {
676        let request = request.into_inner();
677        let job_type = request.get_job_type().unwrap_or_default();
678        let dependencies = request.get_dependencies().iter().copied().collect();
679        let source = request.source;
680        let mview = request.materialized_view.unwrap();
681        let fragment_graph = request.fragment_graph.unwrap();
682
683        let stream_job = StreamingJob::Table(source, mview, job_type);
684        let version = self
685            .ddl_controller
686            .run_command(DdlCommand::CreateStreamingJob {
687                stream_job,
688                fragment_graph,
689                dependencies,
690                resource_type: Self::default_streaming_job_resource_type(),
691                if_not_exists: request.if_not_exists,
692            })
693            .await?;
694
695        Ok(Response::new(CreateTableResponse {
696            status: None,
697            version,
698        }))
699    }
700
701    async fn drop_table(
702        &self,
703        request: Request<DropTableRequest>,
704    ) -> Result<Response<DropTableResponse>, Status> {
705        let request = request.into_inner();
706        let source_id = request.source_id;
707        let table_id = request.table_id;
708
709        let drop_mode = DropMode::from_request_setting(request.cascade);
710        let version = self
711            .ddl_controller
712            .run_command(DdlCommand::DropStreamingJob {
713                job_id: StreamingJobId::Table(
714                    source_id.map(|PbSourceId::Id(id)| id.into()),
715                    table_id,
716                ),
717                drop_mode,
718            })
719            .await?;
720
721        Ok(Response::new(DropTableResponse {
722            status: None,
723            version,
724        }))
725    }
726
727    async fn create_view(
728        &self,
729        request: Request<CreateViewRequest>,
730    ) -> Result<Response<CreateViewResponse>, Status> {
731        let req = request.into_inner();
732        let view = req.get_view()?.clone();
733        let dependencies = req
734            .get_dependencies()
735            .iter()
736            .copied()
737            .collect::<HashSet<_>>();
738
739        let version = self
740            .ddl_controller
741            .run_command(DdlCommand::CreateView(view, dependencies))
742            .await?;
743
744        Ok(Response::new(CreateViewResponse {
745            status: None,
746            version,
747        }))
748    }
749
750    async fn drop_view(
751        &self,
752        request: Request<DropViewRequest>,
753    ) -> Result<Response<DropViewResponse>, Status> {
754        let request = request.into_inner();
755        let view_id = request.get_view_id();
756        let drop_mode = DropMode::from_request_setting(request.cascade);
757        let version = self
758            .ddl_controller
759            .run_command(DdlCommand::DropView(view_id, drop_mode))
760            .await?;
761        Ok(Response::new(DropViewResponse {
762            status: None,
763            version,
764        }))
765    }
766
767    async fn risectl_list_state_tables(
768        &self,
769        _request: Request<RisectlListStateTablesRequest>,
770    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
771        let tables = self
772            .metadata_manager
773            .catalog_controller
774            .list_all_state_tables()
775            .await?;
776        Ok(Response::new(RisectlListStateTablesResponse { tables }))
777    }
778
779    async fn replace_job_plan(
780        &self,
781        request: Request<ReplaceJobPlanRequest>,
782    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
783        let req = request.into_inner().get_plan().cloned()?;
784
785        let version = self
786            .ddl_controller
787            .run_command(DdlCommand::ReplaceStreamJob(
788                Self::extract_replace_table_info(req),
789            ))
790            .await?;
791
792        Ok(Response::new(ReplaceJobPlanResponse {
793            status: None,
794            version,
795        }))
796    }
797
798    async fn get_table(
799        &self,
800        request: Request<GetTableRequest>,
801    ) -> Result<Response<GetTableResponse>, Status> {
802        let req = request.into_inner();
803        let table = self
804            .metadata_manager
805            .catalog_controller
806            .get_table_by_name(&req.database_name, &req.table_name)
807            .await?;
808
809        Ok(Response::new(GetTableResponse { table }))
810    }
811
812    async fn alter_name(
813        &self,
814        request: Request<AlterNameRequest>,
815    ) -> Result<Response<AlterNameResponse>, Status> {
816        let AlterNameRequest { object, new_name } = request.into_inner();
817        let version = self
818            .ddl_controller
819            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
820            .await?;
821        Ok(Response::new(AlterNameResponse {
822            status: None,
823            version,
824        }))
825    }
826
827    /// Only support add column for now.
828    async fn alter_source(
829        &self,
830        request: Request<AlterSourceRequest>,
831    ) -> Result<Response<AlterSourceResponse>, Status> {
832        let AlterSourceRequest { source } = request.into_inner();
833        let version = self
834            .ddl_controller
835            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
836            .await?;
837        Ok(Response::new(AlterSourceResponse {
838            status: None,
839            version,
840        }))
841    }
842
843    async fn alter_owner(
844        &self,
845        request: Request<AlterOwnerRequest>,
846    ) -> Result<Response<AlterOwnerResponse>, Status> {
847        let AlterOwnerRequest { object, owner_id } = request.into_inner();
848        let version = self
849            .ddl_controller
850            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
851            .await?;
852        Ok(Response::new(AlterOwnerResponse {
853            status: None,
854            version,
855        }))
856    }
857
858    async fn alter_set_schema(
859        &self,
860        request: Request<AlterSetSchemaRequest>,
861    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
862        let AlterSetSchemaRequest {
863            object,
864            new_schema_id,
865        } = request.into_inner();
866        let version = self
867            .ddl_controller
868            .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
869            .await?;
870        Ok(Response::new(AlterSetSchemaResponse {
871            status: None,
872            version,
873        }))
874    }
875
876    async fn get_ddl_progress(
877        &self,
878        _request: Request<GetDdlProgressRequest>,
879    ) -> Result<Response<GetDdlProgressResponse>, Status> {
880        Ok(Response::new(GetDdlProgressResponse {
881            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
882        }))
883    }
884
885    async fn create_connection(
886        &self,
887        request: Request<CreateConnectionRequest>,
888    ) -> Result<Response<CreateConnectionResponse>, Status> {
889        let req = request.into_inner();
890        if req.payload.is_none() {
891            return Err(Status::invalid_argument("request is empty"));
892        }
893
894        match req.payload.unwrap() {
895            create_connection_request::Payload::PrivateLink(_) => {
896                panic!("Private Link Connection has been deprecated")
897            }
898            create_connection_request::Payload::ConnectionParams(params) => {
899                let pb_connection = Connection {
900                    id: 0.into(),
901                    schema_id: req.schema_id,
902                    database_id: req.database_id,
903                    name: req.name,
904                    info: Some(ConnectionInfo::ConnectionParams(params)),
905                    owner: req.owner_id,
906                };
907                let version = self
908                    .ddl_controller
909                    .run_command(DdlCommand::CreateConnection(pb_connection))
910                    .await?;
911                Ok(Response::new(CreateConnectionResponse { version }))
912            }
913        }
914    }
915
916    async fn list_connections(
917        &self,
918        _request: Request<ListConnectionsRequest>,
919    ) -> Result<Response<ListConnectionsResponse>, Status> {
920        let conns = self
921            .metadata_manager
922            .catalog_controller
923            .list_connections()
924            .await?;
925
926        Ok(Response::new(ListConnectionsResponse {
927            connections: conns,
928        }))
929    }
930
931    async fn drop_connection(
932        &self,
933        request: Request<DropConnectionRequest>,
934    ) -> Result<Response<DropConnectionResponse>, Status> {
935        let req = request.into_inner();
936        let drop_mode = DropMode::from_request_setting(req.cascade);
937
938        let version = self
939            .ddl_controller
940            .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
941            .await?;
942
943        Ok(Response::new(DropConnectionResponse {
944            status: None,
945            version,
946        }))
947    }
948
949    async fn comment_on(
950        &self,
951        request: Request<CommentOnRequest>,
952    ) -> Result<Response<CommentOnResponse>, Status> {
953        let req = request.into_inner();
954        let comment = req.get_comment()?.clone();
955
956        let version = self
957            .ddl_controller
958            .run_command(DdlCommand::CommentOn(Comment {
959                table_id: comment.table_id,
960                schema_id: comment.schema_id,
961                database_id: comment.database_id,
962                column_index: comment.column_index,
963                description: comment.description,
964            }))
965            .await?;
966
967        Ok(Response::new(CommentOnResponse {
968            status: None,
969            version,
970        }))
971    }
972
973    async fn get_tables(
974        &self,
975        request: Request<GetTablesRequest>,
976    ) -> Result<Response<GetTablesResponse>, Status> {
977        let GetTablesRequest {
978            table_ids,
979            include_dropped_tables,
980        } = request.into_inner();
981        let ret = self
982            .metadata_manager
983            .catalog_controller
984            .get_table_by_ids(table_ids, include_dropped_tables)
985            .await?;
986
987        let mut tables = HashMap::default();
988        for table in ret {
989            tables.insert(table.id, table);
990        }
991        Ok(Response::new(GetTablesResponse { tables }))
992    }
993
994    async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
995        self.ddl_controller.wait().await?;
996        Ok(Response::new(WaitResponse {}))
997    }
998
999    async fn alter_cdc_table_backfill_parallelism(
1000        &self,
1001        request: Request<AlterCdcTableBackfillParallelismRequest>,
1002    ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1003        let req = request.into_inner();
1004        let job_id = req.get_table_id();
1005        let parallelism = *req.get_parallelism()?;
1006
1007        let table_parallelism = ModelTableParallelism::from(parallelism);
1008        let streaming_parallelism = match table_parallelism {
1009            ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1010            _ => bail_invalid_parameter!(
1011                "CDC table backfill parallelism must be set to a fixed value"
1012            ),
1013        };
1014
1015        self.ddl_controller
1016            .reschedule_cdc_table_backfill(
1017                job_id,
1018                ReschedulePolicy::Parallelism(ParallelismPolicy {
1019                    parallelism: streaming_parallelism,
1020                }),
1021            )
1022            .await?;
1023        Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1024    }
1025
1026    async fn alter_parallelism(
1027        &self,
1028        request: Request<AlterParallelismRequest>,
1029    ) -> Result<Response<AlterParallelismResponse>, Status> {
1030        let req = request.into_inner();
1031
1032        let job_id = req.get_table_id();
1033        let parallelism = *req.get_parallelism()?;
1034        let deferred = req.get_deferred();
1035
1036        let parallelism = match parallelism.get_parallelism()? {
1037            Parallelism::Fixed(FixedParallelism { parallelism }) => {
1038                StreamingParallelism::Fixed(*parallelism as _)
1039            }
1040            Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1041            _ => bail_unavailable!(),
1042        };
1043
1044        self.ddl_controller
1045            .reschedule_streaming_job(
1046                job_id,
1047                ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1048                deferred,
1049            )
1050            .await?;
1051
1052        Ok(Response::new(AlterParallelismResponse {}))
1053    }
1054
1055    async fn alter_fragment_parallelism(
1056        &self,
1057        request: Request<AlterFragmentParallelismRequest>,
1058    ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1059        let req = request.into_inner();
1060
1061        let fragment_ids = req.fragment_ids;
1062        if fragment_ids.is_empty() {
1063            return Err(Status::invalid_argument(
1064                "at least one fragment id must be provided",
1065            ));
1066        }
1067
1068        let parallelism = match req.parallelism {
1069            Some(parallelism) => {
1070                let streaming_parallelism = match parallelism.get_parallelism()? {
1071                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1072                        StreamingParallelism::Fixed(*parallelism as _)
1073                    }
1074                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1075                        StreamingParallelism::Adaptive
1076                    }
1077                    _ => bail_unavailable!(),
1078                };
1079                Some(streaming_parallelism)
1080            }
1081            None => None,
1082        };
1083
1084        let fragment_targets = fragment_ids
1085            .into_iter()
1086            .map(|fragment_id| (fragment_id, parallelism.clone()))
1087            .collect();
1088
1089        self.ddl_controller
1090            .reschedule_fragments(fragment_targets)
1091            .await?;
1092
1093        Ok(Response::new(AlterFragmentParallelismResponse {}))
1094    }
1095
1096    async fn alter_streaming_job_config(
1097        &self,
1098        request: Request<AlterStreamingJobConfigRequest>,
1099    ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1100        let AlterStreamingJobConfigRequest {
1101            job_id,
1102            entries_to_add,
1103            keys_to_remove,
1104        } = request.into_inner();
1105
1106        self.ddl_controller
1107            .run_command(DdlCommand::AlterStreamingJobConfig(
1108                job_id,
1109                entries_to_add,
1110                keys_to_remove,
1111            ))
1112            .await?;
1113
1114        Ok(Response::new(AlterStreamingJobConfigResponse {}))
1115    }
1116
1117    /// Auto schema change for cdc sources,
1118    /// called by the source parser when a schema change is detected.
1119    async fn auto_schema_change(
1120        &self,
1121        request: Request<AutoSchemaChangeRequest>,
1122    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1123        let req = request.into_inner();
1124
1125        // randomly select a frontend worker to get the replace table plan
1126        let workers = self
1127            .metadata_manager
1128            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1129            .await?;
1130        let worker = workers
1131            .choose(&mut thread_rng())
1132            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1133
1134        let client = self
1135            .env
1136            .frontend_client_pool()
1137            .get(worker)
1138            .await
1139            .map_err(MetaError::from)?;
1140
1141        let Some(schema_change) = req.schema_change else {
1142            return Err(Status::invalid_argument(
1143                "schema change message is required",
1144            ));
1145        };
1146
1147        for table_change in schema_change.table_changes {
1148            for c in &table_change.columns {
1149                let c = ColumnCatalog::from(c.clone());
1150
1151                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1152                    tracing::warn!(target: "auto_schema_change",
1153                      cdc_table_id = table_change.cdc_table_id,
1154                    upstraem_ddl = table_change.upstream_ddl,
1155                        "invalid column type from cdc table change");
1156                    Err(Status::invalid_argument(format!(
1157                        "invalid column type: {} from cdc table change, column: {:?}",
1158                        column_type, c
1159                    )))
1160                };
1161                if c.is_generated() {
1162                    return invalid_col_type("generated column", &c);
1163                }
1164                if c.is_rw_sys_column() {
1165                    return invalid_col_type("rw system column", &c);
1166                }
1167                if c.is_hidden {
1168                    return invalid_col_type("hidden column", &c);
1169                }
1170            }
1171
1172            // get the table catalog corresponding to the cdc table
1173            let tables: Vec<Table> = self
1174                .metadata_manager
1175                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1176                .await?;
1177
1178            for table in tables {
1179                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
1180                // is a subset of the other.
1181                let original_columns: HashSet<(String, DataType)> =
1182                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
1183                        let col = ColumnCatalog::from(col.clone());
1184                        if col.is_generated() || col.is_hidden() {
1185                            None
1186                        } else {
1187                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1188                        }
1189                    }));
1190
1191                let mut new_columns: HashSet<(String, DataType)> =
1192                    HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1193                        let col = ColumnCatalog::from(col.clone());
1194                        if col.is_generated() || col.is_hidden() {
1195                            None
1196                        } else {
1197                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1198                        }
1199                    }));
1200
1201                // For subset/superset check, we need to add visible connector additional columns defined by INCLUDE in the original table to new_columns
1202                // This includes both _rw columns and user-defined INCLUDE columns (e.g., INCLUDE TIMESTAMP AS xxx)
1203                for col in &table.columns {
1204                    let col = ColumnCatalog::from(col.clone());
1205                    if col.is_connector_additional_column()
1206                        && !col.is_hidden()
1207                        && !col.is_generated()
1208                    {
1209                        new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1210                    }
1211                }
1212
1213                if !(original_columns.is_subset(&new_columns)
1214                    || original_columns.is_superset(&new_columns))
1215                {
1216                    tracing::warn!(target: "auto_schema_change",
1217                                    table_id = %table.id,
1218                                    cdc_table_id = table.cdc_table_id,
1219                                    upstraem_ddl = table_change.upstream_ddl,
1220                                    original_columns = ?original_columns,
1221                                    new_columns = ?new_columns,
1222                                    "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1223
1224                    let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1225                    add_auto_schema_change_fail_event_log(
1226                        &self.meta_metrics,
1227                        table.id,
1228                        table.name.clone(),
1229                        table_change.cdc_table_id.clone(),
1230                        table_change.upstream_ddl.clone(),
1231                        &self.env.event_log_manager_ref(),
1232                        fail_info,
1233                    );
1234
1235                    return Err(Status::invalid_argument(
1236                        "New columns should be a subset or superset of the original columns (including hidden columns)",
1237                    ));
1238                }
1239                // skip the schema change if there is no change to original columns
1240                if original_columns == new_columns {
1241                    tracing::warn!(target: "auto_schema_change",
1242                                   table_id = %table.id,
1243                                   cdc_table_id = table.cdc_table_id,
1244                                   upstraem_ddl = table_change.upstream_ddl,
1245                                    original_columns = ?original_columns,
1246                                    new_columns = ?new_columns,
1247                                   "No change to columns, skipping the schema change");
1248                    continue;
1249                }
1250
1251                let latency_timer = self
1252                    .meta_metrics
1253                    .auto_schema_change_latency
1254                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1255                    .start_timer();
1256                // send a request to the frontend to get the ReplaceJobPlan
1257                // will retry with exponential backoff if the request fails
1258                let resp = client
1259                    .get_table_replace_plan(GetTableReplacePlanRequest {
1260                        database_id: table.database_id,
1261                        table_id: table.id,
1262                        cdc_table_change: Some(table_change.clone()),
1263                    })
1264                    .await;
1265
1266                match resp {
1267                    Ok(resp) => {
1268                        let resp = resp.into_inner();
1269                        if let Some(plan) = resp.replace_plan {
1270                            let plan = Self::extract_replace_table_info(plan);
1271                            plan.streaming_job.table().inspect(|t| {
1272                                tracing::info!(
1273                                    target: "auto_schema_change",
1274                                    table_id = %t.id,
1275                                    cdc_table_id = t.cdc_table_id,
1276                                    upstraem_ddl = table_change.upstream_ddl,
1277                                    "Start the replace config change")
1278                            });
1279                            // start the schema change procedure
1280                            let replace_res = self
1281                                .ddl_controller
1282                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1283                                .await;
1284
1285                            match replace_res {
1286                                Ok(_) => {
1287                                    tracing::info!(
1288                                        target: "auto_schema_change",
1289                                        table_id = %table.id,
1290                                        cdc_table_id = table.cdc_table_id,
1291                                        "Table replaced success");
1292
1293                                    self.meta_metrics
1294                                        .auto_schema_change_success_cnt
1295                                        .with_guarded_label_values(&[
1296                                            &table.id.to_string(),
1297                                            &table.name,
1298                                        ])
1299                                        .inc();
1300                                    latency_timer.observe_duration();
1301                                }
1302                                Err(e) => {
1303                                    tracing::error!(
1304                                        target: "auto_schema_change",
1305                                        error = %e.as_report(),
1306                                        table_id = %table.id,
1307                                        cdc_table_id = table.cdc_table_id,
1308                                        upstraem_ddl = table_change.upstream_ddl,
1309                                        "failed to replace the table",
1310                                    );
1311                                    let fail_info =
1312                                        format!("failed to replace the table: {}", e.as_report());
1313                                    add_auto_schema_change_fail_event_log(
1314                                        &self.meta_metrics,
1315                                        table.id,
1316                                        table.name.clone(),
1317                                        table_change.cdc_table_id.clone(),
1318                                        table_change.upstream_ddl.clone(),
1319                                        &self.env.event_log_manager_ref(),
1320                                        fail_info,
1321                                    );
1322                                }
1323                            };
1324                        }
1325                    }
1326                    Err(e) => {
1327                        tracing::error!(
1328                            target: "auto_schema_change",
1329                            error = %e.as_report(),
1330                            table_id = %table.id,
1331                            cdc_table_id = table.cdc_table_id,
1332                            "failed to get replace table plan",
1333                        );
1334                        let fail_info =
1335                            format!("failed to get replace table plan: {}", e.as_report());
1336                        add_auto_schema_change_fail_event_log(
1337                            &self.meta_metrics,
1338                            table.id,
1339                            table.name.clone(),
1340                            table_change.cdc_table_id.clone(),
1341                            table_change.upstream_ddl.clone(),
1342                            &self.env.event_log_manager_ref(),
1343                            fail_info,
1344                        );
1345                    }
1346                };
1347            }
1348        }
1349
1350        Ok(Response::new(AutoSchemaChangeResponse {}))
1351    }
1352
1353    async fn alter_swap_rename(
1354        &self,
1355        request: Request<AlterSwapRenameRequest>,
1356    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1357        let req = request.into_inner();
1358
1359        let version = self
1360            .ddl_controller
1361            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1362            .await?;
1363
1364        Ok(Response::new(AlterSwapRenameResponse {
1365            status: None,
1366            version,
1367        }))
1368    }
1369
1370    async fn alter_resource_group(
1371        &self,
1372        request: Request<AlterResourceGroupRequest>,
1373    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1374        let req = request.into_inner();
1375
1376        let table_id = req.get_table_id();
1377        let deferred = req.get_deferred();
1378        let resource_group = req.resource_group;
1379
1380        self.ddl_controller
1381            .reschedule_streaming_job(
1382                table_id.as_job_id(),
1383                ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1384                deferred,
1385            )
1386            .await?;
1387
1388        Ok(Response::new(AlterResourceGroupResponse {}))
1389    }
1390
1391    async fn alter_database_param(
1392        &self,
1393        request: Request<AlterDatabaseParamRequest>,
1394    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1395        let req = request.into_inner();
1396        let database_id = req.database_id;
1397
1398        let param = match req.param.unwrap() {
1399            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1400                AlterDatabaseParam::BarrierIntervalMs(value.value)
1401            }
1402            alter_database_param_request::Param::CheckpointFrequency(value) => {
1403                AlterDatabaseParam::CheckpointFrequency(value.value)
1404            }
1405        };
1406        let version = self
1407            .ddl_controller
1408            .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1409            .await?;
1410
1411        return Ok(Response::new(AlterDatabaseParamResponse {
1412            status: None,
1413            version,
1414        }));
1415    }
1416
1417    async fn compact_iceberg_table(
1418        &self,
1419        request: Request<CompactIcebergTableRequest>,
1420    ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1421        let req = request.into_inner();
1422        let sink_id = req.sink_id;
1423
1424        // Trigger manual compaction directly using the sink ID
1425        let task_id = self
1426            .iceberg_compaction_manager
1427            .trigger_manual_compaction(sink_id)
1428            .await
1429            .map_err(|e| {
1430                Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1431            })?;
1432
1433        Ok(Response::new(CompactIcebergTableResponse {
1434            status: None,
1435            task_id,
1436        }))
1437    }
1438
1439    async fn expire_iceberg_table_snapshots(
1440        &self,
1441        request: Request<ExpireIcebergTableSnapshotsRequest>,
1442    ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1443        let req = request.into_inner();
1444        let sink_id = req.sink_id;
1445
1446        // Trigger manual snapshot expiration directly using the sink ID
1447        self.iceberg_compaction_manager
1448            .check_and_expire_snapshots(sink_id)
1449            .await
1450            .map_err(|e| {
1451                Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1452            })?;
1453
1454        Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1455            status: None,
1456        }))
1457    }
1458
1459    async fn create_iceberg_table(
1460        &self,
1461        request: Request<CreateIcebergTableRequest>,
1462    ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1463        let req = request.into_inner();
1464        let CreateIcebergTableRequest {
1465            table_info,
1466            sink_info,
1467            iceberg_source,
1468            if_not_exists,
1469        } = req;
1470
1471        // 1. create table job
1472        let PbTableJobInfo {
1473            source,
1474            table,
1475            fragment_graph,
1476            job_type,
1477        } = table_info.unwrap();
1478        let mut table = table.unwrap();
1479        let mut fragment_graph = fragment_graph.unwrap();
1480        let database_id = table.get_database_id();
1481        let schema_id = table.get_schema_id();
1482        let table_name = table.get_name().to_owned();
1483
1484        // Mark table as background creation, so that it won't block sink creation.
1485        table.create_type = PbCreateType::Background as _;
1486
1487        // Set the source rate limit to 0 and reset it back after the iceberg sink is backfilling.
1488        let source_rate_limit = if let Some(source) = &source {
1489            for fragment in fragment_graph.fragments.values_mut() {
1490                stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1491                    if let NodeBody::Source(source_node) = node
1492                        && let Some(inner) = &mut source_node.source_inner
1493                    {
1494                        inner.rate_limit = Some(0);
1495                    }
1496                });
1497            }
1498            Some(source.rate_limit)
1499        } else {
1500            None
1501        };
1502
1503        let stream_job =
1504            StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1505        let _ = self
1506            .ddl_controller
1507            .run_command(DdlCommand::CreateStreamingJob {
1508                stream_job,
1509                fragment_graph,
1510                dependencies: HashSet::new(),
1511                resource_type: Self::default_streaming_job_resource_type(),
1512                if_not_exists,
1513            })
1514            .await?;
1515
1516        let table_catalog = self
1517            .metadata_manager
1518            .catalog_controller
1519            .get_table_catalog_by_name(database_id, schema_id, &table_name)
1520            .await?
1521            .ok_or(Status::not_found("Internal error: table not found"))?;
1522
1523        // 2. create iceberg sink job
1524        let PbSinkJobInfo {
1525            sink,
1526            fragment_graph,
1527        } = sink_info.unwrap();
1528        let mut sink = sink.unwrap();
1529
1530        // Mark sink as background creation, so that it won't block source creation.
1531        sink.create_type = PbCreateType::Background as _;
1532
1533        let mut fragment_graph = fragment_graph.unwrap();
1534
1535        assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1536        assert!(
1537            risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1538                .is_placeholder()
1539        );
1540        fragment_graph.dependent_table_ids[0] = table_catalog.id;
1541        for fragment in fragment_graph.fragments.values_mut() {
1542            stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1543                NodeBody::StreamScan(scan) => {
1544                    scan.table_id = table_catalog.id;
1545                    if let Some(table_desc) = &mut scan.table_desc {
1546                        assert!(
1547                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1548                                .is_placeholder()
1549                        );
1550                        table_desc.table_id = table_catalog.id;
1551                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1552                    }
1553                    if let Some(table) = &mut scan.arrangement_table {
1554                        assert!(
1555                            risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1556                        );
1557                        *table = table_catalog.clone();
1558                    }
1559                }
1560                NodeBody::BatchPlan(plan) => {
1561                    if let Some(table_desc) = &mut plan.table_desc {
1562                        assert!(
1563                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1564                                .is_placeholder()
1565                        );
1566                        table_desc.table_id = table_catalog.id;
1567                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1568                    }
1569                }
1570                _ => {}
1571            });
1572        }
1573
1574        let table_id = table_catalog.id;
1575        let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1576        let stream_job = StreamingJob::Sink(sink);
1577        let res = self
1578            .ddl_controller
1579            .run_command(DdlCommand::CreateStreamingJob {
1580                stream_job,
1581                fragment_graph,
1582                dependencies,
1583                resource_type: Self::default_streaming_job_resource_type(),
1584                if_not_exists,
1585            })
1586            .await;
1587
1588        if res.is_err() {
1589            let _ = self
1590                .ddl_controller
1591                .run_command(DdlCommand::DropStreamingJob {
1592                    job_id: StreamingJobId::Table(None, table_id),
1593                    drop_mode: DropMode::Cascade,
1594                })
1595                .await
1596                .inspect_err(|err| {
1597                    tracing::error!(error = %err.as_report(),
1598                        "Failed to clean up table after iceberg sink creation failure",
1599                    );
1600                });
1601            res?;
1602        }
1603
1604        // 3. reset source rate limit back to normal after sink creation
1605        if let Some(source_rate_limit) = source_rate_limit
1606            && source_rate_limit != Some(0)
1607        {
1608            let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1609                table_catalog.optional_associated_source_id.unwrap();
1610            let (jobs, fragments) = self
1611                .metadata_manager
1612                .update_source_rate_limit_by_source_id(SourceId::new(source_id), source_rate_limit)
1613                .await?;
1614            let throttle_config = ThrottleConfig {
1615                throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1616                rate_limit: source_rate_limit,
1617            };
1618            let _ = self
1619                .barrier_scheduler
1620                .run_command(
1621                    database_id,
1622                    Command::Throttle {
1623                        jobs,
1624                        config: fragments
1625                            .into_iter()
1626                            .map(|fragment_id| (fragment_id, throttle_config))
1627                            .collect(),
1628                    },
1629                )
1630                .await?;
1631        }
1632
1633        // 4. create iceberg source
1634        let iceberg_source = iceberg_source.unwrap();
1635        let res = self
1636            .ddl_controller
1637            .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1638            .await;
1639        if res.is_err() {
1640            let _ = self
1641                .ddl_controller
1642                .run_command(DdlCommand::DropStreamingJob {
1643                    job_id: StreamingJobId::Table(None, table_id),
1644                    drop_mode: DropMode::Cascade,
1645                })
1646                .await
1647                .inspect_err(|err| {
1648                    tracing::error!(
1649                        error = %err.as_report(),
1650                        "Failed to clean up table after iceberg source creation failure",
1651                    );
1652                });
1653        }
1654
1655        Ok(Response::new(CreateIcebergTableResponse {
1656            status: None,
1657            version: res?,
1658        }))
1659    }
1660}
1661
1662fn add_auto_schema_change_fail_event_log(
1663    meta_metrics: &Arc<MetaMetrics>,
1664    table_id: TableId,
1665    table_name: String,
1666    cdc_table_id: String,
1667    upstream_ddl: String,
1668    event_log_manager: &EventLogManagerRef,
1669    fail_info: String,
1670) {
1671    meta_metrics
1672        .auto_schema_change_failure_cnt
1673        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1674        .inc();
1675    let event = event_log::EventAutoSchemaChangeFail {
1676        table_id,
1677        table_name,
1678        cdc_table_id,
1679        upstream_ddl,
1680        fail_info,
1681    };
1682    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1683}