1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use rand::rng as thread_rng;
20use rand::seq::IndexedRandom;
21use replace_job_plan::{ReplaceSource, ReplaceTable};
22use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
23use risingwave_common::types::DataType;
24use risingwave_connector::sink::catalog::SinkId;
25use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
26use risingwave_meta::model::TableParallelism;
27use risingwave_meta::rpc::metrics::MetaMetrics;
28use risingwave_meta::stream::{JobParallelismTarget, JobRescheduleTarget, JobResourceGroupTarget};
29use risingwave_meta_model::ObjectId;
30use risingwave_pb::catalog::connection::Info as ConnectionInfo;
31use risingwave_pb::catalog::{Comment, Connection, Secret, Table};
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::common::worker_node::State;
34use risingwave_pb::ddl_service::ddl_service_server::DdlService;
35use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
36use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
37use risingwave_pb::ddl_service::*;
38use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
39use risingwave_pb::meta::event_log;
40use thiserror_ext::AsReport;
41use tonic::{Request, Response, Status};
42
43use crate::MetaError;
44use crate::barrier::BarrierManagerRef;
45use crate::manager::sink_coordination::SinkCoordinatorManager;
46use crate::manager::{MetaSrvEnv, StreamingJob};
47use crate::rpc::ddl_controller::{
48 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
49};
50use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
51
52#[derive(Clone)]
53pub struct DdlServiceImpl {
54 env: MetaSrvEnv,
55
56 metadata_manager: MetadataManager,
57 sink_manager: SinkCoordinatorManager,
58 ddl_controller: DdlController,
59 meta_metrics: Arc<MetaMetrics>,
60 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
61}
62
63impl DdlServiceImpl {
64 #[allow(clippy::too_many_arguments)]
65 pub async fn new(
66 env: MetaSrvEnv,
67 metadata_manager: MetadataManager,
68 stream_manager: GlobalStreamManagerRef,
69 source_manager: SourceManagerRef,
70 barrier_manager: BarrierManagerRef,
71 sink_manager: SinkCoordinatorManager,
72 meta_metrics: Arc<MetaMetrics>,
73 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
74 ) -> Self {
75 let ddl_controller = DdlController::new(
76 env.clone(),
77 metadata_manager.clone(),
78 stream_manager,
79 source_manager,
80 barrier_manager,
81 )
82 .await;
83 Self {
84 env,
85 metadata_manager,
86 sink_manager,
87 ddl_controller,
88 meta_metrics,
89 iceberg_compaction_manager,
90 }
91 }
92
93 fn extract_replace_table_info(
94 ReplaceJobPlan {
95 fragment_graph,
96 replace_job,
97 }: ReplaceJobPlan,
98 ) -> ReplaceStreamJobInfo {
99 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
100 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
101 table,
102 source,
103 job_type,
104 }) => StreamingJob::Table(
105 source,
106 table.unwrap(),
107 TableJobType::try_from(job_type).unwrap(),
108 ),
109 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
110 StreamingJob::Source(source.unwrap())
111 }
112 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
113 table,
114 }) => StreamingJob::MaterializedView(table.unwrap()),
115 };
116
117 ReplaceStreamJobInfo {
118 streaming_job: replace_streaming_job,
119 fragment_graph: fragment_graph.unwrap(),
120 }
121 }
122}
123
124#[async_trait::async_trait]
125impl DdlService for DdlServiceImpl {
126 async fn create_database(
127 &self,
128 request: Request<CreateDatabaseRequest>,
129 ) -> Result<Response<CreateDatabaseResponse>, Status> {
130 let req = request.into_inner();
131 let database = req.get_db()?.clone();
132 let version = self
133 .ddl_controller
134 .run_command(DdlCommand::CreateDatabase(database))
135 .await?;
136
137 Ok(Response::new(CreateDatabaseResponse {
138 status: None,
139 version,
140 }))
141 }
142
143 async fn drop_database(
144 &self,
145 request: Request<DropDatabaseRequest>,
146 ) -> Result<Response<DropDatabaseResponse>, Status> {
147 let req = request.into_inner();
148 let database_id = req.get_database_id();
149
150 let version = self
151 .ddl_controller
152 .run_command(DdlCommand::DropDatabase(database_id as _))
153 .await?;
154
155 Ok(Response::new(DropDatabaseResponse {
156 status: None,
157 version,
158 }))
159 }
160
161 async fn create_secret(
162 &self,
163 request: Request<CreateSecretRequest>,
164 ) -> Result<Response<CreateSecretResponse>, Status> {
165 let req = request.into_inner();
166 let pb_secret = Secret {
167 id: 0,
168 name: req.get_name().clone(),
169 database_id: req.get_database_id(),
170 value: req.get_value().clone(),
171 owner: req.get_owner_id(),
172 schema_id: req.get_schema_id(),
173 };
174 let version = self
175 .ddl_controller
176 .run_command(DdlCommand::CreateSecret(pb_secret))
177 .await?;
178
179 Ok(Response::new(CreateSecretResponse { version }))
180 }
181
182 async fn drop_secret(
183 &self,
184 request: Request<DropSecretRequest>,
185 ) -> Result<Response<DropSecretResponse>, Status> {
186 let req = request.into_inner();
187 let secret_id = req.get_secret_id();
188 let version = self
189 .ddl_controller
190 .run_command(DdlCommand::DropSecret(secret_id as _))
191 .await?;
192
193 Ok(Response::new(DropSecretResponse { version }))
194 }
195
196 async fn alter_secret(
197 &self,
198 request: Request<AlterSecretRequest>,
199 ) -> Result<Response<AlterSecretResponse>, Status> {
200 let req = request.into_inner();
201 let pb_secret = Secret {
202 id: req.get_secret_id(),
203 name: req.get_name().clone(),
204 database_id: req.get_database_id(),
205 value: req.get_value().clone(),
206 owner: req.get_owner_id(),
207 schema_id: req.get_schema_id(),
208 };
209 let version = self
210 .ddl_controller
211 .run_command(DdlCommand::AlterSecret(pb_secret))
212 .await?;
213
214 Ok(Response::new(AlterSecretResponse { version }))
215 }
216
217 async fn create_schema(
218 &self,
219 request: Request<CreateSchemaRequest>,
220 ) -> Result<Response<CreateSchemaResponse>, Status> {
221 let req = request.into_inner();
222 let schema = req.get_schema()?.clone();
223 let version = self
224 .ddl_controller
225 .run_command(DdlCommand::CreateSchema(schema))
226 .await?;
227
228 Ok(Response::new(CreateSchemaResponse {
229 status: None,
230 version,
231 }))
232 }
233
234 async fn drop_schema(
235 &self,
236 request: Request<DropSchemaRequest>,
237 ) -> Result<Response<DropSchemaResponse>, Status> {
238 let req = request.into_inner();
239 let schema_id = req.get_schema_id();
240 let drop_mode = DropMode::from_request_setting(req.cascade);
241 let version = self
242 .ddl_controller
243 .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
244 .await?;
245 Ok(Response::new(DropSchemaResponse {
246 status: None,
247 version,
248 }))
249 }
250
251 async fn create_source(
252 &self,
253 request: Request<CreateSourceRequest>,
254 ) -> Result<Response<CreateSourceResponse>, Status> {
255 let req = request.into_inner();
256 let source = req.get_source()?.clone();
257
258 match req.fragment_graph {
259 None => {
260 let version = self
261 .ddl_controller
262 .run_command(DdlCommand::CreateNonSharedSource(source))
263 .await?;
264 Ok(Response::new(CreateSourceResponse {
265 status: None,
266 version,
267 }))
268 }
269 Some(fragment_graph) => {
270 let stream_job = StreamingJob::Source(source);
272 let version = self
273 .ddl_controller
274 .run_command(DdlCommand::CreateStreamingJob {
275 stream_job,
276 fragment_graph,
277 dependencies: HashSet::new(),
278 specific_resource_group: None,
279 if_not_exists: req.if_not_exists,
280 })
281 .await?;
282 Ok(Response::new(CreateSourceResponse {
283 status: None,
284 version,
285 }))
286 }
287 }
288 }
289
290 async fn drop_source(
291 &self,
292 request: Request<DropSourceRequest>,
293 ) -> Result<Response<DropSourceResponse>, Status> {
294 let request = request.into_inner();
295 let source_id = request.source_id;
296 let drop_mode = DropMode::from_request_setting(request.cascade);
297 let version = self
298 .ddl_controller
299 .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
300 .await?;
301
302 Ok(Response::new(DropSourceResponse {
303 status: None,
304 version,
305 }))
306 }
307
308 async fn create_sink(
309 &self,
310 request: Request<CreateSinkRequest>,
311 ) -> Result<Response<CreateSinkResponse>, Status> {
312 self.env.idle_manager().record_activity();
313
314 let req = request.into_inner();
315
316 let sink = req.get_sink()?.clone();
317 let fragment_graph = req.get_fragment_graph()?.clone();
318 let dependencies = req
319 .get_dependencies()
320 .iter()
321 .map(|id| *id as ObjectId)
322 .collect();
323
324 let stream_job = StreamingJob::Sink(sink);
325
326 let command = DdlCommand::CreateStreamingJob {
327 stream_job,
328 fragment_graph,
329 dependencies,
330 specific_resource_group: None,
331 if_not_exists: req.if_not_exists,
332 };
333
334 let version = self.ddl_controller.run_command(command).await?;
335
336 Ok(Response::new(CreateSinkResponse {
337 status: None,
338 version,
339 }))
340 }
341
342 async fn drop_sink(
343 &self,
344 request: Request<DropSinkRequest>,
345 ) -> Result<Response<DropSinkResponse>, Status> {
346 let request = request.into_inner();
347 let sink_id = request.sink_id;
348 let drop_mode = DropMode::from_request_setting(request.cascade);
349
350 let command = DdlCommand::DropStreamingJob {
351 job_id: StreamingJobId::Sink(sink_id as _),
352 drop_mode,
353 };
354
355 let version = self.ddl_controller.run_command(command).await?;
356
357 self.sink_manager
358 .stop_sink_coordinator(SinkId::from(sink_id))
359 .await;
360
361 Ok(Response::new(DropSinkResponse {
362 status: None,
363 version,
364 }))
365 }
366
367 async fn create_subscription(
368 &self,
369 request: Request<CreateSubscriptionRequest>,
370 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
371 self.env.idle_manager().record_activity();
372
373 let req = request.into_inner();
374
375 let subscription = req.get_subscription()?.clone();
376 let command = DdlCommand::CreateSubscription(subscription);
377
378 let version = self.ddl_controller.run_command(command).await?;
379
380 Ok(Response::new(CreateSubscriptionResponse {
381 status: None,
382 version,
383 }))
384 }
385
386 async fn drop_subscription(
387 &self,
388 request: Request<DropSubscriptionRequest>,
389 ) -> Result<Response<DropSubscriptionResponse>, Status> {
390 let request = request.into_inner();
391 let subscription_id = request.subscription_id;
392 let drop_mode = DropMode::from_request_setting(request.cascade);
393
394 let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
395
396 let version = self.ddl_controller.run_command(command).await?;
397
398 Ok(Response::new(DropSubscriptionResponse {
399 status: None,
400 version,
401 }))
402 }
403
404 async fn create_materialized_view(
405 &self,
406 request: Request<CreateMaterializedViewRequest>,
407 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
408 self.env.idle_manager().record_activity();
409
410 let req = request.into_inner();
411 let mview = req.get_materialized_view()?.clone();
412 let specific_resource_group = req.specific_resource_group.clone();
413 let fragment_graph = req.get_fragment_graph()?.clone();
414 let dependencies = req
415 .get_dependencies()
416 .iter()
417 .map(|id| *id as ObjectId)
418 .collect();
419
420 let stream_job = StreamingJob::MaterializedView(mview);
421 let version = self
422 .ddl_controller
423 .run_command(DdlCommand::CreateStreamingJob {
424 stream_job,
425 fragment_graph,
426 dependencies,
427 specific_resource_group,
428 if_not_exists: req.if_not_exists,
429 })
430 .await?;
431
432 Ok(Response::new(CreateMaterializedViewResponse {
433 status: None,
434 version,
435 }))
436 }
437
438 async fn drop_materialized_view(
439 &self,
440 request: Request<DropMaterializedViewRequest>,
441 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
442 self.env.idle_manager().record_activity();
443
444 let request = request.into_inner();
445 let table_id = request.table_id;
446 let drop_mode = DropMode::from_request_setting(request.cascade);
447
448 let version = self
449 .ddl_controller
450 .run_command(DdlCommand::DropStreamingJob {
451 job_id: StreamingJobId::MaterializedView(table_id as _),
452 drop_mode,
453 })
454 .await?;
455
456 Ok(Response::new(DropMaterializedViewResponse {
457 status: None,
458 version,
459 }))
460 }
461
462 async fn create_index(
463 &self,
464 request: Request<CreateIndexRequest>,
465 ) -> Result<Response<CreateIndexResponse>, Status> {
466 self.env.idle_manager().record_activity();
467
468 let req = request.into_inner();
469 let index = req.get_index()?.clone();
470 let index_table = req.get_index_table()?.clone();
471 let fragment_graph = req.get_fragment_graph()?.clone();
472
473 let stream_job = StreamingJob::Index(index, index_table);
474 let version = self
475 .ddl_controller
476 .run_command(DdlCommand::CreateStreamingJob {
477 stream_job,
478 fragment_graph,
479 dependencies: HashSet::new(),
480 specific_resource_group: None,
481 if_not_exists: req.if_not_exists,
482 })
483 .await?;
484
485 Ok(Response::new(CreateIndexResponse {
486 status: None,
487 version,
488 }))
489 }
490
491 async fn drop_index(
492 &self,
493 request: Request<DropIndexRequest>,
494 ) -> Result<Response<DropIndexResponse>, Status> {
495 self.env.idle_manager().record_activity();
496
497 let request = request.into_inner();
498 let index_id = request.index_id;
499 let drop_mode = DropMode::from_request_setting(request.cascade);
500 let version = self
501 .ddl_controller
502 .run_command(DdlCommand::DropStreamingJob {
503 job_id: StreamingJobId::Index(index_id as _),
504 drop_mode,
505 })
506 .await?;
507
508 Ok(Response::new(DropIndexResponse {
509 status: None,
510 version,
511 }))
512 }
513
514 async fn create_function(
515 &self,
516 request: Request<CreateFunctionRequest>,
517 ) -> Result<Response<CreateFunctionResponse>, Status> {
518 let req = request.into_inner();
519 let function = req.get_function()?.clone();
520
521 let version = self
522 .ddl_controller
523 .run_command(DdlCommand::CreateFunction(function))
524 .await?;
525
526 Ok(Response::new(CreateFunctionResponse {
527 status: None,
528 version,
529 }))
530 }
531
532 async fn drop_function(
533 &self,
534 request: Request<DropFunctionRequest>,
535 ) -> Result<Response<DropFunctionResponse>, Status> {
536 let request = request.into_inner();
537
538 let version = self
539 .ddl_controller
540 .run_command(DdlCommand::DropFunction(
541 request.function_id as _,
542 DropMode::from_request_setting(request.cascade),
543 ))
544 .await?;
545
546 Ok(Response::new(DropFunctionResponse {
547 status: None,
548 version,
549 }))
550 }
551
552 async fn create_table(
553 &self,
554 request: Request<CreateTableRequest>,
555 ) -> Result<Response<CreateTableResponse>, Status> {
556 let request = request.into_inner();
557 let job_type = request.get_job_type().unwrap_or_default();
558 let dependencies = request
559 .get_dependencies()
560 .iter()
561 .map(|id| *id as ObjectId)
562 .collect();
563 let source = request.source;
564 let mview = request.materialized_view.unwrap();
565 let fragment_graph = request.fragment_graph.unwrap();
566
567 let stream_job = StreamingJob::Table(source, mview, job_type);
568 let version = self
569 .ddl_controller
570 .run_command(DdlCommand::CreateStreamingJob {
571 stream_job,
572 fragment_graph,
573 dependencies,
574 specific_resource_group: None,
575 if_not_exists: request.if_not_exists,
576 })
577 .await?;
578
579 Ok(Response::new(CreateTableResponse {
580 status: None,
581 version,
582 }))
583 }
584
585 async fn drop_table(
586 &self,
587 request: Request<DropTableRequest>,
588 ) -> Result<Response<DropTableResponse>, Status> {
589 let request = request.into_inner();
590 let source_id = request.source_id;
591 let table_id = request.table_id;
592
593 let drop_mode = DropMode::from_request_setting(request.cascade);
594 let version = self
595 .ddl_controller
596 .run_command(DdlCommand::DropStreamingJob {
597 job_id: StreamingJobId::Table(
598 source_id.map(|PbSourceId::Id(id)| id as _),
599 table_id as _,
600 ),
601 drop_mode,
602 })
603 .await?;
604
605 Ok(Response::new(DropTableResponse {
606 status: None,
607 version,
608 }))
609 }
610
611 async fn create_view(
612 &self,
613 request: Request<CreateViewRequest>,
614 ) -> Result<Response<CreateViewResponse>, Status> {
615 let req = request.into_inner();
616 let view = req.get_view()?.clone();
617 let dependencies = req
618 .get_dependencies()
619 .iter()
620 .map(|id| *id as ObjectId)
621 .collect::<HashSet<_>>();
622
623 let version = self
624 .ddl_controller
625 .run_command(DdlCommand::CreateView(view, dependencies))
626 .await?;
627
628 Ok(Response::new(CreateViewResponse {
629 status: None,
630 version,
631 }))
632 }
633
634 async fn drop_view(
635 &self,
636 request: Request<DropViewRequest>,
637 ) -> Result<Response<DropViewResponse>, Status> {
638 let request = request.into_inner();
639 let view_id = request.get_view_id();
640 let drop_mode = DropMode::from_request_setting(request.cascade);
641 let version = self
642 .ddl_controller
643 .run_command(DdlCommand::DropView(view_id as _, drop_mode))
644 .await?;
645 Ok(Response::new(DropViewResponse {
646 status: None,
647 version,
648 }))
649 }
650
651 async fn risectl_list_state_tables(
652 &self,
653 _request: Request<RisectlListStateTablesRequest>,
654 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
655 let tables = self
656 .metadata_manager
657 .catalog_controller
658 .list_all_state_tables()
659 .await?;
660 Ok(Response::new(RisectlListStateTablesResponse { tables }))
661 }
662
663 async fn replace_job_plan(
664 &self,
665 request: Request<ReplaceJobPlanRequest>,
666 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
667 let req = request.into_inner().get_plan().cloned()?;
668
669 let version = self
670 .ddl_controller
671 .run_command(DdlCommand::ReplaceStreamJob(
672 Self::extract_replace_table_info(req),
673 ))
674 .await?;
675
676 Ok(Response::new(ReplaceJobPlanResponse {
677 status: None,
678 version,
679 }))
680 }
681
682 async fn get_table(
683 &self,
684 request: Request<GetTableRequest>,
685 ) -> Result<Response<GetTableResponse>, Status> {
686 let req = request.into_inner();
687 let table = self
688 .metadata_manager
689 .catalog_controller
690 .get_table_by_name(&req.database_name, &req.table_name)
691 .await?;
692
693 Ok(Response::new(GetTableResponse { table }))
694 }
695
696 async fn alter_name(
697 &self,
698 request: Request<AlterNameRequest>,
699 ) -> Result<Response<AlterNameResponse>, Status> {
700 let AlterNameRequest { object, new_name } = request.into_inner();
701 let version = self
702 .ddl_controller
703 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
704 .await?;
705 Ok(Response::new(AlterNameResponse {
706 status: None,
707 version,
708 }))
709 }
710
711 async fn alter_source(
713 &self,
714 request: Request<AlterSourceRequest>,
715 ) -> Result<Response<AlterSourceResponse>, Status> {
716 let AlterSourceRequest { source } = request.into_inner();
717 let version = self
718 .ddl_controller
719 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
720 .await?;
721 Ok(Response::new(AlterSourceResponse {
722 status: None,
723 version,
724 }))
725 }
726
727 async fn alter_owner(
728 &self,
729 request: Request<AlterOwnerRequest>,
730 ) -> Result<Response<AlterOwnerResponse>, Status> {
731 let AlterOwnerRequest { object, owner_id } = request.into_inner();
732 let version = self
733 .ddl_controller
734 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
735 .await?;
736 Ok(Response::new(AlterOwnerResponse {
737 status: None,
738 version,
739 }))
740 }
741
742 async fn alter_set_schema(
743 &self,
744 request: Request<AlterSetSchemaRequest>,
745 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
746 let AlterSetSchemaRequest {
747 object,
748 new_schema_id,
749 } = request.into_inner();
750 let version = self
751 .ddl_controller
752 .run_command(DdlCommand::AlterSetSchema(
753 object.unwrap(),
754 new_schema_id as _,
755 ))
756 .await?;
757 Ok(Response::new(AlterSetSchemaResponse {
758 status: None,
759 version,
760 }))
761 }
762
763 async fn get_ddl_progress(
764 &self,
765 _request: Request<GetDdlProgressRequest>,
766 ) -> Result<Response<GetDdlProgressResponse>, Status> {
767 Ok(Response::new(GetDdlProgressResponse {
768 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
769 }))
770 }
771
772 async fn create_connection(
773 &self,
774 request: Request<CreateConnectionRequest>,
775 ) -> Result<Response<CreateConnectionResponse>, Status> {
776 let req = request.into_inner();
777 if req.payload.is_none() {
778 return Err(Status::invalid_argument("request is empty"));
779 }
780
781 match req.payload.unwrap() {
782 create_connection_request::Payload::PrivateLink(_) => {
783 panic!("Private Link Connection has been deprecated")
784 }
785 create_connection_request::Payload::ConnectionParams(params) => {
786 let pb_connection = Connection {
787 id: 0,
788 schema_id: req.schema_id,
789 database_id: req.database_id,
790 name: req.name,
791 info: Some(ConnectionInfo::ConnectionParams(params)),
792 owner: req.owner_id,
793 };
794 let version = self
795 .ddl_controller
796 .run_command(DdlCommand::CreateConnection(pb_connection))
797 .await?;
798 Ok(Response::new(CreateConnectionResponse { version }))
799 }
800 }
801 }
802
803 async fn list_connections(
804 &self,
805 _request: Request<ListConnectionsRequest>,
806 ) -> Result<Response<ListConnectionsResponse>, Status> {
807 let conns = self
808 .metadata_manager
809 .catalog_controller
810 .list_connections()
811 .await?;
812
813 Ok(Response::new(ListConnectionsResponse {
814 connections: conns,
815 }))
816 }
817
818 async fn drop_connection(
819 &self,
820 request: Request<DropConnectionRequest>,
821 ) -> Result<Response<DropConnectionResponse>, Status> {
822 let req = request.into_inner();
823 let drop_mode = DropMode::from_request_setting(req.cascade);
824
825 let version = self
826 .ddl_controller
827 .run_command(DdlCommand::DropConnection(
828 req.connection_id as _,
829 drop_mode,
830 ))
831 .await?;
832
833 Ok(Response::new(DropConnectionResponse {
834 status: None,
835 version,
836 }))
837 }
838
839 async fn comment_on(
840 &self,
841 request: Request<CommentOnRequest>,
842 ) -> Result<Response<CommentOnResponse>, Status> {
843 let req = request.into_inner();
844 let comment = req.get_comment()?.clone();
845
846 let version = self
847 .ddl_controller
848 .run_command(DdlCommand::CommentOn(Comment {
849 table_id: comment.table_id,
850 schema_id: comment.schema_id,
851 database_id: comment.database_id,
852 column_index: comment.column_index,
853 description: comment.description,
854 }))
855 .await?;
856
857 Ok(Response::new(CommentOnResponse {
858 status: None,
859 version,
860 }))
861 }
862
863 async fn get_tables(
864 &self,
865 request: Request<GetTablesRequest>,
866 ) -> Result<Response<GetTablesResponse>, Status> {
867 let GetTablesRequest {
868 table_ids,
869 include_dropped_tables,
870 } = request.into_inner();
871 let ret = self
872 .metadata_manager
873 .catalog_controller
874 .get_table_by_ids(
875 table_ids.into_iter().map(|id| id as _).collect(),
876 include_dropped_tables,
877 )
878 .await?;
879
880 let mut tables = HashMap::default();
881 for table in ret {
882 tables.insert(table.id, table);
883 }
884 Ok(Response::new(GetTablesResponse { tables }))
885 }
886
887 async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
888 self.ddl_controller.wait().await?;
889 Ok(Response::new(WaitResponse {}))
890 }
891
892 async fn alter_cdc_table_backfill_parallelism(
893 &self,
894 request: Request<AlterCdcTableBackfillParallelismRequest>,
895 ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
896 let req = request.into_inner();
897 let job_id = req.get_table_id();
898 let parallelism = *req.get_parallelism()?;
899 self.ddl_controller
900 .reschedule_cdc_table_backfill(
901 job_id,
902 JobRescheduleTarget {
903 parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
904 resource_group: JobResourceGroupTarget::Keep,
905 },
906 )
907 .await?;
908 Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
909 }
910
911 async fn alter_parallelism(
912 &self,
913 request: Request<AlterParallelismRequest>,
914 ) -> Result<Response<AlterParallelismResponse>, Status> {
915 let req = request.into_inner();
916
917 let job_id = req.get_table_id();
918 let parallelism = *req.get_parallelism()?;
919 let deferred = req.get_deferred();
920 self.ddl_controller
921 .reschedule_streaming_job(
922 job_id,
923 JobRescheduleTarget {
924 parallelism: JobParallelismTarget::Update(TableParallelism::from(parallelism)),
925 resource_group: JobResourceGroupTarget::Keep,
926 },
927 deferred,
928 )
929 .await?;
930
931 Ok(Response::new(AlterParallelismResponse {}))
932 }
933
934 async fn auto_schema_change(
937 &self,
938 request: Request<AutoSchemaChangeRequest>,
939 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
940 let req = request.into_inner();
941
942 let workers = self
944 .metadata_manager
945 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
946 .await?;
947 let worker = workers
948 .choose(&mut thread_rng())
949 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
950
951 let client = self
952 .env
953 .frontend_client_pool()
954 .get(worker)
955 .await
956 .map_err(MetaError::from)?;
957
958 let Some(schema_change) = req.schema_change else {
959 return Err(Status::invalid_argument(
960 "schema change message is required",
961 ));
962 };
963
964 for table_change in schema_change.table_changes {
965 for c in &table_change.columns {
966 let c = ColumnCatalog::from(c.clone());
967
968 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
969 tracing::warn!(target: "auto_schema_change",
970 cdc_table_id = table_change.cdc_table_id,
971 upstraem_ddl = table_change.upstream_ddl,
972 "invalid column type from cdc table change");
973 Err(Status::invalid_argument(format!(
974 "invalid column type: {} from cdc table change, column: {:?}",
975 column_type, c
976 )))
977 };
978 if c.is_generated() {
979 return invalid_col_type("generated column", &c);
980 }
981 if c.is_rw_sys_column() {
982 return invalid_col_type("rw system column", &c);
983 }
984 if c.is_hidden {
985 return invalid_col_type("hidden column", &c);
986 }
987 }
988
989 let tables: Vec<Table> = self
991 .metadata_manager
992 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
993 .await?;
994
995 for table in tables {
996 let original_columns: HashSet<(String, DataType)> =
999 HashSet::from_iter(table.columns.iter().filter_map(|col| {
1000 let col = ColumnCatalog::from(col.clone());
1001 if col.is_generated() || col.is_hidden() {
1002 None
1003 } else {
1004 Some((col.column_desc.name.clone(), col.data_type().clone()))
1005 }
1006 }));
1007
1008 let mut new_columns: HashSet<(String, DataType)> =
1009 HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1010 let col = ColumnCatalog::from(col.clone());
1011 if col.is_generated() || col.is_hidden() {
1012 None
1013 } else {
1014 Some((col.column_desc.name.clone(), col.data_type().clone()))
1015 }
1016 }));
1017
1018 for col in &table.columns {
1021 let col = ColumnCatalog::from(col.clone());
1022 if col.is_connector_additional_column()
1023 && !col.is_hidden()
1024 && !col.is_generated()
1025 {
1026 new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1027 }
1028 }
1029
1030 if !(original_columns.is_subset(&new_columns)
1031 || original_columns.is_superset(&new_columns))
1032 {
1033 tracing::warn!(target: "auto_schema_change",
1034 table_id = table.id,
1035 cdc_table_id = table.cdc_table_id,
1036 upstraem_ddl = table_change.upstream_ddl,
1037 original_columns = ?original_columns,
1038 new_columns = ?new_columns,
1039 "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1040
1041 let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1042 add_auto_schema_change_fail_event_log(
1043 &self.meta_metrics,
1044 table.id,
1045 table.name.clone(),
1046 table_change.cdc_table_id.clone(),
1047 table_change.upstream_ddl.clone(),
1048 &self.env.event_log_manager_ref(),
1049 fail_info,
1050 );
1051
1052 return Err(Status::invalid_argument(
1053 "New columns should be a subset or superset of the original columns (including hidden columns)",
1054 ));
1055 }
1056 if original_columns == new_columns {
1058 tracing::warn!(target: "auto_schema_change",
1059 table_id = table.id,
1060 cdc_table_id = table.cdc_table_id,
1061 upstraem_ddl = table_change.upstream_ddl,
1062 original_columns = ?original_columns,
1063 new_columns = ?new_columns,
1064 "No change to columns, skipping the schema change");
1065 continue;
1066 }
1067
1068 let latency_timer = self
1069 .meta_metrics
1070 .auto_schema_change_latency
1071 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1072 .start_timer();
1073 let resp = client
1076 .get_table_replace_plan(GetTableReplacePlanRequest {
1077 database_id: table.database_id,
1078 owner: table.owner,
1079 table_name: table.name.clone(),
1080 table_change: Some(table_change.clone()),
1081 })
1082 .await;
1083
1084 match resp {
1085 Ok(resp) => {
1086 let resp = resp.into_inner();
1087 if let Some(plan) = resp.replace_plan {
1088 let plan = Self::extract_replace_table_info(plan);
1089 plan.streaming_job.table().inspect(|t| {
1090 tracing::info!(
1091 target: "auto_schema_change",
1092 table_id = t.id,
1093 cdc_table_id = t.cdc_table_id,
1094 upstraem_ddl = table_change.upstream_ddl,
1095 "Start the replace config change")
1096 });
1097 let replace_res = self
1099 .ddl_controller
1100 .run_command(DdlCommand::ReplaceStreamJob(plan))
1101 .await;
1102
1103 match replace_res {
1104 Ok(_) => {
1105 tracing::info!(
1106 target: "auto_schema_change",
1107 table_id = table.id,
1108 cdc_table_id = table.cdc_table_id,
1109 "Table replaced success");
1110
1111 self.meta_metrics
1112 .auto_schema_change_success_cnt
1113 .with_guarded_label_values(&[
1114 &table.id.to_string(),
1115 &table.name,
1116 ])
1117 .inc();
1118 latency_timer.observe_duration();
1119 }
1120 Err(e) => {
1121 tracing::error!(
1122 target: "auto_schema_change",
1123 error = %e.as_report(),
1124 table_id = table.id,
1125 cdc_table_id = table.cdc_table_id,
1126 upstraem_ddl = table_change.upstream_ddl,
1127 "failed to replace the table",
1128 );
1129 let fail_info =
1130 format!("failed to replace the table: {}", e.as_report());
1131 add_auto_schema_change_fail_event_log(
1132 &self.meta_metrics,
1133 table.id,
1134 table.name.clone(),
1135 table_change.cdc_table_id.clone(),
1136 table_change.upstream_ddl.clone(),
1137 &self.env.event_log_manager_ref(),
1138 fail_info,
1139 );
1140 }
1141 };
1142 }
1143 }
1144 Err(e) => {
1145 tracing::error!(
1146 target: "auto_schema_change",
1147 error = %e.as_report(),
1148 table_id = table.id,
1149 cdc_table_id = table.cdc_table_id,
1150 "failed to get replace table plan",
1151 );
1152 let fail_info =
1153 format!("failed to get replace table plan: {}", e.as_report());
1154 add_auto_schema_change_fail_event_log(
1155 &self.meta_metrics,
1156 table.id,
1157 table.name.clone(),
1158 table_change.cdc_table_id.clone(),
1159 table_change.upstream_ddl.clone(),
1160 &self.env.event_log_manager_ref(),
1161 fail_info,
1162 );
1163 }
1164 };
1165 }
1166 }
1167
1168 Ok(Response::new(AutoSchemaChangeResponse {}))
1169 }
1170
1171 async fn alter_swap_rename(
1172 &self,
1173 request: Request<AlterSwapRenameRequest>,
1174 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1175 let req = request.into_inner();
1176
1177 let version = self
1178 .ddl_controller
1179 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1180 .await?;
1181
1182 Ok(Response::new(AlterSwapRenameResponse {
1183 status: None,
1184 version,
1185 }))
1186 }
1187
1188 async fn alter_resource_group(
1189 &self,
1190 request: Request<AlterResourceGroupRequest>,
1191 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1192 let req = request.into_inner();
1193
1194 let table_id = req.get_table_id();
1195 let deferred = req.get_deferred();
1196 let resource_group = req.resource_group;
1197
1198 self.ddl_controller
1199 .reschedule_streaming_job(
1200 table_id,
1201 JobRescheduleTarget {
1202 parallelism: JobParallelismTarget::Refresh,
1203 resource_group: JobResourceGroupTarget::Update(resource_group),
1204 },
1205 deferred,
1206 )
1207 .await?;
1208
1209 Ok(Response::new(AlterResourceGroupResponse {}))
1210 }
1211
1212 async fn alter_database_param(
1213 &self,
1214 request: Request<AlterDatabaseParamRequest>,
1215 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1216 let req = request.into_inner();
1217 let database_id = req.database_id;
1218
1219 let param = match req.param.unwrap() {
1220 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1221 AlterDatabaseParam::BarrierIntervalMs(value.value)
1222 }
1223 alter_database_param_request::Param::CheckpointFrequency(value) => {
1224 AlterDatabaseParam::CheckpointFrequency(value.value)
1225 }
1226 };
1227 let version = self
1228 .ddl_controller
1229 .run_command(DdlCommand::AlterDatabaseParam(database_id as _, param))
1230 .await?;
1231
1232 return Ok(Response::new(AlterDatabaseParamResponse {
1233 status: None,
1234 version,
1235 }));
1236 }
1237
1238 async fn compact_iceberg_table(
1239 &self,
1240 request: Request<CompactIcebergTableRequest>,
1241 ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1242 let req = request.into_inner();
1243 let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1244
1245 let task_id = self
1247 .iceberg_compaction_manager
1248 .trigger_manual_compaction(sink_id)
1249 .await
1250 .map_err(|e| {
1251 Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1252 })?;
1253
1254 Ok(Response::new(CompactIcebergTableResponse {
1255 status: None,
1256 task_id,
1257 }))
1258 }
1259
1260 async fn expire_iceberg_table_snapshots(
1261 &self,
1262 request: Request<ExpireIcebergTableSnapshotsRequest>,
1263 ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1264 let req = request.into_inner();
1265 let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1266
1267 self.iceberg_compaction_manager
1269 .check_and_expire_snapshots(&sink_id)
1270 .await
1271 .map_err(|e| {
1272 Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1273 })?;
1274
1275 Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1276 status: None,
1277 }))
1278 }
1279}
1280
1281fn add_auto_schema_change_fail_event_log(
1282 meta_metrics: &Arc<MetaMetrics>,
1283 table_id: u32,
1284 table_name: String,
1285 cdc_table_id: String,
1286 upstream_ddl: String,
1287 event_log_manager: &EventLogManagerRef,
1288 fail_info: String,
1289) {
1290 meta_metrics
1291 .auto_schema_change_failure_cnt
1292 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1293 .inc();
1294 let event = event_log::EventAutoSchemaChangeFail {
1295 table_id,
1296 table_name,
1297 cdc_table_id,
1298 upstream_ddl,
1299 fail_info,
1300 };
1301 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1302}