Skip to main content

risingwave_meta_service/
ddl_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::{ObjectId, TableId};
26use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
27use risingwave_common::types::DataType;
28use risingwave_common::util::stream_graph_visitor;
29use risingwave_connector::sink::catalog::SinkId;
30use risingwave_connector::sink::iceberg::ENABLE_PK_INDEX;
31use risingwave_meta::barrier::{BarrierScheduler, Command, ResumeBackfillTarget};
32use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
33use risingwave_meta::model::TableParallelism as ModelTableParallelism;
34use risingwave_meta::rpc::metrics::MetaMetrics;
35use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
36use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
37use risingwave_meta_model::StreamingParallelism;
38use risingwave_pb::catalog::connection::Info as ConnectionInfo;
39use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
40use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
41use risingwave_pb::common::WorkerType;
42use risingwave_pb::common::worker_node::State;
43use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
44use risingwave_pb::ddl_service::ddl_service_server::DdlService;
45use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
46use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
47use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
48use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
49use risingwave_pb::meta::event_log;
50use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
51use risingwave_pb::stream_plan::stream_node::NodeBody;
52use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
53use thiserror_ext::AsReport;
54use tokio::sync::oneshot::Sender;
55use tokio::task::JoinHandle;
56use tonic::{Request, Response, Status};
57
58use crate::MetaError;
59use crate::barrier::BarrierManagerRef;
60use crate::manager::sink_coordination::SinkCoordinatorManager;
61use crate::manager::{MetaSrvEnv, StreamingJob};
62use crate::rpc::ddl_controller::{
63    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
64};
65use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
66
67#[derive(Clone)]
68pub struct DdlServiceImpl {
69    env: MetaSrvEnv,
70
71    metadata_manager: MetadataManager,
72    sink_manager: SinkCoordinatorManager,
73    ddl_controller: DdlController,
74    meta_metrics: Arc<MetaMetrics>,
75    iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
76    barrier_scheduler: BarrierScheduler,
77}
78
79impl DdlServiceImpl {
80    pub async fn new(
81        env: MetaSrvEnv,
82        metadata_manager: MetadataManager,
83        stream_manager: GlobalStreamManagerRef,
84        source_manager: SourceManagerRef,
85        barrier_manager: BarrierManagerRef,
86        sink_manager: SinkCoordinatorManager,
87        meta_metrics: Arc<MetaMetrics>,
88        iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
89        barrier_scheduler: BarrierScheduler,
90    ) -> Self {
91        let ddl_controller = DdlController::new(
92            env.clone(),
93            metadata_manager.clone(),
94            stream_manager,
95            source_manager,
96            barrier_manager,
97            sink_manager.clone(),
98            iceberg_compaction_manager.clone(),
99        )
100        .await;
101        Self {
102            env,
103            metadata_manager,
104            sink_manager,
105            ddl_controller,
106            meta_metrics,
107            iceberg_compaction_manager,
108            barrier_scheduler,
109        }
110    }
111
112    fn extract_replace_table_info(
113        ReplaceJobPlan {
114            fragment_graph,
115            replace_job,
116        }: ReplaceJobPlan,
117    ) -> ReplaceStreamJobInfo {
118        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
119            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
120                table,
121                source,
122                job_type,
123            }) => StreamingJob::Table(
124                source,
125                table.unwrap(),
126                TableJobType::try_from(job_type).unwrap(),
127            ),
128            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
129                StreamingJob::Source(source.unwrap())
130            }
131            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
132                table,
133            }) => StreamingJob::MaterializedView(table.unwrap()),
134        };
135
136        ReplaceStreamJobInfo {
137            streaming_job: replace_streaming_job,
138            fragment_graph: fragment_graph.unwrap(),
139        }
140    }
141
142    fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
143        streaming_job_resource_type::ResourceType::Regular(true)
144    }
145
146    pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
147        tracing::info!("start migrate legacy table fragments task");
148        let env = self.env.clone();
149        let metadata_manager = self.metadata_manager.clone();
150        let ddl_controller = self.ddl_controller.clone();
151
152        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
153        let join_handle = tokio::spawn(async move {
154            async fn migrate_inner(
155                env: &MetaSrvEnv,
156                metadata_manager: &MetadataManager,
157                ddl_controller: &DdlController,
158            ) -> MetaResult<()> {
159                let tables = metadata_manager
160                    .catalog_controller
161                    .list_unmigrated_tables()
162                    .await?;
163
164                if tables.is_empty() {
165                    tracing::info!("no legacy table fragments need migration");
166                    return Ok(());
167                }
168
169                let client = {
170                    let workers = metadata_manager
171                        .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
172                        .await?;
173                    if workers.is_empty() {
174                        return Err(anyhow::anyhow!("no active frontend nodes found").into());
175                    }
176                    let worker = workers.choose(&mut thread_rng()).unwrap();
177                    env.frontend_client_pool().get(worker).await?
178                };
179
180                for table in tables {
181                    let start = tokio::time::Instant::now();
182                    let req = GetTableReplacePlanRequest {
183                        database_id: table.database_id,
184                        table_id: table.id,
185                        cdc_table_change: None,
186                    };
187                    let resp = client
188                        .get_table_replace_plan(req)
189                        .await
190                        .context("failed to get table replace plan from frontend")?;
191
192                    let plan = resp.into_inner().replace_plan.unwrap();
193                    let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
194                    ddl_controller
195                        .run_command(DdlCommand::ReplaceStreamJob(replace_info))
196                        .await?;
197                    tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
198                }
199                tracing::info!("successfully migrated all legacy table fragments");
200
201                Ok(())
202            }
203
204            let migrate_future = async move {
205                let mut attempt = 0;
206                loop {
207                    match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
208                        Ok(_) => break,
209                        Err(e) => {
210                            attempt += 1;
211                            tracing::error!(
212                                "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
213                                e.as_report(),
214                                attempt
215                            );
216                            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
217                        }
218                    }
219                }
220            };
221
222            select(pin!(migrate_future), shutdown_rx).await;
223        });
224
225        (join_handle, shutdown_tx)
226    }
227}
228
229#[async_trait::async_trait]
230impl DdlService for DdlServiceImpl {
231    async fn create_database(
232        &self,
233        request: Request<CreateDatabaseRequest>,
234    ) -> Result<Response<CreateDatabaseResponse>, Status> {
235        let req = request.into_inner();
236        let database = req.get_db()?.clone();
237        let version = self
238            .ddl_controller
239            .run_command(DdlCommand::CreateDatabase(database))
240            .await?;
241
242        Ok(Response::new(CreateDatabaseResponse {
243            status: None,
244            version,
245        }))
246    }
247
248    async fn drop_database(
249        &self,
250        request: Request<DropDatabaseRequest>,
251    ) -> Result<Response<DropDatabaseResponse>, Status> {
252        let req = request.into_inner();
253        let database_id = req.get_database_id();
254
255        let version = self
256            .ddl_controller
257            .run_command(DdlCommand::DropDatabase(database_id))
258            .await?;
259
260        Ok(Response::new(DropDatabaseResponse {
261            status: None,
262            version,
263        }))
264    }
265
266    async fn create_secret(
267        &self,
268        request: Request<CreateSecretRequest>,
269    ) -> Result<Response<CreateSecretResponse>, Status> {
270        let req = request.into_inner();
271        let pb_secret = Secret {
272            id: 0.into(),
273            name: req.get_name().clone(),
274            database_id: req.get_database_id(),
275            value: req.get_value().clone(),
276            owner: req.get_owner_id(),
277            schema_id: req.get_schema_id(),
278        };
279        let version = self
280            .ddl_controller
281            .run_command(DdlCommand::CreateSecret(pb_secret))
282            .await?;
283
284        Ok(Response::new(CreateSecretResponse { version }))
285    }
286
287    async fn drop_secret(
288        &self,
289        request: Request<DropSecretRequest>,
290    ) -> Result<Response<DropSecretResponse>, Status> {
291        let req = request.into_inner();
292        let secret_id = req.get_secret_id();
293        let drop_mode = DropMode::from_request_setting(req.cascade);
294        let version = self
295            .ddl_controller
296            .run_command(DdlCommand::DropSecret(secret_id, drop_mode))
297            .await?;
298
299        Ok(Response::new(DropSecretResponse {
300            status: None,
301            version,
302        }))
303    }
304
305    async fn alter_secret(
306        &self,
307        request: Request<AlterSecretRequest>,
308    ) -> Result<Response<AlterSecretResponse>, Status> {
309        let req = request.into_inner();
310        let pb_secret = Secret {
311            id: req.get_secret_id(),
312            name: req.get_name().clone(),
313            database_id: req.get_database_id(),
314            value: req.get_value().clone(),
315            owner: req.get_owner_id(),
316            schema_id: req.get_schema_id(),
317        };
318        let version = self
319            .ddl_controller
320            .run_command(DdlCommand::AlterSecret(pb_secret))
321            .await?;
322
323        Ok(Response::new(AlterSecretResponse { version }))
324    }
325
326    async fn create_schema(
327        &self,
328        request: Request<CreateSchemaRequest>,
329    ) -> Result<Response<CreateSchemaResponse>, Status> {
330        let req = request.into_inner();
331        let schema = req.get_schema()?.clone();
332        let version = self
333            .ddl_controller
334            .run_command(DdlCommand::CreateSchema(schema))
335            .await?;
336
337        Ok(Response::new(CreateSchemaResponse {
338            status: None,
339            version,
340        }))
341    }
342
343    async fn drop_schema(
344        &self,
345        request: Request<DropSchemaRequest>,
346    ) -> Result<Response<DropSchemaResponse>, Status> {
347        let req = request.into_inner();
348        let schema_id = req.get_schema_id();
349        let drop_mode = DropMode::from_request_setting(req.cascade);
350        let version = self
351            .ddl_controller
352            .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
353            .await?;
354        Ok(Response::new(DropSchemaResponse {
355            status: None,
356            version,
357        }))
358    }
359
360    async fn create_source(
361        &self,
362        request: Request<CreateSourceRequest>,
363    ) -> Result<Response<CreateSourceResponse>, Status> {
364        let req = request.into_inner();
365        let source = req.get_source()?.clone();
366
367        match req.fragment_graph {
368            None => {
369                let version = self
370                    .ddl_controller
371                    .run_command(DdlCommand::CreateNonSharedSource(source))
372                    .await?;
373                Ok(Response::new(CreateSourceResponse {
374                    status: None,
375                    version,
376                }))
377            }
378            Some(fragment_graph) => {
379                // The id of stream job has been set above
380                let stream_job = StreamingJob::Source(source);
381                let version = self
382                    .ddl_controller
383                    .run_command(DdlCommand::CreateStreamingJob {
384                        stream_job,
385                        fragment_graph,
386                        dependencies: HashSet::new(),
387                        resource_type: Self::default_streaming_job_resource_type(),
388                        if_not_exists: req.if_not_exists,
389                        refresh_interval_sec: None,
390                    })
391                    .await?;
392                Ok(Response::new(CreateSourceResponse {
393                    status: None,
394                    version,
395                }))
396            }
397        }
398    }
399
400    async fn drop_source(
401        &self,
402        request: Request<DropSourceRequest>,
403    ) -> Result<Response<DropSourceResponse>, Status> {
404        let request = request.into_inner();
405        let source_id = request.source_id;
406        let drop_mode = DropMode::from_request_setting(request.cascade);
407        let version = self
408            .ddl_controller
409            .run_command(DdlCommand::DropSource(source_id, drop_mode))
410            .await?;
411
412        Ok(Response::new(DropSourceResponse {
413            status: None,
414            version,
415        }))
416    }
417
418    async fn reset_source(
419        &self,
420        request: Request<ResetSourceRequest>,
421    ) -> Result<Response<ResetSourceResponse>, Status> {
422        let request = request.into_inner();
423        let source_id = request.source_id;
424
425        tracing::info!(
426            source_id = %source_id,
427            "Received RESET SOURCE request, routing to DDL controller"
428        );
429
430        // Route to DDL controller
431        let version = self
432            .ddl_controller
433            .run_command(DdlCommand::ResetSource(source_id))
434            .await?;
435
436        Ok(Response::new(ResetSourceResponse {
437            status: None,
438            version,
439        }))
440    }
441
442    async fn create_sink(
443        &self,
444        request: Request<CreateSinkRequest>,
445    ) -> Result<Response<CreateSinkResponse>, Status> {
446        self.env.idle_manager().record_activity();
447
448        let req = request.into_inner();
449
450        let sink = req.get_sink()?.clone();
451        let fragment_graph = req.get_fragment_graph()?.clone();
452        let dependencies = req.get_dependencies().iter().copied().collect();
453        let resource_type = req
454            .resource_type
455            .and_then(|resource_type| resource_type.resource_type)
456            .unwrap_or_else(Self::default_streaming_job_resource_type);
457
458        let stream_job = StreamingJob::Sink(sink);
459
460        let command = DdlCommand::CreateStreamingJob {
461            stream_job,
462            fragment_graph,
463            dependencies,
464            resource_type,
465            if_not_exists: req.if_not_exists,
466            refresh_interval_sec: None,
467        };
468
469        let version = self.ddl_controller.run_command(command).await?;
470
471        Ok(Response::new(CreateSinkResponse {
472            status: None,
473            version,
474        }))
475    }
476
477    async fn drop_sink(
478        &self,
479        request: Request<DropSinkRequest>,
480    ) -> Result<Response<DropSinkResponse>, Status> {
481        let request = request.into_inner();
482        let sink_id = request.sink_id;
483        let drop_mode = DropMode::from_request_setting(request.cascade);
484
485        let command = DdlCommand::DropStreamingJob {
486            job_id: StreamingJobId::Sink(sink_id),
487            drop_mode,
488        };
489
490        let version = self.ddl_controller.run_command(command).await?;
491
492        self.sink_manager
493            .stop_sink_coordinator(vec![SinkId::from(sink_id)])
494            .await;
495        self.iceberg_compaction_manager
496            .clear_iceberg_maintenance_by_sink_id(SinkId::from(sink_id));
497
498        Ok(Response::new(DropSinkResponse {
499            status: None,
500            version,
501        }))
502    }
503
504    async fn create_subscription(
505        &self,
506        request: Request<CreateSubscriptionRequest>,
507    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
508        self.env.idle_manager().record_activity();
509
510        let req = request.into_inner();
511
512        let subscription = req.get_subscription()?.clone();
513        let command = DdlCommand::CreateSubscription(subscription);
514
515        let version = self.ddl_controller.run_command(command).await?;
516
517        Ok(Response::new(CreateSubscriptionResponse {
518            status: None,
519            version,
520        }))
521    }
522
523    async fn drop_subscription(
524        &self,
525        request: Request<DropSubscriptionRequest>,
526    ) -> Result<Response<DropSubscriptionResponse>, Status> {
527        let request = request.into_inner();
528        let subscription_id = request.subscription_id;
529        let drop_mode = DropMode::from_request_setting(request.cascade);
530
531        let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
532
533        let version = self.ddl_controller.run_command(command).await?;
534
535        Ok(Response::new(DropSubscriptionResponse {
536            status: None,
537            version,
538        }))
539    }
540
541    async fn create_materialized_view(
542        &self,
543        request: Request<CreateMaterializedViewRequest>,
544    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
545        self.env.idle_manager().record_activity();
546
547        let req = request.into_inner();
548        let mview = req.get_materialized_view()?.clone();
549        let fragment_graph = req.get_fragment_graph()?.clone();
550        let dependencies = req.get_dependencies().iter().copied().collect();
551        let resource_type = req.resource_type.unwrap().resource_type.unwrap();
552
553        let stream_job = StreamingJob::MaterializedView(mview);
554        let version = self
555            .ddl_controller
556            .run_command(DdlCommand::CreateStreamingJob {
557                stream_job,
558                fragment_graph,
559                dependencies,
560                resource_type,
561                if_not_exists: req.if_not_exists,
562                refresh_interval_sec: req.refresh_interval_sec,
563            })
564            .await?;
565
566        Ok(Response::new(CreateMaterializedViewResponse {
567            status: None,
568            version,
569        }))
570    }
571
572    async fn drop_materialized_view(
573        &self,
574        request: Request<DropMaterializedViewRequest>,
575    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
576        self.env.idle_manager().record_activity();
577
578        let request = request.into_inner();
579        let table_id = request.table_id;
580        let drop_mode = DropMode::from_request_setting(request.cascade);
581
582        let version = self
583            .ddl_controller
584            .run_command(DdlCommand::DropStreamingJob {
585                job_id: StreamingJobId::MaterializedView(table_id),
586                drop_mode,
587            })
588            .await?;
589
590        Ok(Response::new(DropMaterializedViewResponse {
591            status: None,
592            version,
593        }))
594    }
595
596    async fn create_index(
597        &self,
598        request: Request<CreateIndexRequest>,
599    ) -> Result<Response<CreateIndexResponse>, Status> {
600        self.env.idle_manager().record_activity();
601
602        let req = request.into_inner();
603        let index = req.get_index()?.clone();
604        let index_table = req.get_index_table()?.clone();
605        let fragment_graph = req.get_fragment_graph()?.clone();
606        let resource_type = req
607            .resource_type
608            .and_then(|resource_type| resource_type.resource_type)
609            .unwrap_or_else(Self::default_streaming_job_resource_type);
610
611        let stream_job = StreamingJob::Index(index, index_table);
612        let version = self
613            .ddl_controller
614            .run_command(DdlCommand::CreateStreamingJob {
615                stream_job,
616                fragment_graph,
617                dependencies: HashSet::new(),
618                resource_type,
619                if_not_exists: req.if_not_exists,
620                refresh_interval_sec: None,
621            })
622            .await?;
623
624        Ok(Response::new(CreateIndexResponse {
625            status: None,
626            version,
627        }))
628    }
629
630    async fn drop_index(
631        &self,
632        request: Request<DropIndexRequest>,
633    ) -> Result<Response<DropIndexResponse>, Status> {
634        self.env.idle_manager().record_activity();
635
636        let request = request.into_inner();
637        let index_id = request.index_id;
638        let drop_mode = DropMode::from_request_setting(request.cascade);
639        let version = self
640            .ddl_controller
641            .run_command(DdlCommand::DropStreamingJob {
642                job_id: StreamingJobId::Index(index_id),
643                drop_mode,
644            })
645            .await?;
646
647        Ok(Response::new(DropIndexResponse {
648            status: None,
649            version,
650        }))
651    }
652
653    async fn create_function(
654        &self,
655        request: Request<CreateFunctionRequest>,
656    ) -> Result<Response<CreateFunctionResponse>, Status> {
657        let req = request.into_inner();
658        let function = req.get_function()?.clone();
659
660        let version = self
661            .ddl_controller
662            .run_command(DdlCommand::CreateFunction(function))
663            .await?;
664
665        Ok(Response::new(CreateFunctionResponse {
666            status: None,
667            version,
668        }))
669    }
670
671    async fn drop_function(
672        &self,
673        request: Request<DropFunctionRequest>,
674    ) -> Result<Response<DropFunctionResponse>, Status> {
675        let request = request.into_inner();
676
677        let version = self
678            .ddl_controller
679            .run_command(DdlCommand::DropFunction(
680                request.function_id,
681                DropMode::from_request_setting(request.cascade),
682            ))
683            .await?;
684
685        Ok(Response::new(DropFunctionResponse {
686            status: None,
687            version,
688        }))
689    }
690
691    async fn create_table(
692        &self,
693        request: Request<CreateTableRequest>,
694    ) -> Result<Response<CreateTableResponse>, Status> {
695        let request = request.into_inner();
696        let job_type = request.get_job_type().unwrap_or_default();
697        let dependencies = request.get_dependencies().iter().copied().collect();
698        let source = request.source;
699        let mview = request.materialized_view.unwrap();
700        let fragment_graph = request.fragment_graph.unwrap();
701
702        let stream_job = StreamingJob::Table(source, mview, job_type);
703        let version = self
704            .ddl_controller
705            .run_command(DdlCommand::CreateStreamingJob {
706                stream_job,
707                fragment_graph,
708                dependencies,
709                resource_type: Self::default_streaming_job_resource_type(),
710                if_not_exists: request.if_not_exists,
711                refresh_interval_sec: None,
712            })
713            .await?;
714
715        Ok(Response::new(CreateTableResponse {
716            status: None,
717            version,
718        }))
719    }
720
721    async fn drop_table(
722        &self,
723        request: Request<DropTableRequest>,
724    ) -> Result<Response<DropTableResponse>, Status> {
725        let request = request.into_inner();
726        let source_id = request.source_id;
727        let table_id = request.table_id;
728
729        let drop_mode = DropMode::from_request_setting(request.cascade);
730        let version = self
731            .ddl_controller
732            .run_command(DdlCommand::DropStreamingJob {
733                job_id: StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id), table_id),
734                drop_mode,
735            })
736            .await?;
737
738        Ok(Response::new(DropTableResponse {
739            status: None,
740            version,
741        }))
742    }
743
744    async fn create_view(
745        &self,
746        request: Request<CreateViewRequest>,
747    ) -> Result<Response<CreateViewResponse>, Status> {
748        let req = request.into_inner();
749        let view = req.get_view()?.clone();
750        let dependencies = req
751            .get_dependencies()
752            .iter()
753            .copied()
754            .collect::<HashSet<_>>();
755
756        let version = self
757            .ddl_controller
758            .run_command(DdlCommand::CreateView(view, dependencies))
759            .await?;
760
761        Ok(Response::new(CreateViewResponse {
762            status: None,
763            version,
764        }))
765    }
766
767    async fn drop_view(
768        &self,
769        request: Request<DropViewRequest>,
770    ) -> Result<Response<DropViewResponse>, Status> {
771        let request = request.into_inner();
772        let view_id = request.get_view_id();
773        let drop_mode = DropMode::from_request_setting(request.cascade);
774        let version = self
775            .ddl_controller
776            .run_command(DdlCommand::DropView(view_id, drop_mode))
777            .await?;
778        Ok(Response::new(DropViewResponse {
779            status: None,
780            version,
781        }))
782    }
783
784    async fn risectl_list_state_tables(
785        &self,
786        _request: Request<RisectlListStateTablesRequest>,
787    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
788        let tables = self
789            .metadata_manager
790            .catalog_controller
791            .list_all_state_tables()
792            .await?;
793        Ok(Response::new(RisectlListStateTablesResponse { tables }))
794    }
795
796    async fn risectl_resume_backfill(
797        &self,
798        request: Request<RisectlResumeBackfillRequest>,
799    ) -> Result<Response<RisectlResumeBackfillResponse>, Status> {
800        let request = request.into_inner();
801        let target = request
802            .target
803            .ok_or_else(|| Status::invalid_argument("missing resume backfill target"))?;
804
805        match target {
806            risectl_resume_backfill_request::Target::JobId(job_id) => {
807                let database_id = self
808                    .metadata_manager
809                    .catalog_controller
810                    .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
811                    .await?;
812                self.barrier_scheduler
813                    .run_command(
814                        database_id,
815                        Command::ResumeBackfill {
816                            target: ResumeBackfillTarget::Job(job_id),
817                        },
818                    )
819                    .await?;
820            }
821            risectl_resume_backfill_request::Target::FragmentId(fragment_id) => {
822                let mut job_ids = self
823                    .metadata_manager
824                    .catalog_controller
825                    .get_fragment_job_id(vec![fragment_id])
826                    .await?;
827                let job_id = job_ids
828                    .pop()
829                    .ok_or_else(|| Status::invalid_argument("fragment not found"))?;
830                let database_id = self
831                    .metadata_manager
832                    .catalog_controller
833                    .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
834                    .await?;
835                self.barrier_scheduler
836                    .run_command(
837                        database_id,
838                        Command::ResumeBackfill {
839                            target: ResumeBackfillTarget::Fragment(fragment_id),
840                        },
841                    )
842                    .await?;
843            }
844        }
845
846        Ok(Response::new(RisectlResumeBackfillResponse {}))
847    }
848
849    async fn replace_job_plan(
850        &self,
851        request: Request<ReplaceJobPlanRequest>,
852    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
853        let req = request.into_inner().get_plan().cloned()?;
854
855        let version = self
856            .ddl_controller
857            .run_command(DdlCommand::ReplaceStreamJob(
858                Self::extract_replace_table_info(req),
859            ))
860            .await?;
861
862        Ok(Response::new(ReplaceJobPlanResponse {
863            status: None,
864            version,
865        }))
866    }
867
868    async fn get_table(
869        &self,
870        request: Request<GetTableRequest>,
871    ) -> Result<Response<GetTableResponse>, Status> {
872        let req = request.into_inner();
873        let table = self
874            .metadata_manager
875            .catalog_controller
876            .get_table_by_name(&req.database_name, &req.table_name)
877            .await?;
878
879        Ok(Response::new(GetTableResponse { table }))
880    }
881
882    async fn alter_name(
883        &self,
884        request: Request<AlterNameRequest>,
885    ) -> Result<Response<AlterNameResponse>, Status> {
886        let AlterNameRequest { object, new_name } = request.into_inner();
887        let version = self
888            .ddl_controller
889            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
890            .await?;
891        Ok(Response::new(AlterNameResponse {
892            status: None,
893            version,
894        }))
895    }
896
897    /// Only support add column for now.
898    async fn alter_source(
899        &self,
900        request: Request<AlterSourceRequest>,
901    ) -> Result<Response<AlterSourceResponse>, Status> {
902        let AlterSourceRequest { source } = request.into_inner();
903        let version = self
904            .ddl_controller
905            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
906            .await?;
907        Ok(Response::new(AlterSourceResponse {
908            status: None,
909            version,
910        }))
911    }
912
913    async fn alter_owner(
914        &self,
915        request: Request<AlterOwnerRequest>,
916    ) -> Result<Response<AlterOwnerResponse>, Status> {
917        let AlterOwnerRequest { object, owner_id } = request.into_inner();
918        let version = self
919            .ddl_controller
920            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
921            .await?;
922        Ok(Response::new(AlterOwnerResponse {
923            status: None,
924            version,
925        }))
926    }
927
928    async fn alter_subscription_retention(
929        &self,
930        request: Request<AlterSubscriptionRetentionRequest>,
931    ) -> Result<Response<AlterSubscriptionRetentionResponse>, Status> {
932        let AlterSubscriptionRetentionRequest {
933            subscription_id,
934            retention_seconds,
935            definition,
936        } = request.into_inner();
937        let version = self
938            .ddl_controller
939            .run_command(DdlCommand::AlterSubscriptionRetention {
940                subscription_id,
941                retention_seconds,
942                definition,
943            })
944            .await?;
945        Ok(Response::new(AlterSubscriptionRetentionResponse {
946            status: None,
947            version,
948        }))
949    }
950
951    async fn alter_set_schema(
952        &self,
953        request: Request<AlterSetSchemaRequest>,
954    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
955        let AlterSetSchemaRequest {
956            object,
957            new_schema_id,
958        } = request.into_inner();
959        let version = self
960            .ddl_controller
961            .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
962            .await?;
963        Ok(Response::new(AlterSetSchemaResponse {
964            status: None,
965            version,
966        }))
967    }
968
969    async fn get_ddl_progress(
970        &self,
971        _request: Request<GetDdlProgressRequest>,
972    ) -> Result<Response<GetDdlProgressResponse>, Status> {
973        Ok(Response::new(GetDdlProgressResponse {
974            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
975        }))
976    }
977
978    async fn create_connection(
979        &self,
980        request: Request<CreateConnectionRequest>,
981    ) -> Result<Response<CreateConnectionResponse>, Status> {
982        let req = request.into_inner();
983        if req.payload.is_none() {
984            return Err(Status::invalid_argument("request is empty"));
985        }
986
987        match req.payload.unwrap() {
988            #[expect(deprecated)]
989            create_connection_request::Payload::PrivateLink(_) => {
990                panic!("Private Link Connection has been deprecated")
991            }
992            create_connection_request::Payload::ConnectionParams(params) => {
993                let pb_connection = Connection {
994                    id: 0.into(),
995                    schema_id: req.schema_id,
996                    database_id: req.database_id,
997                    name: req.name,
998                    info: Some(ConnectionInfo::ConnectionParams(params)),
999                    owner: req.owner_id,
1000                };
1001                let version = self
1002                    .ddl_controller
1003                    .run_command(DdlCommand::CreateConnection(pb_connection))
1004                    .await?;
1005                Ok(Response::new(CreateConnectionResponse { version }))
1006            }
1007        }
1008    }
1009
1010    async fn list_connections(
1011        &self,
1012        _request: Request<ListConnectionsRequest>,
1013    ) -> Result<Response<ListConnectionsResponse>, Status> {
1014        let conns = self
1015            .metadata_manager
1016            .catalog_controller
1017            .list_connections()
1018            .await?;
1019
1020        Ok(Response::new(ListConnectionsResponse {
1021            connections: conns,
1022        }))
1023    }
1024
1025    async fn drop_connection(
1026        &self,
1027        request: Request<DropConnectionRequest>,
1028    ) -> Result<Response<DropConnectionResponse>, Status> {
1029        let req = request.into_inner();
1030        let drop_mode = DropMode::from_request_setting(req.cascade);
1031
1032        let version = self
1033            .ddl_controller
1034            .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
1035            .await?;
1036
1037        Ok(Response::new(DropConnectionResponse {
1038            status: None,
1039            version,
1040        }))
1041    }
1042
1043    async fn comment_on(
1044        &self,
1045        request: Request<CommentOnRequest>,
1046    ) -> Result<Response<CommentOnResponse>, Status> {
1047        let req = request.into_inner();
1048        let comment = req.get_comment()?.clone();
1049
1050        let version = self
1051            .ddl_controller
1052            .run_command(DdlCommand::CommentOn(Comment {
1053                table_id: comment.table_id,
1054                schema_id: comment.schema_id,
1055                database_id: comment.database_id,
1056                column_index: comment.column_index,
1057                description: comment.description,
1058            }))
1059            .await?;
1060
1061        Ok(Response::new(CommentOnResponse {
1062            status: None,
1063            version,
1064        }))
1065    }
1066
1067    async fn get_tables(
1068        &self,
1069        request: Request<GetTablesRequest>,
1070    ) -> Result<Response<GetTablesResponse>, Status> {
1071        let GetTablesRequest {
1072            table_ids,
1073            include_dropped_tables,
1074        } = request.into_inner();
1075        let ret = self
1076            .metadata_manager
1077            .catalog_controller
1078            .get_table_by_ids(table_ids, include_dropped_tables)
1079            .await?;
1080
1081        let mut tables = HashMap::default();
1082        for table in ret {
1083            tables.insert(table.id, table);
1084        }
1085        Ok(Response::new(GetTablesResponse { tables }))
1086    }
1087
1088    async fn wait(&self, request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
1089        let req = request.into_inner();
1090        let version = self.ddl_controller.wait(req.job_id).await?;
1091        Ok(Response::new(WaitResponse {
1092            version: Some(version),
1093        }))
1094    }
1095
1096    async fn alter_cdc_table_backfill_parallelism(
1097        &self,
1098        request: Request<AlterCdcTableBackfillParallelismRequest>,
1099    ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1100        let req = request.into_inner();
1101        let job_id = req.get_table_id();
1102        let parallelism = *req.get_parallelism()?;
1103
1104        let table_parallelism = ModelTableParallelism::from(parallelism);
1105        let streaming_parallelism = match table_parallelism {
1106            ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1107            _ => bail_invalid_parameter!(
1108                "CDC table backfill parallelism must be set to a fixed value"
1109            ),
1110        };
1111
1112        self.ddl_controller
1113            .reschedule_cdc_table_backfill(
1114                job_id,
1115                ReschedulePolicy::Parallelism(ParallelismPolicy {
1116                    parallelism: streaming_parallelism,
1117                    adaptive_parallelism_strategy: None,
1118                }),
1119            )
1120            .await?;
1121        Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1122    }
1123
1124    async fn alter_parallelism(
1125        &self,
1126        request: Request<AlterParallelismRequest>,
1127    ) -> Result<Response<AlterParallelismResponse>, Status> {
1128        let req = request.into_inner();
1129
1130        let job_id = req.get_table_id();
1131        let parallelism = *req.get_parallelism()?;
1132        let deferred = req.get_deferred();
1133
1134        let adaptive_parallelism_strategy = req.adaptive_parallelism_strategy;
1135        let (parallelism, adaptive_parallelism_strategy) = match parallelism.get_parallelism()? {
1136            Parallelism::Fixed(FixedParallelism { parallelism }) => {
1137                (StreamingParallelism::Fixed(*parallelism as _), None)
1138            }
1139            Parallelism::Auto(_) | Parallelism::Adaptive(_) => (
1140                StreamingParallelism::Adaptive,
1141                Some(adaptive_parallelism_strategy.unwrap_or_else(|| "AUTO".to_owned())),
1142            ),
1143            Parallelism::Custom(_) => (
1144                StreamingParallelism::Adaptive,
1145                Some(adaptive_parallelism_strategy.ok_or_else(|| {
1146                    Status::invalid_argument(
1147                        "adaptive_parallelism_strategy is required for custom parallelism",
1148                    )
1149                })?),
1150            ),
1151        };
1152
1153        if let Some(strategy) = adaptive_parallelism_strategy.as_deref() {
1154            parse_strategy(strategy).map_err(|e| {
1155                Status::invalid_argument(format!(
1156                    "invalid adaptive parallelism strategy: {}",
1157                    e.as_report()
1158                ))
1159            })?;
1160        };
1161
1162        self.ddl_controller
1163            .reschedule_streaming_job(
1164                job_id,
1165                ReschedulePolicy::Parallelism(ParallelismPolicy {
1166                    parallelism,
1167                    adaptive_parallelism_strategy,
1168                }),
1169                deferred,
1170            )
1171            .await?;
1172
1173        Ok(Response::new(AlterParallelismResponse {}))
1174    }
1175
1176    async fn alter_backfill_parallelism(
1177        &self,
1178        request: Request<AlterBackfillParallelismRequest>,
1179    ) -> Result<Response<AlterBackfillParallelismResponse>, Status> {
1180        let req = request.into_inner();
1181
1182        let job_id = req.get_table_id();
1183        let deferred = req.get_deferred();
1184        let adaptive_parallelism_strategy = req.adaptive_parallelism_strategy;
1185
1186        let parallelism = match req.parallelism {
1187            None => None,
1188            Some(parallelism) => {
1189                let (parallelism, adaptive_parallelism_strategy) = match parallelism
1190                    .get_parallelism()?
1191                {
1192                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1193                        (StreamingParallelism::Fixed(*parallelism as _), None)
1194                    }
1195                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => (
1196                        StreamingParallelism::Adaptive,
1197                        Some(adaptive_parallelism_strategy.unwrap_or_else(|| "AUTO".to_owned())),
1198                    ),
1199                    Parallelism::Custom(_) => (
1200                        StreamingParallelism::Adaptive,
1201                        Some(adaptive_parallelism_strategy.ok_or_else(|| {
1202                            Status::invalid_argument(
1203                                "adaptive_parallelism_strategy is required for custom parallelism",
1204                            )
1205                        })?),
1206                    ),
1207                };
1208
1209                if let Some(strategy) = adaptive_parallelism_strategy.as_deref() {
1210                    parse_strategy(strategy).map_err(|e| {
1211                        Status::invalid_argument(format!(
1212                            "invalid adaptive parallelism strategy: {}",
1213                            e.as_report()
1214                        ))
1215                    })?;
1216                };
1217
1218                Some(ParallelismPolicy {
1219                    parallelism,
1220                    adaptive_parallelism_strategy,
1221                })
1222            }
1223        };
1224
1225        self.ddl_controller
1226            .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
1227            .await?;
1228
1229        Ok(Response::new(AlterBackfillParallelismResponse {}))
1230    }
1231
1232    async fn alter_fragment_parallelism(
1233        &self,
1234        request: Request<AlterFragmentParallelismRequest>,
1235    ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1236        let req = request.into_inner();
1237
1238        let fragment_ids = req.fragment_ids;
1239        if fragment_ids.is_empty() {
1240            return Err(Status::invalid_argument(
1241                "at least one fragment id must be provided",
1242            ));
1243        }
1244
1245        let parallelism = match req.parallelism {
1246            Some(parallelism) => {
1247                let streaming_parallelism = match parallelism.get_parallelism()? {
1248                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1249                        StreamingParallelism::Fixed(*parallelism as _)
1250                    }
1251                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1252                        StreamingParallelism::Adaptive
1253                    }
1254                    _ => bail_unavailable!(),
1255                };
1256                Some(streaming_parallelism)
1257            }
1258            None => None,
1259        };
1260
1261        let fragment_targets = fragment_ids
1262            .into_iter()
1263            .map(|fragment_id| (fragment_id, parallelism.clone()))
1264            .collect();
1265
1266        self.ddl_controller
1267            .reschedule_fragments(fragment_targets)
1268            .await?;
1269
1270        Ok(Response::new(AlterFragmentParallelismResponse {}))
1271    }
1272
1273    async fn alter_streaming_job_config(
1274        &self,
1275        request: Request<AlterStreamingJobConfigRequest>,
1276    ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1277        let AlterStreamingJobConfigRequest {
1278            job_id,
1279            entries_to_add,
1280            keys_to_remove,
1281        } = request.into_inner();
1282
1283        self.ddl_controller
1284            .run_command(DdlCommand::AlterStreamingJobConfig(
1285                job_id,
1286                entries_to_add,
1287                keys_to_remove,
1288            ))
1289            .await?;
1290
1291        Ok(Response::new(AlterStreamingJobConfigResponse {}))
1292    }
1293
1294    /// Auto schema change for cdc sources,
1295    /// called by the source parser when a schema change is detected.
1296    async fn auto_schema_change(
1297        &self,
1298        request: Request<AutoSchemaChangeRequest>,
1299    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1300        let req = request.into_inner();
1301
1302        // randomly select a frontend worker to get the replace table plan
1303        let workers = self
1304            .metadata_manager
1305            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1306            .await?;
1307        let worker = workers
1308            .choose(&mut thread_rng())
1309            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1310
1311        let client = self
1312            .env
1313            .frontend_client_pool()
1314            .get(worker)
1315            .await
1316            .map_err(MetaError::from)?;
1317
1318        let Some(schema_change) = req.schema_change else {
1319            return Err(Status::invalid_argument(
1320                "schema change message is required",
1321            ));
1322        };
1323
1324        for table_change in schema_change.table_changes {
1325            for c in &table_change.columns {
1326                let c = ColumnCatalog::from(c.clone());
1327
1328                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1329                    tracing::warn!(target: "auto_schema_change",
1330                      cdc_table_id = table_change.cdc_table_id,
1331                    upstraem_ddl = table_change.upstream_ddl,
1332                        "invalid column type from cdc table change");
1333                    Err(Status::invalid_argument(format!(
1334                        "invalid column type: {} from cdc table change, column: {:?}",
1335                        column_type, c
1336                    )))
1337                };
1338                if c.is_generated() {
1339                    return invalid_col_type("generated column", &c);
1340                }
1341                if c.is_rw_sys_column() {
1342                    return invalid_col_type("rw system column", &c);
1343                }
1344                if c.is_hidden {
1345                    return invalid_col_type("hidden column", &c);
1346                }
1347            }
1348
1349            // get the table catalog corresponding to the cdc table
1350            let tables: Vec<Table> = self
1351                .metadata_manager
1352                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1353                .await?;
1354
1355            for table in tables {
1356                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
1357                // is a subset of the other.
1358                let original_columns: HashSet<(String, DataType)> =
1359                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
1360                        let col = ColumnCatalog::from(col.clone());
1361                        if col.is_generated() || col.is_hidden() {
1362                            None
1363                        } else {
1364                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1365                        }
1366                    }));
1367
1368                let mut new_columns: HashSet<(String, DataType)> =
1369                    HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1370                        let col = ColumnCatalog::from(col.clone());
1371                        if col.is_generated() || col.is_hidden() {
1372                            None
1373                        } else {
1374                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1375                        }
1376                    }));
1377
1378                // For subset/superset check, we need to add visible connector additional columns defined by INCLUDE in the original table to new_columns
1379                // This includes both _rw columns and user-defined INCLUDE columns (e.g., INCLUDE TIMESTAMP AS xxx)
1380                for col in &table.columns {
1381                    let col = ColumnCatalog::from(col.clone());
1382                    if col.is_connector_additional_column()
1383                        && !col.is_hidden()
1384                        && !col.is_generated()
1385                    {
1386                        new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1387                    }
1388                }
1389
1390                if !(original_columns.is_subset(&new_columns)
1391                    || original_columns.is_superset(&new_columns))
1392                {
1393                    tracing::warn!(target: "auto_schema_change",
1394                                    table_id = %table.id,
1395                                    cdc_table_id = table.cdc_table_id,
1396                                    upstraem_ddl = table_change.upstream_ddl,
1397                                    original_columns = ?original_columns,
1398                                    new_columns = ?new_columns,
1399                                    "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1400
1401                    let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1402                    add_auto_schema_change_fail_event_log(
1403                        &self.meta_metrics,
1404                        table.id,
1405                        table.name.clone(),
1406                        table_change.cdc_table_id.clone(),
1407                        table_change.upstream_ddl.clone(),
1408                        &self.env.event_log_manager_ref(),
1409                        fail_info,
1410                    );
1411
1412                    return Err(Status::invalid_argument(
1413                        "New columns should be a subset or superset of the original columns (including hidden columns)",
1414                    ));
1415                }
1416                // skip the schema change if there is no change to original columns
1417                if original_columns == new_columns {
1418                    tracing::warn!(target: "auto_schema_change",
1419                                   table_id = %table.id,
1420                                   cdc_table_id = table.cdc_table_id,
1421                                   upstraem_ddl = table_change.upstream_ddl,
1422                                    original_columns = ?original_columns,
1423                                    new_columns = ?new_columns,
1424                                   "No change to columns, skipping the schema change");
1425                    continue;
1426                }
1427
1428                let latency_timer = self
1429                    .meta_metrics
1430                    .auto_schema_change_latency
1431                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1432                    .start_timer();
1433                // send a request to the frontend to get the ReplaceJobPlan
1434                // will retry with exponential backoff if the request fails
1435                let resp = client
1436                    .get_table_replace_plan(GetTableReplacePlanRequest {
1437                        database_id: table.database_id,
1438                        table_id: table.id,
1439                        cdc_table_change: Some(table_change.clone()),
1440                    })
1441                    .await;
1442
1443                match resp {
1444                    Ok(resp) => {
1445                        let resp = resp.into_inner();
1446                        if let Some(plan) = resp.replace_plan {
1447                            let plan = Self::extract_replace_table_info(plan);
1448                            plan.streaming_job.table().inspect(|t| {
1449                                tracing::info!(
1450                                    target: "auto_schema_change",
1451                                    table_id = %t.id,
1452                                    cdc_table_id = t.cdc_table_id,
1453                                    upstraem_ddl = table_change.upstream_ddl,
1454                                    "Start the replace config change")
1455                            });
1456                            // start the schema change procedure
1457                            let replace_res = self
1458                                .ddl_controller
1459                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1460                                .await;
1461
1462                            match replace_res {
1463                                Ok(_) => {
1464                                    tracing::info!(
1465                                        target: "auto_schema_change",
1466                                        table_id = %table.id,
1467                                        cdc_table_id = table.cdc_table_id,
1468                                        "Table replaced success");
1469
1470                                    self.meta_metrics
1471                                        .auto_schema_change_success_cnt
1472                                        .with_guarded_label_values(&[
1473                                            &table.id.to_string(),
1474                                            &table.name,
1475                                        ])
1476                                        .inc();
1477                                    latency_timer.observe_duration();
1478                                }
1479                                Err(e) => {
1480                                    tracing::error!(
1481                                        target: "auto_schema_change",
1482                                        error = %e.as_report(),
1483                                        table_id = %table.id,
1484                                        cdc_table_id = table.cdc_table_id,
1485                                        upstraem_ddl = table_change.upstream_ddl,
1486                                        "failed to replace the table",
1487                                    );
1488                                    let fail_info =
1489                                        format!("failed to replace the table: {}", e.as_report());
1490                                    add_auto_schema_change_fail_event_log(
1491                                        &self.meta_metrics,
1492                                        table.id,
1493                                        table.name.clone(),
1494                                        table_change.cdc_table_id.clone(),
1495                                        table_change.upstream_ddl.clone(),
1496                                        &self.env.event_log_manager_ref(),
1497                                        fail_info,
1498                                    );
1499                                }
1500                            };
1501                        }
1502                    }
1503                    Err(e) => {
1504                        tracing::error!(
1505                            target: "auto_schema_change",
1506                            error = %e.as_report(),
1507                            table_id = %table.id,
1508                            cdc_table_id = table.cdc_table_id,
1509                            "failed to get replace table plan",
1510                        );
1511                        let fail_info =
1512                            format!("failed to get replace table plan: {}", e.as_report());
1513                        add_auto_schema_change_fail_event_log(
1514                            &self.meta_metrics,
1515                            table.id,
1516                            table.name.clone(),
1517                            table_change.cdc_table_id.clone(),
1518                            table_change.upstream_ddl.clone(),
1519                            &self.env.event_log_manager_ref(),
1520                            fail_info,
1521                        );
1522                    }
1523                };
1524            }
1525        }
1526
1527        Ok(Response::new(AutoSchemaChangeResponse {}))
1528    }
1529
1530    async fn alter_swap_rename(
1531        &self,
1532        request: Request<AlterSwapRenameRequest>,
1533    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1534        let req = request.into_inner();
1535
1536        let version = self
1537            .ddl_controller
1538            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1539            .await?;
1540
1541        Ok(Response::new(AlterSwapRenameResponse {
1542            status: None,
1543            version,
1544        }))
1545    }
1546
1547    async fn alter_resource_group(
1548        &self,
1549        request: Request<AlterResourceGroupRequest>,
1550    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1551        let req = request.into_inner();
1552
1553        let job_id = req.get_job_id();
1554        let deferred = req.get_deferred();
1555        let resource_group = req.resource_group;
1556
1557        self.ddl_controller
1558            .reschedule_streaming_job(
1559                job_id,
1560                ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1561                deferred,
1562            )
1563            .await?;
1564
1565        Ok(Response::new(AlterResourceGroupResponse {}))
1566    }
1567
1568    async fn alter_database_resource_group(
1569        &self,
1570        request: Request<AlterDatabaseResourceGroupRequest>,
1571    ) -> Result<Response<AlterDatabaseResourceGroupResponse>, Status> {
1572        let req = request.into_inner();
1573
1574        let version = self
1575            .ddl_controller
1576            .run_command(DdlCommand::AlterDatabaseResourceGroup(
1577                req.database_id,
1578                req.resource_group,
1579                req.deferred,
1580            ))
1581            .await?;
1582
1583        Ok(Response::new(AlterDatabaseResourceGroupResponse {
1584            status: None,
1585            version,
1586        }))
1587    }
1588
1589    async fn alter_database_param(
1590        &self,
1591        request: Request<AlterDatabaseParamRequest>,
1592    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1593        let req = request.into_inner();
1594        let database_id = req.database_id;
1595
1596        let param = match req.param.unwrap() {
1597            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1598                AlterDatabaseParam::BarrierIntervalMs(value.value)
1599            }
1600            alter_database_param_request::Param::CheckpointFrequency(value) => {
1601                AlterDatabaseParam::CheckpointFrequency(value.value)
1602            }
1603        };
1604        let version = self
1605            .ddl_controller
1606            .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1607            .await?;
1608
1609        return Ok(Response::new(AlterDatabaseParamResponse {
1610            status: None,
1611            version,
1612        }));
1613    }
1614
1615    async fn compact_iceberg_table(
1616        &self,
1617        request: Request<CompactIcebergTableRequest>,
1618    ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1619        let req = request.into_inner();
1620        let sink_id = req.sink_id;
1621
1622        // Trigger manual compaction directly using the sink ID
1623        let task_id = self
1624            .iceberg_compaction_manager
1625            .trigger_manual_compaction(sink_id)
1626            .await
1627            .map_err(|e| {
1628                Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1629            })?;
1630
1631        Ok(Response::new(CompactIcebergTableResponse {
1632            status: None,
1633            task_id,
1634        }))
1635    }
1636
1637    async fn expire_iceberg_table_snapshots(
1638        &self,
1639        request: Request<ExpireIcebergTableSnapshotsRequest>,
1640    ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1641        let req = request.into_inner();
1642        let sink_id = req.sink_id;
1643
1644        // Trigger manual snapshot expiration directly using the sink ID
1645        self.iceberg_compaction_manager
1646            .check_and_expire_snapshots(sink_id)
1647            .await
1648            .map_err(|e| {
1649                Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1650            })?;
1651
1652        Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1653            status: None,
1654        }))
1655    }
1656
1657    async fn create_iceberg_table(
1658        &self,
1659        request: Request<CreateIcebergTableRequest>,
1660    ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1661        let req = request.into_inner();
1662        let CreateIcebergTableRequest {
1663            table_info,
1664            sink_info,
1665            iceberg_source,
1666            if_not_exists,
1667        } = req;
1668
1669        // 1. create table job
1670        let PbTableJobInfo {
1671            source,
1672            table,
1673            fragment_graph,
1674            job_type,
1675        } = table_info.unwrap();
1676        let mut table = table.unwrap();
1677        let mut fragment_graph = fragment_graph.unwrap();
1678        let database_id = table.get_database_id();
1679        let schema_id = table.get_schema_id();
1680        let table_name = table.get_name().to_owned();
1681
1682        // Mark table as background creation, so that it won't block sink creation.
1683        table.create_type = PbCreateType::Background as _;
1684
1685        // Set the source rate limit to 0 and reset it back after the iceberg sink is backfilling.
1686        let source_rate_limit = if let Some(source) = &source {
1687            for fragment in fragment_graph.fragments.values_mut() {
1688                stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1689                    if let NodeBody::Source(source_node) = node
1690                        && let Some(inner) = &mut source_node.source_inner
1691                    {
1692                        inner.rate_limit = Some(0);
1693                    }
1694                });
1695            }
1696            Some(source.rate_limit)
1697        } else {
1698            None
1699        };
1700
1701        let stream_job =
1702            StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1703        let _ = self
1704            .ddl_controller
1705            .run_command(DdlCommand::CreateStreamingJob {
1706                stream_job,
1707                fragment_graph,
1708                dependencies: HashSet::new(),
1709                resource_type: Self::default_streaming_job_resource_type(),
1710                if_not_exists,
1711                refresh_interval_sec: None,
1712            })
1713            .await?;
1714
1715        let table_catalog = self
1716            .metadata_manager
1717            .catalog_controller
1718            .get_table_catalog_by_name(database_id, schema_id, &table_name)
1719            .await?
1720            .ok_or(Status::not_found("Internal error: table not found"))?;
1721
1722        // 2. create iceberg sink job
1723        let PbSinkJobInfo {
1724            sink,
1725            fragment_graph,
1726        } = sink_info.unwrap();
1727        let mut sink = sink.unwrap();
1728
1729        // Mark sink as background creation, so that it won't block source creation.
1730        sink.create_type = PbCreateType::Background as _;
1731
1732        let enable_pk_index = sink
1733            .properties
1734            .get(ENABLE_PK_INDEX)
1735            .is_some_and(|v| v.eq_ignore_ascii_case("true"));
1736        // The internal iceberg sink is planned before the table catalog exists, so this field
1737        // still carries a placeholder table id when the request reaches meta.
1738        sink.auto_refresh_schema_from_table = if enable_pk_index {
1739            None
1740        } else {
1741            Some(table_catalog.id)
1742        };
1743
1744        let mut fragment_graph = fragment_graph.unwrap();
1745
1746        assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1747        assert!(
1748            risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1749                .is_placeholder()
1750        );
1751        fragment_graph.dependent_table_ids[0] = table_catalog.id;
1752        for fragment in fragment_graph.fragments.values_mut() {
1753            stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1754                NodeBody::StreamScan(scan) => {
1755                    scan.table_id = table_catalog.id;
1756                    if let Some(table_desc) = &mut scan.table_desc {
1757                        assert!(
1758                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1759                                .is_placeholder()
1760                        );
1761                        table_desc.table_id = table_catalog.id;
1762                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1763                    }
1764                    if let Some(table) = &mut scan.arrangement_table {
1765                        assert!(
1766                            risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1767                        );
1768                        *table = table_catalog.clone();
1769                    }
1770                }
1771                NodeBody::BatchPlan(plan) => {
1772                    if let Some(table_desc) = &mut plan.table_desc {
1773                        assert!(
1774                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1775                                .is_placeholder()
1776                        );
1777                        table_desc.table_id = table_catalog.id;
1778                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1779                    }
1780                }
1781                _ => {}
1782            });
1783        }
1784
1785        let table_id = table_catalog.id;
1786        let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1787        let stream_job = StreamingJob::Sink(sink);
1788        let res = self
1789            .ddl_controller
1790            .run_command(DdlCommand::CreateStreamingJob {
1791                stream_job,
1792                fragment_graph,
1793                dependencies,
1794                resource_type: Self::default_streaming_job_resource_type(),
1795                if_not_exists,
1796                refresh_interval_sec: None,
1797            })
1798            .await;
1799
1800        if res.is_err() {
1801            let _ = self
1802                .ddl_controller
1803                .run_command(DdlCommand::DropStreamingJob {
1804                    job_id: StreamingJobId::Table(None, table_id),
1805                    drop_mode: DropMode::Cascade,
1806                })
1807                .await
1808                .inspect_err(|err| {
1809                    tracing::error!(error = %err.as_report(),
1810                        "Failed to clean up table after iceberg sink creation failure",
1811                    );
1812                });
1813            res?;
1814        }
1815
1816        // 3. reset source rate limit back to normal after sink creation
1817        if let Some(source_rate_limit) = source_rate_limit
1818            && source_rate_limit != Some(0)
1819        {
1820            let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1821                table_catalog.optional_associated_source_id.unwrap();
1822            let (jobs, fragments) = self
1823                .metadata_manager
1824                .update_source_rate_limit_by_source_id(source_id, source_rate_limit)
1825                .await?;
1826            let throttle_config = ThrottleConfig {
1827                throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1828                rate_limit: source_rate_limit,
1829            };
1830            let _ = self
1831                .barrier_scheduler
1832                .run_command(
1833                    database_id,
1834                    Command::Throttle {
1835                        jobs,
1836                        config: fragments
1837                            .into_iter()
1838                            .map(|fragment_id| (fragment_id, throttle_config))
1839                            .collect(),
1840                    },
1841                )
1842                .await?;
1843        }
1844
1845        // 4. create iceberg source
1846        let iceberg_source = iceberg_source.unwrap();
1847        let res = self
1848            .ddl_controller
1849            .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1850            .await;
1851        if res.is_err() {
1852            let _ = self
1853                .ddl_controller
1854                .run_command(DdlCommand::DropStreamingJob {
1855                    job_id: StreamingJobId::Table(None, table_id),
1856                    drop_mode: DropMode::Cascade,
1857                })
1858                .await
1859                .inspect_err(|err| {
1860                    tracing::error!(
1861                        error = %err.as_report(),
1862                        "Failed to clean up table after iceberg source creation failure",
1863                    );
1864                });
1865        }
1866
1867        Ok(Response::new(CreateIcebergTableResponse {
1868            status: None,
1869            version: res?,
1870        }))
1871    }
1872}
1873
1874fn add_auto_schema_change_fail_event_log(
1875    meta_metrics: &Arc<MetaMetrics>,
1876    table_id: TableId,
1877    table_name: String,
1878    cdc_table_id: String,
1879    upstream_ddl: String,
1880    event_log_manager: &EventLogManagerRef,
1881    fail_info: String,
1882) {
1883    meta_metrics
1884        .auto_schema_change_failure_cnt
1885        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1886        .inc();
1887    let event = event_log::EventAutoSchemaChangeFail {
1888        table_id,
1889        table_name,
1890        cdc_table_id,
1891        upstream_ddl,
1892        fail_info,
1893    };
1894    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1895}