risingwave_meta_service/
ddl_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::TableId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
30use risingwave_meta::model::TableParallelism as ModelTableParallelism;
31use risingwave_meta::rpc::metrics::MetaMetrics;
32use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
33use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
34use risingwave_meta_model::{ObjectId, StreamingParallelism};
35use risingwave_pb::catalog::connection::Info as ConnectionInfo;
36use risingwave_pb::catalog::{Comment, Connection, Secret, Table};
37use risingwave_pb::common::WorkerType;
38use risingwave_pb::common::worker_node::State;
39use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
40use risingwave_pb::ddl_service::ddl_service_server::DdlService;
41use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
42use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
43use risingwave_pb::ddl_service::*;
44use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
45use risingwave_pb::meta::event_log;
46use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
47use risingwave_pb::stream_plan::stream_node::NodeBody;
48use thiserror_ext::AsReport;
49use tokio::sync::oneshot::Sender;
50use tokio::task::JoinHandle;
51use tonic::{Request, Response, Status};
52
53use crate::MetaError;
54use crate::barrier::BarrierManagerRef;
55use crate::manager::sink_coordination::SinkCoordinatorManager;
56use crate::manager::{MetaSrvEnv, StreamingJob};
57use crate::rpc::ddl_controller::{
58    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
59};
60use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
61
62#[derive(Clone)]
63pub struct DdlServiceImpl {
64    env: MetaSrvEnv,
65
66    metadata_manager: MetadataManager,
67    sink_manager: SinkCoordinatorManager,
68    ddl_controller: DdlController,
69    meta_metrics: Arc<MetaMetrics>,
70    iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
71}
72
73impl DdlServiceImpl {
74    #[allow(clippy::too_many_arguments)]
75    pub async fn new(
76        env: MetaSrvEnv,
77        metadata_manager: MetadataManager,
78        stream_manager: GlobalStreamManagerRef,
79        source_manager: SourceManagerRef,
80        barrier_manager: BarrierManagerRef,
81        sink_manager: SinkCoordinatorManager,
82        meta_metrics: Arc<MetaMetrics>,
83        iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
84    ) -> Self {
85        let ddl_controller = DdlController::new(
86            env.clone(),
87            metadata_manager.clone(),
88            stream_manager,
89            source_manager,
90            barrier_manager,
91        )
92        .await;
93        Self {
94            env,
95            metadata_manager,
96            sink_manager,
97            ddl_controller,
98            meta_metrics,
99            iceberg_compaction_manager,
100        }
101    }
102
103    fn extract_replace_table_info(
104        ReplaceJobPlan {
105            fragment_graph,
106            replace_job,
107        }: ReplaceJobPlan,
108    ) -> ReplaceStreamJobInfo {
109        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
110            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
111                table,
112                source,
113                job_type,
114            }) => StreamingJob::Table(
115                source,
116                table.unwrap(),
117                TableJobType::try_from(job_type).unwrap(),
118            ),
119            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
120                StreamingJob::Source(source.unwrap())
121            }
122            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
123                table,
124            }) => StreamingJob::MaterializedView(table.unwrap()),
125        };
126
127        ReplaceStreamJobInfo {
128            streaming_job: replace_streaming_job,
129            fragment_graph: fragment_graph.unwrap(),
130        }
131    }
132
133    pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
134        tracing::info!("start migrate legacy table fragments task");
135        let env = self.env.clone();
136        let metadata_manager = self.metadata_manager.clone();
137        let ddl_controller = self.ddl_controller.clone();
138
139        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
140        let join_handle = tokio::spawn(async move {
141            async fn migrate_inner(
142                env: &MetaSrvEnv,
143                metadata_manager: &MetadataManager,
144                ddl_controller: &DdlController,
145            ) -> MetaResult<()> {
146                let tables = metadata_manager
147                    .catalog_controller
148                    .list_unmigrated_tables()
149                    .await?;
150
151                if tables.is_empty() {
152                    tracing::info!("no legacy table fragments need migration");
153                    return Ok(());
154                }
155
156                let client = {
157                    let workers = metadata_manager
158                        .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
159                        .await?;
160                    if workers.is_empty() {
161                        return Err(anyhow::anyhow!("no active frontend nodes found").into());
162                    }
163                    let worker = workers.choose(&mut thread_rng()).unwrap();
164                    env.frontend_client_pool().get(worker).await?
165                };
166
167                for table in tables {
168                    let start = tokio::time::Instant::now();
169                    let req = GetTableReplacePlanRequest {
170                        database_id: table.database_id,
171                        owner: table.owner,
172                        table_name: table.name.clone(),
173                        cdc_table_change: None,
174                    };
175                    let resp = client
176                        .get_table_replace_plan(req)
177                        .await
178                        .context("failed to get table replace plan from frontend")?;
179
180                    let plan = resp.into_inner().replace_plan.unwrap();
181                    let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
182                    ddl_controller
183                        .run_command(DdlCommand::ReplaceStreamJob(replace_info))
184                        .await?;
185                    tracing::info!(elapsed=?start.elapsed(), table_id=table.id, "migrated table fragments");
186                }
187                tracing::info!("successfully migrated all legacy table fragments");
188
189                Ok(())
190            }
191
192            let migrate_future = async move {
193                let mut attempt = 0;
194                loop {
195                    match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
196                        Ok(_) => break,
197                        Err(e) => {
198                            attempt += 1;
199                            tracing::error!(
200                                "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
201                                e.as_report(),
202                                attempt
203                            );
204                            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
205                        }
206                    }
207                }
208            };
209
210            select(pin!(migrate_future), shutdown_rx).await;
211        });
212
213        (join_handle, shutdown_tx)
214    }
215}
216
217#[async_trait::async_trait]
218impl DdlService for DdlServiceImpl {
219    async fn create_database(
220        &self,
221        request: Request<CreateDatabaseRequest>,
222    ) -> Result<Response<CreateDatabaseResponse>, Status> {
223        let req = request.into_inner();
224        let database = req.get_db()?.clone();
225        let version = self
226            .ddl_controller
227            .run_command(DdlCommand::CreateDatabase(database))
228            .await?;
229
230        Ok(Response::new(CreateDatabaseResponse {
231            status: None,
232            version,
233        }))
234    }
235
236    async fn drop_database(
237        &self,
238        request: Request<DropDatabaseRequest>,
239    ) -> Result<Response<DropDatabaseResponse>, Status> {
240        let req = request.into_inner();
241        let database_id = req.get_database_id();
242
243        let version = self
244            .ddl_controller
245            .run_command(DdlCommand::DropDatabase(database_id as _))
246            .await?;
247
248        Ok(Response::new(DropDatabaseResponse {
249            status: None,
250            version,
251        }))
252    }
253
254    async fn create_secret(
255        &self,
256        request: Request<CreateSecretRequest>,
257    ) -> Result<Response<CreateSecretResponse>, Status> {
258        let req = request.into_inner();
259        let pb_secret = Secret {
260            id: 0,
261            name: req.get_name().clone(),
262            database_id: req.get_database_id(),
263            value: req.get_value().clone(),
264            owner: req.get_owner_id(),
265            schema_id: req.get_schema_id(),
266        };
267        let version = self
268            .ddl_controller
269            .run_command(DdlCommand::CreateSecret(pb_secret))
270            .await?;
271
272        Ok(Response::new(CreateSecretResponse { version }))
273    }
274
275    async fn drop_secret(
276        &self,
277        request: Request<DropSecretRequest>,
278    ) -> Result<Response<DropSecretResponse>, Status> {
279        let req = request.into_inner();
280        let secret_id = req.get_secret_id();
281        let version = self
282            .ddl_controller
283            .run_command(DdlCommand::DropSecret(secret_id as _))
284            .await?;
285
286        Ok(Response::new(DropSecretResponse { version }))
287    }
288
289    async fn alter_secret(
290        &self,
291        request: Request<AlterSecretRequest>,
292    ) -> Result<Response<AlterSecretResponse>, Status> {
293        let req = request.into_inner();
294        let pb_secret = Secret {
295            id: req.get_secret_id(),
296            name: req.get_name().clone(),
297            database_id: req.get_database_id(),
298            value: req.get_value().clone(),
299            owner: req.get_owner_id(),
300            schema_id: req.get_schema_id(),
301        };
302        let version = self
303            .ddl_controller
304            .run_command(DdlCommand::AlterSecret(pb_secret))
305            .await?;
306
307        Ok(Response::new(AlterSecretResponse { version }))
308    }
309
310    async fn create_schema(
311        &self,
312        request: Request<CreateSchemaRequest>,
313    ) -> Result<Response<CreateSchemaResponse>, Status> {
314        let req = request.into_inner();
315        let schema = req.get_schema()?.clone();
316        let version = self
317            .ddl_controller
318            .run_command(DdlCommand::CreateSchema(schema))
319            .await?;
320
321        Ok(Response::new(CreateSchemaResponse {
322            status: None,
323            version,
324        }))
325    }
326
327    async fn drop_schema(
328        &self,
329        request: Request<DropSchemaRequest>,
330    ) -> Result<Response<DropSchemaResponse>, Status> {
331        let req = request.into_inner();
332        let schema_id = req.get_schema_id();
333        let drop_mode = DropMode::from_request_setting(req.cascade);
334        let version = self
335            .ddl_controller
336            .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
337            .await?;
338        Ok(Response::new(DropSchemaResponse {
339            status: None,
340            version,
341        }))
342    }
343
344    async fn create_source(
345        &self,
346        request: Request<CreateSourceRequest>,
347    ) -> Result<Response<CreateSourceResponse>, Status> {
348        let req = request.into_inner();
349        let source = req.get_source()?.clone();
350
351        match req.fragment_graph {
352            None => {
353                let version = self
354                    .ddl_controller
355                    .run_command(DdlCommand::CreateNonSharedSource(source))
356                    .await?;
357                Ok(Response::new(CreateSourceResponse {
358                    status: None,
359                    version,
360                }))
361            }
362            Some(fragment_graph) => {
363                // The id of stream job has been set above
364                let stream_job = StreamingJob::Source(source);
365                let version = self
366                    .ddl_controller
367                    .run_command(DdlCommand::CreateStreamingJob {
368                        stream_job,
369                        fragment_graph,
370                        dependencies: HashSet::new(),
371                        specific_resource_group: None,
372                        if_not_exists: req.if_not_exists,
373                    })
374                    .await?;
375                Ok(Response::new(CreateSourceResponse {
376                    status: None,
377                    version,
378                }))
379            }
380        }
381    }
382
383    async fn drop_source(
384        &self,
385        request: Request<DropSourceRequest>,
386    ) -> Result<Response<DropSourceResponse>, Status> {
387        let request = request.into_inner();
388        let source_id = request.source_id;
389        let drop_mode = DropMode::from_request_setting(request.cascade);
390        let version = self
391            .ddl_controller
392            .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
393            .await?;
394
395        Ok(Response::new(DropSourceResponse {
396            status: None,
397            version,
398        }))
399    }
400
401    async fn create_sink(
402        &self,
403        request: Request<CreateSinkRequest>,
404    ) -> Result<Response<CreateSinkResponse>, Status> {
405        self.env.idle_manager().record_activity();
406
407        let req = request.into_inner();
408
409        let sink = req.get_sink()?.clone();
410        let fragment_graph = req.get_fragment_graph()?.clone();
411        let dependencies = req
412            .get_dependencies()
413            .iter()
414            .map(|id| *id as ObjectId)
415            .collect();
416
417        let stream_job = StreamingJob::Sink(sink);
418
419        let command = DdlCommand::CreateStreamingJob {
420            stream_job,
421            fragment_graph,
422            dependencies,
423            specific_resource_group: None,
424            if_not_exists: req.if_not_exists,
425        };
426
427        let version = self.ddl_controller.run_command(command).await?;
428
429        Ok(Response::new(CreateSinkResponse {
430            status: None,
431            version,
432        }))
433    }
434
435    async fn drop_sink(
436        &self,
437        request: Request<DropSinkRequest>,
438    ) -> Result<Response<DropSinkResponse>, Status> {
439        let request = request.into_inner();
440        let sink_id = request.sink_id;
441        let drop_mode = DropMode::from_request_setting(request.cascade);
442
443        let command = DdlCommand::DropStreamingJob {
444            job_id: StreamingJobId::Sink(sink_id as _),
445            drop_mode,
446        };
447
448        let version = self.ddl_controller.run_command(command).await?;
449
450        self.sink_manager
451            .stop_sink_coordinator(SinkId::from(sink_id))
452            .await;
453
454        Ok(Response::new(DropSinkResponse {
455            status: None,
456            version,
457        }))
458    }
459
460    async fn create_subscription(
461        &self,
462        request: Request<CreateSubscriptionRequest>,
463    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
464        self.env.idle_manager().record_activity();
465
466        let req = request.into_inner();
467
468        let subscription = req.get_subscription()?.clone();
469        let command = DdlCommand::CreateSubscription(subscription);
470
471        let version = self.ddl_controller.run_command(command).await?;
472
473        Ok(Response::new(CreateSubscriptionResponse {
474            status: None,
475            version,
476        }))
477    }
478
479    async fn drop_subscription(
480        &self,
481        request: Request<DropSubscriptionRequest>,
482    ) -> Result<Response<DropSubscriptionResponse>, Status> {
483        let request = request.into_inner();
484        let subscription_id = request.subscription_id;
485        let drop_mode = DropMode::from_request_setting(request.cascade);
486
487        let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
488
489        let version = self.ddl_controller.run_command(command).await?;
490
491        Ok(Response::new(DropSubscriptionResponse {
492            status: None,
493            version,
494        }))
495    }
496
497    async fn create_materialized_view(
498        &self,
499        request: Request<CreateMaterializedViewRequest>,
500    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
501        self.env.idle_manager().record_activity();
502
503        let req = request.into_inner();
504        let mview = req.get_materialized_view()?.clone();
505        let specific_resource_group = req.specific_resource_group.clone();
506        let fragment_graph = req.get_fragment_graph()?.clone();
507        let dependencies = req
508            .get_dependencies()
509            .iter()
510            .map(|id| *id as ObjectId)
511            .collect();
512
513        let stream_job = StreamingJob::MaterializedView(mview);
514        let version = self
515            .ddl_controller
516            .run_command(DdlCommand::CreateStreamingJob {
517                stream_job,
518                fragment_graph,
519                dependencies,
520                specific_resource_group,
521                if_not_exists: req.if_not_exists,
522            })
523            .await?;
524
525        Ok(Response::new(CreateMaterializedViewResponse {
526            status: None,
527            version,
528        }))
529    }
530
531    async fn drop_materialized_view(
532        &self,
533        request: Request<DropMaterializedViewRequest>,
534    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
535        self.env.idle_manager().record_activity();
536
537        let request = request.into_inner();
538        let table_id = request.table_id;
539        let drop_mode = DropMode::from_request_setting(request.cascade);
540
541        let version = self
542            .ddl_controller
543            .run_command(DdlCommand::DropStreamingJob {
544                job_id: StreamingJobId::MaterializedView(TableId::new(table_id as _)),
545                drop_mode,
546            })
547            .await?;
548
549        Ok(Response::new(DropMaterializedViewResponse {
550            status: None,
551            version,
552        }))
553    }
554
555    async fn create_index(
556        &self,
557        request: Request<CreateIndexRequest>,
558    ) -> Result<Response<CreateIndexResponse>, Status> {
559        self.env.idle_manager().record_activity();
560
561        let req = request.into_inner();
562        let index = req.get_index()?.clone();
563        let index_table = req.get_index_table()?.clone();
564        let fragment_graph = req.get_fragment_graph()?.clone();
565
566        let stream_job = StreamingJob::Index(index, index_table);
567        let version = self
568            .ddl_controller
569            .run_command(DdlCommand::CreateStreamingJob {
570                stream_job,
571                fragment_graph,
572                dependencies: HashSet::new(),
573                specific_resource_group: None,
574                if_not_exists: req.if_not_exists,
575            })
576            .await?;
577
578        Ok(Response::new(CreateIndexResponse {
579            status: None,
580            version,
581        }))
582    }
583
584    async fn drop_index(
585        &self,
586        request: Request<DropIndexRequest>,
587    ) -> Result<Response<DropIndexResponse>, Status> {
588        self.env.idle_manager().record_activity();
589
590        let request = request.into_inner();
591        let index_id = request.index_id;
592        let drop_mode = DropMode::from_request_setting(request.cascade);
593        let version = self
594            .ddl_controller
595            .run_command(DdlCommand::DropStreamingJob {
596                job_id: StreamingJobId::Index(index_id as _),
597                drop_mode,
598            })
599            .await?;
600
601        Ok(Response::new(DropIndexResponse {
602            status: None,
603            version,
604        }))
605    }
606
607    async fn create_function(
608        &self,
609        request: Request<CreateFunctionRequest>,
610    ) -> Result<Response<CreateFunctionResponse>, Status> {
611        let req = request.into_inner();
612        let function = req.get_function()?.clone();
613
614        let version = self
615            .ddl_controller
616            .run_command(DdlCommand::CreateFunction(function))
617            .await?;
618
619        Ok(Response::new(CreateFunctionResponse {
620            status: None,
621            version,
622        }))
623    }
624
625    async fn drop_function(
626        &self,
627        request: Request<DropFunctionRequest>,
628    ) -> Result<Response<DropFunctionResponse>, Status> {
629        let request = request.into_inner();
630
631        let version = self
632            .ddl_controller
633            .run_command(DdlCommand::DropFunction(
634                request.function_id as _,
635                DropMode::from_request_setting(request.cascade),
636            ))
637            .await?;
638
639        Ok(Response::new(DropFunctionResponse {
640            status: None,
641            version,
642        }))
643    }
644
645    async fn create_table(
646        &self,
647        request: Request<CreateTableRequest>,
648    ) -> Result<Response<CreateTableResponse>, Status> {
649        let request = request.into_inner();
650        let job_type = request.get_job_type().unwrap_or_default();
651        let dependencies = request
652            .get_dependencies()
653            .iter()
654            .map(|id| *id as ObjectId)
655            .collect();
656        let source = request.source;
657        let mview = request.materialized_view.unwrap();
658        let fragment_graph = request.fragment_graph.unwrap();
659
660        let stream_job = StreamingJob::Table(source, mview, job_type);
661        let version = self
662            .ddl_controller
663            .run_command(DdlCommand::CreateStreamingJob {
664                stream_job,
665                fragment_graph,
666                dependencies,
667                specific_resource_group: None,
668                if_not_exists: request.if_not_exists,
669            })
670            .await?;
671
672        Ok(Response::new(CreateTableResponse {
673            status: None,
674            version,
675        }))
676    }
677
678    async fn drop_table(
679        &self,
680        request: Request<DropTableRequest>,
681    ) -> Result<Response<DropTableResponse>, Status> {
682        let request = request.into_inner();
683        let source_id = request.source_id;
684        let table_id = request.table_id;
685
686        let drop_mode = DropMode::from_request_setting(request.cascade);
687        let version = self
688            .ddl_controller
689            .run_command(DdlCommand::DropStreamingJob {
690                job_id: StreamingJobId::Table(
691                    source_id.map(|PbSourceId::Id(id)| id as _),
692                    TableId::new(table_id as _),
693                ),
694                drop_mode,
695            })
696            .await?;
697
698        Ok(Response::new(DropTableResponse {
699            status: None,
700            version,
701        }))
702    }
703
704    async fn create_view(
705        &self,
706        request: Request<CreateViewRequest>,
707    ) -> Result<Response<CreateViewResponse>, Status> {
708        let req = request.into_inner();
709        let view = req.get_view()?.clone();
710        let dependencies = req
711            .get_dependencies()
712            .iter()
713            .map(|id| *id as ObjectId)
714            .collect::<HashSet<_>>();
715
716        let version = self
717            .ddl_controller
718            .run_command(DdlCommand::CreateView(view, dependencies))
719            .await?;
720
721        Ok(Response::new(CreateViewResponse {
722            status: None,
723            version,
724        }))
725    }
726
727    async fn drop_view(
728        &self,
729        request: Request<DropViewRequest>,
730    ) -> Result<Response<DropViewResponse>, Status> {
731        let request = request.into_inner();
732        let view_id = request.get_view_id();
733        let drop_mode = DropMode::from_request_setting(request.cascade);
734        let version = self
735            .ddl_controller
736            .run_command(DdlCommand::DropView(view_id as _, drop_mode))
737            .await?;
738        Ok(Response::new(DropViewResponse {
739            status: None,
740            version,
741        }))
742    }
743
744    async fn risectl_list_state_tables(
745        &self,
746        _request: Request<RisectlListStateTablesRequest>,
747    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
748        let tables = self
749            .metadata_manager
750            .catalog_controller
751            .list_all_state_tables()
752            .await?;
753        Ok(Response::new(RisectlListStateTablesResponse { tables }))
754    }
755
756    async fn replace_job_plan(
757        &self,
758        request: Request<ReplaceJobPlanRequest>,
759    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
760        let req = request.into_inner().get_plan().cloned()?;
761
762        let version = self
763            .ddl_controller
764            .run_command(DdlCommand::ReplaceStreamJob(
765                Self::extract_replace_table_info(req),
766            ))
767            .await?;
768
769        Ok(Response::new(ReplaceJobPlanResponse {
770            status: None,
771            version,
772        }))
773    }
774
775    async fn get_table(
776        &self,
777        request: Request<GetTableRequest>,
778    ) -> Result<Response<GetTableResponse>, Status> {
779        let req = request.into_inner();
780        let table = self
781            .metadata_manager
782            .catalog_controller
783            .get_table_by_name(&req.database_name, &req.table_name)
784            .await?;
785
786        Ok(Response::new(GetTableResponse { table }))
787    }
788
789    async fn alter_name(
790        &self,
791        request: Request<AlterNameRequest>,
792    ) -> Result<Response<AlterNameResponse>, Status> {
793        let AlterNameRequest { object, new_name } = request.into_inner();
794        let version = self
795            .ddl_controller
796            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
797            .await?;
798        Ok(Response::new(AlterNameResponse {
799            status: None,
800            version,
801        }))
802    }
803
804    /// Only support add column for now.
805    async fn alter_source(
806        &self,
807        request: Request<AlterSourceRequest>,
808    ) -> Result<Response<AlterSourceResponse>, Status> {
809        let AlterSourceRequest { source } = request.into_inner();
810        let version = self
811            .ddl_controller
812            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
813            .await?;
814        Ok(Response::new(AlterSourceResponse {
815            status: None,
816            version,
817        }))
818    }
819
820    async fn alter_owner(
821        &self,
822        request: Request<AlterOwnerRequest>,
823    ) -> Result<Response<AlterOwnerResponse>, Status> {
824        let AlterOwnerRequest { object, owner_id } = request.into_inner();
825        let version = self
826            .ddl_controller
827            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
828            .await?;
829        Ok(Response::new(AlterOwnerResponse {
830            status: None,
831            version,
832        }))
833    }
834
835    async fn alter_set_schema(
836        &self,
837        request: Request<AlterSetSchemaRequest>,
838    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
839        let AlterSetSchemaRequest {
840            object,
841            new_schema_id,
842        } = request.into_inner();
843        let version = self
844            .ddl_controller
845            .run_command(DdlCommand::AlterSetSchema(
846                object.unwrap(),
847                new_schema_id as _,
848            ))
849            .await?;
850        Ok(Response::new(AlterSetSchemaResponse {
851            status: None,
852            version,
853        }))
854    }
855
856    async fn get_ddl_progress(
857        &self,
858        _request: Request<GetDdlProgressRequest>,
859    ) -> Result<Response<GetDdlProgressResponse>, Status> {
860        Ok(Response::new(GetDdlProgressResponse {
861            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
862        }))
863    }
864
865    async fn create_connection(
866        &self,
867        request: Request<CreateConnectionRequest>,
868    ) -> Result<Response<CreateConnectionResponse>, Status> {
869        let req = request.into_inner();
870        if req.payload.is_none() {
871            return Err(Status::invalid_argument("request is empty"));
872        }
873
874        match req.payload.unwrap() {
875            create_connection_request::Payload::PrivateLink(_) => {
876                panic!("Private Link Connection has been deprecated")
877            }
878            create_connection_request::Payload::ConnectionParams(params) => {
879                let pb_connection = Connection {
880                    id: 0,
881                    schema_id: req.schema_id,
882                    database_id: req.database_id,
883                    name: req.name,
884                    info: Some(ConnectionInfo::ConnectionParams(params)),
885                    owner: req.owner_id,
886                };
887                let version = self
888                    .ddl_controller
889                    .run_command(DdlCommand::CreateConnection(pb_connection))
890                    .await?;
891                Ok(Response::new(CreateConnectionResponse { version }))
892            }
893        }
894    }
895
896    async fn list_connections(
897        &self,
898        _request: Request<ListConnectionsRequest>,
899    ) -> Result<Response<ListConnectionsResponse>, Status> {
900        let conns = self
901            .metadata_manager
902            .catalog_controller
903            .list_connections()
904            .await?;
905
906        Ok(Response::new(ListConnectionsResponse {
907            connections: conns,
908        }))
909    }
910
911    async fn drop_connection(
912        &self,
913        request: Request<DropConnectionRequest>,
914    ) -> Result<Response<DropConnectionResponse>, Status> {
915        let req = request.into_inner();
916        let drop_mode = DropMode::from_request_setting(req.cascade);
917
918        let version = self
919            .ddl_controller
920            .run_command(DdlCommand::DropConnection(
921                req.connection_id as _,
922                drop_mode,
923            ))
924            .await?;
925
926        Ok(Response::new(DropConnectionResponse {
927            status: None,
928            version,
929        }))
930    }
931
932    async fn comment_on(
933        &self,
934        request: Request<CommentOnRequest>,
935    ) -> Result<Response<CommentOnResponse>, Status> {
936        let req = request.into_inner();
937        let comment = req.get_comment()?.clone();
938
939        let version = self
940            .ddl_controller
941            .run_command(DdlCommand::CommentOn(Comment {
942                table_id: comment.table_id,
943                schema_id: comment.schema_id,
944                database_id: comment.database_id,
945                column_index: comment.column_index,
946                description: comment.description,
947            }))
948            .await?;
949
950        Ok(Response::new(CommentOnResponse {
951            status: None,
952            version,
953        }))
954    }
955
956    async fn get_tables(
957        &self,
958        request: Request<GetTablesRequest>,
959    ) -> Result<Response<GetTablesResponse>, Status> {
960        let GetTablesRequest {
961            table_ids,
962            include_dropped_tables,
963        } = request.into_inner();
964        let ret = self
965            .metadata_manager
966            .catalog_controller
967            .get_table_by_ids(
968                table_ids
969                    .into_iter()
970                    .map(|id| TableId::new(id as _))
971                    .collect(),
972                include_dropped_tables,
973            )
974            .await?;
975
976        let mut tables = HashMap::default();
977        for table in ret {
978            tables.insert(table.id, table);
979        }
980        Ok(Response::new(GetTablesResponse { tables }))
981    }
982
983    async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
984        self.ddl_controller.wait().await?;
985        Ok(Response::new(WaitResponse {}))
986    }
987
988    async fn alter_cdc_table_backfill_parallelism(
989        &self,
990        request: Request<AlterCdcTableBackfillParallelismRequest>,
991    ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
992        let req = request.into_inner();
993        let job_id = req.get_table_id();
994        let parallelism = *req.get_parallelism()?;
995
996        let table_parallelism = ModelTableParallelism::from(parallelism);
997        let streaming_parallelism = match table_parallelism {
998            ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
999            _ => bail_invalid_parameter!(
1000                "CDC table backfill parallelism must be set to a fixed value"
1001            ),
1002        };
1003
1004        self.ddl_controller
1005            .reschedule_cdc_table_backfill(
1006                job_id,
1007                ReschedulePolicy::Parallelism(ParallelismPolicy {
1008                    parallelism: streaming_parallelism,
1009                }),
1010            )
1011            .await?;
1012        Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1013    }
1014
1015    async fn alter_parallelism(
1016        &self,
1017        request: Request<AlterParallelismRequest>,
1018    ) -> Result<Response<AlterParallelismResponse>, Status> {
1019        let req = request.into_inner();
1020
1021        let job_id = req.get_table_id();
1022        let parallelism = *req.get_parallelism()?;
1023        let deferred = req.get_deferred();
1024
1025        let parallelism = match parallelism.get_parallelism()? {
1026            Parallelism::Fixed(FixedParallelism { parallelism }) => {
1027                StreamingParallelism::Fixed(*parallelism as _)
1028            }
1029            Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1030            _ => bail_unavailable!(),
1031        };
1032
1033        self.ddl_controller
1034            .reschedule_streaming_job(
1035                job_id.into(),
1036                ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1037                deferred,
1038            )
1039            .await?;
1040
1041        Ok(Response::new(AlterParallelismResponse {}))
1042    }
1043
1044    /// Auto schema change for cdc sources,
1045    /// called by the source parser when a schema change is detected.
1046    async fn auto_schema_change(
1047        &self,
1048        request: Request<AutoSchemaChangeRequest>,
1049    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1050        let req = request.into_inner();
1051
1052        // randomly select a frontend worker to get the replace table plan
1053        let workers = self
1054            .metadata_manager
1055            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1056            .await?;
1057        let worker = workers
1058            .choose(&mut thread_rng())
1059            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1060
1061        let client = self
1062            .env
1063            .frontend_client_pool()
1064            .get(worker)
1065            .await
1066            .map_err(MetaError::from)?;
1067
1068        let Some(schema_change) = req.schema_change else {
1069            return Err(Status::invalid_argument(
1070                "schema change message is required",
1071            ));
1072        };
1073
1074        for table_change in schema_change.table_changes {
1075            for c in &table_change.columns {
1076                let c = ColumnCatalog::from(c.clone());
1077
1078                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1079                    tracing::warn!(target: "auto_schema_change",
1080                      cdc_table_id = table_change.cdc_table_id,
1081                    upstraem_ddl = table_change.upstream_ddl,
1082                        "invalid column type from cdc table change");
1083                    Err(Status::invalid_argument(format!(
1084                        "invalid column type: {} from cdc table change, column: {:?}",
1085                        column_type, c
1086                    )))
1087                };
1088                if c.is_generated() {
1089                    return invalid_col_type("generated column", &c);
1090                }
1091                if c.is_rw_sys_column() {
1092                    return invalid_col_type("rw system column", &c);
1093                }
1094                if c.is_hidden {
1095                    return invalid_col_type("hidden column", &c);
1096                }
1097            }
1098
1099            // get the table catalog corresponding to the cdc table
1100            let tables: Vec<Table> = self
1101                .metadata_manager
1102                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1103                .await?;
1104
1105            for table in tables {
1106                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
1107                // is a subset of the other.
1108                let original_columns: HashSet<(String, DataType)> =
1109                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
1110                        let col = ColumnCatalog::from(col.clone());
1111                        if col.is_generated() || col.is_hidden() {
1112                            None
1113                        } else {
1114                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1115                        }
1116                    }));
1117
1118                let mut new_columns: HashSet<(String, DataType)> =
1119                    HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1120                        let col = ColumnCatalog::from(col.clone());
1121                        if col.is_generated() || col.is_hidden() {
1122                            None
1123                        } else {
1124                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1125                        }
1126                    }));
1127
1128                // For subset/superset check, we need to add visible connector additional columns defined by INCLUDE in the original table to new_columns
1129                // This includes both _rw columns and user-defined INCLUDE columns (e.g., INCLUDE TIMESTAMP AS xxx)
1130                for col in &table.columns {
1131                    let col = ColumnCatalog::from(col.clone());
1132                    if col.is_connector_additional_column()
1133                        && !col.is_hidden()
1134                        && !col.is_generated()
1135                    {
1136                        new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1137                    }
1138                }
1139
1140                if !(original_columns.is_subset(&new_columns)
1141                    || original_columns.is_superset(&new_columns))
1142                {
1143                    tracing::warn!(target: "auto_schema_change",
1144                                    table_id = table.id,
1145                                    cdc_table_id = table.cdc_table_id,
1146                                    upstraem_ddl = table_change.upstream_ddl,
1147                                    original_columns = ?original_columns,
1148                                    new_columns = ?new_columns,
1149                                    "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1150
1151                    let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1152                    add_auto_schema_change_fail_event_log(
1153                        &self.meta_metrics,
1154                        table.id,
1155                        table.name.clone(),
1156                        table_change.cdc_table_id.clone(),
1157                        table_change.upstream_ddl.clone(),
1158                        &self.env.event_log_manager_ref(),
1159                        fail_info,
1160                    );
1161
1162                    return Err(Status::invalid_argument(
1163                        "New columns should be a subset or superset of the original columns (including hidden columns)",
1164                    ));
1165                }
1166                // skip the schema change if there is no change to original columns
1167                if original_columns == new_columns {
1168                    tracing::warn!(target: "auto_schema_change",
1169                                   table_id = table.id,
1170                                   cdc_table_id = table.cdc_table_id,
1171                                   upstraem_ddl = table_change.upstream_ddl,
1172                                    original_columns = ?original_columns,
1173                                    new_columns = ?new_columns,
1174                                   "No change to columns, skipping the schema change");
1175                    continue;
1176                }
1177
1178                let latency_timer = self
1179                    .meta_metrics
1180                    .auto_schema_change_latency
1181                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1182                    .start_timer();
1183                // send a request to the frontend to get the ReplaceJobPlan
1184                // will retry with exponential backoff if the request fails
1185                let resp = client
1186                    .get_table_replace_plan(GetTableReplacePlanRequest {
1187                        database_id: table.database_id,
1188                        owner: table.owner,
1189                        table_name: table.name.clone(),
1190                        cdc_table_change: Some(table_change.clone()),
1191                    })
1192                    .await;
1193
1194                match resp {
1195                    Ok(resp) => {
1196                        let resp = resp.into_inner();
1197                        if let Some(plan) = resp.replace_plan {
1198                            let plan = Self::extract_replace_table_info(plan);
1199                            plan.streaming_job.table().inspect(|t| {
1200                                tracing::info!(
1201                                    target: "auto_schema_change",
1202                                    table_id = t.id,
1203                                    cdc_table_id = t.cdc_table_id,
1204                                    upstraem_ddl = table_change.upstream_ddl,
1205                                    "Start the replace config change")
1206                            });
1207                            // start the schema change procedure
1208                            let replace_res = self
1209                                .ddl_controller
1210                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1211                                .await;
1212
1213                            match replace_res {
1214                                Ok(_) => {
1215                                    tracing::info!(
1216                                        target: "auto_schema_change",
1217                                        table_id = table.id,
1218                                        cdc_table_id = table.cdc_table_id,
1219                                        "Table replaced success");
1220
1221                                    self.meta_metrics
1222                                        .auto_schema_change_success_cnt
1223                                        .with_guarded_label_values(&[
1224                                            &table.id.to_string(),
1225                                            &table.name,
1226                                        ])
1227                                        .inc();
1228                                    latency_timer.observe_duration();
1229                                }
1230                                Err(e) => {
1231                                    tracing::error!(
1232                                        target: "auto_schema_change",
1233                                        error = %e.as_report(),
1234                                        table_id = table.id,
1235                                        cdc_table_id = table.cdc_table_id,
1236                                        upstraem_ddl = table_change.upstream_ddl,
1237                                        "failed to replace the table",
1238                                    );
1239                                    let fail_info =
1240                                        format!("failed to replace the table: {}", e.as_report());
1241                                    add_auto_schema_change_fail_event_log(
1242                                        &self.meta_metrics,
1243                                        table.id,
1244                                        table.name.clone(),
1245                                        table_change.cdc_table_id.clone(),
1246                                        table_change.upstream_ddl.clone(),
1247                                        &self.env.event_log_manager_ref(),
1248                                        fail_info,
1249                                    );
1250                                }
1251                            };
1252                        }
1253                    }
1254                    Err(e) => {
1255                        tracing::error!(
1256                            target: "auto_schema_change",
1257                            error = %e.as_report(),
1258                            table_id = table.id,
1259                            cdc_table_id = table.cdc_table_id,
1260                            "failed to get replace table plan",
1261                        );
1262                        let fail_info =
1263                            format!("failed to get replace table plan: {}", e.as_report());
1264                        add_auto_schema_change_fail_event_log(
1265                            &self.meta_metrics,
1266                            table.id,
1267                            table.name.clone(),
1268                            table_change.cdc_table_id.clone(),
1269                            table_change.upstream_ddl.clone(),
1270                            &self.env.event_log_manager_ref(),
1271                            fail_info,
1272                        );
1273                    }
1274                };
1275            }
1276        }
1277
1278        Ok(Response::new(AutoSchemaChangeResponse {}))
1279    }
1280
1281    async fn alter_swap_rename(
1282        &self,
1283        request: Request<AlterSwapRenameRequest>,
1284    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1285        let req = request.into_inner();
1286
1287        let version = self
1288            .ddl_controller
1289            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1290            .await?;
1291
1292        Ok(Response::new(AlterSwapRenameResponse {
1293            status: None,
1294            version,
1295        }))
1296    }
1297
1298    async fn alter_resource_group(
1299        &self,
1300        request: Request<AlterResourceGroupRequest>,
1301    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1302        let req = request.into_inner();
1303
1304        let table_id = req.get_table_id();
1305        let deferred = req.get_deferred();
1306        let resource_group = req.resource_group;
1307
1308        self.ddl_controller
1309            .reschedule_streaming_job(
1310                table_id.into(),
1311                ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1312                deferred,
1313            )
1314            .await?;
1315
1316        Ok(Response::new(AlterResourceGroupResponse {}))
1317    }
1318
1319    async fn alter_database_param(
1320        &self,
1321        request: Request<AlterDatabaseParamRequest>,
1322    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1323        let req = request.into_inner();
1324        let database_id = req.database_id;
1325
1326        let param = match req.param.unwrap() {
1327            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1328                AlterDatabaseParam::BarrierIntervalMs(value.value)
1329            }
1330            alter_database_param_request::Param::CheckpointFrequency(value) => {
1331                AlterDatabaseParam::CheckpointFrequency(value.value)
1332            }
1333        };
1334        let version = self
1335            .ddl_controller
1336            .run_command(DdlCommand::AlterDatabaseParam(database_id as _, param))
1337            .await?;
1338
1339        return Ok(Response::new(AlterDatabaseParamResponse {
1340            status: None,
1341            version,
1342        }));
1343    }
1344
1345    async fn compact_iceberg_table(
1346        &self,
1347        request: Request<CompactIcebergTableRequest>,
1348    ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1349        let req = request.into_inner();
1350        let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1351
1352        // Trigger manual compaction directly using the sink ID
1353        let task_id = self
1354            .iceberg_compaction_manager
1355            .trigger_manual_compaction(sink_id)
1356            .await
1357            .map_err(|e| {
1358                Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1359            })?;
1360
1361        Ok(Response::new(CompactIcebergTableResponse {
1362            status: None,
1363            task_id,
1364        }))
1365    }
1366
1367    async fn expire_iceberg_table_snapshots(
1368        &self,
1369        request: Request<ExpireIcebergTableSnapshotsRequest>,
1370    ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1371        let req = request.into_inner();
1372        let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1373
1374        // Trigger manual snapshot expiration directly using the sink ID
1375        self.iceberg_compaction_manager
1376            .check_and_expire_snapshots(&sink_id)
1377            .await
1378            .map_err(|e| {
1379                Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1380            })?;
1381
1382        Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1383            status: None,
1384        }))
1385    }
1386
1387    async fn create_iceberg_table(
1388        &self,
1389        request: Request<CreateIcebergTableRequest>,
1390    ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1391        let req = request.into_inner();
1392        let CreateIcebergTableRequest {
1393            table_info,
1394            sink_info,
1395            iceberg_source,
1396            if_not_exists,
1397        } = req;
1398
1399        // 1. create table job
1400        let PbTableJobInfo {
1401            source,
1402            table,
1403            fragment_graph,
1404            job_type,
1405        } = table_info.unwrap();
1406        let table = table.unwrap();
1407        let database_id = table.get_database_id();
1408        let schema_id = table.get_schema_id();
1409        let table_name = table.get_name().to_owned();
1410
1411        let stream_job =
1412            StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1413        let _ = self
1414            .ddl_controller
1415            .run_command(DdlCommand::CreateStreamingJob {
1416                stream_job,
1417                fragment_graph: fragment_graph.unwrap(),
1418                dependencies: HashSet::new(),
1419                specific_resource_group: None,
1420                if_not_exists,
1421            })
1422            .await?;
1423
1424        let table_catalog = self
1425            .metadata_manager
1426            .catalog_controller
1427            .get_table_catalog_by_name(database_id as _, schema_id as _, &table_name)
1428            .await?
1429            .ok_or(Status::not_found("Internal error: table not found"))?;
1430
1431        // 2. create iceberg sink job
1432        let PbSinkJobInfo {
1433            sink,
1434            fragment_graph,
1435        } = sink_info.unwrap();
1436        let sink = sink.unwrap();
1437        let mut fragment_graph = fragment_graph.unwrap();
1438
1439        assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1440        assert!(
1441            risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1442                .is_placeholder()
1443        );
1444        fragment_graph.dependent_table_ids[0] = table_catalog.id;
1445        for fragment in fragment_graph.fragments.values_mut() {
1446            stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1447                NodeBody::StreamScan(scan) => {
1448                    scan.table_id = table_catalog.id;
1449                    if let Some(table_desc) = &mut scan.table_desc {
1450                        assert!(
1451                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1452                                .is_placeholder()
1453                        );
1454                        table_desc.table_id = table_catalog.id;
1455                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1456                    }
1457                    if let Some(table) = &mut scan.arrangement_table {
1458                        assert!(
1459                            risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1460                        );
1461                        *table = table_catalog.clone();
1462                    }
1463                }
1464                NodeBody::BatchPlan(plan) => {
1465                    if let Some(table_desc) = &mut plan.table_desc {
1466                        assert!(
1467                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1468                                .is_placeholder()
1469                        );
1470                        table_desc.table_id = table_catalog.id;
1471                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1472                    }
1473                }
1474                _ => {}
1475            });
1476        }
1477
1478        let table_id = table_catalog.id as ObjectId;
1479        let dependencies = HashSet::from_iter([table_id]);
1480        let stream_job = StreamingJob::Sink(sink);
1481        let res = self
1482            .ddl_controller
1483            .run_command(DdlCommand::CreateStreamingJob {
1484                stream_job,
1485                fragment_graph,
1486                dependencies,
1487                specific_resource_group: None,
1488                if_not_exists,
1489            })
1490            .await;
1491
1492        if res.is_err() {
1493            let _ = self
1494                .ddl_controller
1495                .run_command(DdlCommand::DropStreamingJob {
1496                    job_id: StreamingJobId::Table(None, TableId::new(table_id as _)),
1497                    drop_mode: DropMode::Cascade,
1498                })
1499                .await
1500                .inspect_err(|err| {
1501                    tracing::error!(error = %err.as_report(),
1502                        "Failed to clean up table after iceberg sink creation failure",
1503                    );
1504                });
1505            res?;
1506        }
1507
1508        // 3. create iceberg source
1509        let iceberg_source = iceberg_source.unwrap();
1510        let res = self
1511            .ddl_controller
1512            .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1513            .await;
1514        if res.is_err() {
1515            let _ = self
1516                .ddl_controller
1517                .run_command(DdlCommand::DropStreamingJob {
1518                    job_id: StreamingJobId::Table(None, TableId::new(table_id as _)),
1519                    drop_mode: DropMode::Cascade,
1520                })
1521                .await
1522                .inspect_err(|err| {
1523                    tracing::error!(
1524                        error = %err.as_report(),
1525                        "Failed to clean up table after iceberg source creation failure",
1526                    );
1527                });
1528        }
1529
1530        Ok(Response::new(CreateIcebergTableResponse {
1531            status: None,
1532            version: res?,
1533        }))
1534    }
1535}
1536
1537fn add_auto_schema_change_fail_event_log(
1538    meta_metrics: &Arc<MetaMetrics>,
1539    table_id: u32,
1540    table_name: String,
1541    cdc_table_id: String,
1542    upstream_ddl: String,
1543    event_log_manager: &EventLogManagerRef,
1544    fail_info: String,
1545) {
1546    meta_metrics
1547        .auto_schema_change_failure_cnt
1548        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1549        .inc();
1550    let event = event_log::EventAutoSchemaChangeFail {
1551        table_id,
1552        table_name,
1553        cdc_table_id,
1554        upstream_ddl,
1555        fail_info,
1556    };
1557    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1558}