risingwave_meta_service/
ddl_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use rand::rng as thread_rng;
20use rand::seq::IndexedRandom;
21use replace_job_plan::{ReplaceSource, ReplaceTable};
22use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
23use risingwave_common::types::DataType;
24use risingwave_connector::sink::catalog::SinkId;
25use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
26use risingwave_meta::model::TableParallelism;
27use risingwave_meta::rpc::metrics::MetaMetrics;
28use risingwave_meta::stream::{JobParallelismTarget, JobRescheduleTarget, JobResourceGroupTarget};
29use risingwave_meta_model::ObjectId;
30use risingwave_pb::catalog::connection::Info as ConnectionInfo;
31use risingwave_pb::catalog::{Comment, Connection, Secret, Table};
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::common::worker_node::State;
34use risingwave_pb::ddl_service::ddl_service_server::DdlService;
35use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
36use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
37use risingwave_pb::ddl_service::*;
38use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
39use risingwave_pb::meta::event_log;
40use thiserror_ext::AsReport;
41use tonic::{Request, Response, Status};
42
43use crate::MetaError;
44use crate::barrier::BarrierManagerRef;
45use crate::manager::sink_coordination::SinkCoordinatorManager;
46use crate::manager::{MetaSrvEnv, StreamingJob};
47use crate::rpc::ddl_controller::{
48    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
49};
50use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
51
52#[derive(Clone)]
53pub struct DdlServiceImpl {
54    env: MetaSrvEnv,
55
56    metadata_manager: MetadataManager,
57    sink_manager: SinkCoordinatorManager,
58    ddl_controller: DdlController,
59    meta_metrics: Arc<MetaMetrics>,
60    iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
61}
62
63impl DdlServiceImpl {
64    #[allow(clippy::too_many_arguments)]
65    pub async fn new(
66        env: MetaSrvEnv,
67        metadata_manager: MetadataManager,
68        stream_manager: GlobalStreamManagerRef,
69        source_manager: SourceManagerRef,
70        barrier_manager: BarrierManagerRef,
71        sink_manager: SinkCoordinatorManager,
72        meta_metrics: Arc<MetaMetrics>,
73        iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
74    ) -> Self {
75        let ddl_controller = DdlController::new(
76            env.clone(),
77            metadata_manager.clone(),
78            stream_manager,
79            source_manager,
80            barrier_manager,
81        )
82        .await;
83        Self {
84            env,
85            metadata_manager,
86            sink_manager,
87            ddl_controller,
88            meta_metrics,
89            iceberg_compaction_manager,
90        }
91    }
92
93    fn extract_replace_table_info(
94        ReplaceJobPlan {
95            fragment_graph,
96            replace_job,
97        }: ReplaceJobPlan,
98    ) -> ReplaceStreamJobInfo {
99        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
100            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
101                table,
102                source,
103                job_type,
104            }) => StreamingJob::Table(
105                source,
106                table.unwrap(),
107                TableJobType::try_from(job_type).unwrap(),
108            ),
109            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
110                StreamingJob::Source(source.unwrap())
111            }
112            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
113                table,
114            }) => StreamingJob::MaterializedView(table.unwrap()),
115        };
116
117        ReplaceStreamJobInfo {
118            streaming_job: replace_streaming_job,
119            fragment_graph: fragment_graph.unwrap(),
120        }
121    }
122}
123
124#[async_trait::async_trait]
125impl DdlService for DdlServiceImpl {
126    async fn create_database(
127        &self,
128        request: Request<CreateDatabaseRequest>,
129    ) -> Result<Response<CreateDatabaseResponse>, Status> {
130        let req = request.into_inner();
131        let database = req.get_db()?.clone();
132        let version = self
133            .ddl_controller
134            .run_command(DdlCommand::CreateDatabase(database))
135            .await?;
136
137        Ok(Response::new(CreateDatabaseResponse {
138            status: None,
139            version,
140        }))
141    }
142
143    async fn drop_database(
144        &self,
145        request: Request<DropDatabaseRequest>,
146    ) -> Result<Response<DropDatabaseResponse>, Status> {
147        let req = request.into_inner();
148        let database_id = req.get_database_id();
149
150        let version = self
151            .ddl_controller
152            .run_command(DdlCommand::DropDatabase(database_id as _))
153            .await?;
154
155        Ok(Response::new(DropDatabaseResponse {
156            status: None,
157            version,
158        }))
159    }
160
161    async fn create_secret(
162        &self,
163        request: Request<CreateSecretRequest>,
164    ) -> Result<Response<CreateSecretResponse>, Status> {
165        let req = request.into_inner();
166        let pb_secret = Secret {
167            id: 0,
168            name: req.get_name().clone(),
169            database_id: req.get_database_id(),
170            value: req.get_value().clone(),
171            owner: req.get_owner_id(),
172            schema_id: req.get_schema_id(),
173        };
174        let version = self
175            .ddl_controller
176            .run_command(DdlCommand::CreateSecret(pb_secret))
177            .await?;
178
179        Ok(Response::new(CreateSecretResponse { version }))
180    }
181
182    async fn drop_secret(
183        &self,
184        request: Request<DropSecretRequest>,
185    ) -> Result<Response<DropSecretResponse>, Status> {
186        let req = request.into_inner();
187        let secret_id = req.get_secret_id();
188        let version = self
189            .ddl_controller
190            .run_command(DdlCommand::DropSecret(secret_id as _))
191            .await?;
192
193        Ok(Response::new(DropSecretResponse { version }))
194    }
195
196    async fn alter_secret(
197        &self,
198        request: Request<AlterSecretRequest>,
199    ) -> Result<Response<AlterSecretResponse>, Status> {
200        let req = request.into_inner();
201        let pb_secret = Secret {
202            id: req.get_secret_id(),
203            name: req.get_name().clone(),
204            database_id: req.get_database_id(),
205            value: req.get_value().clone(),
206            owner: req.get_owner_id(),
207            schema_id: req.get_schema_id(),
208        };
209        let version = self
210            .ddl_controller
211            .run_command(DdlCommand::AlterSecret(pb_secret))
212            .await?;
213
214        Ok(Response::new(AlterSecretResponse { version }))
215    }
216
217    async fn create_schema(
218        &self,
219        request: Request<CreateSchemaRequest>,
220    ) -> Result<Response<CreateSchemaResponse>, Status> {
221        let req = request.into_inner();
222        let schema = req.get_schema()?.clone();
223        let version = self
224            .ddl_controller
225            .run_command(DdlCommand::CreateSchema(schema))
226            .await?;
227
228        Ok(Response::new(CreateSchemaResponse {
229            status: None,
230            version,
231        }))
232    }
233
234    async fn drop_schema(
235        &self,
236        request: Request<DropSchemaRequest>,
237    ) -> Result<Response<DropSchemaResponse>, Status> {
238        let req = request.into_inner();
239        let schema_id = req.get_schema_id();
240        let drop_mode = DropMode::from_request_setting(req.cascade);
241        let version = self
242            .ddl_controller
243            .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
244            .await?;
245        Ok(Response::new(DropSchemaResponse {
246            status: None,
247            version,
248        }))
249    }
250
251    async fn create_source(
252        &self,
253        request: Request<CreateSourceRequest>,
254    ) -> Result<Response<CreateSourceResponse>, Status> {
255        let req = request.into_inner();
256        let source = req.get_source()?.clone();
257
258        match req.fragment_graph {
259            None => {
260                let version = self
261                    .ddl_controller
262                    .run_command(DdlCommand::CreateNonSharedSource(source))
263                    .await?;
264                Ok(Response::new(CreateSourceResponse {
265                    status: None,
266                    version,
267                }))
268            }
269            Some(fragment_graph) => {
270                // The id of stream job has been set above
271                let stream_job = StreamingJob::Source(source);
272                let version = self
273                    .ddl_controller
274                    .run_command(DdlCommand::CreateStreamingJob {
275                        stream_job,
276                        fragment_graph,
277                        dependencies: HashSet::new(),
278                        specific_resource_group: None,
279                        if_not_exists: req.if_not_exists,
280                    })
281                    .await?;
282                Ok(Response::new(CreateSourceResponse {
283                    status: None,
284                    version,
285                }))
286            }
287        }
288    }
289
290    async fn drop_source(
291        &self,
292        request: Request<DropSourceRequest>,
293    ) -> Result<Response<DropSourceResponse>, Status> {
294        let request = request.into_inner();
295        let source_id = request.source_id;
296        let drop_mode = DropMode::from_request_setting(request.cascade);
297        let version = self
298            .ddl_controller
299            .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
300            .await?;
301
302        Ok(Response::new(DropSourceResponse {
303            status: None,
304            version,
305        }))
306    }
307
308    async fn create_sink(
309        &self,
310        request: Request<CreateSinkRequest>,
311    ) -> Result<Response<CreateSinkResponse>, Status> {
312        self.env.idle_manager().record_activity();
313
314        let req = request.into_inner();
315
316        let sink = req.get_sink()?.clone();
317        let fragment_graph = req.get_fragment_graph()?.clone();
318        let dependencies = req
319            .get_dependencies()
320            .iter()
321            .map(|id| *id as ObjectId)
322            .collect();
323
324        let stream_job = StreamingJob::Sink(sink);
325
326        let command = DdlCommand::CreateStreamingJob {
327            stream_job,
328            fragment_graph,
329            dependencies,
330            specific_resource_group: None,
331            if_not_exists: req.if_not_exists,
332        };
333
334        let version = self.ddl_controller.run_command(command).await?;
335
336        Ok(Response::new(CreateSinkResponse {
337            status: None,
338            version,
339        }))
340    }
341
342    async fn drop_sink(
343        &self,
344        request: Request<DropSinkRequest>,
345    ) -> Result<Response<DropSinkResponse>, Status> {
346        let request = request.into_inner();
347        let sink_id = request.sink_id;
348        let drop_mode = DropMode::from_request_setting(request.cascade);
349
350        let command = DdlCommand::DropStreamingJob {
351            job_id: StreamingJobId::Sink(sink_id as _),
352            drop_mode,
353        };
354
355        let version = self.ddl_controller.run_command(command).await?;
356
357        self.sink_manager
358            .stop_sink_coordinator(SinkId::from(sink_id))
359            .await;
360
361        Ok(Response::new(DropSinkResponse {
362            status: None,
363            version,
364        }))
365    }
366
367    async fn create_subscription(
368        &self,
369        request: Request<CreateSubscriptionRequest>,
370    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
371        self.env.idle_manager().record_activity();
372
373        let req = request.into_inner();
374
375        let subscription = req.get_subscription()?.clone();
376        let command = DdlCommand::CreateSubscription(subscription);
377
378        let version = self.ddl_controller.run_command(command).await?;
379
380        Ok(Response::new(CreateSubscriptionResponse {
381            status: None,
382            version,
383        }))
384    }
385
386    async fn drop_subscription(
387        &self,
388        request: Request<DropSubscriptionRequest>,
389    ) -> Result<Response<DropSubscriptionResponse>, Status> {
390        let request = request.into_inner();
391        let subscription_id = request.subscription_id;
392        let drop_mode = DropMode::from_request_setting(request.cascade);
393
394        let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
395
396        let version = self.ddl_controller.run_command(command).await?;
397
398        Ok(Response::new(DropSubscriptionResponse {
399            status: None,
400            version,
401        }))
402    }
403
404    async fn create_materialized_view(
405        &self,
406        request: Request<CreateMaterializedViewRequest>,
407    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
408        self.env.idle_manager().record_activity();
409
410        let req = request.into_inner();
411        let mview = req.get_materialized_view()?.clone();
412        let specific_resource_group = req.specific_resource_group.clone();
413        let fragment_graph = req.get_fragment_graph()?.clone();
414        let dependencies = req
415            .get_dependencies()
416            .iter()
417            .map(|id| *id as ObjectId)
418            .collect();
419
420        let stream_job = StreamingJob::MaterializedView(mview);
421        let version = self
422            .ddl_controller
423            .run_command(DdlCommand::CreateStreamingJob {
424                stream_job,
425                fragment_graph,
426                dependencies,
427                specific_resource_group,
428                if_not_exists: req.if_not_exists,
429            })
430            .await?;
431
432        Ok(Response::new(CreateMaterializedViewResponse {
433            status: None,
434            version,
435        }))
436    }
437
438    async fn drop_materialized_view(
439        &self,
440        request: Request<DropMaterializedViewRequest>,
441    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
442        self.env.idle_manager().record_activity();
443
444        let request = request.into_inner();
445        let table_id = request.table_id;
446        let drop_mode = DropMode::from_request_setting(request.cascade);
447
448        let version = self
449            .ddl_controller
450            .run_command(DdlCommand::DropStreamingJob {
451                job_id: StreamingJobId::MaterializedView(table_id as _),
452                drop_mode,
453            })
454            .await?;
455
456        Ok(Response::new(DropMaterializedViewResponse {
457            status: None,
458            version,
459        }))
460    }
461
462    async fn create_index(
463        &self,
464        request: Request<CreateIndexRequest>,
465    ) -> Result<Response<CreateIndexResponse>, Status> {
466        self.env.idle_manager().record_activity();
467
468        let req = request.into_inner();
469        let index = req.get_index()?.clone();
470        let index_table = req.get_index_table()?.clone();
471        let fragment_graph = req.get_fragment_graph()?.clone();
472
473        let stream_job = StreamingJob::Index(index, index_table);
474        let version = self
475            .ddl_controller
476            .run_command(DdlCommand::CreateStreamingJob {
477                stream_job,
478                fragment_graph,
479                dependencies: HashSet::new(),
480                specific_resource_group: None,
481                if_not_exists: req.if_not_exists,
482            })
483            .await?;
484
485        Ok(Response::new(CreateIndexResponse {
486            status: None,
487            version,
488        }))
489    }
490
491    async fn drop_index(
492        &self,
493        request: Request<DropIndexRequest>,
494    ) -> Result<Response<DropIndexResponse>, Status> {
495        self.env.idle_manager().record_activity();
496
497        let request = request.into_inner();
498        let index_id = request.index_id;
499        let drop_mode = DropMode::from_request_setting(request.cascade);
500        let version = self
501            .ddl_controller
502            .run_command(DdlCommand::DropStreamingJob {
503                job_id: StreamingJobId::Index(index_id as _),
504                drop_mode,
505            })
506            .await?;
507
508        Ok(Response::new(DropIndexResponse {
509            status: None,
510            version,
511        }))
512    }
513
514    async fn create_function(
515        &self,
516        request: Request<CreateFunctionRequest>,
517    ) -> Result<Response<CreateFunctionResponse>, Status> {
518        let req = request.into_inner();
519        let function = req.get_function()?.clone();
520
521        let version = self
522            .ddl_controller
523            .run_command(DdlCommand::CreateFunction(function))
524            .await?;
525
526        Ok(Response::new(CreateFunctionResponse {
527            status: None,
528            version,
529        }))
530    }
531
532    async fn drop_function(
533        &self,
534        request: Request<DropFunctionRequest>,
535    ) -> Result<Response<DropFunctionResponse>, Status> {
536        let request = request.into_inner();
537
538        let version = self
539            .ddl_controller
540            .run_command(DdlCommand::DropFunction(
541                request.function_id as _,
542                DropMode::from_request_setting(request.cascade),
543            ))
544            .await?;
545
546        Ok(Response::new(DropFunctionResponse {
547            status: None,
548            version,
549        }))
550    }
551
552    async fn create_table(
553        &self,
554        request: Request<CreateTableRequest>,
555    ) -> Result<Response<CreateTableResponse>, Status> {
556        let request = request.into_inner();
557        let job_type = request.get_job_type().unwrap_or_default();
558        let dependencies = request
559            .get_dependencies()
560            .iter()
561            .map(|id| *id as ObjectId)
562            .collect();
563        let source = request.source;
564        let mview = request.materialized_view.unwrap();
565        let fragment_graph = request.fragment_graph.unwrap();
566
567        let stream_job = StreamingJob::Table(source, mview, job_type);
568        let version = self
569            .ddl_controller
570            .run_command(DdlCommand::CreateStreamingJob {
571                stream_job,
572                fragment_graph,
573                dependencies,
574                specific_resource_group: None,
575                if_not_exists: request.if_not_exists,
576            })
577            .await?;
578
579        Ok(Response::new(CreateTableResponse {
580            status: None,
581            version,
582        }))
583    }
584
585    async fn drop_table(
586        &self,
587        request: Request<DropTableRequest>,
588    ) -> Result<Response<DropTableResponse>, Status> {
589        let request = request.into_inner();
590        let source_id = request.source_id;
591        let table_id = request.table_id;
592
593        let drop_mode = DropMode::from_request_setting(request.cascade);
594        let version = self
595            .ddl_controller
596            .run_command(DdlCommand::DropStreamingJob {
597                job_id: StreamingJobId::Table(
598                    source_id.map(|PbSourceId::Id(id)| id as _),
599                    table_id as _,
600                ),
601                drop_mode,
602            })
603            .await?;
604
605        Ok(Response::new(DropTableResponse {
606            status: None,
607            version,
608        }))
609    }
610
611    async fn create_view(
612        &self,
613        request: Request<CreateViewRequest>,
614    ) -> Result<Response<CreateViewResponse>, Status> {
615        let req = request.into_inner();
616        let view = req.get_view()?.clone();
617        let dependencies = req
618            .get_dependencies()
619            .iter()
620            .map(|id| *id as ObjectId)
621            .collect::<HashSet<_>>();
622
623        let version = self
624            .ddl_controller
625            .run_command(DdlCommand::CreateView(view, dependencies))
626            .await?;
627
628        Ok(Response::new(CreateViewResponse {
629            status: None,
630            version,
631        }))
632    }
633
634    async fn drop_view(
635        &self,
636        request: Request<DropViewRequest>,
637    ) -> Result<Response<DropViewResponse>, Status> {
638        let request = request.into_inner();
639        let view_id = request.get_view_id();
640        let drop_mode = DropMode::from_request_setting(request.cascade);
641        let version = self
642            .ddl_controller
643            .run_command(DdlCommand::DropView(view_id as _, drop_mode))
644            .await?;
645        Ok(Response::new(DropViewResponse {
646            status: None,
647            version,
648        }))
649    }
650
651    async fn risectl_list_state_tables(
652        &self,
653        _request: Request<RisectlListStateTablesRequest>,
654    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
655        let tables = self
656            .metadata_manager
657            .catalog_controller
658            .list_all_state_tables()
659            .await?;
660        Ok(Response::new(RisectlListStateTablesResponse { tables }))
661    }
662
663    async fn replace_job_plan(
664        &self,
665        request: Request<ReplaceJobPlanRequest>,
666    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
667        let req = request.into_inner().get_plan().cloned()?;
668
669        let version = self
670            .ddl_controller
671            .run_command(DdlCommand::ReplaceStreamJob(
672                Self::extract_replace_table_info(req),
673            ))
674            .await?;
675
676        Ok(Response::new(ReplaceJobPlanResponse {
677            status: None,
678            version,
679        }))
680    }
681
682    async fn get_table(
683        &self,
684        request: Request<GetTableRequest>,
685    ) -> Result<Response<GetTableResponse>, Status> {
686        let req = request.into_inner();
687        let table = self
688            .metadata_manager
689            .catalog_controller
690            .get_table_by_name(&req.database_name, &req.table_name)
691            .await?;
692
693        Ok(Response::new(GetTableResponse { table }))
694    }
695
696    async fn alter_name(
697        &self,
698        request: Request<AlterNameRequest>,
699    ) -> Result<Response<AlterNameResponse>, Status> {
700        let AlterNameRequest { object, new_name } = request.into_inner();
701        let version = self
702            .ddl_controller
703            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
704            .await?;
705        Ok(Response::new(AlterNameResponse {
706            status: None,
707            version,
708        }))
709    }
710
711    /// Only support add column for now.
712    async fn alter_source(
713        &self,
714        request: Request<AlterSourceRequest>,
715    ) -> Result<Response<AlterSourceResponse>, Status> {
716        let AlterSourceRequest { source } = request.into_inner();
717        let version = self
718            .ddl_controller
719            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
720            .await?;
721        Ok(Response::new(AlterSourceResponse {
722            status: None,
723            version,
724        }))
725    }
726
727    async fn alter_owner(
728        &self,
729        request: Request<AlterOwnerRequest>,
730    ) -> Result<Response<AlterOwnerResponse>, Status> {
731        let AlterOwnerRequest { object, owner_id } = request.into_inner();
732        let version = self
733            .ddl_controller
734            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
735            .await?;
736        Ok(Response::new(AlterOwnerResponse {
737            status: None,
738            version,
739        }))
740    }
741
742    async fn alter_set_schema(
743        &self,
744        request: Request<AlterSetSchemaRequest>,
745    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
746        let AlterSetSchemaRequest {
747            object,
748            new_schema_id,
749        } = request.into_inner();
750        let version = self
751            .ddl_controller
752            .run_command(DdlCommand::AlterSetSchema(
753                object.unwrap(),
754                new_schema_id as _,
755            ))
756            .await?;
757        Ok(Response::new(AlterSetSchemaResponse {
758            status: None,
759            version,
760        }))
761    }
762
763    async fn get_ddl_progress(
764        &self,
765        _request: Request<GetDdlProgressRequest>,
766    ) -> Result<Response<GetDdlProgressResponse>, Status> {
767        Ok(Response::new(GetDdlProgressResponse {
768            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
769        }))
770    }
771
772    async fn create_connection(
773        &self,
774        request: Request<CreateConnectionRequest>,
775    ) -> Result<Response<CreateConnectionResponse>, Status> {
776        let req = request.into_inner();
777        if req.payload.is_none() {
778            return Err(Status::invalid_argument("request is empty"));
779        }
780
781        match req.payload.unwrap() {
782            create_connection_request::Payload::PrivateLink(_) => {
783                panic!("Private Link Connection has been deprecated")
784            }
785            create_connection_request::Payload::ConnectionParams(params) => {
786                let pb_connection = Connection {
787                    id: 0,
788                    schema_id: req.schema_id,
789                    database_id: req.database_id,
790                    name: req.name,
791                    info: Some(ConnectionInfo::ConnectionParams(params)),
792                    owner: req.owner_id,
793                };
794                let version = self
795                    .ddl_controller
796                    .run_command(DdlCommand::CreateConnection(pb_connection))
797                    .await?;
798                Ok(Response::new(CreateConnectionResponse { version }))
799            }
800        }
801    }
802
803    async fn list_connections(
804        &self,
805        _request: Request<ListConnectionsRequest>,
806    ) -> Result<Response<ListConnectionsResponse>, Status> {
807        let conns = self
808            .metadata_manager
809            .catalog_controller
810            .list_connections()
811            .await?;
812
813        Ok(Response::new(ListConnectionsResponse {
814            connections: conns,
815        }))
816    }
817
818    async fn drop_connection(
819        &self,
820        request: Request<DropConnectionRequest>,
821    ) -> Result<Response<DropConnectionResponse>, Status> {
822        let req = request.into_inner();
823        let drop_mode = DropMode::from_request_setting(req.cascade);
824
825        let version = self
826            .ddl_controller
827            .run_command(DdlCommand::DropConnection(
828                req.connection_id as _,
829                drop_mode,
830            ))
831            .await?;
832
833        Ok(Response::new(DropConnectionResponse {
834            status: None,
835            version,
836        }))
837    }
838
839    async fn comment_on(
840        &self,
841        request: Request<CommentOnRequest>,
842    ) -> Result<Response<CommentOnResponse>, Status> {
843        let req = request.into_inner();
844        let comment = req.get_comment()?.clone();
845
846        let version = self
847            .ddl_controller
848            .run_command(DdlCommand::CommentOn(Comment {
849                table_id: comment.table_id,
850                schema_id: comment.schema_id,
851                database_id: comment.database_id,
852                column_index: comment.column_index,
853                description: comment.description,
854            }))
855            .await?;
856
857        Ok(Response::new(CommentOnResponse {
858            status: None,
859            version,
860        }))
861    }
862
863    async fn get_tables(
864        &self,
865        request: Request<GetTablesRequest>,
866    ) -> Result<Response<GetTablesResponse>, Status> {
867        let GetTablesRequest {
868            table_ids,
869            include_dropped_tables,
870        } = request.into_inner();
871        let ret = self
872            .metadata_manager
873            .catalog_controller
874            .get_table_by_ids(
875                table_ids.into_iter().map(|id| id as _).collect(),
876                include_dropped_tables,
877            )
878            .await?;
879
880        let mut tables = HashMap::default();
881        for table in ret {
882            tables.insert(table.id, table);
883        }
884        Ok(Response::new(GetTablesResponse { tables }))
885    }
886
887    async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
888        self.ddl_controller.wait().await?;
889        Ok(Response::new(WaitResponse {}))
890    }
891
892    async fn alter_cdc_table_backfill_parallelism(
893        &self,
894        request: Request<AlterCdcTableBackfillParallelismRequest>,
895    ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
896        let req = request.into_inner();
897        let job_id = req.get_table_id();
898        let parallelism = *req.get_parallelism()?;
899        self.ddl_controller
900            .reschedule_cdc_table_backfill(
901                job_id,
902                JobRescheduleTarget {
903                    parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
904                    resource_group: JobResourceGroupTarget::Keep,
905                },
906            )
907            .await?;
908        Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
909    }
910
911    async fn alter_parallelism(
912        &self,
913        request: Request<AlterParallelismRequest>,
914    ) -> Result<Response<AlterParallelismResponse>, Status> {
915        let req = request.into_inner();
916
917        let job_id = req.get_table_id();
918        let parallelism = *req.get_parallelism()?;
919        let deferred = req.get_deferred();
920        self.ddl_controller
921            .reschedule_streaming_job(
922                job_id,
923                JobRescheduleTarget {
924                    parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
925                    resource_group: JobResourceGroupTarget::Keep,
926                },
927                deferred,
928            )
929            .await?;
930
931        Ok(Response::new(AlterParallelismResponse {}))
932    }
933
934    /// Auto schema change for cdc sources,
935    /// called by the source parser when a schema change is detected.
936    async fn auto_schema_change(
937        &self,
938        request: Request<AutoSchemaChangeRequest>,
939    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
940        let req = request.into_inner();
941
942        // randomly select a frontend worker to get the replace table plan
943        let workers = self
944            .metadata_manager
945            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
946            .await?;
947        let worker = workers
948            .choose(&mut thread_rng())
949            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
950
951        let client = self
952            .env
953            .frontend_client_pool()
954            .get(worker)
955            .await
956            .map_err(MetaError::from)?;
957
958        let Some(schema_change) = req.schema_change else {
959            return Err(Status::invalid_argument(
960                "schema change message is required",
961            ));
962        };
963
964        for table_change in schema_change.table_changes {
965            for c in &table_change.columns {
966                let c = ColumnCatalog::from(c.clone());
967
968                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
969                    tracing::warn!(target: "auto_schema_change",
970                      cdc_table_id = table_change.cdc_table_id,
971                    upstraem_ddl = table_change.upstream_ddl,
972                        "invalid column type from cdc table change");
973                    Err(Status::invalid_argument(format!(
974                        "invalid column type: {} from cdc table change, column: {:?}",
975                        column_type, c
976                    )))
977                };
978                if c.is_generated() {
979                    return invalid_col_type("generated column", &c);
980                }
981                if c.is_rw_sys_column() {
982                    return invalid_col_type("rw system column", &c);
983                }
984                if c.is_hidden {
985                    return invalid_col_type("hidden column", &c);
986                }
987            }
988
989            // get the table catalog corresponding to the cdc table
990            let tables: Vec<Table> = self
991                .metadata_manager
992                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
993                .await?;
994
995            for table in tables {
996                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
997                // is a subset of the other.
998                let original_columns: HashSet<(String, DataType)> =
999                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
1000                        let col = ColumnCatalog::from(col.clone());
1001                        if col.is_generated() || col.is_hidden() {
1002                            None
1003                        } else {
1004                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1005                        }
1006                    }));
1007
1008                let mut new_columns: HashSet<(String, DataType)> =
1009                    HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1010                        let col = ColumnCatalog::from(col.clone());
1011                        if col.is_generated() || col.is_hidden() {
1012                            None
1013                        } else {
1014                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1015                        }
1016                    }));
1017
1018                // For subset/superset check, we need to add visible connector additional columns defined by INCLUDE in the original table to new_columns
1019                // This includes both _rw columns and user-defined INCLUDE columns (e.g., INCLUDE TIMESTAMP AS xxx)
1020                for col in &table.columns {
1021                    let col = ColumnCatalog::from(col.clone());
1022                    if col.is_connector_additional_column()
1023                        && !col.is_hidden()
1024                        && !col.is_generated()
1025                    {
1026                        new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1027                    }
1028                }
1029
1030                if !(original_columns.is_subset(&new_columns)
1031                    || original_columns.is_superset(&new_columns))
1032                {
1033                    tracing::warn!(target: "auto_schema_change",
1034                                    table_id = table.id,
1035                                    cdc_table_id = table.cdc_table_id,
1036                                    upstraem_ddl = table_change.upstream_ddl,
1037                                    original_columns = ?original_columns,
1038                                    new_columns = ?new_columns,
1039                                    "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1040
1041                    let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1042                    add_auto_schema_change_fail_event_log(
1043                        &self.meta_metrics,
1044                        table.id,
1045                        table.name.clone(),
1046                        table_change.cdc_table_id.clone(),
1047                        table_change.upstream_ddl.clone(),
1048                        &self.env.event_log_manager_ref(),
1049                        fail_info,
1050                    );
1051
1052                    return Err(Status::invalid_argument(
1053                        "New columns should be a subset or superset of the original columns (including hidden columns)",
1054                    ));
1055                }
1056                // skip the schema change if there is no change to original columns
1057                if original_columns == new_columns {
1058                    tracing::warn!(target: "auto_schema_change",
1059                                   table_id = table.id,
1060                                   cdc_table_id = table.cdc_table_id,
1061                                   upstraem_ddl = table_change.upstream_ddl,
1062                                    original_columns = ?original_columns,
1063                                    new_columns = ?new_columns,
1064                                   "No change to columns, skipping the schema change");
1065                    continue;
1066                }
1067
1068                let latency_timer = self
1069                    .meta_metrics
1070                    .auto_schema_change_latency
1071                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1072                    .start_timer();
1073                // send a request to the frontend to get the ReplaceJobPlan
1074                // will retry with exponential backoff if the request fails
1075                let resp = client
1076                    .get_table_replace_plan(GetTableReplacePlanRequest {
1077                        database_id: table.database_id,
1078                        owner: table.owner,
1079                        table_name: table.name.clone(),
1080                        table_change: Some(table_change.clone()),
1081                    })
1082                    .await;
1083
1084                match resp {
1085                    Ok(resp) => {
1086                        let resp = resp.into_inner();
1087                        if let Some(plan) = resp.replace_plan {
1088                            let plan = Self::extract_replace_table_info(plan);
1089                            plan.streaming_job.table().inspect(|t| {
1090                                tracing::info!(
1091                                    target: "auto_schema_change",
1092                                    table_id = t.id,
1093                                    cdc_table_id = t.cdc_table_id,
1094                                    upstraem_ddl = table_change.upstream_ddl,
1095                                    "Start the replace config change")
1096                            });
1097                            // start the schema change procedure
1098                            let replace_res = self
1099                                .ddl_controller
1100                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1101                                .await;
1102
1103                            match replace_res {
1104                                Ok(_) => {
1105                                    tracing::info!(
1106                                        target: "auto_schema_change",
1107                                        table_id = table.id,
1108                                        cdc_table_id = table.cdc_table_id,
1109                                        "Table replaced success");
1110
1111                                    self.meta_metrics
1112                                        .auto_schema_change_success_cnt
1113                                        .with_guarded_label_values(&[
1114                                            &table.id.to_string(),
1115                                            &table.name,
1116                                        ])
1117                                        .inc();
1118                                    latency_timer.observe_duration();
1119                                }
1120                                Err(e) => {
1121                                    tracing::error!(
1122                                        target: "auto_schema_change",
1123                                        error = %e.as_report(),
1124                                        table_id = table.id,
1125                                        cdc_table_id = table.cdc_table_id,
1126                                        upstraem_ddl = table_change.upstream_ddl,
1127                                        "failed to replace the table",
1128                                    );
1129                                    let fail_info =
1130                                        format!("failed to replace the table: {}", e.as_report());
1131                                    add_auto_schema_change_fail_event_log(
1132                                        &self.meta_metrics,
1133                                        table.id,
1134                                        table.name.clone(),
1135                                        table_change.cdc_table_id.clone(),
1136                                        table_change.upstream_ddl.clone(),
1137                                        &self.env.event_log_manager_ref(),
1138                                        fail_info,
1139                                    );
1140                                }
1141                            };
1142                        }
1143                    }
1144                    Err(e) => {
1145                        tracing::error!(
1146                            target: "auto_schema_change",
1147                            error = %e.as_report(),
1148                            table_id = table.id,
1149                            cdc_table_id = table.cdc_table_id,
1150                            "failed to get replace table plan",
1151                        );
1152                        let fail_info =
1153                            format!("failed to get replace table plan: {}", e.as_report());
1154                        add_auto_schema_change_fail_event_log(
1155                            &self.meta_metrics,
1156                            table.id,
1157                            table.name.clone(),
1158                            table_change.cdc_table_id.clone(),
1159                            table_change.upstream_ddl.clone(),
1160                            &self.env.event_log_manager_ref(),
1161                            fail_info,
1162                        );
1163                    }
1164                };
1165            }
1166        }
1167
1168        Ok(Response::new(AutoSchemaChangeResponse {}))
1169    }
1170
1171    async fn alter_swap_rename(
1172        &self,
1173        request: Request<AlterSwapRenameRequest>,
1174    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1175        let req = request.into_inner();
1176
1177        let version = self
1178            .ddl_controller
1179            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1180            .await?;
1181
1182        Ok(Response::new(AlterSwapRenameResponse {
1183            status: None,
1184            version,
1185        }))
1186    }
1187
1188    async fn alter_resource_group(
1189        &self,
1190        request: Request<AlterResourceGroupRequest>,
1191    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1192        let req = request.into_inner();
1193
1194        let table_id = req.get_table_id();
1195        let deferred = req.get_deferred();
1196        let resource_group = req.resource_group;
1197
1198        self.ddl_controller
1199            .reschedule_streaming_job(
1200                table_id,
1201                JobRescheduleTarget {
1202                    parallelism: JobParallelismTarget::Refresh,
1203                    resource_group: JobResourceGroupTarget::Update(resource_group),
1204                },
1205                deferred,
1206            )
1207            .await?;
1208
1209        Ok(Response::new(AlterResourceGroupResponse {}))
1210    }
1211
1212    async fn alter_database_param(
1213        &self,
1214        request: Request<AlterDatabaseParamRequest>,
1215    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1216        let req = request.into_inner();
1217        let database_id = req.database_id;
1218
1219        let param = match req.param.unwrap() {
1220            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1221                AlterDatabaseParam::BarrierIntervalMs(value.value)
1222            }
1223            alter_database_param_request::Param::CheckpointFrequency(value) => {
1224                AlterDatabaseParam::CheckpointFrequency(value.value)
1225            }
1226        };
1227        let version = self
1228            .ddl_controller
1229            .run_command(DdlCommand::AlterDatabaseParam(database_id as _, param))
1230            .await?;
1231
1232        return Ok(Response::new(AlterDatabaseParamResponse {
1233            status: None,
1234            version,
1235        }));
1236    }
1237
1238    async fn compact_iceberg_table(
1239        &self,
1240        request: Request<CompactIcebergTableRequest>,
1241    ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1242        let req = request.into_inner();
1243        let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1244
1245        // Trigger manual compaction directly using the sink ID
1246        let task_id = self
1247            .iceberg_compaction_manager
1248            .trigger_manual_compaction(sink_id)
1249            .await
1250            .map_err(|e| {
1251                Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1252            })?;
1253
1254        Ok(Response::new(CompactIcebergTableResponse {
1255            status: None,
1256            task_id,
1257        }))
1258    }
1259
1260    async fn expire_iceberg_table_snapshots(
1261        &self,
1262        request: Request<ExpireIcebergTableSnapshotsRequest>,
1263    ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1264        let req = request.into_inner();
1265        let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1266
1267        // Trigger manual snapshot expiration directly using the sink ID
1268        self.iceberg_compaction_manager
1269            .check_and_expire_snapshots(&sink_id)
1270            .await
1271            .map_err(|e| {
1272                Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1273            })?;
1274
1275        Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1276            status: None,
1277        }))
1278    }
1279}
1280
1281fn add_auto_schema_change_fail_event_log(
1282    meta_metrics: &Arc<MetaMetrics>,
1283    table_id: u32,
1284    table_name: String,
1285    cdc_table_id: String,
1286    upstream_ddl: String,
1287    event_log_manager: &EventLogManagerRef,
1288    fail_info: String,
1289) {
1290    meta_metrics
1291        .auto_schema_change_failure_cnt
1292        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1293        .inc();
1294    let event = event_log::EventAutoSchemaChangeFail {
1295        table_id,
1296        table_name,
1297        cdc_table_id,
1298        upstream_ddl,
1299        fail_info,
1300    };
1301    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1302}