1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use rand::rng as thread_rng;
20use rand::seq::IndexedRandom;
21use replace_job_plan::{ReplaceSource, ReplaceTable};
22use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
23use risingwave_common::types::DataType;
24use risingwave_connector::sink::catalog::SinkId;
25use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
26use risingwave_meta::model::TableParallelism;
27use risingwave_meta::rpc::metrics::MetaMetrics;
28use risingwave_meta::stream::{JobParallelismTarget, JobRescheduleTarget, JobResourceGroupTarget};
29use risingwave_meta_model::ObjectId;
30use risingwave_pb::catalog::connection::Info as ConnectionInfo;
31use risingwave_pb::catalog::{Comment, Connection, CreateType, Secret, Table};
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::common::worker_node::State;
34use risingwave_pb::ddl_service::ddl_service_server::DdlService;
35use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
36use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
37use risingwave_pb::ddl_service::*;
38use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
39use risingwave_pb::meta::event_log;
40use thiserror_ext::AsReport;
41use tonic::{Request, Response, Status};
42
43use crate::MetaError;
44use crate::barrier::BarrierManagerRef;
45use crate::manager::sink_coordination::SinkCoordinatorManager;
46use crate::manager::{MetaSrvEnv, StreamingJob};
47use crate::rpc::ddl_controller::{
48 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
49};
50use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
51
52#[derive(Clone)]
53pub struct DdlServiceImpl {
54 env: MetaSrvEnv,
55
56 metadata_manager: MetadataManager,
57 sink_manager: SinkCoordinatorManager,
58 ddl_controller: DdlController,
59 meta_metrics: Arc<MetaMetrics>,
60}
61
62impl DdlServiceImpl {
63 #[allow(clippy::too_many_arguments)]
64 pub async fn new(
65 env: MetaSrvEnv,
66 metadata_manager: MetadataManager,
67 stream_manager: GlobalStreamManagerRef,
68 source_manager: SourceManagerRef,
69 barrier_manager: BarrierManagerRef,
70 sink_manager: SinkCoordinatorManager,
71 meta_metrics: Arc<MetaMetrics>,
72 ) -> Self {
73 let ddl_controller = DdlController::new(
74 env.clone(),
75 metadata_manager.clone(),
76 stream_manager,
77 source_manager,
78 barrier_manager,
79 )
80 .await;
81 Self {
82 env,
83 metadata_manager,
84 sink_manager,
85 ddl_controller,
86 meta_metrics,
87 }
88 }
89
90 fn extract_replace_table_info(
91 ReplaceJobPlan {
92 fragment_graph,
93 replace_job,
94 }: ReplaceJobPlan,
95 ) -> ReplaceStreamJobInfo {
96 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
97 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
98 table,
99 source,
100 job_type,
101 }) => StreamingJob::Table(
102 source,
103 table.unwrap(),
104 TableJobType::try_from(job_type).unwrap(),
105 ),
106 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
107 StreamingJob::Source(source.unwrap())
108 }
109 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
110 table,
111 }) => StreamingJob::MaterializedView(table.unwrap()),
112 };
113
114 ReplaceStreamJobInfo {
115 streaming_job: replace_streaming_job,
116 fragment_graph: fragment_graph.unwrap(),
117 }
118 }
119}
120
121#[async_trait::async_trait]
122impl DdlService for DdlServiceImpl {
123 async fn create_database(
124 &self,
125 request: Request<CreateDatabaseRequest>,
126 ) -> Result<Response<CreateDatabaseResponse>, Status> {
127 let req = request.into_inner();
128 let database = req.get_db()?.clone();
129 let version = self
130 .ddl_controller
131 .run_command(DdlCommand::CreateDatabase(database))
132 .await?;
133
134 Ok(Response::new(CreateDatabaseResponse {
135 status: None,
136 version,
137 }))
138 }
139
140 async fn drop_database(
141 &self,
142 request: Request<DropDatabaseRequest>,
143 ) -> Result<Response<DropDatabaseResponse>, Status> {
144 let req = request.into_inner();
145 let database_id = req.get_database_id();
146
147 let version = self
148 .ddl_controller
149 .run_command(DdlCommand::DropDatabase(database_id as _))
150 .await?;
151
152 Ok(Response::new(DropDatabaseResponse {
153 status: None,
154 version,
155 }))
156 }
157
158 async fn create_secret(
159 &self,
160 request: Request<CreateSecretRequest>,
161 ) -> Result<Response<CreateSecretResponse>, Status> {
162 let req = request.into_inner();
163 let pb_secret = Secret {
164 id: 0,
165 name: req.get_name().clone(),
166 database_id: req.get_database_id(),
167 value: req.get_value().clone(),
168 owner: req.get_owner_id(),
169 schema_id: req.get_schema_id(),
170 };
171 let version = self
172 .ddl_controller
173 .run_command(DdlCommand::CreateSecret(pb_secret))
174 .await?;
175
176 Ok(Response::new(CreateSecretResponse { version }))
177 }
178
179 async fn drop_secret(
180 &self,
181 request: Request<DropSecretRequest>,
182 ) -> Result<Response<DropSecretResponse>, Status> {
183 let req = request.into_inner();
184 let secret_id = req.get_secret_id();
185 let version = self
186 .ddl_controller
187 .run_command(DdlCommand::DropSecret(secret_id as _))
188 .await?;
189
190 Ok(Response::new(DropSecretResponse { version }))
191 }
192
193 async fn alter_secret(
194 &self,
195 request: Request<AlterSecretRequest>,
196 ) -> Result<Response<AlterSecretResponse>, Status> {
197 let req = request.into_inner();
198 let pb_secret = Secret {
199 id: req.get_secret_id(),
200 name: req.get_name().clone(),
201 database_id: req.get_database_id(),
202 value: req.get_value().clone(),
203 owner: req.get_owner_id(),
204 schema_id: req.get_schema_id(),
205 };
206 let version = self
207 .ddl_controller
208 .run_command(DdlCommand::AlterSecret(pb_secret))
209 .await?;
210
211 Ok(Response::new(AlterSecretResponse { version }))
212 }
213
214 async fn create_schema(
215 &self,
216 request: Request<CreateSchemaRequest>,
217 ) -> Result<Response<CreateSchemaResponse>, Status> {
218 let req = request.into_inner();
219 let schema = req.get_schema()?.clone();
220 let version = self
221 .ddl_controller
222 .run_command(DdlCommand::CreateSchema(schema))
223 .await?;
224
225 Ok(Response::new(CreateSchemaResponse {
226 status: None,
227 version,
228 }))
229 }
230
231 async fn drop_schema(
232 &self,
233 request: Request<DropSchemaRequest>,
234 ) -> Result<Response<DropSchemaResponse>, Status> {
235 let req = request.into_inner();
236 let schema_id = req.get_schema_id();
237 let drop_mode = DropMode::from_request_setting(req.cascade);
238 let version = self
239 .ddl_controller
240 .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
241 .await?;
242 Ok(Response::new(DropSchemaResponse {
243 status: None,
244 version,
245 }))
246 }
247
248 async fn create_source(
249 &self,
250 request: Request<CreateSourceRequest>,
251 ) -> Result<Response<CreateSourceResponse>, Status> {
252 let req = request.into_inner();
253 let source = req.get_source()?.clone();
254
255 match req.fragment_graph {
256 None => {
257 let version = self
258 .ddl_controller
259 .run_command(DdlCommand::CreateNonSharedSource(source))
260 .await?;
261 Ok(Response::new(CreateSourceResponse {
262 status: None,
263 version,
264 }))
265 }
266 Some(fragment_graph) => {
267 let stream_job = StreamingJob::Source(source);
269 let version = self
270 .ddl_controller
271 .run_command(DdlCommand::CreateStreamingJob {
272 stream_job,
273 fragment_graph,
274 create_type: CreateType::Foreground,
275 affected_table_replace_info: None,
276 dependencies: HashSet::new(), specific_resource_group: None,
278 if_not_exists: req.if_not_exists,
279 })
280 .await?;
281 Ok(Response::new(CreateSourceResponse {
282 status: None,
283 version,
284 }))
285 }
286 }
287 }
288
289 async fn drop_source(
290 &self,
291 request: Request<DropSourceRequest>,
292 ) -> Result<Response<DropSourceResponse>, Status> {
293 let request = request.into_inner();
294 let source_id = request.source_id;
295 let drop_mode = DropMode::from_request_setting(request.cascade);
296 let version = self
297 .ddl_controller
298 .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
299 .await?;
300
301 Ok(Response::new(DropSourceResponse {
302 status: None,
303 version,
304 }))
305 }
306
307 async fn create_sink(
308 &self,
309 request: Request<CreateSinkRequest>,
310 ) -> Result<Response<CreateSinkResponse>, Status> {
311 self.env.idle_manager().record_activity();
312
313 let req = request.into_inner();
314
315 let sink = req.get_sink()?.clone();
316 let fragment_graph = req.get_fragment_graph()?.clone();
317 let affected_table_change = req
318 .get_affected_table_change()
319 .cloned()
320 .ok()
321 .map(Self::extract_replace_table_info);
322 let dependencies = req
323 .get_dependencies()
324 .iter()
325 .map(|id| *id as ObjectId)
326 .collect();
327
328 let stream_job = match &affected_table_change {
329 None => StreamingJob::Sink(sink, None),
330 Some(change) => {
331 let (source, table, _) = change
332 .streaming_job
333 .clone()
334 .try_as_table()
335 .expect("must be replace table");
336 StreamingJob::Sink(sink, Some((table, source)))
337 }
338 };
339
340 let command = DdlCommand::CreateStreamingJob {
341 stream_job,
342 fragment_graph,
343 create_type: CreateType::Foreground,
344 affected_table_replace_info: affected_table_change,
345 dependencies,
346 specific_resource_group: None,
347 if_not_exists: req.if_not_exists,
348 };
349
350 let version = self.ddl_controller.run_command(command).await?;
351
352 Ok(Response::new(CreateSinkResponse {
353 status: None,
354 version,
355 }))
356 }
357
358 async fn drop_sink(
359 &self,
360 request: Request<DropSinkRequest>,
361 ) -> Result<Response<DropSinkResponse>, Status> {
362 let request = request.into_inner();
363 let sink_id = request.sink_id;
364 let drop_mode = DropMode::from_request_setting(request.cascade);
365
366 let command = DdlCommand::DropStreamingJob {
367 job_id: StreamingJobId::Sink(sink_id as _),
368 drop_mode,
369 target_replace_info: request
370 .affected_table_change
371 .map(Self::extract_replace_table_info),
372 };
373
374 let version = self.ddl_controller.run_command(command).await?;
375
376 self.sink_manager
377 .stop_sink_coordinator(SinkId::from(sink_id))
378 .await;
379
380 Ok(Response::new(DropSinkResponse {
381 status: None,
382 version,
383 }))
384 }
385
386 async fn create_subscription(
387 &self,
388 request: Request<CreateSubscriptionRequest>,
389 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
390 self.env.idle_manager().record_activity();
391
392 let req = request.into_inner();
393
394 let subscription = req.get_subscription()?.clone();
395 let command = DdlCommand::CreateSubscription(subscription);
396
397 let version = self.ddl_controller.run_command(command).await?;
398
399 Ok(Response::new(CreateSubscriptionResponse {
400 status: None,
401 version,
402 }))
403 }
404
405 async fn drop_subscription(
406 &self,
407 request: Request<DropSubscriptionRequest>,
408 ) -> Result<Response<DropSubscriptionResponse>, Status> {
409 let request = request.into_inner();
410 let subscription_id = request.subscription_id;
411 let drop_mode = DropMode::from_request_setting(request.cascade);
412
413 let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
414
415 let version = self.ddl_controller.run_command(command).await?;
416
417 Ok(Response::new(DropSubscriptionResponse {
418 status: None,
419 version,
420 }))
421 }
422
423 async fn create_materialized_view(
424 &self,
425 request: Request<CreateMaterializedViewRequest>,
426 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
427 self.env.idle_manager().record_activity();
428
429 let req = request.into_inner();
430 let mview = req.get_materialized_view()?.clone();
431 let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
432 let specific_resource_group = req.specific_resource_group.clone();
433 let fragment_graph = req.get_fragment_graph()?.clone();
434 let dependencies = req
435 .get_dependencies()
436 .iter()
437 .map(|id| *id as ObjectId)
438 .collect();
439
440 let stream_job = StreamingJob::MaterializedView(mview);
441 let version = self
442 .ddl_controller
443 .run_command(DdlCommand::CreateStreamingJob {
444 stream_job,
445 fragment_graph,
446 create_type,
447 affected_table_replace_info: None,
448 dependencies,
449 specific_resource_group,
450 if_not_exists: req.if_not_exists,
451 })
452 .await?;
453
454 Ok(Response::new(CreateMaterializedViewResponse {
455 status: None,
456 version,
457 }))
458 }
459
460 async fn drop_materialized_view(
461 &self,
462 request: Request<DropMaterializedViewRequest>,
463 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
464 self.env.idle_manager().record_activity();
465
466 let request = request.into_inner();
467 let table_id = request.table_id;
468 let drop_mode = DropMode::from_request_setting(request.cascade);
469
470 let version = self
471 .ddl_controller
472 .run_command(DdlCommand::DropStreamingJob {
473 job_id: StreamingJobId::MaterializedView(table_id as _),
474 drop_mode,
475 target_replace_info: None,
476 })
477 .await?;
478
479 Ok(Response::new(DropMaterializedViewResponse {
480 status: None,
481 version,
482 }))
483 }
484
485 async fn create_index(
486 &self,
487 request: Request<CreateIndexRequest>,
488 ) -> Result<Response<CreateIndexResponse>, Status> {
489 self.env.idle_manager().record_activity();
490
491 let req = request.into_inner();
492 let index = req.get_index()?.clone();
493 let index_table = req.get_index_table()?.clone();
494 let fragment_graph = req.get_fragment_graph()?.clone();
495
496 let stream_job = StreamingJob::Index(index, index_table);
497 let version = self
498 .ddl_controller
499 .run_command(DdlCommand::CreateStreamingJob {
500 stream_job,
501 fragment_graph,
502 create_type: CreateType::Foreground,
503 affected_table_replace_info: None,
504 dependencies: HashSet::new(),
505 specific_resource_group: None,
506 if_not_exists: req.if_not_exists,
507 })
508 .await?;
509
510 Ok(Response::new(CreateIndexResponse {
511 status: None,
512 version,
513 }))
514 }
515
516 async fn drop_index(
517 &self,
518 request: Request<DropIndexRequest>,
519 ) -> Result<Response<DropIndexResponse>, Status> {
520 self.env.idle_manager().record_activity();
521
522 let request = request.into_inner();
523 let index_id = request.index_id;
524 let drop_mode = DropMode::from_request_setting(request.cascade);
525 let version = self
526 .ddl_controller
527 .run_command(DdlCommand::DropStreamingJob {
528 job_id: StreamingJobId::Index(index_id as _),
529 drop_mode,
530 target_replace_info: None,
531 })
532 .await?;
533
534 Ok(Response::new(DropIndexResponse {
535 status: None,
536 version,
537 }))
538 }
539
540 async fn create_function(
541 &self,
542 request: Request<CreateFunctionRequest>,
543 ) -> Result<Response<CreateFunctionResponse>, Status> {
544 let req = request.into_inner();
545 let function = req.get_function()?.clone();
546
547 let version = self
548 .ddl_controller
549 .run_command(DdlCommand::CreateFunction(function))
550 .await?;
551
552 Ok(Response::new(CreateFunctionResponse {
553 status: None,
554 version,
555 }))
556 }
557
558 async fn drop_function(
559 &self,
560 request: Request<DropFunctionRequest>,
561 ) -> Result<Response<DropFunctionResponse>, Status> {
562 let request = request.into_inner();
563
564 let version = self
565 .ddl_controller
566 .run_command(DdlCommand::DropFunction(request.function_id as _))
567 .await?;
568
569 Ok(Response::new(DropFunctionResponse {
570 status: None,
571 version,
572 }))
573 }
574
575 async fn create_table(
576 &self,
577 request: Request<CreateTableRequest>,
578 ) -> Result<Response<CreateTableResponse>, Status> {
579 let request = request.into_inner();
580 let job_type = request.get_job_type().unwrap_or_default();
581 let source = request.source;
582 let mview = request.materialized_view.unwrap();
583 let fragment_graph = request.fragment_graph.unwrap();
584
585 let stream_job = StreamingJob::Table(source, mview, job_type);
586 let version = self
587 .ddl_controller
588 .run_command(DdlCommand::CreateStreamingJob {
589 stream_job,
590 fragment_graph,
591 create_type: CreateType::Foreground,
592 affected_table_replace_info: None,
593 dependencies: HashSet::new(), specific_resource_group: None,
595 if_not_exists: request.if_not_exists,
596 })
597 .await?;
598
599 Ok(Response::new(CreateTableResponse {
600 status: None,
601 version,
602 }))
603 }
604
605 async fn drop_table(
606 &self,
607 request: Request<DropTableRequest>,
608 ) -> Result<Response<DropTableResponse>, Status> {
609 let request = request.into_inner();
610 let source_id = request.source_id;
611 let table_id = request.table_id;
612
613 let drop_mode = DropMode::from_request_setting(request.cascade);
614 let version = self
615 .ddl_controller
616 .run_command(DdlCommand::DropStreamingJob {
617 job_id: StreamingJobId::Table(
618 source_id.map(|PbSourceId::Id(id)| id as _),
619 table_id as _,
620 ),
621 drop_mode,
622 target_replace_info: None,
623 })
624 .await?;
625
626 Ok(Response::new(DropTableResponse {
627 status: None,
628 version,
629 }))
630 }
631
632 async fn create_view(
633 &self,
634 request: Request<CreateViewRequest>,
635 ) -> Result<Response<CreateViewResponse>, Status> {
636 let req = request.into_inner();
637 let view = req.get_view()?.clone();
638
639 let version = self
640 .ddl_controller
641 .run_command(DdlCommand::CreateView(view))
642 .await?;
643
644 Ok(Response::new(CreateViewResponse {
645 status: None,
646 version,
647 }))
648 }
649
650 async fn drop_view(
651 &self,
652 request: Request<DropViewRequest>,
653 ) -> Result<Response<DropViewResponse>, Status> {
654 let request = request.into_inner();
655 let view_id = request.get_view_id();
656 let drop_mode = DropMode::from_request_setting(request.cascade);
657 let version = self
658 .ddl_controller
659 .run_command(DdlCommand::DropView(view_id as _, drop_mode))
660 .await?;
661 Ok(Response::new(DropViewResponse {
662 status: None,
663 version,
664 }))
665 }
666
667 async fn risectl_list_state_tables(
668 &self,
669 _request: Request<RisectlListStateTablesRequest>,
670 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
671 let tables = self
672 .metadata_manager
673 .catalog_controller
674 .list_all_state_tables()
675 .await?;
676 Ok(Response::new(RisectlListStateTablesResponse { tables }))
677 }
678
679 async fn replace_job_plan(
680 &self,
681 request: Request<ReplaceJobPlanRequest>,
682 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
683 let req = request.into_inner().get_plan().cloned()?;
684
685 let version = self
686 .ddl_controller
687 .run_command(DdlCommand::ReplaceStreamJob(
688 Self::extract_replace_table_info(req),
689 ))
690 .await?;
691
692 Ok(Response::new(ReplaceJobPlanResponse {
693 status: None,
694 version,
695 }))
696 }
697
698 async fn get_table(
699 &self,
700 request: Request<GetTableRequest>,
701 ) -> Result<Response<GetTableResponse>, Status> {
702 let req = request.into_inner();
703 let table = self
704 .metadata_manager
705 .catalog_controller
706 .get_table_by_name(&req.database_name, &req.table_name)
707 .await?;
708
709 Ok(Response::new(GetTableResponse { table }))
710 }
711
712 async fn alter_name(
713 &self,
714 request: Request<AlterNameRequest>,
715 ) -> Result<Response<AlterNameResponse>, Status> {
716 let AlterNameRequest { object, new_name } = request.into_inner();
717 let version = self
718 .ddl_controller
719 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
720 .await?;
721 Ok(Response::new(AlterNameResponse {
722 status: None,
723 version,
724 }))
725 }
726
727 async fn alter_source(
729 &self,
730 request: Request<AlterSourceRequest>,
731 ) -> Result<Response<AlterSourceResponse>, Status> {
732 let AlterSourceRequest { source } = request.into_inner();
733 let version = self
734 .ddl_controller
735 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
736 .await?;
737 Ok(Response::new(AlterSourceResponse {
738 status: None,
739 version,
740 }))
741 }
742
743 async fn alter_owner(
744 &self,
745 request: Request<AlterOwnerRequest>,
746 ) -> Result<Response<AlterOwnerResponse>, Status> {
747 let AlterOwnerRequest { object, owner_id } = request.into_inner();
748 let version = self
749 .ddl_controller
750 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
751 .await?;
752 Ok(Response::new(AlterOwnerResponse {
753 status: None,
754 version,
755 }))
756 }
757
758 async fn alter_set_schema(
759 &self,
760 request: Request<AlterSetSchemaRequest>,
761 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
762 let AlterSetSchemaRequest {
763 object,
764 new_schema_id,
765 } = request.into_inner();
766 let version = self
767 .ddl_controller
768 .run_command(DdlCommand::AlterSetSchema(
769 object.unwrap(),
770 new_schema_id as _,
771 ))
772 .await?;
773 Ok(Response::new(AlterSetSchemaResponse {
774 status: None,
775 version,
776 }))
777 }
778
779 async fn get_ddl_progress(
780 &self,
781 _request: Request<GetDdlProgressRequest>,
782 ) -> Result<Response<GetDdlProgressResponse>, Status> {
783 Ok(Response::new(GetDdlProgressResponse {
784 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
785 }))
786 }
787
788 async fn create_connection(
789 &self,
790 request: Request<CreateConnectionRequest>,
791 ) -> Result<Response<CreateConnectionResponse>, Status> {
792 let req = request.into_inner();
793 if req.payload.is_none() {
794 return Err(Status::invalid_argument("request is empty"));
795 }
796
797 match req.payload.unwrap() {
798 create_connection_request::Payload::PrivateLink(_) => {
799 panic!("Private Link Connection has been deprecated")
800 }
801 create_connection_request::Payload::ConnectionParams(params) => {
802 let pb_connection = Connection {
803 id: 0,
804 schema_id: req.schema_id,
805 database_id: req.database_id,
806 name: req.name,
807 info: Some(ConnectionInfo::ConnectionParams(params)),
808 owner: req.owner_id,
809 };
810 let version = self
811 .ddl_controller
812 .run_command(DdlCommand::CreateConnection(pb_connection))
813 .await?;
814 Ok(Response::new(CreateConnectionResponse { version }))
815 }
816 }
817 }
818
819 async fn list_connections(
820 &self,
821 _request: Request<ListConnectionsRequest>,
822 ) -> Result<Response<ListConnectionsResponse>, Status> {
823 let conns = self
824 .metadata_manager
825 .catalog_controller
826 .list_connections()
827 .await?;
828
829 Ok(Response::new(ListConnectionsResponse {
830 connections: conns,
831 }))
832 }
833
834 async fn drop_connection(
835 &self,
836 request: Request<DropConnectionRequest>,
837 ) -> Result<Response<DropConnectionResponse>, Status> {
838 let req = request.into_inner();
839
840 let version = self
841 .ddl_controller
842 .run_command(DdlCommand::DropConnection(req.connection_id as _))
843 .await?;
844
845 Ok(Response::new(DropConnectionResponse {
846 status: None,
847 version,
848 }))
849 }
850
851 async fn comment_on(
852 &self,
853 request: Request<CommentOnRequest>,
854 ) -> Result<Response<CommentOnResponse>, Status> {
855 let req = request.into_inner();
856 let comment = req.get_comment()?.clone();
857
858 let version = self
859 .ddl_controller
860 .run_command(DdlCommand::CommentOn(Comment {
861 table_id: comment.table_id,
862 schema_id: comment.schema_id,
863 database_id: comment.database_id,
864 column_index: comment.column_index,
865 description: comment.description,
866 }))
867 .await?;
868
869 Ok(Response::new(CommentOnResponse {
870 status: None,
871 version,
872 }))
873 }
874
875 async fn get_tables(
876 &self,
877 request: Request<GetTablesRequest>,
878 ) -> Result<Response<GetTablesResponse>, Status> {
879 let GetTablesRequest {
880 table_ids,
881 include_dropped_tables,
882 } = request.into_inner();
883 let ret = self
884 .metadata_manager
885 .catalog_controller
886 .get_table_by_ids(
887 table_ids.into_iter().map(|id| id as _).collect(),
888 include_dropped_tables,
889 )
890 .await?;
891
892 let mut tables = HashMap::default();
893 for table in ret {
894 tables.insert(table.id, table);
895 }
896 Ok(Response::new(GetTablesResponse { tables }))
897 }
898
899 async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
900 self.ddl_controller.wait().await?;
901 Ok(Response::new(WaitResponse {}))
902 }
903
904 async fn alter_parallelism(
905 &self,
906 request: Request<AlterParallelismRequest>,
907 ) -> Result<Response<AlterParallelismResponse>, Status> {
908 let req = request.into_inner();
909
910 let job_id = req.get_table_id();
911 let parallelism = *req.get_parallelism()?;
912 let deferred = req.get_deferred();
913 self.ddl_controller
914 .reschedule_streaming_job(
915 job_id,
916 JobRescheduleTarget {
917 parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
918 resource_group: JobResourceGroupTarget::Keep,
919 },
920 deferred,
921 )
922 .await?;
923
924 Ok(Response::new(AlterParallelismResponse {}))
925 }
926
927 async fn auto_schema_change(
930 &self,
931 request: Request<AutoSchemaChangeRequest>,
932 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
933 let req = request.into_inner();
934
935 let workers = self
937 .metadata_manager
938 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
939 .await?;
940 let worker = workers
941 .choose(&mut thread_rng())
942 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
943
944 let client = self
945 .env
946 .frontend_client_pool()
947 .get(worker)
948 .await
949 .map_err(MetaError::from)?;
950
951 let Some(schema_change) = req.schema_change else {
952 return Err(Status::invalid_argument(
953 "schema change message is required",
954 ));
955 };
956
957 for table_change in schema_change.table_changes {
958 for c in &table_change.columns {
959 let c = ColumnCatalog::from(c.clone());
960
961 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
962 tracing::warn!(target: "auto_schema_change",
963 cdc_table_id = table_change.cdc_table_id,
964 upstraem_ddl = table_change.upstream_ddl,
965 "invalid column type from cdc table change");
966 Err(Status::invalid_argument(format!(
967 "invalid column type: {} from cdc table change, column: {:?}",
968 column_type, c
969 )))
970 };
971 if c.is_generated() {
972 return invalid_col_type("generated column", &c);
973 }
974 if c.is_rw_sys_column() {
975 return invalid_col_type("rw system column", &c);
976 }
977 if c.is_hidden {
978 return invalid_col_type("hidden column", &c);
979 }
980 }
981
982 let tables: Vec<Table> = self
984 .metadata_manager
985 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
986 .await?;
987
988 for table in tables {
989 let original_columns: HashSet<(String, DataType)> =
992 HashSet::from_iter(table.columns.iter().filter_map(|col| {
993 let col = ColumnCatalog::from(col.clone());
994 let data_type = col.data_type().clone();
995 if col.is_generated() {
996 None
997 } else {
998 Some((col.column_desc.name, data_type))
999 }
1000 }));
1001 let new_columns: HashSet<(String, DataType)> =
1002 HashSet::from_iter(table_change.columns.iter().map(|col| {
1003 let col = ColumnCatalog::from(col.clone());
1004 let data_type = col.data_type().clone();
1005 (col.column_desc.name, data_type)
1006 }));
1007
1008 if !(original_columns.is_subset(&new_columns)
1009 || original_columns.is_superset(&new_columns))
1010 {
1011 tracing::warn!(target: "auto_schema_change",
1012 table_id = table.id,
1013 cdc_table_id = table.cdc_table_id,
1014 upstraem_ddl = table_change.upstream_ddl,
1015 original_columns = ?original_columns,
1016 new_columns = ?new_columns,
1017 "New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
1018 return Err(Status::invalid_argument(
1019 "New columns should be a subset or superset of the original columns",
1020 ));
1021 }
1022 if original_columns == new_columns {
1024 tracing::warn!(target: "auto_schema_change",
1025 table_id = table.id,
1026 cdc_table_id = table.cdc_table_id,
1027 upstraem_ddl = table_change.upstream_ddl,
1028 original_columns = ?original_columns,
1029 new_columns = ?new_columns,
1030 "No change to columns, skipping the schema change");
1031 continue;
1032 }
1033
1034 let latency_timer = self
1035 .meta_metrics
1036 .auto_schema_change_latency
1037 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1038 .start_timer();
1039 let resp = client
1042 .get_table_replace_plan(GetTableReplacePlanRequest {
1043 database_id: table.database_id,
1044 owner: table.owner,
1045 table_name: table.name.clone(),
1046 table_change: Some(table_change.clone()),
1047 })
1048 .await;
1049
1050 match resp {
1051 Ok(resp) => {
1052 let resp = resp.into_inner();
1053 if let Some(plan) = resp.replace_plan {
1054 let plan = Self::extract_replace_table_info(plan);
1055 plan.streaming_job.table().inspect(|t| {
1056 tracing::info!(
1057 target: "auto_schema_change",
1058 table_id = t.id,
1059 cdc_table_id = t.cdc_table_id,
1060 upstraem_ddl = table_change.upstream_ddl,
1061 "Start the replace config change")
1062 });
1063 let replace_res = self
1065 .ddl_controller
1066 .run_command(DdlCommand::ReplaceStreamJob(plan))
1067 .await;
1068
1069 match replace_res {
1070 Ok(_) => {
1071 tracing::info!(
1072 target: "auto_schema_change",
1073 table_id = table.id,
1074 cdc_table_id = table.cdc_table_id,
1075 "Table replaced success");
1076
1077 self.meta_metrics
1078 .auto_schema_change_success_cnt
1079 .with_guarded_label_values(&[
1080 &table.id.to_string(),
1081 &table.name,
1082 ])
1083 .inc();
1084 latency_timer.observe_duration();
1085 }
1086 Err(e) => {
1087 tracing::error!(
1088 target: "auto_schema_change",
1089 error = %e.as_report(),
1090 table_id = table.id,
1091 cdc_table_id = table.cdc_table_id,
1092 upstraem_ddl = table_change.upstream_ddl,
1093 "failed to replace the table",
1094 );
1095 add_auto_schema_change_fail_event_log(
1096 &self.meta_metrics,
1097 table.id,
1098 table.name.clone(),
1099 table_change.cdc_table_id.clone(),
1100 table_change.upstream_ddl.clone(),
1101 &self.env.event_log_manager_ref(),
1102 );
1103 }
1104 };
1105 }
1106 }
1107 Err(e) => {
1108 tracing::error!(
1109 target: "auto_schema_change",
1110 error = %e.as_report(),
1111 table_id = table.id,
1112 cdc_table_id = table.cdc_table_id,
1113 "failed to get replace table plan",
1114 );
1115 add_auto_schema_change_fail_event_log(
1116 &self.meta_metrics,
1117 table.id,
1118 table.name.clone(),
1119 table_change.cdc_table_id.clone(),
1120 table_change.upstream_ddl.clone(),
1121 &self.env.event_log_manager_ref(),
1122 );
1123 }
1124 };
1125 }
1126 }
1127
1128 Ok(Response::new(AutoSchemaChangeResponse {}))
1129 }
1130
1131 async fn alter_swap_rename(
1132 &self,
1133 request: Request<AlterSwapRenameRequest>,
1134 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1135 let req = request.into_inner();
1136
1137 let version = self
1138 .ddl_controller
1139 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1140 .await?;
1141
1142 Ok(Response::new(AlterSwapRenameResponse {
1143 status: None,
1144 version,
1145 }))
1146 }
1147
1148 async fn alter_resource_group(
1149 &self,
1150 request: Request<AlterResourceGroupRequest>,
1151 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1152 let req = request.into_inner();
1153
1154 let table_id = req.get_table_id();
1155 let deferred = req.get_deferred();
1156 let resource_group = req.resource_group;
1157
1158 self.ddl_controller
1159 .reschedule_streaming_job(
1160 table_id,
1161 JobRescheduleTarget {
1162 parallelism: JobParallelismTarget::Refresh,
1163 resource_group: JobResourceGroupTarget::Update(resource_group),
1164 },
1165 deferred,
1166 )
1167 .await?;
1168
1169 Ok(Response::new(AlterResourceGroupResponse {}))
1170 }
1171
1172 async fn alter_database_param(
1173 &self,
1174 request: Request<AlterDatabaseParamRequest>,
1175 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1176 let req = request.into_inner();
1177 let database_id = req.database_id;
1178
1179 let param = match req.param.unwrap() {
1180 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1181 AlterDatabaseParam::BarrierIntervalMs(value.value)
1182 }
1183 alter_database_param_request::Param::CheckpointFrequency(value) => {
1184 AlterDatabaseParam::CheckpointFrequency(value.value)
1185 }
1186 };
1187 let version = self
1188 .ddl_controller
1189 .run_command(DdlCommand::AlterDatabaseParam(database_id as _, param))
1190 .await?;
1191
1192 return Ok(Response::new(AlterDatabaseParamResponse {
1193 status: None,
1194 version,
1195 }));
1196 }
1197}
1198
1199fn add_auto_schema_change_fail_event_log(
1200 meta_metrics: &Arc<MetaMetrics>,
1201 table_id: u32,
1202 table_name: String,
1203 cdc_table_id: String,
1204 upstream_ddl: String,
1205 event_log_manager: &EventLogManagerRef,
1206) {
1207 meta_metrics
1208 .auto_schema_change_failure_cnt
1209 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1210 .inc();
1211 let event = event_log::EventAutoSchemaChangeFail {
1212 table_id,
1213 table_name,
1214 cdc_table_id,
1215 upstream_ddl,
1216 };
1217 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1218}