1use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::TableId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
30use risingwave_meta::model::TableParallelism as ModelTableParallelism;
31use risingwave_meta::rpc::metrics::MetaMetrics;
32use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
33use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
34use risingwave_meta_model::{ObjectId, StreamingParallelism};
35use risingwave_pb::catalog::connection::Info as ConnectionInfo;
36use risingwave_pb::catalog::{Comment, Connection, Secret, Table};
37use risingwave_pb::common::WorkerType;
38use risingwave_pb::common::worker_node::State;
39use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
40use risingwave_pb::ddl_service::ddl_service_server::DdlService;
41use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
42use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
43use risingwave_pb::ddl_service::*;
44use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
45use risingwave_pb::meta::event_log;
46use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
47use risingwave_pb::stream_plan::stream_node::NodeBody;
48use thiserror_ext::AsReport;
49use tokio::sync::oneshot::Sender;
50use tokio::task::JoinHandle;
51use tonic::{Request, Response, Status};
52
53use crate::MetaError;
54use crate::barrier::BarrierManagerRef;
55use crate::manager::sink_coordination::SinkCoordinatorManager;
56use crate::manager::{MetaSrvEnv, StreamingJob};
57use crate::rpc::ddl_controller::{
58 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
59};
60use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
61
62#[derive(Clone)]
63pub struct DdlServiceImpl {
64 env: MetaSrvEnv,
65
66 metadata_manager: MetadataManager,
67 sink_manager: SinkCoordinatorManager,
68 ddl_controller: DdlController,
69 meta_metrics: Arc<MetaMetrics>,
70 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
71}
72
73impl DdlServiceImpl {
74 #[allow(clippy::too_many_arguments)]
75 pub async fn new(
76 env: MetaSrvEnv,
77 metadata_manager: MetadataManager,
78 stream_manager: GlobalStreamManagerRef,
79 source_manager: SourceManagerRef,
80 barrier_manager: BarrierManagerRef,
81 sink_manager: SinkCoordinatorManager,
82 meta_metrics: Arc<MetaMetrics>,
83 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
84 ) -> Self {
85 let ddl_controller = DdlController::new(
86 env.clone(),
87 metadata_manager.clone(),
88 stream_manager,
89 source_manager,
90 barrier_manager,
91 )
92 .await;
93 Self {
94 env,
95 metadata_manager,
96 sink_manager,
97 ddl_controller,
98 meta_metrics,
99 iceberg_compaction_manager,
100 }
101 }
102
103 fn extract_replace_table_info(
104 ReplaceJobPlan {
105 fragment_graph,
106 replace_job,
107 }: ReplaceJobPlan,
108 ) -> ReplaceStreamJobInfo {
109 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
110 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
111 table,
112 source,
113 job_type,
114 }) => StreamingJob::Table(
115 source,
116 table.unwrap(),
117 TableJobType::try_from(job_type).unwrap(),
118 ),
119 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
120 StreamingJob::Source(source.unwrap())
121 }
122 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
123 table,
124 }) => StreamingJob::MaterializedView(table.unwrap()),
125 };
126
127 ReplaceStreamJobInfo {
128 streaming_job: replace_streaming_job,
129 fragment_graph: fragment_graph.unwrap(),
130 }
131 }
132
133 pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
134 tracing::info!("start migrate legacy table fragments task");
135 let env = self.env.clone();
136 let metadata_manager = self.metadata_manager.clone();
137 let ddl_controller = self.ddl_controller.clone();
138
139 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
140 let join_handle = tokio::spawn(async move {
141 async fn migrate_inner(
142 env: &MetaSrvEnv,
143 metadata_manager: &MetadataManager,
144 ddl_controller: &DdlController,
145 ) -> MetaResult<()> {
146 let tables = metadata_manager
147 .catalog_controller
148 .list_unmigrated_tables()
149 .await?;
150
151 if tables.is_empty() {
152 tracing::info!("no legacy table fragments need migration");
153 return Ok(());
154 }
155
156 let client = {
157 let workers = metadata_manager
158 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
159 .await?;
160 if workers.is_empty() {
161 return Err(anyhow::anyhow!("no active frontend nodes found").into());
162 }
163 let worker = workers.choose(&mut thread_rng()).unwrap();
164 env.frontend_client_pool().get(worker).await?
165 };
166
167 for table in tables {
168 let start = tokio::time::Instant::now();
169 let req = GetTableReplacePlanRequest {
170 database_id: table.database_id,
171 owner: table.owner,
172 table_name: table.name.clone(),
173 cdc_table_change: None,
174 };
175 let resp = client
176 .get_table_replace_plan(req)
177 .await
178 .context("failed to get table replace plan from frontend")?;
179
180 let plan = resp.into_inner().replace_plan.unwrap();
181 let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
182 ddl_controller
183 .run_command(DdlCommand::ReplaceStreamJob(replace_info))
184 .await?;
185 tracing::info!(elapsed=?start.elapsed(), table_id=table.id, "migrated table fragments");
186 }
187 tracing::info!("successfully migrated all legacy table fragments");
188
189 Ok(())
190 }
191
192 let migrate_future = async move {
193 let mut attempt = 0;
194 loop {
195 match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
196 Ok(_) => break,
197 Err(e) => {
198 attempt += 1;
199 tracing::error!(
200 "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
201 e.as_report(),
202 attempt
203 );
204 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
205 }
206 }
207 }
208 };
209
210 select(pin!(migrate_future), shutdown_rx).await;
211 });
212
213 (join_handle, shutdown_tx)
214 }
215}
216
217#[async_trait::async_trait]
218impl DdlService for DdlServiceImpl {
219 async fn create_database(
220 &self,
221 request: Request<CreateDatabaseRequest>,
222 ) -> Result<Response<CreateDatabaseResponse>, Status> {
223 let req = request.into_inner();
224 let database = req.get_db()?.clone();
225 let version = self
226 .ddl_controller
227 .run_command(DdlCommand::CreateDatabase(database))
228 .await?;
229
230 Ok(Response::new(CreateDatabaseResponse {
231 status: None,
232 version,
233 }))
234 }
235
236 async fn drop_database(
237 &self,
238 request: Request<DropDatabaseRequest>,
239 ) -> Result<Response<DropDatabaseResponse>, Status> {
240 let req = request.into_inner();
241 let database_id = req.get_database_id();
242
243 let version = self
244 .ddl_controller
245 .run_command(DdlCommand::DropDatabase(database_id as _))
246 .await?;
247
248 Ok(Response::new(DropDatabaseResponse {
249 status: None,
250 version,
251 }))
252 }
253
254 async fn create_secret(
255 &self,
256 request: Request<CreateSecretRequest>,
257 ) -> Result<Response<CreateSecretResponse>, Status> {
258 let req = request.into_inner();
259 let pb_secret = Secret {
260 id: 0,
261 name: req.get_name().clone(),
262 database_id: req.get_database_id(),
263 value: req.get_value().clone(),
264 owner: req.get_owner_id(),
265 schema_id: req.get_schema_id(),
266 };
267 let version = self
268 .ddl_controller
269 .run_command(DdlCommand::CreateSecret(pb_secret))
270 .await?;
271
272 Ok(Response::new(CreateSecretResponse { version }))
273 }
274
275 async fn drop_secret(
276 &self,
277 request: Request<DropSecretRequest>,
278 ) -> Result<Response<DropSecretResponse>, Status> {
279 let req = request.into_inner();
280 let secret_id = req.get_secret_id();
281 let version = self
282 .ddl_controller
283 .run_command(DdlCommand::DropSecret(secret_id as _))
284 .await?;
285
286 Ok(Response::new(DropSecretResponse { version }))
287 }
288
289 async fn alter_secret(
290 &self,
291 request: Request<AlterSecretRequest>,
292 ) -> Result<Response<AlterSecretResponse>, Status> {
293 let req = request.into_inner();
294 let pb_secret = Secret {
295 id: req.get_secret_id(),
296 name: req.get_name().clone(),
297 database_id: req.get_database_id(),
298 value: req.get_value().clone(),
299 owner: req.get_owner_id(),
300 schema_id: req.get_schema_id(),
301 };
302 let version = self
303 .ddl_controller
304 .run_command(DdlCommand::AlterSecret(pb_secret))
305 .await?;
306
307 Ok(Response::new(AlterSecretResponse { version }))
308 }
309
310 async fn create_schema(
311 &self,
312 request: Request<CreateSchemaRequest>,
313 ) -> Result<Response<CreateSchemaResponse>, Status> {
314 let req = request.into_inner();
315 let schema = req.get_schema()?.clone();
316 let version = self
317 .ddl_controller
318 .run_command(DdlCommand::CreateSchema(schema))
319 .await?;
320
321 Ok(Response::new(CreateSchemaResponse {
322 status: None,
323 version,
324 }))
325 }
326
327 async fn drop_schema(
328 &self,
329 request: Request<DropSchemaRequest>,
330 ) -> Result<Response<DropSchemaResponse>, Status> {
331 let req = request.into_inner();
332 let schema_id = req.get_schema_id();
333 let drop_mode = DropMode::from_request_setting(req.cascade);
334 let version = self
335 .ddl_controller
336 .run_command(DdlCommand::DropSchema(schema_id as _, drop_mode))
337 .await?;
338 Ok(Response::new(DropSchemaResponse {
339 status: None,
340 version,
341 }))
342 }
343
344 async fn create_source(
345 &self,
346 request: Request<CreateSourceRequest>,
347 ) -> Result<Response<CreateSourceResponse>, Status> {
348 let req = request.into_inner();
349 let source = req.get_source()?.clone();
350
351 match req.fragment_graph {
352 None => {
353 let version = self
354 .ddl_controller
355 .run_command(DdlCommand::CreateNonSharedSource(source))
356 .await?;
357 Ok(Response::new(CreateSourceResponse {
358 status: None,
359 version,
360 }))
361 }
362 Some(fragment_graph) => {
363 let stream_job = StreamingJob::Source(source);
365 let version = self
366 .ddl_controller
367 .run_command(DdlCommand::CreateStreamingJob {
368 stream_job,
369 fragment_graph,
370 dependencies: HashSet::new(),
371 specific_resource_group: None,
372 if_not_exists: req.if_not_exists,
373 })
374 .await?;
375 Ok(Response::new(CreateSourceResponse {
376 status: None,
377 version,
378 }))
379 }
380 }
381 }
382
383 async fn drop_source(
384 &self,
385 request: Request<DropSourceRequest>,
386 ) -> Result<Response<DropSourceResponse>, Status> {
387 let request = request.into_inner();
388 let source_id = request.source_id;
389 let drop_mode = DropMode::from_request_setting(request.cascade);
390 let version = self
391 .ddl_controller
392 .run_command(DdlCommand::DropSource(source_id as _, drop_mode))
393 .await?;
394
395 Ok(Response::new(DropSourceResponse {
396 status: None,
397 version,
398 }))
399 }
400
401 async fn create_sink(
402 &self,
403 request: Request<CreateSinkRequest>,
404 ) -> Result<Response<CreateSinkResponse>, Status> {
405 self.env.idle_manager().record_activity();
406
407 let req = request.into_inner();
408
409 let sink = req.get_sink()?.clone();
410 let fragment_graph = req.get_fragment_graph()?.clone();
411 let dependencies = req
412 .get_dependencies()
413 .iter()
414 .map(|id| *id as ObjectId)
415 .collect();
416
417 let stream_job = StreamingJob::Sink(sink);
418
419 let command = DdlCommand::CreateStreamingJob {
420 stream_job,
421 fragment_graph,
422 dependencies,
423 specific_resource_group: None,
424 if_not_exists: req.if_not_exists,
425 };
426
427 let version = self.ddl_controller.run_command(command).await?;
428
429 Ok(Response::new(CreateSinkResponse {
430 status: None,
431 version,
432 }))
433 }
434
435 async fn drop_sink(
436 &self,
437 request: Request<DropSinkRequest>,
438 ) -> Result<Response<DropSinkResponse>, Status> {
439 let request = request.into_inner();
440 let sink_id = request.sink_id;
441 let drop_mode = DropMode::from_request_setting(request.cascade);
442
443 let command = DdlCommand::DropStreamingJob {
444 job_id: StreamingJobId::Sink(sink_id as _),
445 drop_mode,
446 };
447
448 let version = self.ddl_controller.run_command(command).await?;
449
450 self.sink_manager
451 .stop_sink_coordinator(SinkId::from(sink_id))
452 .await;
453
454 Ok(Response::new(DropSinkResponse {
455 status: None,
456 version,
457 }))
458 }
459
460 async fn create_subscription(
461 &self,
462 request: Request<CreateSubscriptionRequest>,
463 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
464 self.env.idle_manager().record_activity();
465
466 let req = request.into_inner();
467
468 let subscription = req.get_subscription()?.clone();
469 let command = DdlCommand::CreateSubscription(subscription);
470
471 let version = self.ddl_controller.run_command(command).await?;
472
473 Ok(Response::new(CreateSubscriptionResponse {
474 status: None,
475 version,
476 }))
477 }
478
479 async fn drop_subscription(
480 &self,
481 request: Request<DropSubscriptionRequest>,
482 ) -> Result<Response<DropSubscriptionResponse>, Status> {
483 let request = request.into_inner();
484 let subscription_id = request.subscription_id;
485 let drop_mode = DropMode::from_request_setting(request.cascade);
486
487 let command = DdlCommand::DropSubscription(subscription_id as _, drop_mode);
488
489 let version = self.ddl_controller.run_command(command).await?;
490
491 Ok(Response::new(DropSubscriptionResponse {
492 status: None,
493 version,
494 }))
495 }
496
497 async fn create_materialized_view(
498 &self,
499 request: Request<CreateMaterializedViewRequest>,
500 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
501 self.env.idle_manager().record_activity();
502
503 let req = request.into_inner();
504 let mview = req.get_materialized_view()?.clone();
505 let specific_resource_group = req.specific_resource_group.clone();
506 let fragment_graph = req.get_fragment_graph()?.clone();
507 let dependencies = req
508 .get_dependencies()
509 .iter()
510 .map(|id| *id as ObjectId)
511 .collect();
512
513 let stream_job = StreamingJob::MaterializedView(mview);
514 let version = self
515 .ddl_controller
516 .run_command(DdlCommand::CreateStreamingJob {
517 stream_job,
518 fragment_graph,
519 dependencies,
520 specific_resource_group,
521 if_not_exists: req.if_not_exists,
522 })
523 .await?;
524
525 Ok(Response::new(CreateMaterializedViewResponse {
526 status: None,
527 version,
528 }))
529 }
530
531 async fn drop_materialized_view(
532 &self,
533 request: Request<DropMaterializedViewRequest>,
534 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
535 self.env.idle_manager().record_activity();
536
537 let request = request.into_inner();
538 let table_id = request.table_id;
539 let drop_mode = DropMode::from_request_setting(request.cascade);
540
541 let version = self
542 .ddl_controller
543 .run_command(DdlCommand::DropStreamingJob {
544 job_id: StreamingJobId::MaterializedView(TableId::new(table_id as _)),
545 drop_mode,
546 })
547 .await?;
548
549 Ok(Response::new(DropMaterializedViewResponse {
550 status: None,
551 version,
552 }))
553 }
554
555 async fn create_index(
556 &self,
557 request: Request<CreateIndexRequest>,
558 ) -> Result<Response<CreateIndexResponse>, Status> {
559 self.env.idle_manager().record_activity();
560
561 let req = request.into_inner();
562 let index = req.get_index()?.clone();
563 let index_table = req.get_index_table()?.clone();
564 let fragment_graph = req.get_fragment_graph()?.clone();
565
566 let stream_job = StreamingJob::Index(index, index_table);
567 let version = self
568 .ddl_controller
569 .run_command(DdlCommand::CreateStreamingJob {
570 stream_job,
571 fragment_graph,
572 dependencies: HashSet::new(),
573 specific_resource_group: None,
574 if_not_exists: req.if_not_exists,
575 })
576 .await?;
577
578 Ok(Response::new(CreateIndexResponse {
579 status: None,
580 version,
581 }))
582 }
583
584 async fn drop_index(
585 &self,
586 request: Request<DropIndexRequest>,
587 ) -> Result<Response<DropIndexResponse>, Status> {
588 self.env.idle_manager().record_activity();
589
590 let request = request.into_inner();
591 let index_id = request.index_id;
592 let drop_mode = DropMode::from_request_setting(request.cascade);
593 let version = self
594 .ddl_controller
595 .run_command(DdlCommand::DropStreamingJob {
596 job_id: StreamingJobId::Index(index_id as _),
597 drop_mode,
598 })
599 .await?;
600
601 Ok(Response::new(DropIndexResponse {
602 status: None,
603 version,
604 }))
605 }
606
607 async fn create_function(
608 &self,
609 request: Request<CreateFunctionRequest>,
610 ) -> Result<Response<CreateFunctionResponse>, Status> {
611 let req = request.into_inner();
612 let function = req.get_function()?.clone();
613
614 let version = self
615 .ddl_controller
616 .run_command(DdlCommand::CreateFunction(function))
617 .await?;
618
619 Ok(Response::new(CreateFunctionResponse {
620 status: None,
621 version,
622 }))
623 }
624
625 async fn drop_function(
626 &self,
627 request: Request<DropFunctionRequest>,
628 ) -> Result<Response<DropFunctionResponse>, Status> {
629 let request = request.into_inner();
630
631 let version = self
632 .ddl_controller
633 .run_command(DdlCommand::DropFunction(
634 request.function_id as _,
635 DropMode::from_request_setting(request.cascade),
636 ))
637 .await?;
638
639 Ok(Response::new(DropFunctionResponse {
640 status: None,
641 version,
642 }))
643 }
644
645 async fn create_table(
646 &self,
647 request: Request<CreateTableRequest>,
648 ) -> Result<Response<CreateTableResponse>, Status> {
649 let request = request.into_inner();
650 let job_type = request.get_job_type().unwrap_or_default();
651 let dependencies = request
652 .get_dependencies()
653 .iter()
654 .map(|id| *id as ObjectId)
655 .collect();
656 let source = request.source;
657 let mview = request.materialized_view.unwrap();
658 let fragment_graph = request.fragment_graph.unwrap();
659
660 let stream_job = StreamingJob::Table(source, mview, job_type);
661 let version = self
662 .ddl_controller
663 .run_command(DdlCommand::CreateStreamingJob {
664 stream_job,
665 fragment_graph,
666 dependencies,
667 specific_resource_group: None,
668 if_not_exists: request.if_not_exists,
669 })
670 .await?;
671
672 Ok(Response::new(CreateTableResponse {
673 status: None,
674 version,
675 }))
676 }
677
678 async fn drop_table(
679 &self,
680 request: Request<DropTableRequest>,
681 ) -> Result<Response<DropTableResponse>, Status> {
682 let request = request.into_inner();
683 let source_id = request.source_id;
684 let table_id = request.table_id;
685
686 let drop_mode = DropMode::from_request_setting(request.cascade);
687 let version = self
688 .ddl_controller
689 .run_command(DdlCommand::DropStreamingJob {
690 job_id: StreamingJobId::Table(
691 source_id.map(|PbSourceId::Id(id)| id as _),
692 TableId::new(table_id as _),
693 ),
694 drop_mode,
695 })
696 .await?;
697
698 Ok(Response::new(DropTableResponse {
699 status: None,
700 version,
701 }))
702 }
703
704 async fn create_view(
705 &self,
706 request: Request<CreateViewRequest>,
707 ) -> Result<Response<CreateViewResponse>, Status> {
708 let req = request.into_inner();
709 let view = req.get_view()?.clone();
710 let dependencies = req
711 .get_dependencies()
712 .iter()
713 .map(|id| *id as ObjectId)
714 .collect::<HashSet<_>>();
715
716 let version = self
717 .ddl_controller
718 .run_command(DdlCommand::CreateView(view, dependencies))
719 .await?;
720
721 Ok(Response::new(CreateViewResponse {
722 status: None,
723 version,
724 }))
725 }
726
727 async fn drop_view(
728 &self,
729 request: Request<DropViewRequest>,
730 ) -> Result<Response<DropViewResponse>, Status> {
731 let request = request.into_inner();
732 let view_id = request.get_view_id();
733 let drop_mode = DropMode::from_request_setting(request.cascade);
734 let version = self
735 .ddl_controller
736 .run_command(DdlCommand::DropView(view_id as _, drop_mode))
737 .await?;
738 Ok(Response::new(DropViewResponse {
739 status: None,
740 version,
741 }))
742 }
743
744 async fn risectl_list_state_tables(
745 &self,
746 _request: Request<RisectlListStateTablesRequest>,
747 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
748 let tables = self
749 .metadata_manager
750 .catalog_controller
751 .list_all_state_tables()
752 .await?;
753 Ok(Response::new(RisectlListStateTablesResponse { tables }))
754 }
755
756 async fn replace_job_plan(
757 &self,
758 request: Request<ReplaceJobPlanRequest>,
759 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
760 let req = request.into_inner().get_plan().cloned()?;
761
762 let version = self
763 .ddl_controller
764 .run_command(DdlCommand::ReplaceStreamJob(
765 Self::extract_replace_table_info(req),
766 ))
767 .await?;
768
769 Ok(Response::new(ReplaceJobPlanResponse {
770 status: None,
771 version,
772 }))
773 }
774
775 async fn get_table(
776 &self,
777 request: Request<GetTableRequest>,
778 ) -> Result<Response<GetTableResponse>, Status> {
779 let req = request.into_inner();
780 let table = self
781 .metadata_manager
782 .catalog_controller
783 .get_table_by_name(&req.database_name, &req.table_name)
784 .await?;
785
786 Ok(Response::new(GetTableResponse { table }))
787 }
788
789 async fn alter_name(
790 &self,
791 request: Request<AlterNameRequest>,
792 ) -> Result<Response<AlterNameResponse>, Status> {
793 let AlterNameRequest { object, new_name } = request.into_inner();
794 let version = self
795 .ddl_controller
796 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
797 .await?;
798 Ok(Response::new(AlterNameResponse {
799 status: None,
800 version,
801 }))
802 }
803
804 async fn alter_source(
806 &self,
807 request: Request<AlterSourceRequest>,
808 ) -> Result<Response<AlterSourceResponse>, Status> {
809 let AlterSourceRequest { source } = request.into_inner();
810 let version = self
811 .ddl_controller
812 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
813 .await?;
814 Ok(Response::new(AlterSourceResponse {
815 status: None,
816 version,
817 }))
818 }
819
820 async fn alter_owner(
821 &self,
822 request: Request<AlterOwnerRequest>,
823 ) -> Result<Response<AlterOwnerResponse>, Status> {
824 let AlterOwnerRequest { object, owner_id } = request.into_inner();
825 let version = self
826 .ddl_controller
827 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
828 .await?;
829 Ok(Response::new(AlterOwnerResponse {
830 status: None,
831 version,
832 }))
833 }
834
835 async fn alter_set_schema(
836 &self,
837 request: Request<AlterSetSchemaRequest>,
838 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
839 let AlterSetSchemaRequest {
840 object,
841 new_schema_id,
842 } = request.into_inner();
843 let version = self
844 .ddl_controller
845 .run_command(DdlCommand::AlterSetSchema(
846 object.unwrap(),
847 new_schema_id as _,
848 ))
849 .await?;
850 Ok(Response::new(AlterSetSchemaResponse {
851 status: None,
852 version,
853 }))
854 }
855
856 async fn get_ddl_progress(
857 &self,
858 _request: Request<GetDdlProgressRequest>,
859 ) -> Result<Response<GetDdlProgressResponse>, Status> {
860 Ok(Response::new(GetDdlProgressResponse {
861 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
862 }))
863 }
864
865 async fn create_connection(
866 &self,
867 request: Request<CreateConnectionRequest>,
868 ) -> Result<Response<CreateConnectionResponse>, Status> {
869 let req = request.into_inner();
870 if req.payload.is_none() {
871 return Err(Status::invalid_argument("request is empty"));
872 }
873
874 match req.payload.unwrap() {
875 create_connection_request::Payload::PrivateLink(_) => {
876 panic!("Private Link Connection has been deprecated")
877 }
878 create_connection_request::Payload::ConnectionParams(params) => {
879 let pb_connection = Connection {
880 id: 0,
881 schema_id: req.schema_id,
882 database_id: req.database_id,
883 name: req.name,
884 info: Some(ConnectionInfo::ConnectionParams(params)),
885 owner: req.owner_id,
886 };
887 let version = self
888 .ddl_controller
889 .run_command(DdlCommand::CreateConnection(pb_connection))
890 .await?;
891 Ok(Response::new(CreateConnectionResponse { version }))
892 }
893 }
894 }
895
896 async fn list_connections(
897 &self,
898 _request: Request<ListConnectionsRequest>,
899 ) -> Result<Response<ListConnectionsResponse>, Status> {
900 let conns = self
901 .metadata_manager
902 .catalog_controller
903 .list_connections()
904 .await?;
905
906 Ok(Response::new(ListConnectionsResponse {
907 connections: conns,
908 }))
909 }
910
911 async fn drop_connection(
912 &self,
913 request: Request<DropConnectionRequest>,
914 ) -> Result<Response<DropConnectionResponse>, Status> {
915 let req = request.into_inner();
916 let drop_mode = DropMode::from_request_setting(req.cascade);
917
918 let version = self
919 .ddl_controller
920 .run_command(DdlCommand::DropConnection(
921 req.connection_id as _,
922 drop_mode,
923 ))
924 .await?;
925
926 Ok(Response::new(DropConnectionResponse {
927 status: None,
928 version,
929 }))
930 }
931
932 async fn comment_on(
933 &self,
934 request: Request<CommentOnRequest>,
935 ) -> Result<Response<CommentOnResponse>, Status> {
936 let req = request.into_inner();
937 let comment = req.get_comment()?.clone();
938
939 let version = self
940 .ddl_controller
941 .run_command(DdlCommand::CommentOn(Comment {
942 table_id: comment.table_id,
943 schema_id: comment.schema_id,
944 database_id: comment.database_id,
945 column_index: comment.column_index,
946 description: comment.description,
947 }))
948 .await?;
949
950 Ok(Response::new(CommentOnResponse {
951 status: None,
952 version,
953 }))
954 }
955
956 async fn get_tables(
957 &self,
958 request: Request<GetTablesRequest>,
959 ) -> Result<Response<GetTablesResponse>, Status> {
960 let GetTablesRequest {
961 table_ids,
962 include_dropped_tables,
963 } = request.into_inner();
964 let ret = self
965 .metadata_manager
966 .catalog_controller
967 .get_table_by_ids(
968 table_ids
969 .into_iter()
970 .map(|id| TableId::new(id as _))
971 .collect(),
972 include_dropped_tables,
973 )
974 .await?;
975
976 let mut tables = HashMap::default();
977 for table in ret {
978 tables.insert(table.id, table);
979 }
980 Ok(Response::new(GetTablesResponse { tables }))
981 }
982
983 async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
984 self.ddl_controller.wait().await?;
985 Ok(Response::new(WaitResponse {}))
986 }
987
988 async fn alter_cdc_table_backfill_parallelism(
989 &self,
990 request: Request<AlterCdcTableBackfillParallelismRequest>,
991 ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
992 let req = request.into_inner();
993 let job_id = req.get_table_id();
994 let parallelism = *req.get_parallelism()?;
995
996 let table_parallelism = ModelTableParallelism::from(parallelism);
997 let streaming_parallelism = match table_parallelism {
998 ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
999 _ => bail_invalid_parameter!(
1000 "CDC table backfill parallelism must be set to a fixed value"
1001 ),
1002 };
1003
1004 self.ddl_controller
1005 .reschedule_cdc_table_backfill(
1006 job_id,
1007 ReschedulePolicy::Parallelism(ParallelismPolicy {
1008 parallelism: streaming_parallelism,
1009 }),
1010 )
1011 .await?;
1012 Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1013 }
1014
1015 async fn alter_parallelism(
1016 &self,
1017 request: Request<AlterParallelismRequest>,
1018 ) -> Result<Response<AlterParallelismResponse>, Status> {
1019 let req = request.into_inner();
1020
1021 let job_id = req.get_table_id();
1022 let parallelism = *req.get_parallelism()?;
1023 let deferred = req.get_deferred();
1024
1025 let parallelism = match parallelism.get_parallelism()? {
1026 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1027 StreamingParallelism::Fixed(*parallelism as _)
1028 }
1029 Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1030 _ => bail_unavailable!(),
1031 };
1032
1033 self.ddl_controller
1034 .reschedule_streaming_job(
1035 job_id.into(),
1036 ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1037 deferred,
1038 )
1039 .await?;
1040
1041 Ok(Response::new(AlterParallelismResponse {}))
1042 }
1043
1044 async fn auto_schema_change(
1047 &self,
1048 request: Request<AutoSchemaChangeRequest>,
1049 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1050 let req = request.into_inner();
1051
1052 let workers = self
1054 .metadata_manager
1055 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1056 .await?;
1057 let worker = workers
1058 .choose(&mut thread_rng())
1059 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1060
1061 let client = self
1062 .env
1063 .frontend_client_pool()
1064 .get(worker)
1065 .await
1066 .map_err(MetaError::from)?;
1067
1068 let Some(schema_change) = req.schema_change else {
1069 return Err(Status::invalid_argument(
1070 "schema change message is required",
1071 ));
1072 };
1073
1074 for table_change in schema_change.table_changes {
1075 for c in &table_change.columns {
1076 let c = ColumnCatalog::from(c.clone());
1077
1078 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1079 tracing::warn!(target: "auto_schema_change",
1080 cdc_table_id = table_change.cdc_table_id,
1081 upstraem_ddl = table_change.upstream_ddl,
1082 "invalid column type from cdc table change");
1083 Err(Status::invalid_argument(format!(
1084 "invalid column type: {} from cdc table change, column: {:?}",
1085 column_type, c
1086 )))
1087 };
1088 if c.is_generated() {
1089 return invalid_col_type("generated column", &c);
1090 }
1091 if c.is_rw_sys_column() {
1092 return invalid_col_type("rw system column", &c);
1093 }
1094 if c.is_hidden {
1095 return invalid_col_type("hidden column", &c);
1096 }
1097 }
1098
1099 let tables: Vec<Table> = self
1101 .metadata_manager
1102 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1103 .await?;
1104
1105 for table in tables {
1106 let original_columns: HashSet<(String, DataType)> =
1109 HashSet::from_iter(table.columns.iter().filter_map(|col| {
1110 let col = ColumnCatalog::from(col.clone());
1111 if col.is_generated() || col.is_hidden() {
1112 None
1113 } else {
1114 Some((col.column_desc.name.clone(), col.data_type().clone()))
1115 }
1116 }));
1117
1118 let mut new_columns: HashSet<(String, DataType)> =
1119 HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1120 let col = ColumnCatalog::from(col.clone());
1121 if col.is_generated() || col.is_hidden() {
1122 None
1123 } else {
1124 Some((col.column_desc.name.clone(), col.data_type().clone()))
1125 }
1126 }));
1127
1128 for col in &table.columns {
1131 let col = ColumnCatalog::from(col.clone());
1132 if col.is_connector_additional_column()
1133 && !col.is_hidden()
1134 && !col.is_generated()
1135 {
1136 new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1137 }
1138 }
1139
1140 if !(original_columns.is_subset(&new_columns)
1141 || original_columns.is_superset(&new_columns))
1142 {
1143 tracing::warn!(target: "auto_schema_change",
1144 table_id = table.id,
1145 cdc_table_id = table.cdc_table_id,
1146 upstraem_ddl = table_change.upstream_ddl,
1147 original_columns = ?original_columns,
1148 new_columns = ?new_columns,
1149 "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1150
1151 let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1152 add_auto_schema_change_fail_event_log(
1153 &self.meta_metrics,
1154 table.id,
1155 table.name.clone(),
1156 table_change.cdc_table_id.clone(),
1157 table_change.upstream_ddl.clone(),
1158 &self.env.event_log_manager_ref(),
1159 fail_info,
1160 );
1161
1162 return Err(Status::invalid_argument(
1163 "New columns should be a subset or superset of the original columns (including hidden columns)",
1164 ));
1165 }
1166 if original_columns == new_columns {
1168 tracing::warn!(target: "auto_schema_change",
1169 table_id = table.id,
1170 cdc_table_id = table.cdc_table_id,
1171 upstraem_ddl = table_change.upstream_ddl,
1172 original_columns = ?original_columns,
1173 new_columns = ?new_columns,
1174 "No change to columns, skipping the schema change");
1175 continue;
1176 }
1177
1178 let latency_timer = self
1179 .meta_metrics
1180 .auto_schema_change_latency
1181 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1182 .start_timer();
1183 let resp = client
1186 .get_table_replace_plan(GetTableReplacePlanRequest {
1187 database_id: table.database_id,
1188 owner: table.owner,
1189 table_name: table.name.clone(),
1190 cdc_table_change: Some(table_change.clone()),
1191 })
1192 .await;
1193
1194 match resp {
1195 Ok(resp) => {
1196 let resp = resp.into_inner();
1197 if let Some(plan) = resp.replace_plan {
1198 let plan = Self::extract_replace_table_info(plan);
1199 plan.streaming_job.table().inspect(|t| {
1200 tracing::info!(
1201 target: "auto_schema_change",
1202 table_id = t.id,
1203 cdc_table_id = t.cdc_table_id,
1204 upstraem_ddl = table_change.upstream_ddl,
1205 "Start the replace config change")
1206 });
1207 let replace_res = self
1209 .ddl_controller
1210 .run_command(DdlCommand::ReplaceStreamJob(plan))
1211 .await;
1212
1213 match replace_res {
1214 Ok(_) => {
1215 tracing::info!(
1216 target: "auto_schema_change",
1217 table_id = table.id,
1218 cdc_table_id = table.cdc_table_id,
1219 "Table replaced success");
1220
1221 self.meta_metrics
1222 .auto_schema_change_success_cnt
1223 .with_guarded_label_values(&[
1224 &table.id.to_string(),
1225 &table.name,
1226 ])
1227 .inc();
1228 latency_timer.observe_duration();
1229 }
1230 Err(e) => {
1231 tracing::error!(
1232 target: "auto_schema_change",
1233 error = %e.as_report(),
1234 table_id = table.id,
1235 cdc_table_id = table.cdc_table_id,
1236 upstraem_ddl = table_change.upstream_ddl,
1237 "failed to replace the table",
1238 );
1239 let fail_info =
1240 format!("failed to replace the table: {}", e.as_report());
1241 add_auto_schema_change_fail_event_log(
1242 &self.meta_metrics,
1243 table.id,
1244 table.name.clone(),
1245 table_change.cdc_table_id.clone(),
1246 table_change.upstream_ddl.clone(),
1247 &self.env.event_log_manager_ref(),
1248 fail_info,
1249 );
1250 }
1251 };
1252 }
1253 }
1254 Err(e) => {
1255 tracing::error!(
1256 target: "auto_schema_change",
1257 error = %e.as_report(),
1258 table_id = table.id,
1259 cdc_table_id = table.cdc_table_id,
1260 "failed to get replace table plan",
1261 );
1262 let fail_info =
1263 format!("failed to get replace table plan: {}", e.as_report());
1264 add_auto_schema_change_fail_event_log(
1265 &self.meta_metrics,
1266 table.id,
1267 table.name.clone(),
1268 table_change.cdc_table_id.clone(),
1269 table_change.upstream_ddl.clone(),
1270 &self.env.event_log_manager_ref(),
1271 fail_info,
1272 );
1273 }
1274 };
1275 }
1276 }
1277
1278 Ok(Response::new(AutoSchemaChangeResponse {}))
1279 }
1280
1281 async fn alter_swap_rename(
1282 &self,
1283 request: Request<AlterSwapRenameRequest>,
1284 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1285 let req = request.into_inner();
1286
1287 let version = self
1288 .ddl_controller
1289 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1290 .await?;
1291
1292 Ok(Response::new(AlterSwapRenameResponse {
1293 status: None,
1294 version,
1295 }))
1296 }
1297
1298 async fn alter_resource_group(
1299 &self,
1300 request: Request<AlterResourceGroupRequest>,
1301 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1302 let req = request.into_inner();
1303
1304 let table_id = req.get_table_id();
1305 let deferred = req.get_deferred();
1306 let resource_group = req.resource_group;
1307
1308 self.ddl_controller
1309 .reschedule_streaming_job(
1310 table_id.into(),
1311 ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1312 deferred,
1313 )
1314 .await?;
1315
1316 Ok(Response::new(AlterResourceGroupResponse {}))
1317 }
1318
1319 async fn alter_database_param(
1320 &self,
1321 request: Request<AlterDatabaseParamRequest>,
1322 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1323 let req = request.into_inner();
1324 let database_id = req.database_id;
1325
1326 let param = match req.param.unwrap() {
1327 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1328 AlterDatabaseParam::BarrierIntervalMs(value.value)
1329 }
1330 alter_database_param_request::Param::CheckpointFrequency(value) => {
1331 AlterDatabaseParam::CheckpointFrequency(value.value)
1332 }
1333 };
1334 let version = self
1335 .ddl_controller
1336 .run_command(DdlCommand::AlterDatabaseParam(database_id as _, param))
1337 .await?;
1338
1339 return Ok(Response::new(AlterDatabaseParamResponse {
1340 status: None,
1341 version,
1342 }));
1343 }
1344
1345 async fn compact_iceberg_table(
1346 &self,
1347 request: Request<CompactIcebergTableRequest>,
1348 ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1349 let req = request.into_inner();
1350 let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1351
1352 let task_id = self
1354 .iceberg_compaction_manager
1355 .trigger_manual_compaction(sink_id)
1356 .await
1357 .map_err(|e| {
1358 Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1359 })?;
1360
1361 Ok(Response::new(CompactIcebergTableResponse {
1362 status: None,
1363 task_id,
1364 }))
1365 }
1366
1367 async fn expire_iceberg_table_snapshots(
1368 &self,
1369 request: Request<ExpireIcebergTableSnapshotsRequest>,
1370 ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1371 let req = request.into_inner();
1372 let sink_id = risingwave_connector::sink::catalog::SinkId::new(req.sink_id);
1373
1374 self.iceberg_compaction_manager
1376 .check_and_expire_snapshots(&sink_id)
1377 .await
1378 .map_err(|e| {
1379 Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1380 })?;
1381
1382 Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1383 status: None,
1384 }))
1385 }
1386
1387 async fn create_iceberg_table(
1388 &self,
1389 request: Request<CreateIcebergTableRequest>,
1390 ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1391 let req = request.into_inner();
1392 let CreateIcebergTableRequest {
1393 table_info,
1394 sink_info,
1395 iceberg_source,
1396 if_not_exists,
1397 } = req;
1398
1399 let PbTableJobInfo {
1401 source,
1402 table,
1403 fragment_graph,
1404 job_type,
1405 } = table_info.unwrap();
1406 let table = table.unwrap();
1407 let database_id = table.get_database_id();
1408 let schema_id = table.get_schema_id();
1409 let table_name = table.get_name().to_owned();
1410
1411 let stream_job =
1412 StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1413 let _ = self
1414 .ddl_controller
1415 .run_command(DdlCommand::CreateStreamingJob {
1416 stream_job,
1417 fragment_graph: fragment_graph.unwrap(),
1418 dependencies: HashSet::new(),
1419 specific_resource_group: None,
1420 if_not_exists,
1421 })
1422 .await?;
1423
1424 let table_catalog = self
1425 .metadata_manager
1426 .catalog_controller
1427 .get_table_catalog_by_name(database_id as _, schema_id as _, &table_name)
1428 .await?
1429 .ok_or(Status::not_found("Internal error: table not found"))?;
1430
1431 let PbSinkJobInfo {
1433 sink,
1434 fragment_graph,
1435 } = sink_info.unwrap();
1436 let sink = sink.unwrap();
1437 let mut fragment_graph = fragment_graph.unwrap();
1438
1439 assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1440 assert!(
1441 risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1442 .is_placeholder()
1443 );
1444 fragment_graph.dependent_table_ids[0] = table_catalog.id;
1445 for fragment in fragment_graph.fragments.values_mut() {
1446 stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1447 NodeBody::StreamScan(scan) => {
1448 scan.table_id = table_catalog.id;
1449 if let Some(table_desc) = &mut scan.table_desc {
1450 assert!(
1451 risingwave_common::catalog::TableId::from(table_desc.table_id)
1452 .is_placeholder()
1453 );
1454 table_desc.table_id = table_catalog.id;
1455 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1456 }
1457 if let Some(table) = &mut scan.arrangement_table {
1458 assert!(
1459 risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1460 );
1461 *table = table_catalog.clone();
1462 }
1463 }
1464 NodeBody::BatchPlan(plan) => {
1465 if let Some(table_desc) = &mut plan.table_desc {
1466 assert!(
1467 risingwave_common::catalog::TableId::from(table_desc.table_id)
1468 .is_placeholder()
1469 );
1470 table_desc.table_id = table_catalog.id;
1471 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1472 }
1473 }
1474 _ => {}
1475 });
1476 }
1477
1478 let table_id = table_catalog.id as ObjectId;
1479 let dependencies = HashSet::from_iter([table_id]);
1480 let stream_job = StreamingJob::Sink(sink);
1481 let res = self
1482 .ddl_controller
1483 .run_command(DdlCommand::CreateStreamingJob {
1484 stream_job,
1485 fragment_graph,
1486 dependencies,
1487 specific_resource_group: None,
1488 if_not_exists,
1489 })
1490 .await;
1491
1492 if res.is_err() {
1493 let _ = self
1494 .ddl_controller
1495 .run_command(DdlCommand::DropStreamingJob {
1496 job_id: StreamingJobId::Table(None, TableId::new(table_id as _)),
1497 drop_mode: DropMode::Cascade,
1498 })
1499 .await
1500 .inspect_err(|err| {
1501 tracing::error!(error = %err.as_report(),
1502 "Failed to clean up table after iceberg sink creation failure",
1503 );
1504 });
1505 res?;
1506 }
1507
1508 let iceberg_source = iceberg_source.unwrap();
1510 let res = self
1511 .ddl_controller
1512 .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1513 .await;
1514 if res.is_err() {
1515 let _ = self
1516 .ddl_controller
1517 .run_command(DdlCommand::DropStreamingJob {
1518 job_id: StreamingJobId::Table(None, TableId::new(table_id as _)),
1519 drop_mode: DropMode::Cascade,
1520 })
1521 .await
1522 .inspect_err(|err| {
1523 tracing::error!(
1524 error = %err.as_report(),
1525 "Failed to clean up table after iceberg source creation failure",
1526 );
1527 });
1528 }
1529
1530 Ok(Response::new(CreateIcebergTableResponse {
1531 status: None,
1532 version: res?,
1533 }))
1534 }
1535}
1536
1537fn add_auto_schema_change_fail_event_log(
1538 meta_metrics: &Arc<MetaMetrics>,
1539 table_id: u32,
1540 table_name: String,
1541 cdc_table_id: String,
1542 upstream_ddl: String,
1543 event_log_manager: &EventLogManagerRef,
1544 fail_info: String,
1545) {
1546 meta_metrics
1547 .auto_schema_change_failure_cnt
1548 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1549 .inc();
1550 let event = event_log::EventAutoSchemaChangeFail {
1551 table_id,
1552 table_name,
1553 cdc_table_id,
1554 upstream_ddl,
1555 fail_info,
1556 };
1557 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1558}