risingwave_meta_service/
ddl_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::TableId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{
34    ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy, ThrottleConfig,
35};
36use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
37use risingwave_meta_model::StreamingParallelism;
38use risingwave_pb::catalog::connection::Info as ConnectionInfo;
39use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
40use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
41use risingwave_pb::common::WorkerType;
42use risingwave_pb::common::worker_node::State;
43use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
44use risingwave_pb::ddl_service::ddl_service_server::DdlService;
45use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
46use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
47use risingwave_pb::ddl_service::*;
48use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
49use risingwave_pb::id::SourceId;
50use risingwave_pb::meta::event_log;
51use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
52use risingwave_pb::stream_plan::stream_node::NodeBody;
53use thiserror_ext::AsReport;
54use tokio::sync::oneshot::Sender;
55use tokio::task::JoinHandle;
56use tonic::{Request, Response, Status};
57
58use crate::MetaError;
59use crate::barrier::BarrierManagerRef;
60use crate::manager::sink_coordination::SinkCoordinatorManager;
61use crate::manager::{MetaSrvEnv, StreamingJob};
62use crate::rpc::ddl_controller::{
63    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
64};
65use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
66
67#[derive(Clone)]
68pub struct DdlServiceImpl {
69    env: MetaSrvEnv,
70
71    metadata_manager: MetadataManager,
72    sink_manager: SinkCoordinatorManager,
73    ddl_controller: DdlController,
74    meta_metrics: Arc<MetaMetrics>,
75    iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
76    barrier_scheduler: BarrierScheduler,
77}
78
79impl DdlServiceImpl {
80    #[allow(clippy::too_many_arguments)]
81    pub async fn new(
82        env: MetaSrvEnv,
83        metadata_manager: MetadataManager,
84        stream_manager: GlobalStreamManagerRef,
85        source_manager: SourceManagerRef,
86        barrier_manager: BarrierManagerRef,
87        sink_manager: SinkCoordinatorManager,
88        meta_metrics: Arc<MetaMetrics>,
89        iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
90        barrier_scheduler: BarrierScheduler,
91    ) -> Self {
92        let ddl_controller = DdlController::new(
93            env.clone(),
94            metadata_manager.clone(),
95            stream_manager,
96            source_manager,
97            barrier_manager,
98            sink_manager.clone(),
99        )
100        .await;
101        Self {
102            env,
103            metadata_manager,
104            sink_manager,
105            ddl_controller,
106            meta_metrics,
107            iceberg_compaction_manager,
108            barrier_scheduler,
109        }
110    }
111
112    fn extract_replace_table_info(
113        ReplaceJobPlan {
114            fragment_graph,
115            replace_job,
116        }: ReplaceJobPlan,
117    ) -> ReplaceStreamJobInfo {
118        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
119            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
120                table,
121                source,
122                job_type,
123            }) => StreamingJob::Table(
124                source,
125                table.unwrap(),
126                TableJobType::try_from(job_type).unwrap(),
127            ),
128            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
129                StreamingJob::Source(source.unwrap())
130            }
131            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
132                table,
133            }) => StreamingJob::MaterializedView(table.unwrap()),
134        };
135
136        ReplaceStreamJobInfo {
137            streaming_job: replace_streaming_job,
138            fragment_graph: fragment_graph.unwrap(),
139        }
140    }
141
142    pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
143        tracing::info!("start migrate legacy table fragments task");
144        let env = self.env.clone();
145        let metadata_manager = self.metadata_manager.clone();
146        let ddl_controller = self.ddl_controller.clone();
147
148        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
149        let join_handle = tokio::spawn(async move {
150            async fn migrate_inner(
151                env: &MetaSrvEnv,
152                metadata_manager: &MetadataManager,
153                ddl_controller: &DdlController,
154            ) -> MetaResult<()> {
155                let tables = metadata_manager
156                    .catalog_controller
157                    .list_unmigrated_tables()
158                    .await?;
159
160                if tables.is_empty() {
161                    tracing::info!("no legacy table fragments need migration");
162                    return Ok(());
163                }
164
165                let client = {
166                    let workers = metadata_manager
167                        .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
168                        .await?;
169                    if workers.is_empty() {
170                        return Err(anyhow::anyhow!("no active frontend nodes found").into());
171                    }
172                    let worker = workers.choose(&mut thread_rng()).unwrap();
173                    env.frontend_client_pool().get(worker).await?
174                };
175
176                for table in tables {
177                    let start = tokio::time::Instant::now();
178                    let req = GetTableReplacePlanRequest {
179                        database_id: table.database_id,
180                        owner: table.owner,
181                        table_id: table.id,
182                        cdc_table_change: None,
183                    };
184                    let resp = client
185                        .get_table_replace_plan(req)
186                        .await
187                        .context("failed to get table replace plan from frontend")?;
188
189                    let plan = resp.into_inner().replace_plan.unwrap();
190                    let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
191                    ddl_controller
192                        .run_command(DdlCommand::ReplaceStreamJob(replace_info))
193                        .await?;
194                    tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
195                }
196                tracing::info!("successfully migrated all legacy table fragments");
197
198                Ok(())
199            }
200
201            let migrate_future = async move {
202                let mut attempt = 0;
203                loop {
204                    match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
205                        Ok(_) => break,
206                        Err(e) => {
207                            attempt += 1;
208                            tracing::error!(
209                                "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
210                                e.as_report(),
211                                attempt
212                            );
213                            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
214                        }
215                    }
216                }
217            };
218
219            select(pin!(migrate_future), shutdown_rx).await;
220        });
221
222        (join_handle, shutdown_tx)
223    }
224}
225
226#[async_trait::async_trait]
227impl DdlService for DdlServiceImpl {
228    async fn create_database(
229        &self,
230        request: Request<CreateDatabaseRequest>,
231    ) -> Result<Response<CreateDatabaseResponse>, Status> {
232        let req = request.into_inner();
233        let database = req.get_db()?.clone();
234        let version = self
235            .ddl_controller
236            .run_command(DdlCommand::CreateDatabase(database))
237            .await?;
238
239        Ok(Response::new(CreateDatabaseResponse {
240            status: None,
241            version,
242        }))
243    }
244
245    async fn drop_database(
246        &self,
247        request: Request<DropDatabaseRequest>,
248    ) -> Result<Response<DropDatabaseResponse>, Status> {
249        let req = request.into_inner();
250        let database_id = req.get_database_id();
251
252        let version = self
253            .ddl_controller
254            .run_command(DdlCommand::DropDatabase(database_id))
255            .await?;
256
257        Ok(Response::new(DropDatabaseResponse {
258            status: None,
259            version,
260        }))
261    }
262
263    async fn create_secret(
264        &self,
265        request: Request<CreateSecretRequest>,
266    ) -> Result<Response<CreateSecretResponse>, Status> {
267        let req = request.into_inner();
268        let pb_secret = Secret {
269            id: 0.into(),
270            name: req.get_name().clone(),
271            database_id: req.get_database_id(),
272            value: req.get_value().clone(),
273            owner: req.get_owner_id(),
274            schema_id: req.get_schema_id(),
275        };
276        let version = self
277            .ddl_controller
278            .run_command(DdlCommand::CreateSecret(pb_secret))
279            .await?;
280
281        Ok(Response::new(CreateSecretResponse { version }))
282    }
283
284    async fn drop_secret(
285        &self,
286        request: Request<DropSecretRequest>,
287    ) -> Result<Response<DropSecretResponse>, Status> {
288        let req = request.into_inner();
289        let secret_id = req.get_secret_id();
290        let version = self
291            .ddl_controller
292            .run_command(DdlCommand::DropSecret(secret_id))
293            .await?;
294
295        Ok(Response::new(DropSecretResponse { version }))
296    }
297
298    async fn alter_secret(
299        &self,
300        request: Request<AlterSecretRequest>,
301    ) -> Result<Response<AlterSecretResponse>, Status> {
302        let req = request.into_inner();
303        let pb_secret = Secret {
304            id: req.get_secret_id(),
305            name: req.get_name().clone(),
306            database_id: req.get_database_id(),
307            value: req.get_value().clone(),
308            owner: req.get_owner_id(),
309            schema_id: req.get_schema_id(),
310        };
311        let version = self
312            .ddl_controller
313            .run_command(DdlCommand::AlterSecret(pb_secret))
314            .await?;
315
316        Ok(Response::new(AlterSecretResponse { version }))
317    }
318
319    async fn create_schema(
320        &self,
321        request: Request<CreateSchemaRequest>,
322    ) -> Result<Response<CreateSchemaResponse>, Status> {
323        let req = request.into_inner();
324        let schema = req.get_schema()?.clone();
325        let version = self
326            .ddl_controller
327            .run_command(DdlCommand::CreateSchema(schema))
328            .await?;
329
330        Ok(Response::new(CreateSchemaResponse {
331            status: None,
332            version,
333        }))
334    }
335
336    async fn drop_schema(
337        &self,
338        request: Request<DropSchemaRequest>,
339    ) -> Result<Response<DropSchemaResponse>, Status> {
340        let req = request.into_inner();
341        let schema_id = req.get_schema_id();
342        let drop_mode = DropMode::from_request_setting(req.cascade);
343        let version = self
344            .ddl_controller
345            .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
346            .await?;
347        Ok(Response::new(DropSchemaResponse {
348            status: None,
349            version,
350        }))
351    }
352
353    async fn create_source(
354        &self,
355        request: Request<CreateSourceRequest>,
356    ) -> Result<Response<CreateSourceResponse>, Status> {
357        let req = request.into_inner();
358        let source = req.get_source()?.clone();
359
360        match req.fragment_graph {
361            None => {
362                let version = self
363                    .ddl_controller
364                    .run_command(DdlCommand::CreateNonSharedSource(source))
365                    .await?;
366                Ok(Response::new(CreateSourceResponse {
367                    status: None,
368                    version,
369                }))
370            }
371            Some(fragment_graph) => {
372                // The id of stream job has been set above
373                let stream_job = StreamingJob::Source(source);
374                let version = self
375                    .ddl_controller
376                    .run_command(DdlCommand::CreateStreamingJob {
377                        stream_job,
378                        fragment_graph,
379                        dependencies: HashSet::new(),
380                        specific_resource_group: None,
381                        if_not_exists: req.if_not_exists,
382                    })
383                    .await?;
384                Ok(Response::new(CreateSourceResponse {
385                    status: None,
386                    version,
387                }))
388            }
389        }
390    }
391
392    async fn drop_source(
393        &self,
394        request: Request<DropSourceRequest>,
395    ) -> Result<Response<DropSourceResponse>, Status> {
396        let request = request.into_inner();
397        let source_id = request.source_id;
398        let drop_mode = DropMode::from_request_setting(request.cascade);
399        let version = self
400            .ddl_controller
401            .run_command(DdlCommand::DropSource(source_id, drop_mode))
402            .await?;
403
404        Ok(Response::new(DropSourceResponse {
405            status: None,
406            version,
407        }))
408    }
409
410    async fn create_sink(
411        &self,
412        request: Request<CreateSinkRequest>,
413    ) -> Result<Response<CreateSinkResponse>, Status> {
414        self.env.idle_manager().record_activity();
415
416        let req = request.into_inner();
417
418        let sink = req.get_sink()?.clone();
419        let fragment_graph = req.get_fragment_graph()?.clone();
420        let dependencies = req.get_dependencies().iter().copied().collect();
421
422        let stream_job = StreamingJob::Sink(sink);
423
424        let command = DdlCommand::CreateStreamingJob {
425            stream_job,
426            fragment_graph,
427            dependencies,
428            specific_resource_group: None,
429            if_not_exists: req.if_not_exists,
430        };
431
432        let version = self.ddl_controller.run_command(command).await?;
433
434        Ok(Response::new(CreateSinkResponse {
435            status: None,
436            version,
437        }))
438    }
439
440    async fn drop_sink(
441        &self,
442        request: Request<DropSinkRequest>,
443    ) -> Result<Response<DropSinkResponse>, Status> {
444        let request = request.into_inner();
445        let sink_id = request.sink_id;
446        let drop_mode = DropMode::from_request_setting(request.cascade);
447
448        let command = DdlCommand::DropStreamingJob {
449            job_id: StreamingJobId::Sink(sink_id),
450            drop_mode,
451        };
452
453        let version = self.ddl_controller.run_command(command).await?;
454
455        self.sink_manager
456            .stop_sink_coordinator(vec![SinkId::from(sink_id)])
457            .await;
458
459        Ok(Response::new(DropSinkResponse {
460            status: None,
461            version,
462        }))
463    }
464
465    async fn create_subscription(
466        &self,
467        request: Request<CreateSubscriptionRequest>,
468    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
469        self.env.idle_manager().record_activity();
470
471        let req = request.into_inner();
472
473        let subscription = req.get_subscription()?.clone();
474        let command = DdlCommand::CreateSubscription(subscription);
475
476        let version = self.ddl_controller.run_command(command).await?;
477
478        Ok(Response::new(CreateSubscriptionResponse {
479            status: None,
480            version,
481        }))
482    }
483
484    async fn drop_subscription(
485        &self,
486        request: Request<DropSubscriptionRequest>,
487    ) -> Result<Response<DropSubscriptionResponse>, Status> {
488        let request = request.into_inner();
489        let subscription_id = request.subscription_id;
490        let drop_mode = DropMode::from_request_setting(request.cascade);
491
492        let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
493
494        let version = self.ddl_controller.run_command(command).await?;
495
496        Ok(Response::new(DropSubscriptionResponse {
497            status: None,
498            version,
499        }))
500    }
501
502    async fn create_materialized_view(
503        &self,
504        request: Request<CreateMaterializedViewRequest>,
505    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
506        self.env.idle_manager().record_activity();
507
508        let req = request.into_inner();
509        let mview = req.get_materialized_view()?.clone();
510        let specific_resource_group = req.specific_resource_group.clone();
511        let fragment_graph = req.get_fragment_graph()?.clone();
512        let dependencies = req.get_dependencies().iter().copied().collect();
513
514        let stream_job = StreamingJob::MaterializedView(mview);
515        let version = self
516            .ddl_controller
517            .run_command(DdlCommand::CreateStreamingJob {
518                stream_job,
519                fragment_graph,
520                dependencies,
521                specific_resource_group,
522                if_not_exists: req.if_not_exists,
523            })
524            .await?;
525
526        Ok(Response::new(CreateMaterializedViewResponse {
527            status: None,
528            version,
529        }))
530    }
531
532    async fn drop_materialized_view(
533        &self,
534        request: Request<DropMaterializedViewRequest>,
535    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
536        self.env.idle_manager().record_activity();
537
538        let request = request.into_inner();
539        let table_id = request.table_id;
540        let drop_mode = DropMode::from_request_setting(request.cascade);
541
542        let version = self
543            .ddl_controller
544            .run_command(DdlCommand::DropStreamingJob {
545                job_id: StreamingJobId::MaterializedView(table_id),
546                drop_mode,
547            })
548            .await?;
549
550        Ok(Response::new(DropMaterializedViewResponse {
551            status: None,
552            version,
553        }))
554    }
555
556    async fn create_index(
557        &self,
558        request: Request<CreateIndexRequest>,
559    ) -> Result<Response<CreateIndexResponse>, Status> {
560        self.env.idle_manager().record_activity();
561
562        let req = request.into_inner();
563        let index = req.get_index()?.clone();
564        let index_table = req.get_index_table()?.clone();
565        let fragment_graph = req.get_fragment_graph()?.clone();
566
567        let stream_job = StreamingJob::Index(index, index_table);
568        let version = self
569            .ddl_controller
570            .run_command(DdlCommand::CreateStreamingJob {
571                stream_job,
572                fragment_graph,
573                dependencies: HashSet::new(),
574                specific_resource_group: None,
575                if_not_exists: req.if_not_exists,
576            })
577            .await?;
578
579        Ok(Response::new(CreateIndexResponse {
580            status: None,
581            version,
582        }))
583    }
584
585    async fn drop_index(
586        &self,
587        request: Request<DropIndexRequest>,
588    ) -> Result<Response<DropIndexResponse>, Status> {
589        self.env.idle_manager().record_activity();
590
591        let request = request.into_inner();
592        let index_id = request.index_id;
593        let drop_mode = DropMode::from_request_setting(request.cascade);
594        let version = self
595            .ddl_controller
596            .run_command(DdlCommand::DropStreamingJob {
597                job_id: StreamingJobId::Index(index_id),
598                drop_mode,
599            })
600            .await?;
601
602        Ok(Response::new(DropIndexResponse {
603            status: None,
604            version,
605        }))
606    }
607
608    async fn create_function(
609        &self,
610        request: Request<CreateFunctionRequest>,
611    ) -> Result<Response<CreateFunctionResponse>, Status> {
612        let req = request.into_inner();
613        let function = req.get_function()?.clone();
614
615        let version = self
616            .ddl_controller
617            .run_command(DdlCommand::CreateFunction(function))
618            .await?;
619
620        Ok(Response::new(CreateFunctionResponse {
621            status: None,
622            version,
623        }))
624    }
625
626    async fn drop_function(
627        &self,
628        request: Request<DropFunctionRequest>,
629    ) -> Result<Response<DropFunctionResponse>, Status> {
630        let request = request.into_inner();
631
632        let version = self
633            .ddl_controller
634            .run_command(DdlCommand::DropFunction(
635                request.function_id,
636                DropMode::from_request_setting(request.cascade),
637            ))
638            .await?;
639
640        Ok(Response::new(DropFunctionResponse {
641            status: None,
642            version,
643        }))
644    }
645
646    async fn create_table(
647        &self,
648        request: Request<CreateTableRequest>,
649    ) -> Result<Response<CreateTableResponse>, Status> {
650        let request = request.into_inner();
651        let job_type = request.get_job_type().unwrap_or_default();
652        let dependencies = request.get_dependencies().iter().copied().collect();
653        let source = request.source;
654        let mview = request.materialized_view.unwrap();
655        let fragment_graph = request.fragment_graph.unwrap();
656
657        let stream_job = StreamingJob::Table(source, mview, job_type);
658        let version = self
659            .ddl_controller
660            .run_command(DdlCommand::CreateStreamingJob {
661                stream_job,
662                fragment_graph,
663                dependencies,
664                specific_resource_group: None,
665                if_not_exists: request.if_not_exists,
666            })
667            .await?;
668
669        Ok(Response::new(CreateTableResponse {
670            status: None,
671            version,
672        }))
673    }
674
675    async fn drop_table(
676        &self,
677        request: Request<DropTableRequest>,
678    ) -> Result<Response<DropTableResponse>, Status> {
679        let request = request.into_inner();
680        let source_id = request.source_id;
681        let table_id = request.table_id;
682
683        let drop_mode = DropMode::from_request_setting(request.cascade);
684        let version = self
685            .ddl_controller
686            .run_command(DdlCommand::DropStreamingJob {
687                job_id: StreamingJobId::Table(
688                    source_id.map(|PbSourceId::Id(id)| id.into()),
689                    table_id,
690                ),
691                drop_mode,
692            })
693            .await?;
694
695        Ok(Response::new(DropTableResponse {
696            status: None,
697            version,
698        }))
699    }
700
701    async fn create_view(
702        &self,
703        request: Request<CreateViewRequest>,
704    ) -> Result<Response<CreateViewResponse>, Status> {
705        let req = request.into_inner();
706        let view = req.get_view()?.clone();
707        let dependencies = req
708            .get_dependencies()
709            .iter()
710            .copied()
711            .collect::<HashSet<_>>();
712
713        let version = self
714            .ddl_controller
715            .run_command(DdlCommand::CreateView(view, dependencies))
716            .await?;
717
718        Ok(Response::new(CreateViewResponse {
719            status: None,
720            version,
721        }))
722    }
723
724    async fn drop_view(
725        &self,
726        request: Request<DropViewRequest>,
727    ) -> Result<Response<DropViewResponse>, Status> {
728        let request = request.into_inner();
729        let view_id = request.get_view_id();
730        let drop_mode = DropMode::from_request_setting(request.cascade);
731        let version = self
732            .ddl_controller
733            .run_command(DdlCommand::DropView(view_id, drop_mode))
734            .await?;
735        Ok(Response::new(DropViewResponse {
736            status: None,
737            version,
738        }))
739    }
740
741    async fn risectl_list_state_tables(
742        &self,
743        _request: Request<RisectlListStateTablesRequest>,
744    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
745        let tables = self
746            .metadata_manager
747            .catalog_controller
748            .list_all_state_tables()
749            .await?;
750        Ok(Response::new(RisectlListStateTablesResponse { tables }))
751    }
752
753    async fn replace_job_plan(
754        &self,
755        request: Request<ReplaceJobPlanRequest>,
756    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
757        let req = request.into_inner().get_plan().cloned()?;
758
759        let version = self
760            .ddl_controller
761            .run_command(DdlCommand::ReplaceStreamJob(
762                Self::extract_replace_table_info(req),
763            ))
764            .await?;
765
766        Ok(Response::new(ReplaceJobPlanResponse {
767            status: None,
768            version,
769        }))
770    }
771
772    async fn get_table(
773        &self,
774        request: Request<GetTableRequest>,
775    ) -> Result<Response<GetTableResponse>, Status> {
776        let req = request.into_inner();
777        let table = self
778            .metadata_manager
779            .catalog_controller
780            .get_table_by_name(&req.database_name, &req.table_name)
781            .await?;
782
783        Ok(Response::new(GetTableResponse { table }))
784    }
785
786    async fn alter_name(
787        &self,
788        request: Request<AlterNameRequest>,
789    ) -> Result<Response<AlterNameResponse>, Status> {
790        let AlterNameRequest { object, new_name } = request.into_inner();
791        let version = self
792            .ddl_controller
793            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
794            .await?;
795        Ok(Response::new(AlterNameResponse {
796            status: None,
797            version,
798        }))
799    }
800
801    /// Only support add column for now.
802    async fn alter_source(
803        &self,
804        request: Request<AlterSourceRequest>,
805    ) -> Result<Response<AlterSourceResponse>, Status> {
806        let AlterSourceRequest { source } = request.into_inner();
807        let version = self
808            .ddl_controller
809            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
810            .await?;
811        Ok(Response::new(AlterSourceResponse {
812            status: None,
813            version,
814        }))
815    }
816
817    async fn alter_owner(
818        &self,
819        request: Request<AlterOwnerRequest>,
820    ) -> Result<Response<AlterOwnerResponse>, Status> {
821        let AlterOwnerRequest { object, owner_id } = request.into_inner();
822        let version = self
823            .ddl_controller
824            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
825            .await?;
826        Ok(Response::new(AlterOwnerResponse {
827            status: None,
828            version,
829        }))
830    }
831
832    async fn alter_set_schema(
833        &self,
834        request: Request<AlterSetSchemaRequest>,
835    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
836        let AlterSetSchemaRequest {
837            object,
838            new_schema_id,
839        } = request.into_inner();
840        let version = self
841            .ddl_controller
842            .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
843            .await?;
844        Ok(Response::new(AlterSetSchemaResponse {
845            status: None,
846            version,
847        }))
848    }
849
850    async fn get_ddl_progress(
851        &self,
852        _request: Request<GetDdlProgressRequest>,
853    ) -> Result<Response<GetDdlProgressResponse>, Status> {
854        Ok(Response::new(GetDdlProgressResponse {
855            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
856        }))
857    }
858
859    async fn create_connection(
860        &self,
861        request: Request<CreateConnectionRequest>,
862    ) -> Result<Response<CreateConnectionResponse>, Status> {
863        let req = request.into_inner();
864        if req.payload.is_none() {
865            return Err(Status::invalid_argument("request is empty"));
866        }
867
868        match req.payload.unwrap() {
869            create_connection_request::Payload::PrivateLink(_) => {
870                panic!("Private Link Connection has been deprecated")
871            }
872            create_connection_request::Payload::ConnectionParams(params) => {
873                let pb_connection = Connection {
874                    id: 0.into(),
875                    schema_id: req.schema_id,
876                    database_id: req.database_id,
877                    name: req.name,
878                    info: Some(ConnectionInfo::ConnectionParams(params)),
879                    owner: req.owner_id,
880                };
881                let version = self
882                    .ddl_controller
883                    .run_command(DdlCommand::CreateConnection(pb_connection))
884                    .await?;
885                Ok(Response::new(CreateConnectionResponse { version }))
886            }
887        }
888    }
889
890    async fn list_connections(
891        &self,
892        _request: Request<ListConnectionsRequest>,
893    ) -> Result<Response<ListConnectionsResponse>, Status> {
894        let conns = self
895            .metadata_manager
896            .catalog_controller
897            .list_connections()
898            .await?;
899
900        Ok(Response::new(ListConnectionsResponse {
901            connections: conns,
902        }))
903    }
904
905    async fn drop_connection(
906        &self,
907        request: Request<DropConnectionRequest>,
908    ) -> Result<Response<DropConnectionResponse>, Status> {
909        let req = request.into_inner();
910        let drop_mode = DropMode::from_request_setting(req.cascade);
911
912        let version = self
913            .ddl_controller
914            .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
915            .await?;
916
917        Ok(Response::new(DropConnectionResponse {
918            status: None,
919            version,
920        }))
921    }
922
923    async fn comment_on(
924        &self,
925        request: Request<CommentOnRequest>,
926    ) -> Result<Response<CommentOnResponse>, Status> {
927        let req = request.into_inner();
928        let comment = req.get_comment()?.clone();
929
930        let version = self
931            .ddl_controller
932            .run_command(DdlCommand::CommentOn(Comment {
933                table_id: comment.table_id,
934                schema_id: comment.schema_id,
935                database_id: comment.database_id,
936                column_index: comment.column_index,
937                description: comment.description,
938            }))
939            .await?;
940
941        Ok(Response::new(CommentOnResponse {
942            status: None,
943            version,
944        }))
945    }
946
947    async fn get_tables(
948        &self,
949        request: Request<GetTablesRequest>,
950    ) -> Result<Response<GetTablesResponse>, Status> {
951        let GetTablesRequest {
952            table_ids,
953            include_dropped_tables,
954        } = request.into_inner();
955        let ret = self
956            .metadata_manager
957            .catalog_controller
958            .get_table_by_ids(table_ids, include_dropped_tables)
959            .await?;
960
961        let mut tables = HashMap::default();
962        for table in ret {
963            tables.insert(table.id, table);
964        }
965        Ok(Response::new(GetTablesResponse { tables }))
966    }
967
968    async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
969        self.ddl_controller.wait().await?;
970        Ok(Response::new(WaitResponse {}))
971    }
972
973    async fn alter_cdc_table_backfill_parallelism(
974        &self,
975        request: Request<AlterCdcTableBackfillParallelismRequest>,
976    ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
977        let req = request.into_inner();
978        let job_id = req.get_table_id();
979        let parallelism = *req.get_parallelism()?;
980
981        let table_parallelism = ModelTableParallelism::from(parallelism);
982        let streaming_parallelism = match table_parallelism {
983            ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
984            _ => bail_invalid_parameter!(
985                "CDC table backfill parallelism must be set to a fixed value"
986            ),
987        };
988
989        self.ddl_controller
990            .reschedule_cdc_table_backfill(
991                job_id,
992                ReschedulePolicy::Parallelism(ParallelismPolicy {
993                    parallelism: streaming_parallelism,
994                }),
995            )
996            .await?;
997        Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
998    }
999
1000    async fn alter_parallelism(
1001        &self,
1002        request: Request<AlterParallelismRequest>,
1003    ) -> Result<Response<AlterParallelismResponse>, Status> {
1004        let req = request.into_inner();
1005
1006        let job_id = req.get_table_id();
1007        let parallelism = *req.get_parallelism()?;
1008        let deferred = req.get_deferred();
1009
1010        let parallelism = match parallelism.get_parallelism()? {
1011            Parallelism::Fixed(FixedParallelism { parallelism }) => {
1012                StreamingParallelism::Fixed(*parallelism as _)
1013            }
1014            Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1015            _ => bail_unavailable!(),
1016        };
1017
1018        self.ddl_controller
1019            .reschedule_streaming_job(
1020                job_id,
1021                ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1022                deferred,
1023            )
1024            .await?;
1025
1026        Ok(Response::new(AlterParallelismResponse {}))
1027    }
1028
1029    async fn alter_fragment_parallelism(
1030        &self,
1031        request: Request<AlterFragmentParallelismRequest>,
1032    ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1033        let req = request.into_inner();
1034
1035        let fragment_ids = req.fragment_ids;
1036        if fragment_ids.is_empty() {
1037            return Err(Status::invalid_argument(
1038                "at least one fragment id must be provided",
1039            ));
1040        }
1041
1042        let parallelism = match req.parallelism {
1043            Some(parallelism) => {
1044                let streaming_parallelism = match parallelism.get_parallelism()? {
1045                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1046                        StreamingParallelism::Fixed(*parallelism as _)
1047                    }
1048                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1049                        StreamingParallelism::Adaptive
1050                    }
1051                    _ => bail_unavailable!(),
1052                };
1053                Some(streaming_parallelism)
1054            }
1055            None => None,
1056        };
1057
1058        let fragment_targets = fragment_ids
1059            .into_iter()
1060            .map(|fragment_id| (fragment_id, parallelism.clone()))
1061            .collect();
1062
1063        self.ddl_controller
1064            .reschedule_fragments(fragment_targets)
1065            .await?;
1066
1067        Ok(Response::new(AlterFragmentParallelismResponse {}))
1068    }
1069
1070    async fn alter_streaming_job_config(
1071        &self,
1072        request: Request<AlterStreamingJobConfigRequest>,
1073    ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1074        let AlterStreamingJobConfigRequest {
1075            job_id,
1076            entries_to_add,
1077            keys_to_remove,
1078        } = request.into_inner();
1079
1080        self.ddl_controller
1081            .run_command(DdlCommand::AlterStreamingJobConfig(
1082                job_id,
1083                entries_to_add,
1084                keys_to_remove,
1085            ))
1086            .await?;
1087
1088        Ok(Response::new(AlterStreamingJobConfigResponse {}))
1089    }
1090
1091    /// Auto schema change for cdc sources,
1092    /// called by the source parser when a schema change is detected.
1093    async fn auto_schema_change(
1094        &self,
1095        request: Request<AutoSchemaChangeRequest>,
1096    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1097        let req = request.into_inner();
1098
1099        // randomly select a frontend worker to get the replace table plan
1100        let workers = self
1101            .metadata_manager
1102            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1103            .await?;
1104        let worker = workers
1105            .choose(&mut thread_rng())
1106            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1107
1108        let client = self
1109            .env
1110            .frontend_client_pool()
1111            .get(worker)
1112            .await
1113            .map_err(MetaError::from)?;
1114
1115        let Some(schema_change) = req.schema_change else {
1116            return Err(Status::invalid_argument(
1117                "schema change message is required",
1118            ));
1119        };
1120
1121        for table_change in schema_change.table_changes {
1122            for c in &table_change.columns {
1123                let c = ColumnCatalog::from(c.clone());
1124
1125                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1126                    tracing::warn!(target: "auto_schema_change",
1127                      cdc_table_id = table_change.cdc_table_id,
1128                    upstraem_ddl = table_change.upstream_ddl,
1129                        "invalid column type from cdc table change");
1130                    Err(Status::invalid_argument(format!(
1131                        "invalid column type: {} from cdc table change, column: {:?}",
1132                        column_type, c
1133                    )))
1134                };
1135                if c.is_generated() {
1136                    return invalid_col_type("generated column", &c);
1137                }
1138                if c.is_rw_sys_column() {
1139                    return invalid_col_type("rw system column", &c);
1140                }
1141                if c.is_hidden {
1142                    return invalid_col_type("hidden column", &c);
1143                }
1144            }
1145
1146            // get the table catalog corresponding to the cdc table
1147            let tables: Vec<Table> = self
1148                .metadata_manager
1149                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1150                .await?;
1151
1152            for table in tables {
1153                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
1154                // is a subset of the other.
1155                let original_columns: HashSet<(String, DataType)> =
1156                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
1157                        let col = ColumnCatalog::from(col.clone());
1158                        if col.is_generated() || col.is_hidden() {
1159                            None
1160                        } else {
1161                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1162                        }
1163                    }));
1164
1165                let mut new_columns: HashSet<(String, DataType)> =
1166                    HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1167                        let col = ColumnCatalog::from(col.clone());
1168                        if col.is_generated() || col.is_hidden() {
1169                            None
1170                        } else {
1171                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1172                        }
1173                    }));
1174
1175                // For subset/superset check, we need to add visible connector additional columns defined by INCLUDE in the original table to new_columns
1176                // This includes both _rw columns and user-defined INCLUDE columns (e.g., INCLUDE TIMESTAMP AS xxx)
1177                for col in &table.columns {
1178                    let col = ColumnCatalog::from(col.clone());
1179                    if col.is_connector_additional_column()
1180                        && !col.is_hidden()
1181                        && !col.is_generated()
1182                    {
1183                        new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1184                    }
1185                }
1186
1187                if !(original_columns.is_subset(&new_columns)
1188                    || original_columns.is_superset(&new_columns))
1189                {
1190                    tracing::warn!(target: "auto_schema_change",
1191                                    table_id = %table.id,
1192                                    cdc_table_id = table.cdc_table_id,
1193                                    upstraem_ddl = table_change.upstream_ddl,
1194                                    original_columns = ?original_columns,
1195                                    new_columns = ?new_columns,
1196                                    "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1197
1198                    let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1199                    add_auto_schema_change_fail_event_log(
1200                        &self.meta_metrics,
1201                        table.id,
1202                        table.name.clone(),
1203                        table_change.cdc_table_id.clone(),
1204                        table_change.upstream_ddl.clone(),
1205                        &self.env.event_log_manager_ref(),
1206                        fail_info,
1207                    );
1208
1209                    return Err(Status::invalid_argument(
1210                        "New columns should be a subset or superset of the original columns (including hidden columns)",
1211                    ));
1212                }
1213                // skip the schema change if there is no change to original columns
1214                if original_columns == new_columns {
1215                    tracing::warn!(target: "auto_schema_change",
1216                                   table_id = %table.id,
1217                                   cdc_table_id = table.cdc_table_id,
1218                                   upstraem_ddl = table_change.upstream_ddl,
1219                                    original_columns = ?original_columns,
1220                                    new_columns = ?new_columns,
1221                                   "No change to columns, skipping the schema change");
1222                    continue;
1223                }
1224
1225                let latency_timer = self
1226                    .meta_metrics
1227                    .auto_schema_change_latency
1228                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1229                    .start_timer();
1230                // send a request to the frontend to get the ReplaceJobPlan
1231                // will retry with exponential backoff if the request fails
1232                let resp = client
1233                    .get_table_replace_plan(GetTableReplacePlanRequest {
1234                        database_id: table.database_id,
1235                        owner: table.owner,
1236                        table_id: table.id,
1237                        cdc_table_change: Some(table_change.clone()),
1238                    })
1239                    .await;
1240
1241                match resp {
1242                    Ok(resp) => {
1243                        let resp = resp.into_inner();
1244                        if let Some(plan) = resp.replace_plan {
1245                            let plan = Self::extract_replace_table_info(plan);
1246                            plan.streaming_job.table().inspect(|t| {
1247                                tracing::info!(
1248                                    target: "auto_schema_change",
1249                                    table_id = %t.id,
1250                                    cdc_table_id = t.cdc_table_id,
1251                                    upstraem_ddl = table_change.upstream_ddl,
1252                                    "Start the replace config change")
1253                            });
1254                            // start the schema change procedure
1255                            let replace_res = self
1256                                .ddl_controller
1257                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1258                                .await;
1259
1260                            match replace_res {
1261                                Ok(_) => {
1262                                    tracing::info!(
1263                                        target: "auto_schema_change",
1264                                        table_id = %table.id,
1265                                        cdc_table_id = table.cdc_table_id,
1266                                        "Table replaced success");
1267
1268                                    self.meta_metrics
1269                                        .auto_schema_change_success_cnt
1270                                        .with_guarded_label_values(&[
1271                                            &table.id.to_string(),
1272                                            &table.name,
1273                                        ])
1274                                        .inc();
1275                                    latency_timer.observe_duration();
1276                                }
1277                                Err(e) => {
1278                                    tracing::error!(
1279                                        target: "auto_schema_change",
1280                                        error = %e.as_report(),
1281                                        table_id = %table.id,
1282                                        cdc_table_id = table.cdc_table_id,
1283                                        upstraem_ddl = table_change.upstream_ddl,
1284                                        "failed to replace the table",
1285                                    );
1286                                    let fail_info =
1287                                        format!("failed to replace the table: {}", e.as_report());
1288                                    add_auto_schema_change_fail_event_log(
1289                                        &self.meta_metrics,
1290                                        table.id,
1291                                        table.name.clone(),
1292                                        table_change.cdc_table_id.clone(),
1293                                        table_change.upstream_ddl.clone(),
1294                                        &self.env.event_log_manager_ref(),
1295                                        fail_info,
1296                                    );
1297                                }
1298                            };
1299                        }
1300                    }
1301                    Err(e) => {
1302                        tracing::error!(
1303                            target: "auto_schema_change",
1304                            error = %e.as_report(),
1305                            table_id = %table.id,
1306                            cdc_table_id = table.cdc_table_id,
1307                            "failed to get replace table plan",
1308                        );
1309                        let fail_info =
1310                            format!("failed to get replace table plan: {}", e.as_report());
1311                        add_auto_schema_change_fail_event_log(
1312                            &self.meta_metrics,
1313                            table.id,
1314                            table.name.clone(),
1315                            table_change.cdc_table_id.clone(),
1316                            table_change.upstream_ddl.clone(),
1317                            &self.env.event_log_manager_ref(),
1318                            fail_info,
1319                        );
1320                    }
1321                };
1322            }
1323        }
1324
1325        Ok(Response::new(AutoSchemaChangeResponse {}))
1326    }
1327
1328    async fn alter_swap_rename(
1329        &self,
1330        request: Request<AlterSwapRenameRequest>,
1331    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1332        let req = request.into_inner();
1333
1334        let version = self
1335            .ddl_controller
1336            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1337            .await?;
1338
1339        Ok(Response::new(AlterSwapRenameResponse {
1340            status: None,
1341            version,
1342        }))
1343    }
1344
1345    async fn alter_resource_group(
1346        &self,
1347        request: Request<AlterResourceGroupRequest>,
1348    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1349        let req = request.into_inner();
1350
1351        let table_id = req.get_table_id();
1352        let deferred = req.get_deferred();
1353        let resource_group = req.resource_group;
1354
1355        self.ddl_controller
1356            .reschedule_streaming_job(
1357                table_id.as_job_id(),
1358                ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1359                deferred,
1360            )
1361            .await?;
1362
1363        Ok(Response::new(AlterResourceGroupResponse {}))
1364    }
1365
1366    async fn alter_database_param(
1367        &self,
1368        request: Request<AlterDatabaseParamRequest>,
1369    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1370        let req = request.into_inner();
1371        let database_id = req.database_id;
1372
1373        let param = match req.param.unwrap() {
1374            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1375                AlterDatabaseParam::BarrierIntervalMs(value.value)
1376            }
1377            alter_database_param_request::Param::CheckpointFrequency(value) => {
1378                AlterDatabaseParam::CheckpointFrequency(value.value)
1379            }
1380        };
1381        let version = self
1382            .ddl_controller
1383            .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1384            .await?;
1385
1386        return Ok(Response::new(AlterDatabaseParamResponse {
1387            status: None,
1388            version,
1389        }));
1390    }
1391
1392    async fn compact_iceberg_table(
1393        &self,
1394        request: Request<CompactIcebergTableRequest>,
1395    ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1396        let req = request.into_inner();
1397        let sink_id = req.sink_id;
1398
1399        // Trigger manual compaction directly using the sink ID
1400        let task_id = self
1401            .iceberg_compaction_manager
1402            .trigger_manual_compaction(sink_id)
1403            .await
1404            .map_err(|e| {
1405                Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1406            })?;
1407
1408        Ok(Response::new(CompactIcebergTableResponse {
1409            status: None,
1410            task_id,
1411        }))
1412    }
1413
1414    async fn expire_iceberg_table_snapshots(
1415        &self,
1416        request: Request<ExpireIcebergTableSnapshotsRequest>,
1417    ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1418        let req = request.into_inner();
1419        let sink_id = req.sink_id;
1420
1421        // Trigger manual snapshot expiration directly using the sink ID
1422        self.iceberg_compaction_manager
1423            .check_and_expire_snapshots(sink_id)
1424            .await
1425            .map_err(|e| {
1426                Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1427            })?;
1428
1429        Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1430            status: None,
1431        }))
1432    }
1433
1434    async fn create_iceberg_table(
1435        &self,
1436        request: Request<CreateIcebergTableRequest>,
1437    ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1438        let req = request.into_inner();
1439        let CreateIcebergTableRequest {
1440            table_info,
1441            sink_info,
1442            iceberg_source,
1443            if_not_exists,
1444        } = req;
1445
1446        // 1. create table job
1447        let PbTableJobInfo {
1448            source,
1449            table,
1450            fragment_graph,
1451            job_type,
1452        } = table_info.unwrap();
1453        let mut table = table.unwrap();
1454        let mut fragment_graph = fragment_graph.unwrap();
1455        let database_id = table.get_database_id();
1456        let schema_id = table.get_schema_id();
1457        let table_name = table.get_name().to_owned();
1458
1459        // Mark table as background creation, so that it won't block sink creation.
1460        table.create_type = PbCreateType::Background as _;
1461
1462        // Set the source rate limit to 0 and reset it back after the iceberg sink is backfilling.
1463        let source_rate_limit = if let Some(source) = &source {
1464            for fragment in fragment_graph.fragments.values_mut() {
1465                stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1466                    if let NodeBody::Source(source_node) = node
1467                        && let Some(inner) = &mut source_node.source_inner
1468                    {
1469                        inner.rate_limit = Some(0);
1470                    }
1471                });
1472            }
1473            Some(source.rate_limit)
1474        } else {
1475            None
1476        };
1477
1478        let stream_job =
1479            StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1480        let _ = self
1481            .ddl_controller
1482            .run_command(DdlCommand::CreateStreamingJob {
1483                stream_job,
1484                fragment_graph,
1485                dependencies: HashSet::new(),
1486                specific_resource_group: None,
1487                if_not_exists,
1488            })
1489            .await?;
1490
1491        let table_catalog = self
1492            .metadata_manager
1493            .catalog_controller
1494            .get_table_catalog_by_name(database_id, schema_id, &table_name)
1495            .await?
1496            .ok_or(Status::not_found("Internal error: table not found"))?;
1497
1498        // 2. create iceberg sink job
1499        let PbSinkJobInfo {
1500            sink,
1501            fragment_graph,
1502        } = sink_info.unwrap();
1503        let mut sink = sink.unwrap();
1504
1505        // Mark sink as background creation, so that it won't block source creation.
1506        sink.create_type = PbCreateType::Background as _;
1507
1508        let mut fragment_graph = fragment_graph.unwrap();
1509
1510        assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1511        assert!(
1512            risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1513                .is_placeholder()
1514        );
1515        fragment_graph.dependent_table_ids[0] = table_catalog.id;
1516        for fragment in fragment_graph.fragments.values_mut() {
1517            stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1518                NodeBody::StreamScan(scan) => {
1519                    scan.table_id = table_catalog.id;
1520                    if let Some(table_desc) = &mut scan.table_desc {
1521                        assert!(
1522                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1523                                .is_placeholder()
1524                        );
1525                        table_desc.table_id = table_catalog.id;
1526                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1527                    }
1528                    if let Some(table) = &mut scan.arrangement_table {
1529                        assert!(
1530                            risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1531                        );
1532                        *table = table_catalog.clone();
1533                    }
1534                }
1535                NodeBody::BatchPlan(plan) => {
1536                    if let Some(table_desc) = &mut plan.table_desc {
1537                        assert!(
1538                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1539                                .is_placeholder()
1540                        );
1541                        table_desc.table_id = table_catalog.id;
1542                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1543                    }
1544                }
1545                _ => {}
1546            });
1547        }
1548
1549        let table_id = table_catalog.id;
1550        let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1551        let stream_job = StreamingJob::Sink(sink);
1552        let res = self
1553            .ddl_controller
1554            .run_command(DdlCommand::CreateStreamingJob {
1555                stream_job,
1556                fragment_graph,
1557                dependencies,
1558                specific_resource_group: None,
1559                if_not_exists,
1560            })
1561            .await;
1562
1563        if res.is_err() {
1564            let _ = self
1565                .ddl_controller
1566                .run_command(DdlCommand::DropStreamingJob {
1567                    job_id: StreamingJobId::Table(None, table_id),
1568                    drop_mode: DropMode::Cascade,
1569                })
1570                .await
1571                .inspect_err(|err| {
1572                    tracing::error!(error = %err.as_report(),
1573                        "Failed to clean up table after iceberg sink creation failure",
1574                    );
1575                });
1576            res?;
1577        }
1578
1579        // 3. reset source rate limit back to normal after sink creation
1580        if let Some(source_rate_limit) = source_rate_limit
1581            && source_rate_limit != Some(0)
1582        {
1583            let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1584                table_catalog.optional_associated_source_id.unwrap();
1585            let actors_to_apply = self
1586                .metadata_manager
1587                .update_source_rate_limit_by_source_id(SourceId::new(source_id), source_rate_limit)
1588                .await?;
1589            let mutation: ThrottleConfig = actors_to_apply
1590                .into_iter()
1591                .map(|(fragment_id, actors)| {
1592                    (
1593                        fragment_id,
1594                        actors
1595                            .into_iter()
1596                            .map(|actor_id| (actor_id, source_rate_limit))
1597                            .collect::<HashMap<_, _>>(),
1598                    )
1599                })
1600                .collect();
1601            let _ = self
1602                .barrier_scheduler
1603                .run_command(database_id, Command::Throttle(mutation))
1604                .await?;
1605        }
1606
1607        // 4. create iceberg source
1608        let iceberg_source = iceberg_source.unwrap();
1609        let res = self
1610            .ddl_controller
1611            .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1612            .await;
1613        if res.is_err() {
1614            let _ = self
1615                .ddl_controller
1616                .run_command(DdlCommand::DropStreamingJob {
1617                    job_id: StreamingJobId::Table(None, table_id),
1618                    drop_mode: DropMode::Cascade,
1619                })
1620                .await
1621                .inspect_err(|err| {
1622                    tracing::error!(
1623                        error = %err.as_report(),
1624                        "Failed to clean up table after iceberg source creation failure",
1625                    );
1626                });
1627        }
1628
1629        Ok(Response::new(CreateIcebergTableResponse {
1630            status: None,
1631            version: res?,
1632        }))
1633    }
1634}
1635
1636fn add_auto_schema_change_fail_event_log(
1637    meta_metrics: &Arc<MetaMetrics>,
1638    table_id: TableId,
1639    table_name: String,
1640    cdc_table_id: String,
1641    upstream_ddl: String,
1642    event_log_manager: &EventLogManagerRef,
1643    fail_info: String,
1644) {
1645    meta_metrics
1646        .auto_schema_change_failure_cnt
1647        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1648        .inc();
1649    let event = event_log::EventAutoSchemaChangeFail {
1650        table_id,
1651        table_name,
1652        cdc_table_id,
1653        upstream_ddl,
1654        fail_info,
1655    };
1656    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1657}