1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use rand::rng as thread_rng;
20use rand::seq::IndexedRandom;
21use replace_job_plan::{ReplaceSource, ReplaceTable};
22use risingwave_common::catalog::ColumnCatalog;
23use risingwave_common::types::DataType;
24use risingwave_common::util::column_index_mapping::ColIndexMapping;
25use risingwave_connector::sink::catalog::SinkId;
26use risingwave_meta::manager::{EventLogManagerRef, MetadataManager};
27use risingwave_meta::model::TableParallelism;
28use risingwave_meta::rpc::metrics::MetaMetrics;
29use risingwave_meta::stream::{JobParallelismTarget, JobRescheduleTarget, JobResourceGroupTarget};
30use risingwave_meta_model::ObjectId;
31use risingwave_pb::catalog::connection::Info as ConnectionInfo;
32use risingwave_pb::catalog::{Comment, Connection, CreateType, Secret, Table};
33use risingwave_pb::common::WorkerType;
34use risingwave_pb::common::worker_node::State;
35use risingwave_pb::ddl_service::ddl_service_server::DdlService;
36use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
37use risingwave_pb::ddl_service::*;
38use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
39use risingwave_pb::meta::event_log;
40use thiserror_ext::AsReport;
41use tonic::{Request, Response, Status};
42
43use crate::MetaError;
44use crate::barrier::BarrierManagerRef;
45use crate::manager::sink_coordination::SinkCoordinatorManager;
46use crate::manager::{MetaSrvEnv, StreamingJob};
47use crate::rpc::ddl_controller::{
48 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
49};
50use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
51
52#[derive(Clone)]
53pub struct DdlServiceImpl {
54 env: MetaSrvEnv,
55
56 metadata_manager: MetadataManager,
57 sink_manager: SinkCoordinatorManager,
58 ddl_controller: DdlController,
59 meta_metrics: Arc<MetaMetrics>,
60}
61
62impl DdlServiceImpl {
63 #[allow(clippy::too_many_arguments)]
64 pub async fn new(
65 env: MetaSrvEnv,
66 metadata_manager: MetadataManager,
67 stream_manager: GlobalStreamManagerRef,
68 source_manager: SourceManagerRef,
69 barrier_manager: BarrierManagerRef,
70 sink_manager: SinkCoordinatorManager,
71 meta_metrics: Arc<MetaMetrics>,
72 ) -> Self {
73 let ddl_controller = DdlController::new(
74 env.clone(),
75 metadata_manager.clone(),
76 stream_manager,
77 source_manager,
78 barrier_manager,
79 )
80 .await;
81 Self {
82 env,
83 metadata_manager,
84 sink_manager,
85 ddl_controller,
86 meta_metrics,
87 }
88 }
89
90 fn extract_replace_table_info(
91 ReplaceJobPlan {
92 fragment_graph,
93 table_col_index_mapping,
94 replace_job,
95 }: ReplaceJobPlan,
96 ) -> ReplaceStreamJobInfo {
97 let col_index_mapping = table_col_index_mapping
98 .as_ref()
99 .map(ColIndexMapping::from_protobuf);
100
101 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
102 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
103 table,
104 source,
105 job_type,
106 }) => StreamingJob::Table(
107 source,
108 table.unwrap(),
109 TableJobType::try_from(job_type).unwrap(),
110 ),
111 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
112 StreamingJob::Source(source.unwrap())
113 }
114 };
115
116 ReplaceStreamJobInfo {
117 streaming_job: replace_streaming_job,
118 fragment_graph: fragment_graph.unwrap(),
119 col_index_mapping,
120 }
121 }
122}
123
124#[async_trait::async_trait]
125impl DdlService for DdlServiceImpl {
126 async fn create_database(
127 &self,
128 request: Request<CreateDatabaseRequest>,
129 ) -> Result<Response<CreateDatabaseResponse>, Status> {
130 let req = request.into_inner();
131 let database = req.get_db()?.clone();
132 let version = self
133 .ddl_controller
134 .run_command(DdlCommand::CreateDatabase(database))
135 .await?;
136
137 Ok(Response::new(CreateDatabaseResponse {
138 status: None,
139 version,
140 }))
141 }
142
143 async fn drop_database(
144 &self,
145 request: Request<DropDatabaseRequest>,
146 ) -> Result<Response<DropDatabaseResponse>, Status> {
147 let req = request.into_inner();
148 let database_id = req.get_database_id();
149
150 let version = self
151 .ddl_controller
152 .run_command(DdlCommand::DropDatabase(database_id as _))
153 .await?;
154
155 Ok(Response::new(DropDatabaseResponse {
156 status: None,
157 version,
158 }))
159 }
160
161 async fn create_secret(
162 &self,
163 request: Request<CreateSecretRequest>,
164 ) -> Result<Response<CreateSecretResponse>, Status> {
165 let req = request.into_inner();
166 let pb_secret = Secret {
167 id: 0,
168 name: req.get_name().clone(),
169 database_id: req.get_database_id(),
170 value: req.get_value().clone(),
171 owner: req.get_owner_id(),
172 schema_id: req.get_schema_id(),
173 };
174 let version = self
175 .ddl_controller
176 .run_command(DdlCommand::CreateSecret(pb_secret))
177 .await?;
178
179 Ok(Response::new(CreateSecretResponse { version }))
180 }
181
182 async fn drop_secret(
183 &self,
184 request: Request<DropSecretRequest>,
185 ) -> Result<Response<DropSecretResponse>, Status> {
186 let req = request.into_inner();
187 let secret_id = req.get_secret_id();
188 let version = self
189 .ddl_controller
190 .run_command(DdlCommand::DropSecret(secret_id as _))
191 .await?;
192
193 Ok(Response::new(DropSecretResponse { version }))
194 }
195
196 async fn alter_secret(
197 &self,
198 request: Request<AlterSecretRequest>,
199 ) -> Result<Response<AlterSecretResponse>, Status> {
200 let req = request.into_inner();
201 let pb_secret = Secret {
202 id: req.get_secret_id(),
203 name: req.get_name().clone(),
204 database_id: req.get_database_id(),
205 value: req.get_value().clone(),
206 owner: req.get_owner_id(),
207 schema_id: req.get_schema_id(),
208 };
209 let version = self
210 .ddl_controller
211 .run_command(DdlCommand::AlterSecret(pb_secret))
212 .await?;
213
214 Ok(Response::new(AlterSecretResponse { version }))
215 }
216
217 async fn create_schema(
218 &self,
219 request: Request<CreateSchemaRequest>,
220 ) -> Result<Response<CreateSchemaResponse>, Status> {
221 let req = request.into_inner();
222 let schema = req.get_schema()?.clone();
223 let version = self
224 .ddl_controller
225 .run_command(DdlCommand::CreateSchema(schema))
226 .await?;
227
228 Ok(Response::new(CreateSchemaResponse {
229 status: None,
230 version,
231 }))
232 }
233
234 async fn drop_schema(
235 &self,
236 request: Request<DropSchemaRequest>,
237 ) -> Result<Response<DropSchemaResponse>, Status> {
238 let req = request.into_inner();
239 let schema_id = req.get_schema_id();
240 let drop_mode = DropMode::from_request_setting(req.cascade);
241 let version = self
242 .ddl_controller
243 .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
244 .await?;
245 Ok(Response::new(DropSchemaResponse {
246 status: None,
247 version,
248 }))
249 }
250
251 async fn create_source(
252 &self,
253 request: Request<CreateSourceRequest>,
254 ) -> Result<Response<CreateSourceResponse>, Status> {
255 let req = request.into_inner();
256 let source = req.get_source()?.clone();
257
258 match req.fragment_graph {
259 None => {
260 let version = self
261 .ddl_controller
262 .run_command(DdlCommand::CreateNonSharedSource(source))
263 .await?;
264 Ok(Response::new(CreateSourceResponse {
265 status: None,
266 version,
267 }))
268 }
269 Some(fragment_graph) => {
270 let stream_job = StreamingJob::Source(source);
272 let version = self
273 .ddl_controller
274 .run_command(DdlCommand::CreateStreamingJob(
275 stream_job,
276 fragment_graph,
277 CreateType::Foreground,
278 None,
279 HashSet::new(), None,
281 ))
282 .await?;
283 Ok(Response::new(CreateSourceResponse {
284 status: None,
285 version,
286 }))
287 }
288 }
289 }
290
291 async fn drop_source(
292 &self,
293 request: Request<DropSourceRequest>,
294 ) -> Result<Response<DropSourceResponse>, Status> {
295 let request = request.into_inner();
296 let source_id = request.source_id;
297 let drop_mode = DropMode::from_request_setting(request.cascade);
298 let version = self
299 .ddl_controller
300 .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
301 .await?;
302
303 Ok(Response::new(DropSourceResponse {
304 status: None,
305 version,
306 }))
307 }
308
309 async fn create_sink(
310 &self,
311 request: Request<CreateSinkRequest>,
312 ) -> Result<Response<CreateSinkResponse>, Status> {
313 self.env.idle_manager().record_activity();
314
315 let req = request.into_inner();
316
317 let sink = req.get_sink()?.clone();
318 let fragment_graph = req.get_fragment_graph()?.clone();
319 let affected_table_change = req
320 .get_affected_table_change()
321 .cloned()
322 .ok()
323 .map(Self::extract_replace_table_info);
324 let dependencies = req
325 .get_dependencies()
326 .iter()
327 .map(|id| *id as ObjectId)
328 .collect();
329
330 let stream_job = match &affected_table_change {
331 None => StreamingJob::Sink(sink, None),
332 Some(change) => {
333 let (source, table, _) = change
334 .streaming_job
335 .clone()
336 .try_as_table()
337 .expect("must be replace table");
338 StreamingJob::Sink(sink, Some((table, source)))
339 }
340 };
341
342 let command = DdlCommand::CreateStreamingJob(
343 stream_job,
344 fragment_graph,
345 CreateType::Foreground,
346 affected_table_change,
347 dependencies,
348 None,
349 );
350
351 let version = self.ddl_controller.run_command(command).await?;
352
353 Ok(Response::new(CreateSinkResponse {
354 status: None,
355 version,
356 }))
357 }
358
359 async fn drop_sink(
360 &self,
361 request: Request<DropSinkRequest>,
362 ) -> Result<Response<DropSinkResponse>, Status> {
363 let request = request.into_inner();
364 let sink_id = request.sink_id;
365 let drop_mode = DropMode::from_request_setting(request.cascade);
366
367 let command = DdlCommand::DropStreamingJob(
368 StreamingJobId::Sink(sink_id as _),
369 drop_mode,
370 request
371 .affected_table_change
372 .map(Self::extract_replace_table_info),
373 );
374
375 let version = self.ddl_controller.run_command(command).await?;
376
377 self.sink_manager
378 .stop_sink_coordinator(SinkId::from(sink_id))
379 .await;
380
381 Ok(Response::new(DropSinkResponse {
382 status: None,
383 version,
384 }))
385 }
386
387 async fn create_subscription(
388 &self,
389 request: Request<CreateSubscriptionRequest>,
390 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
391 self.env.idle_manager().record_activity();
392
393 let req = request.into_inner();
394
395 let subscription = req.get_subscription()?.clone();
396 let command = DdlCommand::CreateSubscription(subscription);
397
398 let version = self.ddl_controller.run_command(command).await?;
399
400 Ok(Response::new(CreateSubscriptionResponse {
401 status: None,
402 version,
403 }))
404 }
405
406 async fn drop_subscription(
407 &self,
408 request: Request<DropSubscriptionRequest>,
409 ) -> Result<Response<DropSubscriptionResponse>, Status> {
410 let request = request.into_inner();
411 let subscription_id = request.subscription_id;
412 let drop_mode = DropMode::from_request_setting(request.cascade);
413
414 let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
415
416 let version = self.ddl_controller.run_command(command).await?;
417
418 Ok(Response::new(DropSubscriptionResponse {
419 status: None,
420 version,
421 }))
422 }
423
424 async fn create_materialized_view(
425 &self,
426 request: Request<CreateMaterializedViewRequest>,
427 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
428 self.env.idle_manager().record_activity();
429
430 let req = request.into_inner();
431 let mview = req.get_materialized_view()?.clone();
432 let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
433 let specific_resource_group = req.specific_resource_group.clone();
434 let fragment_graph = req.get_fragment_graph()?.clone();
435 let dependencies = req
436 .get_dependencies()
437 .iter()
438 .map(|id| *id as ObjectId)
439 .collect();
440
441 let stream_job = StreamingJob::MaterializedView(mview);
442 let version = self
443 .ddl_controller
444 .run_command(DdlCommand::CreateStreamingJob(
445 stream_job,
446 fragment_graph,
447 create_type,
448 None,
449 dependencies,
450 specific_resource_group,
451 ))
452 .await?;
453
454 Ok(Response::new(CreateMaterializedViewResponse {
455 status: None,
456 version,
457 }))
458 }
459
460 async fn drop_materialized_view(
461 &self,
462 request: Request<DropMaterializedViewRequest>,
463 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
464 self.env.idle_manager().record_activity();
465
466 let request = request.into_inner();
467 let table_id = request.table_id;
468 let drop_mode = DropMode::from_request_setting(request.cascade);
469
470 let version = self
471 .ddl_controller
472 .run_command(DdlCommand::DropStreamingJob(
473 StreamingJobId::MaterializedView(table_id as _),
474 drop_mode,
475 None,
476 ))
477 .await?;
478
479 Ok(Response::new(DropMaterializedViewResponse {
480 status: None,
481 version,
482 }))
483 }
484
485 async fn create_index(
486 &self,
487 request: Request<CreateIndexRequest>,
488 ) -> Result<Response<CreateIndexResponse>, Status> {
489 self.env.idle_manager().record_activity();
490
491 let req = request.into_inner();
492 let index = req.get_index()?.clone();
493 let index_table = req.get_index_table()?.clone();
494 let fragment_graph = req.get_fragment_graph()?.clone();
495
496 let stream_job = StreamingJob::Index(index, index_table);
497 let version = self
498 .ddl_controller
499 .run_command(DdlCommand::CreateStreamingJob(
500 stream_job,
501 fragment_graph,
502 CreateType::Foreground,
503 None,
504 HashSet::new(),
505 None,
506 ))
507 .await?;
508
509 Ok(Response::new(CreateIndexResponse {
510 status: None,
511 version,
512 }))
513 }
514
515 async fn drop_index(
516 &self,
517 request: Request<DropIndexRequest>,
518 ) -> Result<Response<DropIndexResponse>, Status> {
519 self.env.idle_manager().record_activity();
520
521 let request = request.into_inner();
522 let index_id = request.index_id;
523 let drop_mode = DropMode::from_request_setting(request.cascade);
524 let version = self
525 .ddl_controller
526 .run_command(DdlCommand::DropStreamingJob(
527 StreamingJobId::Index(index_id as _),
528 drop_mode,
529 None,
530 ))
531 .await?;
532
533 Ok(Response::new(DropIndexResponse {
534 status: None,
535 version,
536 }))
537 }
538
539 async fn create_function(
540 &self,
541 request: Request<CreateFunctionRequest>,
542 ) -> Result<Response<CreateFunctionResponse>, Status> {
543 let req = request.into_inner();
544 let function = req.get_function()?.clone();
545
546 let version = self
547 .ddl_controller
548 .run_command(DdlCommand::CreateFunction(function))
549 .await?;
550
551 Ok(Response::new(CreateFunctionResponse {
552 status: None,
553 version,
554 }))
555 }
556
557 async fn drop_function(
558 &self,
559 request: Request<DropFunctionRequest>,
560 ) -> Result<Response<DropFunctionResponse>, Status> {
561 let request = request.into_inner();
562
563 let version = self
564 .ddl_controller
565 .run_command(DdlCommand::DropFunction(request.function_id as _))
566 .await?;
567
568 Ok(Response::new(DropFunctionResponse {
569 status: None,
570 version,
571 }))
572 }
573
574 async fn create_table(
575 &self,
576 request: Request<CreateTableRequest>,
577 ) -> Result<Response<CreateTableResponse>, Status> {
578 let request = request.into_inner();
579 let job_type = request.get_job_type().unwrap_or_default();
580 let source = request.source;
581 let mview = request.materialized_view.unwrap();
582 let fragment_graph = request.fragment_graph.unwrap();
583
584 let stream_job = StreamingJob::Table(source, mview, job_type);
585 let version = self
586 .ddl_controller
587 .run_command(DdlCommand::CreateStreamingJob(
588 stream_job,
589 fragment_graph,
590 CreateType::Foreground,
591 None,
592 HashSet::new(), None,
594 ))
595 .await?;
596
597 Ok(Response::new(CreateTableResponse {
598 status: None,
599 version,
600 }))
601 }
602
603 async fn drop_table(
604 &self,
605 request: Request<DropTableRequest>,
606 ) -> Result<Response<DropTableResponse>, Status> {
607 let request = request.into_inner();
608 let source_id = request.source_id;
609 let table_id = request.table_id;
610
611 let drop_mode = DropMode::from_request_setting(request.cascade);
612 let version = self
613 .ddl_controller
614 .run_command(DdlCommand::DropStreamingJob(
615 StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id as _), table_id as _),
616 drop_mode,
617 None,
618 ))
619 .await?;
620
621 Ok(Response::new(DropTableResponse {
622 status: None,
623 version,
624 }))
625 }
626
627 async fn create_view(
628 &self,
629 request: Request<CreateViewRequest>,
630 ) -> Result<Response<CreateViewResponse>, Status> {
631 let req = request.into_inner();
632 let view = req.get_view()?.clone();
633
634 let version = self
635 .ddl_controller
636 .run_command(DdlCommand::CreateView(view))
637 .await?;
638
639 Ok(Response::new(CreateViewResponse {
640 status: None,
641 version,
642 }))
643 }
644
645 async fn drop_view(
646 &self,
647 request: Request<DropViewRequest>,
648 ) -> Result<Response<DropViewResponse>, Status> {
649 let request = request.into_inner();
650 let view_id = request.get_view_id();
651 let drop_mode = DropMode::from_request_setting(request.cascade);
652 let version = self
653 .ddl_controller
654 .run_command(DdlCommand::DropView(view_id as _, drop_mode))
655 .await?;
656 Ok(Response::new(DropViewResponse {
657 status: None,
658 version,
659 }))
660 }
661
662 async fn risectl_list_state_tables(
663 &self,
664 _request: Request<RisectlListStateTablesRequest>,
665 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
666 let tables = self
667 .metadata_manager
668 .catalog_controller
669 .list_all_state_tables()
670 .await?;
671 Ok(Response::new(RisectlListStateTablesResponse { tables }))
672 }
673
674 async fn replace_job_plan(
675 &self,
676 request: Request<ReplaceJobPlanRequest>,
677 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
678 let req = request.into_inner().get_plan().cloned()?;
679
680 let version = self
681 .ddl_controller
682 .run_command(DdlCommand::ReplaceStreamJob(
683 Self::extract_replace_table_info(req),
684 ))
685 .await?;
686
687 Ok(Response::new(ReplaceJobPlanResponse {
688 status: None,
689 version,
690 }))
691 }
692
693 async fn get_table(
694 &self,
695 request: Request<GetTableRequest>,
696 ) -> Result<Response<GetTableResponse>, Status> {
697 let req = request.into_inner();
698 let table = self
699 .metadata_manager
700 .catalog_controller
701 .get_table_by_name(&req.database_name, &req.table_name)
702 .await?;
703
704 Ok(Response::new(GetTableResponse { table }))
705 }
706
707 async fn alter_name(
708 &self,
709 request: Request<AlterNameRequest>,
710 ) -> Result<Response<AlterNameResponse>, Status> {
711 let AlterNameRequest { object, new_name } = request.into_inner();
712 let version = self
713 .ddl_controller
714 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
715 .await?;
716 Ok(Response::new(AlterNameResponse {
717 status: None,
718 version,
719 }))
720 }
721
722 async fn alter_source(
724 &self,
725 request: Request<AlterSourceRequest>,
726 ) -> Result<Response<AlterSourceResponse>, Status> {
727 let AlterSourceRequest { source } = request.into_inner();
728 let version = self
729 .ddl_controller
730 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
731 .await?;
732 Ok(Response::new(AlterSourceResponse {
733 status: None,
734 version,
735 }))
736 }
737
738 async fn alter_owner(
739 &self,
740 request: Request<AlterOwnerRequest>,
741 ) -> Result<Response<AlterOwnerResponse>, Status> {
742 let AlterOwnerRequest { object, owner_id } = request.into_inner();
743 let version = self
744 .ddl_controller
745 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
746 .await?;
747 Ok(Response::new(AlterOwnerResponse {
748 status: None,
749 version,
750 }))
751 }
752
753 async fn alter_set_schema(
754 &self,
755 request: Request<AlterSetSchemaRequest>,
756 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
757 let AlterSetSchemaRequest {
758 object,
759 new_schema_id,
760 } = request.into_inner();
761 let version = self
762 .ddl_controller
763 .run_command(DdlCommand::AlterSetSchema(
764 object.unwrap(),
765 new_schema_id as _,
766 ))
767 .await?;
768 Ok(Response::new(AlterSetSchemaResponse {
769 status: None,
770 version,
771 }))
772 }
773
774 async fn get_ddl_progress(
775 &self,
776 _request: Request<GetDdlProgressRequest>,
777 ) -> Result<Response<GetDdlProgressResponse>, Status> {
778 Ok(Response::new(GetDdlProgressResponse {
779 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
780 }))
781 }
782
783 async fn create_connection(
784 &self,
785 request: Request<CreateConnectionRequest>,
786 ) -> Result<Response<CreateConnectionResponse>, Status> {
787 let req = request.into_inner();
788 if req.payload.is_none() {
789 return Err(Status::invalid_argument("request is empty"));
790 }
791
792 match req.payload.unwrap() {
793 create_connection_request::Payload::PrivateLink(_) => {
794 panic!("Private Link Connection has been deprecated")
795 }
796 create_connection_request::Payload::ConnectionParams(params) => {
797 let pb_connection = Connection {
798 id: 0,
799 schema_id: req.schema_id,
800 database_id: req.database_id,
801 name: req.name,
802 info: Some(ConnectionInfo::ConnectionParams(params)),
803 owner: req.owner_id,
804 };
805 let version = self
806 .ddl_controller
807 .run_command(DdlCommand::CreateConnection(pb_connection))
808 .await?;
809 Ok(Response::new(CreateConnectionResponse { version }))
810 }
811 }
812 }
813
814 async fn list_connections(
815 &self,
816 _request: Request<ListConnectionsRequest>,
817 ) -> Result<Response<ListConnectionsResponse>, Status> {
818 let conns = self
819 .metadata_manager
820 .catalog_controller
821 .list_connections()
822 .await?;
823
824 Ok(Response::new(ListConnectionsResponse {
825 connections: conns,
826 }))
827 }
828
829 async fn drop_connection(
830 &self,
831 request: Request<DropConnectionRequest>,
832 ) -> Result<Response<DropConnectionResponse>, Status> {
833 let req = request.into_inner();
834
835 let version = self
836 .ddl_controller
837 .run_command(DdlCommand::DropConnection(req.connection_id as _))
838 .await?;
839
840 Ok(Response::new(DropConnectionResponse {
841 status: None,
842 version,
843 }))
844 }
845
846 async fn comment_on(
847 &self,
848 request: Request<CommentOnRequest>,
849 ) -> Result<Response<CommentOnResponse>, Status> {
850 let req = request.into_inner();
851 let comment = req.get_comment()?.clone();
852
853 let version = self
854 .ddl_controller
855 .run_command(DdlCommand::CommentOn(Comment {
856 table_id: comment.table_id,
857 schema_id: comment.schema_id,
858 database_id: comment.database_id,
859 column_index: comment.column_index,
860 description: comment.description,
861 }))
862 .await?;
863
864 Ok(Response::new(CommentOnResponse {
865 status: None,
866 version,
867 }))
868 }
869
870 #[cfg_attr(coverage, coverage(off))]
871 async fn get_tables(
872 &self,
873 request: Request<GetTablesRequest>,
874 ) -> Result<Response<GetTablesResponse>, Status> {
875 let GetTablesRequest {
876 table_ids,
877 include_dropped_tables,
878 } = request.into_inner();
879 let ret = self
880 .metadata_manager
881 .catalog_controller
882 .get_table_by_ids(
883 table_ids.into_iter().map(|id| id as _).collect(),
884 include_dropped_tables,
885 )
886 .await?;
887
888 let mut tables = HashMap::default();
889 for table in ret {
890 tables.insert(table.id, table);
891 }
892 Ok(Response::new(GetTablesResponse { tables }))
893 }
894
895 async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
896 self.ddl_controller.wait().await?;
897 Ok(Response::new(WaitResponse {}))
898 }
899
900 async fn alter_parallelism(
901 &self,
902 request: Request<AlterParallelismRequest>,
903 ) -> Result<Response<AlterParallelismResponse>, Status> {
904 let req = request.into_inner();
905
906 let job_id = req.get_table_id();
907 let parallelism = *req.get_parallelism()?;
908 let deferred = req.get_deferred();
909 self.ddl_controller
910 .reschedule_streaming_job(
911 job_id,
912 JobRescheduleTarget {
913 parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
914 resource_group: JobResourceGroupTarget::Keep,
915 },
916 deferred,
917 )
918 .await?;
919
920 Ok(Response::new(AlterParallelismResponse {}))
921 }
922
923 async fn auto_schema_change(
926 &self,
927 request: Request<AutoSchemaChangeRequest>,
928 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
929 let req = request.into_inner();
930
931 let workers = self
933 .metadata_manager
934 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
935 .await?;
936 let worker = workers
937 .choose(&mut thread_rng())
938 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
939
940 let client = self
941 .env
942 .frontend_client_pool()
943 .get(worker)
944 .await
945 .map_err(MetaError::from)?;
946
947 let Some(schema_change) = req.schema_change else {
948 return Err(Status::invalid_argument(
949 "schema change message is required",
950 ));
951 };
952
953 for table_change in schema_change.table_changes {
954 for c in &table_change.columns {
955 let c = ColumnCatalog::from(c.clone());
956
957 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
958 tracing::warn!(target: "auto_schema_change",
959 cdc_table_id = table_change.cdc_table_id,
960 upstraem_ddl = table_change.upstream_ddl,
961 "invalid column type from cdc table change");
962 Err(Status::invalid_argument(format!(
963 "invalid column type: {} from cdc table change, column: {:?}",
964 column_type, c
965 )))
966 };
967 if c.is_generated() {
968 return invalid_col_type("generated column", &c);
969 }
970 if c.is_rw_sys_column() {
971 return invalid_col_type("rw system column", &c);
972 }
973 if c.is_hidden {
974 return invalid_col_type("hidden column", &c);
975 }
976 }
977
978 let tables: Vec<Table> = self
980 .metadata_manager
981 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
982 .await?;
983
984 for table in tables {
985 let original_columns: HashSet<(String, DataType)> =
988 HashSet::from_iter(table.columns.iter().filter_map(|col| {
989 let col = ColumnCatalog::from(col.clone());
990 let data_type = col.data_type().clone();
991 if col.is_generated() {
992 None
993 } else {
994 Some((col.column_desc.name, data_type))
995 }
996 }));
997 let new_columns: HashSet<(String, DataType)> =
998 HashSet::from_iter(table_change.columns.iter().map(|col| {
999 let col = ColumnCatalog::from(col.clone());
1000 let data_type = col.data_type().clone();
1001 (col.column_desc.name, data_type)
1002 }));
1003
1004 if !(original_columns.is_subset(&new_columns)
1005 || original_columns.is_superset(&new_columns))
1006 {
1007 tracing::warn!(target: "auto_schema_change",
1008 table_id = table.id,
1009 cdc_table_id = table.cdc_table_id,
1010 upstraem_ddl = table_change.upstream_ddl,
1011 original_columns = ?original_columns,
1012 new_columns = ?new_columns,
1013 "New columns should be a subset or superset of the original columns, since only `ADD COLUMN` and `DROP COLUMN` is supported");
1014 return Err(Status::invalid_argument(
1015 "New columns should be a subset or superset of the original columns",
1016 ));
1017 }
1018 if original_columns == new_columns {
1020 tracing::warn!(target: "auto_schema_change",
1021 table_id = table.id,
1022 cdc_table_id = table.cdc_table_id,
1023 upstraem_ddl = table_change.upstream_ddl,
1024 original_columns = ?original_columns,
1025 new_columns = ?new_columns,
1026 "No change to columns, skipping the schema change");
1027 continue;
1028 }
1029
1030 let latency_timer = self
1031 .meta_metrics
1032 .auto_schema_change_latency
1033 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1034 .start_timer();
1035 let resp = client
1038 .get_table_replace_plan(GetTableReplacePlanRequest {
1039 database_id: table.database_id,
1040 owner: table.owner,
1041 table_name: table.name.clone(),
1042 table_change: Some(table_change.clone()),
1043 })
1044 .await;
1045
1046 match resp {
1047 Ok(resp) => {
1048 let resp = resp.into_inner();
1049 if let Some(plan) = resp.replace_plan {
1050 let plan = Self::extract_replace_table_info(plan);
1051 plan.streaming_job.table().inspect(|t| {
1052 tracing::info!(
1053 target: "auto_schema_change",
1054 table_id = t.id,
1055 cdc_table_id = t.cdc_table_id,
1056 upstraem_ddl = table_change.upstream_ddl,
1057 "Start the replace config change")
1058 });
1059 let replace_res = self
1061 .ddl_controller
1062 .run_command(DdlCommand::ReplaceStreamJob(plan))
1063 .await;
1064
1065 match replace_res {
1066 Ok(_) => {
1067 tracing::info!(
1068 target: "auto_schema_change",
1069 table_id = table.id,
1070 cdc_table_id = table.cdc_table_id,
1071 "Table replaced success");
1072
1073 self.meta_metrics
1074 .auto_schema_change_success_cnt
1075 .with_guarded_label_values(&[
1076 &table.id.to_string(),
1077 &table.name,
1078 ])
1079 .inc();
1080 latency_timer.observe_duration();
1081 }
1082 Err(e) => {
1083 tracing::error!(
1084 target: "auto_schema_change",
1085 error = %e.as_report(),
1086 table_id = table.id,
1087 cdc_table_id = table.cdc_table_id,
1088 upstraem_ddl = table_change.upstream_ddl,
1089 "failed to replace the table",
1090 );
1091 add_auto_schema_change_fail_event_log(
1092 &self.meta_metrics,
1093 table.id,
1094 table.name.clone(),
1095 table_change.cdc_table_id.clone(),
1096 table_change.upstream_ddl.clone(),
1097 &self.env.event_log_manager_ref(),
1098 );
1099 }
1100 };
1101 }
1102 }
1103 Err(e) => {
1104 tracing::error!(
1105 target: "auto_schema_change",
1106 error = %e.as_report(),
1107 table_id = table.id,
1108 cdc_table_id = table.cdc_table_id,
1109 "failed to get replace table plan",
1110 );
1111 add_auto_schema_change_fail_event_log(
1112 &self.meta_metrics,
1113 table.id,
1114 table.name.clone(),
1115 table_change.cdc_table_id.clone(),
1116 table_change.upstream_ddl.clone(),
1117 &self.env.event_log_manager_ref(),
1118 );
1119 }
1120 };
1121 }
1122 }
1123
1124 Ok(Response::new(AutoSchemaChangeResponse {}))
1125 }
1126
1127 async fn alter_swap_rename(
1128 &self,
1129 request: Request<AlterSwapRenameRequest>,
1130 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1131 let req = request.into_inner();
1132
1133 let version = self
1134 .ddl_controller
1135 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1136 .await?;
1137
1138 Ok(Response::new(AlterSwapRenameResponse {
1139 status: None,
1140 version,
1141 }))
1142 }
1143
1144 async fn alter_resource_group(
1145 &self,
1146 request: Request<AlterResourceGroupRequest>,
1147 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1148 let req = request.into_inner();
1149
1150 let table_id = req.get_table_id();
1151 let deferred = req.get_deferred();
1152 let resource_group = req.resource_group;
1153
1154 self.ddl_controller
1155 .reschedule_streaming_job(
1156 table_id,
1157 JobRescheduleTarget {
1158 parallelism: JobParallelismTarget::Refresh,
1159 resource_group: JobResourceGroupTarget::Update(resource_group),
1160 },
1161 deferred,
1162 )
1163 .await?;
1164
1165 Ok(Response::new(AlterResourceGroupResponse {}))
1166 }
1167}
1168
1169fn add_auto_schema_change_fail_event_log(
1170 meta_metrics: &Arc<MetaMetrics>,
1171 table_id: u32,
1172 table_name: String,
1173 cdc_table_id: String,
1174 upstream_ddl: String,
1175 event_log_manager: &EventLogManagerRef,
1176) {
1177 meta_metrics
1178 .auto_schema_change_failure_cnt
1179 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1180 .inc();
1181 let event = event_log::EventAutoSchemaChangeFail {
1182 table_id,
1183 table_name,
1184 cdc_table_id,
1185 upstream_ddl,
1186 };
1187 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1188}