risingwave_meta_service/
ddl_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use rand::rng as thread_rng;
20use rand::seq::IndexedRandom;
21use replace_job_plan::{ReplaceSource, ReplaceTable};
22use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
23use risingwave_common::types::DataType;
24use risingwave_connector::sink::catalog::SinkId;
25use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
26use risingwave_meta::model::TableParallelism;
27use risingwave_meta::rpc::metrics::MetaMetrics;
28use risingwave_meta::stream::{JobParallelismTarget, JobRescheduleTarget, JobResourceGroupTarget};
29use risingwave_meta_model::ObjectId;
30use risingwave_pb::catalog::connection::Info as ConnectionInfo;
31use risingwave_pb::catalog::{Comment, Connection, CreateType, Secret, Table};
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::common::worker_node::State;
34use risingwave_pb::ddl_service::ddl_service_server::DdlService;
35use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
36use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
37use risingwave_pb::ddl_service::*;
38use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
39use risingwave_pb::meta::event_log;
40use thiserror_ext::AsReport;
41use tonic::{Request, Response, Status};
42
43use crate::MetaError;
44use crate::barrier::BarrierManagerRef;
45use crate::manager::sink_coordination::SinkCoordinatorManager;
46use crate::manager::{MetaSrvEnv, StreamingJob};
47use crate::rpc::ddl_controller::{
48    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
49};
50use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
51
52#[derive(Clone)]
53pub struct DdlServiceImpl {
54    env: MetaSrvEnv,
55
56    metadata_manager: MetadataManager,
57    sink_manager: SinkCoordinatorManager,
58    ddl_controller: DdlController,
59    meta_metrics: Arc<MetaMetrics>,
60}
61
62impl DdlServiceImpl {
63    #[allow(clippy::too_many_arguments)]
64    pub async fn new(
65        env: MetaSrvEnv,
66        metadata_manager: MetadataManager,
67        stream_manager: GlobalStreamManagerRef,
68        source_manager: SourceManagerRef,
69        barrier_manager: BarrierManagerRef,
70        sink_manager: SinkCoordinatorManager,
71        meta_metrics: Arc<MetaMetrics>,
72    ) -> Self {
73        let ddl_controller = DdlController::new(
74            env.clone(),
75            metadata_manager.clone(),
76            stream_manager,
77            source_manager,
78            barrier_manager,
79        )
80        .await;
81        Self {
82            env,
83            metadata_manager,
84            sink_manager,
85            ddl_controller,
86            meta_metrics,
87        }
88    }
89
90    fn extract_replace_table_info(
91        ReplaceJobPlan {
92            fragment_graph,
93            replace_job,
94        }: ReplaceJobPlan,
95    ) -> ReplaceStreamJobInfo {
96        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
97            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
98                table,
99                source,
100                job_type,
101            }) => StreamingJob::Table(
102                source,
103                table.unwrap(),
104                TableJobType::try_from(job_type).unwrap(),
105            ),
106            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
107                StreamingJob::Source(source.unwrap())
108            }
109            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
110                table,
111            }) => StreamingJob::MaterializedView(table.unwrap()),
112        };
113
114        ReplaceStreamJobInfo {
115            streaming_job: replace_streaming_job,
116            fragment_graph: fragment_graph.unwrap(),
117        }
118    }
119}
120
121#[async_trait::async_trait]
122impl DdlService for DdlServiceImpl {
123    async fn create_database(
124        &self,
125        request: Request<CreateDatabaseRequest>,
126    ) -> Result<Response<CreateDatabaseResponse>, Status> {
127        let req = request.into_inner();
128        let database = req.get_db()?.clone();
129        let version = self
130            .ddl_controller
131            .run_command(DdlCommand::CreateDatabase(database))
132            .await?;
133
134        Ok(Response::new(CreateDatabaseResponse {
135            status: None,
136            version,
137        }))
138    }
139
140    async fn drop_database(
141        &self,
142        request: Request<DropDatabaseRequest>,
143    ) -> Result<Response<DropDatabaseResponse>, Status> {
144        let req = request.into_inner();
145        let database_id = req.get_database_id();
146
147        let version = self
148            .ddl_controller
149            .run_command(DdlCommand::DropDatabase(database_id as _))
150            .await?;
151
152        Ok(Response::new(DropDatabaseResponse {
153            status: None,
154            version,
155        }))
156    }
157
158    async fn create_secret(
159        &self,
160        request: Request<CreateSecretRequest>,
161    ) -> Result<Response<CreateSecretResponse>, Status> {
162        let req = request.into_inner();
163        let pb_secret = Secret {
164            id: 0,
165            name: req.get_name().clone(),
166            database_id: req.get_database_id(),
167            value: req.get_value().clone(),
168            owner: req.get_owner_id(),
169            schema_id: req.get_schema_id(),
170        };
171        let version = self
172            .ddl_controller
173            .run_command(DdlCommand::CreateSecret(pb_secret))
174            .await?;
175
176        Ok(Response::new(CreateSecretResponse { version }))
177    }
178
179    async fn drop_secret(
180        &self,
181        request: Request<DropSecretRequest>,
182    ) -> Result<Response<DropSecretResponse>, Status> {
183        let req = request.into_inner();
184        let secret_id = req.get_secret_id();
185        let version = self
186            .ddl_controller
187            .run_command(DdlCommand::DropSecret(secret_id as _))
188            .await?;
189
190        Ok(Response::new(DropSecretResponse { version }))
191    }
192
193    async fn alter_secret(
194        &self,
195        request: Request<AlterSecretRequest>,
196    ) -> Result<Response<AlterSecretResponse>, Status> {
197        let req = request.into_inner();
198        let pb_secret = Secret {
199            id: req.get_secret_id(),
200            name: req.get_name().clone(),
201            database_id: req.get_database_id(),
202            value: req.get_value().clone(),
203            owner: req.get_owner_id(),
204            schema_id: req.get_schema_id(),
205        };
206        let version = self
207            .ddl_controller
208            .run_command(DdlCommand::AlterSecret(pb_secret))
209            .await?;
210
211        Ok(Response::new(AlterSecretResponse { version }))
212    }
213
214    async fn create_schema(
215        &self,
216        request: Request<CreateSchemaRequest>,
217    ) -> Result<Response<CreateSchemaResponse>, Status> {
218        let req = request.into_inner();
219        let schema = req.get_schema()?.clone();
220        let version = self
221            .ddl_controller
222            .run_command(DdlCommand::CreateSchema(schema))
223            .await?;
224
225        Ok(Response::new(CreateSchemaResponse {
226            status: None,
227            version,
228        }))
229    }
230
231    async fn drop_schema(
232        &self,
233        request: Request<DropSchemaRequest>,
234    ) -> Result<Response<DropSchemaResponse>, Status> {
235        let req = request.into_inner();
236        let schema_id = req.get_schema_id();
237        let drop_mode = DropMode::from_request_setting(req.cascade);
238        let version = self
239            .ddl_controller
240            .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
241            .await?;
242        Ok(Response::new(DropSchemaResponse {
243            status: None,
244            version,
245        }))
246    }
247
248    async fn create_source(
249        &self,
250        request: Request<CreateSourceRequest>,
251    ) -> Result<Response<CreateSourceResponse>, Status> {
252        let req = request.into_inner();
253        let source = req.get_source()?.clone();
254
255        match req.fragment_graph {
256            None => {
257                let version = self
258                    .ddl_controller
259                    .run_command(DdlCommand::CreateNonSharedSource(source))
260                    .await?;
261                Ok(Response::new(CreateSourceResponse {
262                    status: None,
263                    version,
264                }))
265            }
266            Some(fragment_graph) => {
267                // The id of stream job has been set above
268                let stream_job = StreamingJob::Source(source);
269                let version = self
270                    .ddl_controller
271                    .run_command(DdlCommand::CreateStreamingJob {
272                        stream_job,
273                        fragment_graph,
274                        create_type: CreateType::Foreground,
275                        affected_table_replace_info: None,
276                        dependencies: HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbSource`
277                        specific_resource_group: None,
278                        if_not_exists: req.if_not_exists,
279                    })
280                    .await?;
281                Ok(Response::new(CreateSourceResponse {
282                    status: None,
283                    version,
284                }))
285            }
286        }
287    }
288
289    async fn drop_source(
290        &self,
291        request: Request<DropSourceRequest>,
292    ) -> Result<Response<DropSourceResponse>, Status> {
293        let request = request.into_inner();
294        let source_id = request.source_id;
295        let drop_mode = DropMode::from_request_setting(request.cascade);
296        let version = self
297            .ddl_controller
298            .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
299            .await?;
300
301        Ok(Response::new(DropSourceResponse {
302            status: None,
303            version,
304        }))
305    }
306
307    async fn create_sink(
308        &self,
309        request: Request<CreateSinkRequest>,
310    ) -> Result<Response<CreateSinkResponse>, Status> {
311        self.env.idle_manager().record_activity();
312
313        let req = request.into_inner();
314
315        let sink = req.get_sink()?.clone();
316        let fragment_graph = req.get_fragment_graph()?.clone();
317        let affected_table_change = req
318            .get_affected_table_change()
319            .cloned()
320            .ok()
321            .map(Self::extract_replace_table_info);
322        let dependencies = req
323            .get_dependencies()
324            .iter()
325            .map(|id| *id as ObjectId)
326            .collect();
327
328        let stream_job = match &affected_table_change {
329            None => StreamingJob::Sink(sink, None),
330            Some(change) => {
331                let (source, table, _) = change
332                    .streaming_job
333                    .clone()
334                    .try_as_table()
335                    .expect("must be replace table");
336                StreamingJob::Sink(sink, Some((table, source)))
337            }
338        };
339
340        let command = DdlCommand::CreateStreamingJob {
341            stream_job,
342            fragment_graph,
343            create_type: CreateType::Foreground,
344            affected_table_replace_info: affected_table_change,
345            dependencies,
346            specific_resource_group: None,
347            if_not_exists: req.if_not_exists,
348        };
349
350        let version = self.ddl_controller.run_command(command).await?;
351
352        Ok(Response::new(CreateSinkResponse {
353            status: None,
354            version,
355        }))
356    }
357
358    async fn drop_sink(
359        &self,
360        request: Request<DropSinkRequest>,
361    ) -> Result<Response<DropSinkResponse>, Status> {
362        let request = request.into_inner();
363        let sink_id = request.sink_id;
364        let drop_mode = DropMode::from_request_setting(request.cascade);
365
366        let command = DdlCommand::DropStreamingJob {
367            job_id: StreamingJobId::Sink(sink_id as _),
368            drop_mode,
369            target_replace_info: request
370                .affected_table_change
371                .map(Self::extract_replace_table_info),
372        };
373
374        let version = self.ddl_controller.run_command(command).await?;
375
376        self.sink_manager
377            .stop_sink_coordinator(SinkId::from(sink_id))
378            .await;
379
380        Ok(Response::new(DropSinkResponse {
381            status: None,
382            version,
383        }))
384    }
385
386    async fn create_subscription(
387        &self,
388        request: Request<CreateSubscriptionRequest>,
389    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
390        self.env.idle_manager().record_activity();
391
392        let req = request.into_inner();
393
394        let subscription = req.get_subscription()?.clone();
395        let command = DdlCommand::CreateSubscription(subscription);
396
397        let version = self.ddl_controller.run_command(command).await?;
398
399        Ok(Response::new(CreateSubscriptionResponse {
400            status: None,
401            version,
402        }))
403    }
404
405    async fn drop_subscription(
406        &self,
407        request: Request<DropSubscriptionRequest>,
408    ) -> Result<Response<DropSubscriptionResponse>, Status> {
409        let request = request.into_inner();
410        let subscription_id = request.subscription_id;
411        let drop_mode = DropMode::from_request_setting(request.cascade);
412
413        let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
414
415        let version = self.ddl_controller.run_command(command).await?;
416
417        Ok(Response::new(DropSubscriptionResponse {
418            status: None,
419            version,
420        }))
421    }
422
423    async fn create_materialized_view(
424        &self,
425        request: Request<CreateMaterializedViewRequest>,
426    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
427        self.env.idle_manager().record_activity();
428
429        let req = request.into_inner();
430        let mview = req.get_materialized_view()?.clone();
431        let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
432        let specific_resource_group = req.specific_resource_group.clone();
433        let fragment_graph = req.get_fragment_graph()?.clone();
434        let dependencies = req
435            .get_dependencies()
436            .iter()
437            .map(|id| *id as ObjectId)
438            .collect();
439
440        let stream_job = StreamingJob::MaterializedView(mview);
441        let version = self
442            .ddl_controller
443            .run_command(DdlCommand::CreateStreamingJob {
444                stream_job,
445                fragment_graph,
446                create_type,
447                affected_table_replace_info: None,
448                dependencies,
449                specific_resource_group,
450                if_not_exists: req.if_not_exists,
451            })
452            .await?;
453
454        Ok(Response::new(CreateMaterializedViewResponse {
455            status: None,
456            version,
457        }))
458    }
459
460    async fn drop_materialized_view(
461        &self,
462        request: Request<DropMaterializedViewRequest>,
463    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
464        self.env.idle_manager().record_activity();
465
466        let request = request.into_inner();
467        let table_id = request.table_id;
468        let drop_mode = DropMode::from_request_setting(request.cascade);
469
470        let version = self
471            .ddl_controller
472            .run_command(DdlCommand::DropStreamingJob {
473                job_id: StreamingJobId::MaterializedView(table_id as _),
474                drop_mode,
475                target_replace_info: None,
476            })
477            .await?;
478
479        Ok(Response::new(DropMaterializedViewResponse {
480            status: None,
481            version,
482        }))
483    }
484
485    async fn create_index(
486        &self,
487        request: Request<CreateIndexRequest>,
488    ) -> Result<Response<CreateIndexResponse>, Status> {
489        self.env.idle_manager().record_activity();
490
491        let req = request.into_inner();
492        let index = req.get_index()?.clone();
493        let index_table = req.get_index_table()?.clone();
494        let fragment_graph = req.get_fragment_graph()?.clone();
495
496        let stream_job = StreamingJob::Index(index, index_table);
497        let version = self
498            .ddl_controller
499            .run_command(DdlCommand::CreateStreamingJob {
500                stream_job,
501                fragment_graph,
502                create_type: CreateType::Foreground,
503                affected_table_replace_info: None,
504                dependencies: HashSet::new(),
505                specific_resource_group: None,
506                if_not_exists: req.if_not_exists,
507            })
508            .await?;
509
510        Ok(Response::new(CreateIndexResponse {
511            status: None,
512            version,
513        }))
514    }
515
516    async fn drop_index(
517        &self,
518        request: Request<DropIndexRequest>,
519    ) -> Result<Response<DropIndexResponse>, Status> {
520        self.env.idle_manager().record_activity();
521
522        let request = request.into_inner();
523        let index_id = request.index_id;
524        let drop_mode = DropMode::from_request_setting(request.cascade);
525        let version = self
526            .ddl_controller
527            .run_command(DdlCommand::DropStreamingJob {
528                job_id: StreamingJobId::Index(index_id as _),
529                drop_mode,
530                target_replace_info: None,
531            })
532            .await?;
533
534        Ok(Response::new(DropIndexResponse {
535            status: None,
536            version,
537        }))
538    }
539
540    async fn create_function(
541        &self,
542        request: Request<CreateFunctionRequest>,
543    ) -> Result<Response<CreateFunctionResponse>, Status> {
544        let req = request.into_inner();
545        let function = req.get_function()?.clone();
546
547        let version = self
548            .ddl_controller
549            .run_command(DdlCommand::CreateFunction(function))
550            .await?;
551
552        Ok(Response::new(CreateFunctionResponse {
553            status: None,
554            version,
555        }))
556    }
557
558    async fn drop_function(
559        &self,
560        request: Request<DropFunctionRequest>,
561    ) -> Result<Response<DropFunctionResponse>, Status> {
562        let request = request.into_inner();
563
564        let version = self
565            .ddl_controller
566            .run_command(DdlCommand::DropFunction(request.function_id as _))
567            .await?;
568
569        Ok(Response::new(DropFunctionResponse {
570            status: None,
571            version,
572        }))
573    }
574
575    async fn create_table(
576        &self,
577        request: Request<CreateTableRequest>,
578    ) -> Result<Response<CreateTableResponse>, Status> {
579        let request = request.into_inner();
580        let job_type = request.get_job_type().unwrap_or_default();
581        let source = request.source;
582        let mview = request.materialized_view.unwrap();
583        let fragment_graph = request.fragment_graph.unwrap();
584
585        let stream_job = StreamingJob::Table(source, mview, job_type);
586        let version = self
587            .ddl_controller
588            .run_command(DdlCommand::CreateStreamingJob {
589                stream_job,
590                fragment_graph,
591                create_type: CreateType::Foreground,
592                affected_table_replace_info: None,
593                dependencies: HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbTable`
594                specific_resource_group: None,
595                if_not_exists: request.if_not_exists,
596            })
597            .await?;
598
599        Ok(Response::new(CreateTableResponse {
600            status: None,
601            version,
602        }))
603    }
604
605    async fn drop_table(
606        &self,
607        request: Request<DropTableRequest>,
608    ) -> Result<Response<DropTableResponse>, Status> {
609        let request = request.into_inner();
610        let source_id = request.source_id;
611        let table_id = request.table_id;
612
613        let drop_mode = DropMode::from_request_setting(request.cascade);
614        let version = self
615            .ddl_controller
616            .run_command(DdlCommand::DropStreamingJob {
617                job_id: StreamingJobId::Table(
618                    source_id.map(|PbSourceId::Id(id)| id as _),
619                    table_id as _,
620                ),
621                drop_mode,
622                target_replace_info: None,
623            })
624            .await?;
625
626        Ok(Response::new(DropTableResponse {
627            status: None,
628            version,
629        }))
630    }
631
632    async fn create_view(
633        &self,
634        request: Request<CreateViewRequest>,
635    ) -> Result<Response<CreateViewResponse>, Status> {
636        let req = request.into_inner();
637        let view = req.get_view()?.clone();
638
639        let version = self
640            .ddl_controller
641            .run_command(DdlCommand::CreateView(view))
642            .await?;
643
644        Ok(Response::new(CreateViewResponse {
645            status: None,
646            version,
647        }))
648    }
649
650    async fn drop_view(
651        &self,
652        request: Request<DropViewRequest>,
653    ) -> Result<Response<DropViewResponse>, Status> {
654        let request = request.into_inner();
655        let view_id = request.get_view_id();
656        let drop_mode = DropMode::from_request_setting(request.cascade);
657        let version = self
658            .ddl_controller
659            .run_command(DdlCommand::DropView(view_id as _, drop_mode))
660            .await?;
661        Ok(Response::new(DropViewResponse {
662            status: None,
663            version,
664        }))
665    }
666
667    async fn risectl_list_state_tables(
668        &self,
669        _request: Request<RisectlListStateTablesRequest>,
670    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
671        let tables = self
672            .metadata_manager
673            .catalog_controller
674            .list_all_state_tables()
675            .await?;
676        Ok(Response::new(RisectlListStateTablesResponse { tables }))
677    }
678
679    async fn replace_job_plan(
680        &self,
681        request: Request<ReplaceJobPlanRequest>,
682    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
683        let req = request.into_inner().get_plan().cloned()?;
684
685        let version = self
686            .ddl_controller
687            .run_command(DdlCommand::ReplaceStreamJob(
688                Self::extract_replace_table_info(req),
689            ))
690            .await?;
691
692        Ok(Response::new(ReplaceJobPlanResponse {
693            status: None,
694            version,
695        }))
696    }
697
698    async fn get_table(
699        &self,
700        request: Request<GetTableRequest>,
701    ) -> Result<Response<GetTableResponse>, Status> {
702        let req = request.into_inner();
703        let table = self
704            .metadata_manager
705            .catalog_controller
706            .get_table_by_name(&req.database_name, &req.table_name)
707            .await?;
708
709        Ok(Response::new(GetTableResponse { table }))
710    }
711
712    async fn alter_name(
713        &self,
714        request: Request<AlterNameRequest>,
715    ) -> Result<Response<AlterNameResponse>, Status> {
716        let AlterNameRequest { object, new_name } = request.into_inner();
717        let version = self
718            .ddl_controller
719            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
720            .await?;
721        Ok(Response::new(AlterNameResponse {
722            status: None,
723            version,
724        }))
725    }
726
727    /// Only support add column for now.
728    async fn alter_source(
729        &self,
730        request: Request<AlterSourceRequest>,
731    ) -> Result<Response<AlterSourceResponse>, Status> {
732        let AlterSourceRequest { source } = request.into_inner();
733        let version = self
734            .ddl_controller
735            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
736            .await?;
737        Ok(Response::new(AlterSourceResponse {
738            status: None,
739            version,
740        }))
741    }
742
743    async fn alter_owner(
744        &self,
745        request: Request<AlterOwnerRequest>,
746    ) -> Result<Response<AlterOwnerResponse>, Status> {
747        let AlterOwnerRequest { object, owner_id } = request.into_inner();
748        let version = self
749            .ddl_controller
750            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
751            .await?;
752        Ok(Response::new(AlterOwnerResponse {
753            status: None,
754            version,
755        }))
756    }
757
758    async fn alter_set_schema(
759        &self,
760        request: Request<AlterSetSchemaRequest>,
761    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
762        let AlterSetSchemaRequest {
763            object,
764            new_schema_id,
765        } = request.into_inner();
766        let version = self
767            .ddl_controller
768            .run_command(DdlCommand::AlterSetSchema(
769                object.unwrap(),
770                new_schema_id as _,
771            ))
772            .await?;
773        Ok(Response::new(AlterSetSchemaResponse {
774            status: None,
775            version,
776        }))
777    }
778
779    async fn get_ddl_progress(
780        &self,
781        _request: Request<GetDdlProgressRequest>,
782    ) -> Result<Response<GetDdlProgressResponse>, Status> {
783        Ok(Response::new(GetDdlProgressResponse {
784            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
785        }))
786    }
787
788    async fn create_connection(
789        &self,
790        request: Request<CreateConnectionRequest>,
791    ) -> Result<Response<CreateConnectionResponse>, Status> {
792        let req = request.into_inner();
793        if req.payload.is_none() {
794            return Err(Status::invalid_argument("request is empty"));
795        }
796
797        match req.payload.unwrap() {
798            create_connection_request::Payload::PrivateLink(_) => {
799                panic!("Private Link Connection has been deprecated")
800            }
801            create_connection_request::Payload::ConnectionParams(params) => {
802                let pb_connection = Connection {
803                    id: 0,
804                    schema_id: req.schema_id,
805                    database_id: req.database_id,
806                    name: req.name,
807                    info: Some(ConnectionInfo::ConnectionParams(params)),
808                    owner: req.owner_id,
809                };
810                let version = self
811                    .ddl_controller
812                    .run_command(DdlCommand::CreateConnection(pb_connection))
813                    .await?;
814                Ok(Response::new(CreateConnectionResponse { version }))
815            }
816        }
817    }
818
819    async fn list_connections(
820        &self,
821        _request: Request<ListConnectionsRequest>,
822    ) -> Result<Response<ListConnectionsResponse>, Status> {
823        let conns = self
824            .metadata_manager
825            .catalog_controller
826            .list_connections()
827            .await?;
828
829        Ok(Response::new(ListConnectionsResponse {
830            connections: conns,
831        }))
832    }
833
834    async fn drop_connection(
835        &self,
836        request: Request<DropConnectionRequest>,
837    ) -> Result<Response<DropConnectionResponse>, Status> {
838        let req = request.into_inner();
839
840        let version = self
841            .ddl_controller
842            .run_command(DdlCommand::DropConnection(req.connection_id as _))
843            .await?;
844
845        Ok(Response::new(DropConnectionResponse {
846            status: None,
847            version,
848        }))
849    }
850
851    async fn comment_on(
852        &self,
853        request: Request<CommentOnRequest>,
854    ) -> Result<Response<CommentOnResponse>, Status> {
855        let req = request.into_inner();
856        let comment = req.get_comment()?.clone();
857
858        let version = self
859            .ddl_controller
860            .run_command(DdlCommand::CommentOn(Comment {
861                table_id: comment.table_id,
862                schema_id: comment.schema_id,
863                database_id: comment.database_id,
864                column_index: comment.column_index,
865                description: comment.description,
866            }))
867            .await?;
868
869        Ok(Response::new(CommentOnResponse {
870            status: None,
871            version,
872        }))
873    }
874
875    async fn get_tables(
876        &self,
877        request: Request<GetTablesRequest>,
878    ) -> Result<Response<GetTablesResponse>, Status> {
879        let GetTablesRequest {
880            table_ids,
881            include_dropped_tables,
882        } = request.into_inner();
883        let ret = self
884            .metadata_manager
885            .catalog_controller
886            .get_table_by_ids(
887                table_ids.into_iter().map(|id| id as _).collect(),
888                include_dropped_tables,
889            )
890            .await?;
891
892        let mut tables = HashMap::default();
893        for table in ret {
894            tables.insert(table.id, table);
895        }
896        Ok(Response::new(GetTablesResponse { tables }))
897    }
898
899    async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
900        self.ddl_controller.wait().await?;
901        Ok(Response::new(WaitResponse {}))
902    }
903
904    async fn alter_parallelism(
905        &self,
906        request: Request<AlterParallelismRequest>,
907    ) -> Result<Response<AlterParallelismResponse>, Status> {
908        let req = request.into_inner();
909
910        let job_id = req.get_table_id();
911        let parallelism = *req.get_parallelism()?;
912        let deferred = req.get_deferred();
913        self.ddl_controller
914            .reschedule_streaming_job(
915                job_id,
916                JobRescheduleTarget {
917                    parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
918                    resource_group: JobResourceGroupTarget::Keep,
919                },
920                deferred,
921            )
922            .await?;
923
924        Ok(Response::new(AlterParallelismResponse {}))
925    }
926
927    /// Auto schema change for cdc sources,
928    /// called by the source parser when a schema change is detected.
929    async fn auto_schema_change(
930        &self,
931        request: Request<AutoSchemaChangeRequest>,
932    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
933        let req = request.into_inner();
934
935        // randomly select a frontend worker to get the replace table plan
936        let workers = self
937            .metadata_manager
938            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
939            .await?;
940        let worker = workers
941            .choose(&mut thread_rng())
942            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
943
944        let client = self
945            .env
946            .frontend_client_pool()
947            .get(worker)
948            .await
949            .map_err(MetaError::from)?;
950
951        let Some(schema_change) = req.schema_change else {
952            return Err(Status::invalid_argument(
953                "schema change message is required",
954            ));
955        };
956
957        for table_change in schema_change.table_changes {
958            for c in &table_change.columns {
959                let c = ColumnCatalog::from(c.clone());
960
961                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
962                    tracing::warn!(target: "auto_schema_change",
963                      cdc_table_id = table_change.cdc_table_id,
964                    upstraem_ddl = table_change.upstream_ddl,
965                        "invalid column type from cdc table change");
966                    Err(Status::invalid_argument(format!(
967                        "invalid column type: {} from cdc table change, column: {:?}",
968                        column_type, c
969                    )))
970                };
971                if c.is_generated() {
972                    return invalid_col_type("generated column", &c);
973                }
974                if c.is_rw_sys_column() {
975                    return invalid_col_type("rw system column", &c);
976                }
977                if c.is_hidden {
978                    return invalid_col_type("hidden column", &c);
979                }
980            }
981
982            // get the table catalog corresponding to the cdc table
983            let tables: Vec<Table> = self
984                .metadata_manager
985                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
986                .await?;
987
988            for table in tables {
989                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
990                // is a subset of the other.
991                let original_columns: HashSet<(String, DataType)> =
992                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
993                        let col = ColumnCatalog::from(col.clone());
994                        let data_type = col.data_type().clone();
995                        if col.is_generated() {
996                            None
997                        } else {
998                            Some((col.column_desc.name, data_type))
999                        }
1000                    }));
1001                let new_columns: HashSet<(String, DataType)> =
1002                    HashSet::from_iter(table_change.columns.iter().map(|col| {
1003                        let col = ColumnCatalog::from(col.clone());
1004                        let data_type = col.data_type().clone();
1005                        (col.column_desc.name, data_type)
1006                    }));
1007
1008                if !(original_columns.is_subset(&new_columns)
1009                    || original_columns.is_superset(&new_columns))
1010                {
1011                    tracing::warn!(target: "auto_schema_change",
1012                                    table_id = table.id,
1013                                    cdc_table_id = table.cdc_table_id,
1014                                    upstraem_ddl = table_change.upstream_ddl,
1015                                    original_columns = ?original_columns,
1016                                    new_columns = ?new_columns,
1017                                    "New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
1018                    return Err(Status::invalid_argument(
1019                        "New columns should be a subset or superset of the original columns",
1020                    ));
1021                }
1022                // skip the schema change if there is no change to original columns
1023                if original_columns == new_columns {
1024                    tracing::warn!(target: "auto_schema_change",
1025                                   table_id = table.id,
1026                                   cdc_table_id = table.cdc_table_id,
1027                                   upstraem_ddl = table_change.upstream_ddl,
1028                                    original_columns = ?original_columns,
1029                                    new_columns = ?new_columns,
1030                                   "No change to columns, skipping the schema change");
1031                    continue;
1032                }
1033
1034                let latency_timer = self
1035                    .meta_metrics
1036                    .auto_schema_change_latency
1037                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1038                    .start_timer();
1039                // send a request to the frontend to get the ReplaceJobPlan
1040                // will retry with exponential backoff if the request fails
1041                let resp = client
1042                    .get_table_replace_plan(GetTableReplacePlanRequest {
1043                        database_id: table.database_id,
1044                        owner: table.owner,
1045                        table_name: table.name.clone(),
1046                        table_change: Some(table_change.clone()),
1047                    })
1048                    .await;
1049
1050                match resp {
1051                    Ok(resp) => {
1052                        let resp = resp.into_inner();
1053                        if let Some(plan) = resp.replace_plan {
1054                            let plan = Self::extract_replace_table_info(plan);
1055                            plan.streaming_job.table().inspect(|t| {
1056                                tracing::info!(
1057                                    target: "auto_schema_change",
1058                                    table_id = t.id,
1059                                    cdc_table_id = t.cdc_table_id,
1060                                    upstraem_ddl = table_change.upstream_ddl,
1061                                    "Start the replace config change")
1062                            });
1063                            // start the schema change procedure
1064                            let replace_res = self
1065                                .ddl_controller
1066                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1067                                .await;
1068
1069                            match replace_res {
1070                                Ok(_) => {
1071                                    tracing::info!(
1072                                        target: "auto_schema_change",
1073                                        table_id = table.id,
1074                                        cdc_table_id = table.cdc_table_id,
1075                                        "Table replaced success");
1076
1077                                    self.meta_metrics
1078                                        .auto_schema_change_success_cnt
1079                                        .with_guarded_label_values(&[
1080                                            &table.id.to_string(),
1081                                            &table.name,
1082                                        ])
1083                                        .inc();
1084                                    latency_timer.observe_duration();
1085                                }
1086                                Err(e) => {
1087                                    tracing::error!(
1088                                        target: "auto_schema_change",
1089                                        error = %e.as_report(),
1090                                        table_id = table.id,
1091                                        cdc_table_id = table.cdc_table_id,
1092                                        upstraem_ddl = table_change.upstream_ddl,
1093                                        "failed to replace the table",
1094                                    );
1095                                    add_auto_schema_change_fail_event_log(
1096                                        &self.meta_metrics,
1097                                        table.id,
1098                                        table.name.clone(),
1099                                        table_change.cdc_table_id.clone(),
1100                                        table_change.upstream_ddl.clone(),
1101                                        &self.env.event_log_manager_ref(),
1102                                    );
1103                                }
1104                            };
1105                        }
1106                    }
1107                    Err(e) => {
1108                        tracing::error!(
1109                            target: "auto_schema_change",
1110                            error = %e.as_report(),
1111                            table_id = table.id,
1112                            cdc_table_id = table.cdc_table_id,
1113                            "failed to get replace table plan",
1114                        );
1115                        add_auto_schema_change_fail_event_log(
1116                            &self.meta_metrics,
1117                            table.id,
1118                            table.name.clone(),
1119                            table_change.cdc_table_id.clone(),
1120                            table_change.upstream_ddl.clone(),
1121                            &self.env.event_log_manager_ref(),
1122                        );
1123                    }
1124                };
1125            }
1126        }
1127
1128        Ok(Response::new(AutoSchemaChangeResponse {}))
1129    }
1130
1131    async fn alter_swap_rename(
1132        &self,
1133        request: Request<AlterSwapRenameRequest>,
1134    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1135        let req = request.into_inner();
1136
1137        let version = self
1138            .ddl_controller
1139            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1140            .await?;
1141
1142        Ok(Response::new(AlterSwapRenameResponse {
1143            status: None,
1144            version,
1145        }))
1146    }
1147
1148    async fn alter_resource_group(
1149        &self,
1150        request: Request<AlterResourceGroupRequest>,
1151    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1152        let req = request.into_inner();
1153
1154        let table_id = req.get_table_id();
1155        let deferred = req.get_deferred();
1156        let resource_group = req.resource_group;
1157
1158        self.ddl_controller
1159            .reschedule_streaming_job(
1160                table_id,
1161                JobRescheduleTarget {
1162                    parallelism: JobParallelismTarget::Refresh,
1163                    resource_group: JobResourceGroupTarget::Update(resource_group),
1164                },
1165                deferred,
1166            )
1167            .await?;
1168
1169        Ok(Response::new(AlterResourceGroupResponse {}))
1170    }
1171
1172    async fn alter_database_param(
1173        &self,
1174        request: Request<AlterDatabaseParamRequest>,
1175    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1176        let req = request.into_inner();
1177        let database_id = req.database_id;
1178
1179        let param = match req.param.unwrap() {
1180            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1181                AlterDatabaseParam::BarrierIntervalMs(value.value)
1182            }
1183            alter_database_param_request::Param::CheckpointFrequency(value) => {
1184                AlterDatabaseParam::CheckpointFrequency(value.value)
1185            }
1186        };
1187        let version = self
1188            .ddl_controller
1189            .run_command(DdlCommand::AlterDatabaseParam(database_id as _, param))
1190            .await?;
1191
1192        return Ok(Response::new(AlterDatabaseParamResponse {
1193            status: None,
1194            version,
1195        }));
1196    }
1197}
1198
1199fn add_auto_schema_change_fail_event_log(
1200    meta_metrics: &Arc<MetaMetrics>,
1201    table_id: u32,
1202    table_name: String,
1203    cdc_table_id: String,
1204    upstream_ddl: String,
1205    event_log_manager: &EventLogManagerRef,
1206) {
1207    meta_metrics
1208        .auto_schema_change_failure_cnt
1209        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1210        .inc();
1211    let event = event_log::EventAutoSchemaChangeFail {
1212        table_id,
1213        table_name,
1214        cdc_table_id,
1215        upstream_ddl,
1216    };
1217    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1218}