risingwave_meta_service/
ddl_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use rand::rng as thread_rng;
20use rand::seq::IndexedRandom;
21use replace_job_plan::{ReplaceSource, ReplaceTable};
22use risingwave_common::catalog::ColumnCatalog;
23use risingwave_common::types::DataType;
24use risingwave_common::util::column_index_mapping::ColIndexMapping;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
27use risingwave_meta::model::TableParallelism;
28use risingwave_meta::rpc::metrics::MetaMetrics;
29use risingwave_meta::stream::{JobParallelismTarget, JobRescheduleTarget, JobResourceGroupTarget};
30use risingwave_meta_model::ObjectId;
31use risingwave_pb::catalog::connection::Info as ConnectionInfo;
32use risingwave_pb::catalog::{Comment, Connection, CreateType, Secret, Table};
33use risingwave_pb::common::WorkerType;
34use risingwave_pb::common::worker_node::State;
35use risingwave_pb::ddl_service::ddl_service_server::DdlService;
36use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
37use risingwave_pb::ddl_service::*;
38use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
39use risingwave_pb::meta::event_log;
40use thiserror_ext::AsReport;
41use tonic::{Request, Response, Status};
42
43use crate::MetaError;
44use crate::barrier::BarrierManagerRef;
45use crate::manager::sink_coordination::SinkCoordinatorManager;
46use crate::manager::{MetaSrvEnv, StreamingJob};
47use crate::rpc::ddl_controller::{
48    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
49};
50use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
51
52#[derive(Clone)]
53pub struct DdlServiceImpl {
54    env: MetaSrvEnv,
55
56    metadata_manager: MetadataManager,
57    sink_manager: SinkCoordinatorManager,
58    ddl_controller: DdlController,
59    meta_metrics: Arc<MetaMetrics>,
60}
61
62impl DdlServiceImpl {
63    #[allow(clippy::too_many_arguments)]
64    pub async fn new(
65        env: MetaSrvEnv,
66        metadata_manager: MetadataManager,
67        stream_manager: GlobalStreamManagerRef,
68        source_manager: SourceManagerRef,
69        barrier_manager: BarrierManagerRef,
70        sink_manager: SinkCoordinatorManager,
71        meta_metrics: Arc<MetaMetrics>,
72    ) -> Self {
73        let ddl_controller = DdlController::new(
74            env.clone(),
75            metadata_manager.clone(),
76            stream_manager,
77            source_manager,
78            barrier_manager,
79        )
80        .await;
81        Self {
82            env,
83            metadata_manager,
84            sink_manager,
85            ddl_controller,
86            meta_metrics,
87        }
88    }
89
90    fn extract_replace_table_info(
91        ReplaceJobPlan {
92            fragment_graph,
93            table_col_index_mapping,
94            replace_job,
95        }: ReplaceJobPlan,
96    ) -> ReplaceStreamJobInfo {
97        let col_index_mapping = table_col_index_mapping
98            .as_ref()
99            .map(ColIndexMapping::from_protobuf);
100
101        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
102            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
103                table,
104                source,
105                job_type,
106            }) => StreamingJob::Table(
107                source,
108                table.unwrap(),
109                TableJobType::try_from(job_type).unwrap(),
110            ),
111            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
112                StreamingJob::Source(source.unwrap())
113            }
114        };
115
116        ReplaceStreamJobInfo {
117            streaming_job: replace_streaming_job,
118            fragment_graph: fragment_graph.unwrap(),
119            col_index_mapping,
120        }
121    }
122}
123
124#[async_trait::async_trait]
125impl DdlService for DdlServiceImpl {
126    async fn create_database(
127        &self,
128        request: Request<CreateDatabaseRequest>,
129    ) -> Result<Response<CreateDatabaseResponse>, Status> {
130        let req = request.into_inner();
131        let database = req.get_db()?.clone();
132        let version = self
133            .ddl_controller
134            .run_command(DdlCommand::CreateDatabase(database))
135            .await?;
136
137        Ok(Response::new(CreateDatabaseResponse {
138            status: None,
139            version,
140        }))
141    }
142
143    async fn drop_database(
144        &self,
145        request: Request<DropDatabaseRequest>,
146    ) -> Result<Response<DropDatabaseResponse>, Status> {
147        let req = request.into_inner();
148        let database_id = req.get_database_id();
149
150        let version = self
151            .ddl_controller
152            .run_command(DdlCommand::DropDatabase(database_id as _))
153            .await?;
154
155        Ok(Response::new(DropDatabaseResponse {
156            status: None,
157            version,
158        }))
159    }
160
161    async fn create_secret(
162        &self,
163        request: Request<CreateSecretRequest>,
164    ) -> Result<Response<CreateSecretResponse>, Status> {
165        let req = request.into_inner();
166        let pb_secret = Secret {
167            id: 0,
168            name: req.get_name().clone(),
169            database_id: req.get_database_id(),
170            value: req.get_value().clone(),
171            owner: req.get_owner_id(),
172            schema_id: req.get_schema_id(),
173        };
174        let version = self
175            .ddl_controller
176            .run_command(DdlCommand::CreateSecret(pb_secret))
177            .await?;
178
179        Ok(Response::new(CreateSecretResponse { version }))
180    }
181
182    async fn drop_secret(
183        &self,
184        request: Request<DropSecretRequest>,
185    ) -> Result<Response<DropSecretResponse>, Status> {
186        let req = request.into_inner();
187        let secret_id = req.get_secret_id();
188        let version = self
189            .ddl_controller
190            .run_command(DdlCommand::DropSecret(secret_id as _))
191            .await?;
192
193        Ok(Response::new(DropSecretResponse { version }))
194    }
195
196    async fn alter_secret(
197        &self,
198        request: Request<AlterSecretRequest>,
199    ) -> Result<Response<AlterSecretResponse>, Status> {
200        let req = request.into_inner();
201        let pb_secret = Secret {
202            id: req.get_secret_id(),
203            name: req.get_name().clone(),
204            database_id: req.get_database_id(),
205            value: req.get_value().clone(),
206            owner: req.get_owner_id(),
207            schema_id: req.get_schema_id(),
208        };
209        let version = self
210            .ddl_controller
211            .run_command(DdlCommand::AlterSecret(pb_secret))
212            .await?;
213
214        Ok(Response::new(AlterSecretResponse { version }))
215    }
216
217    async fn create_schema(
218        &self,
219        request: Request<CreateSchemaRequest>,
220    ) -> Result<Response<CreateSchemaResponse>, Status> {
221        let req = request.into_inner();
222        let schema = req.get_schema()?.clone();
223        let version = self
224            .ddl_controller
225            .run_command(DdlCommand::CreateSchema(schema))
226            .await?;
227
228        Ok(Response::new(CreateSchemaResponse {
229            status: None,
230            version,
231        }))
232    }
233
234    async fn drop_schema(
235        &self,
236        request: Request<DropSchemaRequest>,
237    ) -> Result<Response<DropSchemaResponse>, Status> {
238        let req = request.into_inner();
239        let schema_id = req.get_schema_id();
240        let drop_mode = DropMode::from_request_setting(req.cascade);
241        let version = self
242            .ddl_controller
243            .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
244            .await?;
245        Ok(Response::new(DropSchemaResponse {
246            status: None,
247            version,
248        }))
249    }
250
251    async fn create_source(
252        &self,
253        request: Request<CreateSourceRequest>,
254    ) -> Result<Response<CreateSourceResponse>, Status> {
255        let req = request.into_inner();
256        let source = req.get_source()?.clone();
257
258        match req.fragment_graph {
259            None => {
260                let version = self
261                    .ddl_controller
262                    .run_command(DdlCommand::CreateNonSharedSource(source))
263                    .await?;
264                Ok(Response::new(CreateSourceResponse {
265                    status: None,
266                    version,
267                }))
268            }
269            Some(fragment_graph) => {
270                // The id of stream job has been set above
271                let stream_job = StreamingJob::Source(source);
272                let version = self
273                    .ddl_controller
274                    .run_command(DdlCommand::CreateStreamingJob(
275                        stream_job,
276                        fragment_graph,
277                        CreateType::Foreground,
278                        None,
279                        HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbSource`
280                        None,
281                    ))
282                    .await?;
283                Ok(Response::new(CreateSourceResponse {
284                    status: None,
285                    version,
286                }))
287            }
288        }
289    }
290
291    async fn drop_source(
292        &self,
293        request: Request<DropSourceRequest>,
294    ) -> Result<Response<DropSourceResponse>, Status> {
295        let request = request.into_inner();
296        let source_id = request.source_id;
297        let drop_mode = DropMode::from_request_setting(request.cascade);
298        let version = self
299            .ddl_controller
300            .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
301            .await?;
302
303        Ok(Response::new(DropSourceResponse {
304            status: None,
305            version,
306        }))
307    }
308
309    async fn create_sink(
310        &self,
311        request: Request<CreateSinkRequest>,
312    ) -> Result<Response<CreateSinkResponse>, Status> {
313        self.env.idle_manager().record_activity();
314
315        let req = request.into_inner();
316
317        let sink = req.get_sink()?.clone();
318        let fragment_graph = req.get_fragment_graph()?.clone();
319        let affected_table_change = req
320            .get_affected_table_change()
321            .cloned()
322            .ok()
323            .map(Self::extract_replace_table_info);
324        let dependencies = req
325            .get_dependencies()
326            .iter()
327            .map(|id| *id as ObjectId)
328            .collect();
329
330        let stream_job = match &affected_table_change {
331            None => StreamingJob::Sink(sink, None),
332            Some(change) => {
333                let (source, table, _) = change
334                    .streaming_job
335                    .clone()
336                    .try_as_table()
337                    .expect("must be replace table");
338                StreamingJob::Sink(sink, Some((table, source)))
339            }
340        };
341
342        let command = DdlCommand::CreateStreamingJob(
343            stream_job,
344            fragment_graph,
345            CreateType::Foreground,
346            affected_table_change,
347            dependencies,
348            None,
349        );
350
351        let version = self.ddl_controller.run_command(command).await?;
352
353        Ok(Response::new(CreateSinkResponse {
354            status: None,
355            version,
356        }))
357    }
358
359    async fn drop_sink(
360        &self,
361        request: Request<DropSinkRequest>,
362    ) -> Result<Response<DropSinkResponse>, Status> {
363        let request = request.into_inner();
364        let sink_id = request.sink_id;
365        let drop_mode = DropMode::from_request_setting(request.cascade);
366
367        let command = DdlCommand::DropStreamingJob(
368            StreamingJobId::Sink(sink_id as _),
369            drop_mode,
370            request
371                .affected_table_change
372                .map(Self::extract_replace_table_info),
373        );
374
375        let version = self.ddl_controller.run_command(command).await?;
376
377        self.sink_manager
378            .stop_sink_coordinator(SinkId::from(sink_id))
379            .await;
380
381        Ok(Response::new(DropSinkResponse {
382            status: None,
383            version,
384        }))
385    }
386
387    async fn create_subscription(
388        &self,
389        request: Request<CreateSubscriptionRequest>,
390    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
391        self.env.idle_manager().record_activity();
392
393        let req = request.into_inner();
394
395        let subscription = req.get_subscription()?.clone();
396        let command = DdlCommand::CreateSubscription(subscription);
397
398        let version = self.ddl_controller.run_command(command).await?;
399
400        Ok(Response::new(CreateSubscriptionResponse {
401            status: None,
402            version,
403        }))
404    }
405
406    async fn drop_subscription(
407        &self,
408        request: Request<DropSubscriptionRequest>,
409    ) -> Result<Response<DropSubscriptionResponse>, Status> {
410        let request = request.into_inner();
411        let subscription_id = request.subscription_id;
412        let drop_mode = DropMode::from_request_setting(request.cascade);
413
414        let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
415
416        let version = self.ddl_controller.run_command(command).await?;
417
418        Ok(Response::new(DropSubscriptionResponse {
419            status: None,
420            version,
421        }))
422    }
423
424    async fn create_materialized_view(
425        &self,
426        request: Request<CreateMaterializedViewRequest>,
427    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
428        self.env.idle_manager().record_activity();
429
430        let req = request.into_inner();
431        let mview = req.get_materialized_view()?.clone();
432        let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
433        let specific_resource_group = req.specific_resource_group.clone();
434        let fragment_graph = req.get_fragment_graph()?.clone();
435        let dependencies = req
436            .get_dependencies()
437            .iter()
438            .map(|id| *id as ObjectId)
439            .collect();
440
441        let stream_job = StreamingJob::MaterializedView(mview);
442        let version = self
443            .ddl_controller
444            .run_command(DdlCommand::CreateStreamingJob(
445                stream_job,
446                fragment_graph,
447                create_type,
448                None,
449                dependencies,
450                specific_resource_group,
451            ))
452            .await?;
453
454        Ok(Response::new(CreateMaterializedViewResponse {
455            status: None,
456            version,
457        }))
458    }
459
460    async fn drop_materialized_view(
461        &self,
462        request: Request<DropMaterializedViewRequest>,
463    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
464        self.env.idle_manager().record_activity();
465
466        let request = request.into_inner();
467        let table_id = request.table_id;
468        let drop_mode = DropMode::from_request_setting(request.cascade);
469
470        let version = self
471            .ddl_controller
472            .run_command(DdlCommand::DropStreamingJob(
473                StreamingJobId::MaterializedView(table_id as _),
474                drop_mode,
475                None,
476            ))
477            .await?;
478
479        Ok(Response::new(DropMaterializedViewResponse {
480            status: None,
481            version,
482        }))
483    }
484
485    async fn create_index(
486        &self,
487        request: Request<CreateIndexRequest>,
488    ) -> Result<Response<CreateIndexResponse>, Status> {
489        self.env.idle_manager().record_activity();
490
491        let req = request.into_inner();
492        let index = req.get_index()?.clone();
493        let index_table = req.get_index_table()?.clone();
494        let fragment_graph = req.get_fragment_graph()?.clone();
495
496        let stream_job = StreamingJob::Index(index, index_table);
497        let version = self
498            .ddl_controller
499            .run_command(DdlCommand::CreateStreamingJob(
500                stream_job,
501                fragment_graph,
502                CreateType::Foreground,
503                None,
504                HashSet::new(),
505                None,
506            ))
507            .await?;
508
509        Ok(Response::new(CreateIndexResponse {
510            status: None,
511            version,
512        }))
513    }
514
515    async fn drop_index(
516        &self,
517        request: Request<DropIndexRequest>,
518    ) -> Result<Response<DropIndexResponse>, Status> {
519        self.env.idle_manager().record_activity();
520
521        let request = request.into_inner();
522        let index_id = request.index_id;
523        let drop_mode = DropMode::from_request_setting(request.cascade);
524        let version = self
525            .ddl_controller
526            .run_command(DdlCommand::DropStreamingJob(
527                StreamingJobId::Index(index_id as _),
528                drop_mode,
529                None,
530            ))
531            .await?;
532
533        Ok(Response::new(DropIndexResponse {
534            status: None,
535            version,
536        }))
537    }
538
539    async fn create_function(
540        &self,
541        request: Request<CreateFunctionRequest>,
542    ) -> Result<Response<CreateFunctionResponse>, Status> {
543        let req = request.into_inner();
544        let function = req.get_function()?.clone();
545
546        let version = self
547            .ddl_controller
548            .run_command(DdlCommand::CreateFunction(function))
549            .await?;
550
551        Ok(Response::new(CreateFunctionResponse {
552            status: None,
553            version,
554        }))
555    }
556
557    async fn drop_function(
558        &self,
559        request: Request<DropFunctionRequest>,
560    ) -> Result<Response<DropFunctionResponse>, Status> {
561        let request = request.into_inner();
562
563        let version = self
564            .ddl_controller
565            .run_command(DdlCommand::DropFunction(request.function_id as _))
566            .await?;
567
568        Ok(Response::new(DropFunctionResponse {
569            status: None,
570            version,
571        }))
572    }
573
574    async fn create_table(
575        &self,
576        request: Request<CreateTableRequest>,
577    ) -> Result<Response<CreateTableResponse>, Status> {
578        let request = request.into_inner();
579        let job_type = request.get_job_type().unwrap_or_default();
580        let source = request.source;
581        let mview = request.materialized_view.unwrap();
582        let fragment_graph = request.fragment_graph.unwrap();
583
584        let stream_job = StreamingJob::Table(source, mview, job_type);
585        let version = self
586            .ddl_controller
587            .run_command(DdlCommand::CreateStreamingJob(
588                stream_job,
589                fragment_graph,
590                CreateType::Foreground,
591                None,
592                HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbTable`
593                None,
594            ))
595            .await?;
596
597        Ok(Response::new(CreateTableResponse {
598            status: None,
599            version,
600        }))
601    }
602
603    async fn drop_table(
604        &self,
605        request: Request<DropTableRequest>,
606    ) -> Result<Response<DropTableResponse>, Status> {
607        let request = request.into_inner();
608        let source_id = request.source_id;
609        let table_id = request.table_id;
610
611        let drop_mode = DropMode::from_request_setting(request.cascade);
612        let version = self
613            .ddl_controller
614            .run_command(DdlCommand::DropStreamingJob(
615                StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id as _), table_id as _),
616                drop_mode,
617                None,
618            ))
619            .await?;
620
621        Ok(Response::new(DropTableResponse {
622            status: None,
623            version,
624        }))
625    }
626
627    async fn create_view(
628        &self,
629        request: Request<CreateViewRequest>,
630    ) -> Result<Response<CreateViewResponse>, Status> {
631        let req = request.into_inner();
632        let view = req.get_view()?.clone();
633
634        let version = self
635            .ddl_controller
636            .run_command(DdlCommand::CreateView(view))
637            .await?;
638
639        Ok(Response::new(CreateViewResponse {
640            status: None,
641            version,
642        }))
643    }
644
645    async fn drop_view(
646        &self,
647        request: Request<DropViewRequest>,
648    ) -> Result<Response<DropViewResponse>, Status> {
649        let request = request.into_inner();
650        let view_id = request.get_view_id();
651        let drop_mode = DropMode::from_request_setting(request.cascade);
652        let version = self
653            .ddl_controller
654            .run_command(DdlCommand::DropView(view_id as _, drop_mode))
655            .await?;
656        Ok(Response::new(DropViewResponse {
657            status: None,
658            version,
659        }))
660    }
661
662    async fn risectl_list_state_tables(
663        &self,
664        _request: Request<RisectlListStateTablesRequest>,
665    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
666        let tables = self
667            .metadata_manager
668            .catalog_controller
669            .list_all_state_tables()
670            .await?;
671        Ok(Response::new(RisectlListStateTablesResponse { tables }))
672    }
673
674    async fn replace_job_plan(
675        &self,
676        request: Request<ReplaceJobPlanRequest>,
677    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
678        let req = request.into_inner().get_plan().cloned()?;
679
680        let version = self
681            .ddl_controller
682            .run_command(DdlCommand::ReplaceStreamJob(
683                Self::extract_replace_table_info(req),
684            ))
685            .await?;
686
687        Ok(Response::new(ReplaceJobPlanResponse {
688            status: None,
689            version,
690        }))
691    }
692
693    async fn get_table(
694        &self,
695        request: Request<GetTableRequest>,
696    ) -> Result<Response<GetTableResponse>, Status> {
697        let req = request.into_inner();
698        let table = self
699            .metadata_manager
700            .catalog_controller
701            .get_table_by_name(&req.database_name, &req.table_name)
702            .await?;
703
704        Ok(Response::new(GetTableResponse { table }))
705    }
706
707    async fn alter_name(
708        &self,
709        request: Request<AlterNameRequest>,
710    ) -> Result<Response<AlterNameResponse>, Status> {
711        let AlterNameRequest { object, new_name } = request.into_inner();
712        let version = self
713            .ddl_controller
714            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
715            .await?;
716        Ok(Response::new(AlterNameResponse {
717            status: None,
718            version,
719        }))
720    }
721
722    /// Only support add column for now.
723    async fn alter_source(
724        &self,
725        request: Request<AlterSourceRequest>,
726    ) -> Result<Response<AlterSourceResponse>, Status> {
727        let AlterSourceRequest { source } = request.into_inner();
728        let version = self
729            .ddl_controller
730            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
731            .await?;
732        Ok(Response::new(AlterSourceResponse {
733            status: None,
734            version,
735        }))
736    }
737
738    async fn alter_owner(
739        &self,
740        request: Request<AlterOwnerRequest>,
741    ) -> Result<Response<AlterOwnerResponse>, Status> {
742        let AlterOwnerRequest { object, owner_id } = request.into_inner();
743        let version = self
744            .ddl_controller
745            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
746            .await?;
747        Ok(Response::new(AlterOwnerResponse {
748            status: None,
749            version,
750        }))
751    }
752
753    async fn alter_set_schema(
754        &self,
755        request: Request<AlterSetSchemaRequest>,
756    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
757        let AlterSetSchemaRequest {
758            object,
759            new_schema_id,
760        } = request.into_inner();
761        let version = self
762            .ddl_controller
763            .run_command(DdlCommand::AlterSetSchema(
764                object.unwrap(),
765                new_schema_id as _,
766            ))
767            .await?;
768        Ok(Response::new(AlterSetSchemaResponse {
769            status: None,
770            version,
771        }))
772    }
773
774    async fn get_ddl_progress(
775        &self,
776        _request: Request<GetDdlProgressRequest>,
777    ) -> Result<Response<GetDdlProgressResponse>, Status> {
778        Ok(Response::new(GetDdlProgressResponse {
779            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
780        }))
781    }
782
783    async fn create_connection(
784        &self,
785        request: Request<CreateConnectionRequest>,
786    ) -> Result<Response<CreateConnectionResponse>, Status> {
787        let req = request.into_inner();
788        if req.payload.is_none() {
789            return Err(Status::invalid_argument("request is empty"));
790        }
791
792        match req.payload.unwrap() {
793            create_connection_request::Payload::PrivateLink(_) => {
794                panic!("Private Link Connection has been deprecated")
795            }
796            create_connection_request::Payload::ConnectionParams(params) => {
797                let pb_connection = Connection {
798                    id: 0,
799                    schema_id: req.schema_id,
800                    database_id: req.database_id,
801                    name: req.name,
802                    info: Some(ConnectionInfo::ConnectionParams(params)),
803                    owner: req.owner_id,
804                };
805                let version = self
806                    .ddl_controller
807                    .run_command(DdlCommand::CreateConnection(pb_connection))
808                    .await?;
809                Ok(Response::new(CreateConnectionResponse { version }))
810            }
811        }
812    }
813
814    async fn list_connections(
815        &self,
816        _request: Request<ListConnectionsRequest>,
817    ) -> Result<Response<ListConnectionsResponse>, Status> {
818        let conns = self
819            .metadata_manager
820            .catalog_controller
821            .list_connections()
822            .await?;
823
824        Ok(Response::new(ListConnectionsResponse {
825            connections: conns,
826        }))
827    }
828
829    async fn drop_connection(
830        &self,
831        request: Request<DropConnectionRequest>,
832    ) -> Result<Response<DropConnectionResponse>, Status> {
833        let req = request.into_inner();
834
835        let version = self
836            .ddl_controller
837            .run_command(DdlCommand::DropConnection(req.connection_id as _))
838            .await?;
839
840        Ok(Response::new(DropConnectionResponse {
841            status: None,
842            version,
843        }))
844    }
845
846    async fn comment_on(
847        &self,
848        request: Request<CommentOnRequest>,
849    ) -> Result<Response<CommentOnResponse>, Status> {
850        let req = request.into_inner();
851        let comment = req.get_comment()?.clone();
852
853        let version = self
854            .ddl_controller
855            .run_command(DdlCommand::CommentOn(Comment {
856                table_id: comment.table_id,
857                schema_id: comment.schema_id,
858                database_id: comment.database_id,
859                column_index: comment.column_index,
860                description: comment.description,
861            }))
862            .await?;
863
864        Ok(Response::new(CommentOnResponse {
865            status: None,
866            version,
867        }))
868    }
869
870    #[cfg_attr(coverage, coverage(off))]
871    async fn get_tables(
872        &self,
873        request: Request<GetTablesRequest>,
874    ) -> Result<Response<GetTablesResponse>, Status> {
875        let GetTablesRequest {
876            table_ids,
877            include_dropped_tables,
878        } = request.into_inner();
879        let ret = self
880            .metadata_manager
881            .catalog_controller
882            .get_table_by_ids(
883                table_ids.into_iter().map(|id| id as _).collect(),
884                include_dropped_tables,
885            )
886            .await?;
887
888        let mut tables = HashMap::default();
889        for table in ret {
890            tables.insert(table.id, table);
891        }
892        Ok(Response::new(GetTablesResponse { tables }))
893    }
894
895    async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
896        self.ddl_controller.wait().await?;
897        Ok(Response::new(WaitResponse {}))
898    }
899
900    async fn alter_parallelism(
901        &self,
902        request: Request<AlterParallelismRequest>,
903    ) -> Result<Response<AlterParallelismResponse>, Status> {
904        let req = request.into_inner();
905
906        let job_id = req.get_table_id();
907        let parallelism = *req.get_parallelism()?;
908        let deferred = req.get_deferred();
909        self.ddl_controller
910            .reschedule_streaming_job(
911                job_id,
912                JobRescheduleTarget {
913                    parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
914                    resource_group: JobResourceGroupTarget::Keep,
915                },
916                deferred,
917            )
918            .await?;
919
920        Ok(Response::new(AlterParallelismResponse {}))
921    }
922
923    /// Auto schema change for cdc sources,
924    /// called by the source parser when a schema change is detected.
925    async fn auto_schema_change(
926        &self,
927        request: Request<AutoSchemaChangeRequest>,
928    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
929        let req = request.into_inner();
930
931        // randomly select a frontend worker to get the replace table plan
932        let workers = self
933            .metadata_manager
934            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
935            .await?;
936        let worker = workers
937            .choose(&mut thread_rng())
938            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
939
940        let client = self
941            .env
942            .frontend_client_pool()
943            .get(worker)
944            .await
945            .map_err(MetaError::from)?;
946
947        let Some(schema_change) = req.schema_change else {
948            return Err(Status::invalid_argument(
949                "schema change message is required",
950            ));
951        };
952
953        for table_change in schema_change.table_changes {
954            for c in &table_change.columns {
955                let c = ColumnCatalog::from(c.clone());
956
957                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
958                    tracing::warn!(target: "auto_schema_change",
959                      cdc_table_id = table_change.cdc_table_id,
960                    upstraem_ddl = table_change.upstream_ddl,
961                        "invalid column type from cdc table change");
962                    Err(Status::invalid_argument(format!(
963                        "invalid column type: {} from cdc table change, column: {:?}",
964                        column_type, c
965                    )))
966                };
967                if c.is_generated() {
968                    return invalid_col_type("generated column", &c);
969                }
970                if c.is_rw_sys_column() {
971                    return invalid_col_type("rw system column", &c);
972                }
973                if c.is_hidden {
974                    return invalid_col_type("hidden column", &c);
975                }
976            }
977
978            // get the table catalog corresponding to the cdc table
979            let tables: Vec<Table> = self
980                .metadata_manager
981                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
982                .await?;
983
984            for table in tables {
985                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
986                // is a subset of the other.
987                let original_columns: HashSet<(String, DataType)> =
988                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
989                        let col = ColumnCatalog::from(col.clone());
990                        let data_type = col.data_type().clone();
991                        if col.is_generated() {
992                            None
993                        } else {
994                            Some((col.column_desc.name, data_type))
995                        }
996                    }));
997                let new_columns: HashSet<(String, DataType)> =
998                    HashSet::from_iter(table_change.columns.iter().map(|col| {
999                        let col = ColumnCatalog::from(col.clone());
1000                        let data_type = col.data_type().clone();
1001                        (col.column_desc.name, data_type)
1002                    }));
1003
1004                if !(original_columns.is_subset(&new_columns)
1005                    || original_columns.is_superset(&new_columns))
1006                {
1007                    tracing::warn!(target: "auto_schema_change",
1008                                    table_id = table.id,
1009                                    cdc_table_id = table.cdc_table_id,
1010                                    upstraem_ddl = table_change.upstream_ddl,
1011                                    original_columns = ?original_columns,
1012                                    new_columns = ?new_columns,
1013                                    "New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
1014                    return Err(Status::invalid_argument(
1015                        "New columns should be a subset or superset of the original columns",
1016                    ));
1017                }
1018                // skip the schema change if there is no change to original columns
1019                if original_columns == new_columns {
1020                    tracing::warn!(target: "auto_schema_change",
1021                                   table_id = table.id,
1022                                   cdc_table_id = table.cdc_table_id,
1023                                   upstraem_ddl = table_change.upstream_ddl,
1024                                    original_columns = ?original_columns,
1025                                    new_columns = ?new_columns,
1026                                   "No change to columns, skipping the schema change");
1027                    continue;
1028                }
1029
1030                let latency_timer = self
1031                    .meta_metrics
1032                    .auto_schema_change_latency
1033                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1034                    .start_timer();
1035                // send a request to the frontend to get the ReplaceJobPlan
1036                // will retry with exponential backoff if the request fails
1037                let resp = client
1038                    .get_table_replace_plan(GetTableReplacePlanRequest {
1039                        database_id: table.database_id,
1040                        owner: table.owner,
1041                        table_name: table.name.clone(),
1042                        table_change: Some(table_change.clone()),
1043                    })
1044                    .await;
1045
1046                match resp {
1047                    Ok(resp) => {
1048                        let resp = resp.into_inner();
1049                        if let Some(plan) = resp.replace_plan {
1050                            let plan = Self::extract_replace_table_info(plan);
1051                            plan.streaming_job.table().inspect(|t| {
1052                                tracing::info!(
1053                                    target: "auto_schema_change",
1054                                    table_id = t.id,
1055                                    cdc_table_id = t.cdc_table_id,
1056                                    upstraem_ddl = table_change.upstream_ddl,
1057                                    "Start the replace config change")
1058                            });
1059                            // start the schema change procedure
1060                            let replace_res = self
1061                                .ddl_controller
1062                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1063                                .await;
1064
1065                            match replace_res {
1066                                Ok(_) => {
1067                                    tracing::info!(
1068                                        target: "auto_schema_change",
1069                                        table_id = table.id,
1070                                        cdc_table_id = table.cdc_table_id,
1071                                        "Table replaced success");
1072
1073                                    self.meta_metrics
1074                                        .auto_schema_change_success_cnt
1075                                        .with_guarded_label_values(&[
1076                                            &table.id.to_string(),
1077                                            &table.name,
1078                                        ])
1079                                        .inc();
1080                                    latency_timer.observe_duration();
1081                                }
1082                                Err(e) => {
1083                                    tracing::error!(
1084                                        target: "auto_schema_change",
1085                                        error = %e.as_report(),
1086                                        table_id = table.id,
1087                                        cdc_table_id = table.cdc_table_id,
1088                                        upstraem_ddl = table_change.upstream_ddl,
1089                                        "failed to replace the table",
1090                                    );
1091                                    add_auto_schema_change_fail_event_log(
1092                                        &self.meta_metrics,
1093                                        table.id,
1094                                        table.name.clone(),
1095                                        table_change.cdc_table_id.clone(),
1096                                        table_change.upstream_ddl.clone(),
1097                                        &self.env.event_log_manager_ref(),
1098                                    );
1099                                }
1100                            };
1101                        }
1102                    }
1103                    Err(e) => {
1104                        tracing::error!(
1105                            target: "auto_schema_change",
1106                            error = %e.as_report(),
1107                            table_id = table.id,
1108                            cdc_table_id = table.cdc_table_id,
1109                            "failed to get replace table plan",
1110                        );
1111                        add_auto_schema_change_fail_event_log(
1112                            &self.meta_metrics,
1113                            table.id,
1114                            table.name.clone(),
1115                            table_change.cdc_table_id.clone(),
1116                            table_change.upstream_ddl.clone(),
1117                            &self.env.event_log_manager_ref(),
1118                        );
1119                    }
1120                };
1121            }
1122        }
1123
1124        Ok(Response::new(AutoSchemaChangeResponse {}))
1125    }
1126
1127    async fn alter_swap_rename(
1128        &self,
1129        request: Request<AlterSwapRenameRequest>,
1130    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1131        let req = request.into_inner();
1132
1133        let version = self
1134            .ddl_controller
1135            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1136            .await?;
1137
1138        Ok(Response::new(AlterSwapRenameResponse {
1139            status: None,
1140            version,
1141        }))
1142    }
1143
1144    async fn alter_resource_group(
1145        &self,
1146        request: Request<AlterResourceGroupRequest>,
1147    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1148        let req = request.into_inner();
1149
1150        let table_id = req.get_table_id();
1151        let deferred = req.get_deferred();
1152        let resource_group = req.resource_group;
1153
1154        self.ddl_controller
1155            .reschedule_streaming_job(
1156                table_id,
1157                JobRescheduleTarget {
1158                    parallelism: JobParallelismTarget::Refresh,
1159                    resource_group: JobResourceGroupTarget::Update(resource_group),
1160                },
1161                deferred,
1162            )
1163            .await?;
1164
1165        Ok(Response::new(AlterResourceGroupResponse {}))
1166    }
1167}
1168
1169fn add_auto_schema_change_fail_event_log(
1170    meta_metrics: &Arc<MetaMetrics>,
1171    table_id: u32,
1172    table_name: String,
1173    cdc_table_id: String,
1174    upstream_ddl: String,
1175    event_log_manager: &EventLogManagerRef,
1176) {
1177    meta_metrics
1178        .auto_schema_change_failure_cnt
1179        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1180        .inc();
1181    let event = event_log::EventAutoSchemaChangeFail {
1182        table_id,
1183        table_name,
1184        cdc_table_id,
1185        upstream_ddl,
1186    };
1187    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1188}