1use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::{ObjectId, TableId};
26use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
27use risingwave_common::types::DataType;
28use risingwave_common::util::stream_graph_visitor;
29use risingwave_connector::sink::catalog::SinkId;
30use risingwave_connector::sink::iceberg::ENABLE_PK_INDEX;
31use risingwave_meta::barrier::{BarrierScheduler, Command, ResumeBackfillTarget};
32use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
33use risingwave_meta::model::TableParallelism as ModelTableParallelism;
34use risingwave_meta::rpc::metrics::MetaMetrics;
35use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
36use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
37use risingwave_meta_model::StreamingParallelism;
38use risingwave_pb::catalog::connection::Info as ConnectionInfo;
39use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
40use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
41use risingwave_pb::common::WorkerType;
42use risingwave_pb::common::worker_node::State;
43use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
44use risingwave_pb::ddl_service::ddl_service_server::DdlService;
45use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
46use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
47use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
48use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
49use risingwave_pb::meta::event_log;
50use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
51use risingwave_pb::stream_plan::stream_node::NodeBody;
52use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
53use thiserror_ext::AsReport;
54use tokio::sync::oneshot::Sender;
55use tokio::task::JoinHandle;
56use tonic::{Request, Response, Status};
57
58use crate::MetaError;
59use crate::barrier::BarrierManagerRef;
60use crate::manager::sink_coordination::SinkCoordinatorManager;
61use crate::manager::{MetaSrvEnv, StreamingJob};
62use crate::rpc::ddl_controller::{
63 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
64};
65use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
66
67#[derive(Clone)]
68pub struct DdlServiceImpl {
69 env: MetaSrvEnv,
70
71 metadata_manager: MetadataManager,
72 sink_manager: SinkCoordinatorManager,
73 ddl_controller: DdlController,
74 meta_metrics: Arc<MetaMetrics>,
75 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
76 barrier_scheduler: BarrierScheduler,
77}
78
79impl DdlServiceImpl {
80 pub async fn new(
81 env: MetaSrvEnv,
82 metadata_manager: MetadataManager,
83 stream_manager: GlobalStreamManagerRef,
84 source_manager: SourceManagerRef,
85 barrier_manager: BarrierManagerRef,
86 sink_manager: SinkCoordinatorManager,
87 meta_metrics: Arc<MetaMetrics>,
88 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
89 barrier_scheduler: BarrierScheduler,
90 ) -> Self {
91 let ddl_controller = DdlController::new(
92 env.clone(),
93 metadata_manager.clone(),
94 stream_manager,
95 source_manager,
96 barrier_manager,
97 sink_manager.clone(),
98 iceberg_compaction_manager.clone(),
99 )
100 .await;
101 Self {
102 env,
103 metadata_manager,
104 sink_manager,
105 ddl_controller,
106 meta_metrics,
107 iceberg_compaction_manager,
108 barrier_scheduler,
109 }
110 }
111
112 fn extract_replace_table_info(
113 ReplaceJobPlan {
114 fragment_graph,
115 replace_job,
116 }: ReplaceJobPlan,
117 ) -> ReplaceStreamJobInfo {
118 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
119 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
120 table,
121 source,
122 job_type,
123 }) => StreamingJob::Table(
124 source,
125 table.unwrap(),
126 TableJobType::try_from(job_type).unwrap(),
127 ),
128 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
129 StreamingJob::Source(source.unwrap())
130 }
131 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
132 table,
133 }) => StreamingJob::MaterializedView(table.unwrap()),
134 };
135
136 ReplaceStreamJobInfo {
137 streaming_job: replace_streaming_job,
138 fragment_graph: fragment_graph.unwrap(),
139 }
140 }
141
142 fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
143 streaming_job_resource_type::ResourceType::Regular(true)
144 }
145
146 pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
147 tracing::info!("start migrate legacy table fragments task");
148 let env = self.env.clone();
149 let metadata_manager = self.metadata_manager.clone();
150 let ddl_controller = self.ddl_controller.clone();
151
152 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
153 let join_handle = tokio::spawn(async move {
154 async fn migrate_inner(
155 env: &MetaSrvEnv,
156 metadata_manager: &MetadataManager,
157 ddl_controller: &DdlController,
158 ) -> MetaResult<()> {
159 let tables = metadata_manager
160 .catalog_controller
161 .list_unmigrated_tables()
162 .await?;
163
164 if tables.is_empty() {
165 tracing::info!("no legacy table fragments need migration");
166 return Ok(());
167 }
168
169 let client = {
170 let workers = metadata_manager
171 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
172 .await?;
173 if workers.is_empty() {
174 return Err(anyhow::anyhow!("no active frontend nodes found").into());
175 }
176 let worker = workers.choose(&mut thread_rng()).unwrap();
177 env.frontend_client_pool().get(worker).await?
178 };
179
180 for table in tables {
181 let start = tokio::time::Instant::now();
182 let req = GetTableReplacePlanRequest {
183 database_id: table.database_id,
184 table_id: table.id,
185 cdc_table_change: None,
186 };
187 let resp = client
188 .get_table_replace_plan(req)
189 .await
190 .context("failed to get table replace plan from frontend")?;
191
192 let plan = resp.into_inner().replace_plan.unwrap();
193 let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
194 ddl_controller
195 .run_command(DdlCommand::ReplaceStreamJob(replace_info))
196 .await?;
197 tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
198 }
199 tracing::info!("successfully migrated all legacy table fragments");
200
201 Ok(())
202 }
203
204 let migrate_future = async move {
205 let mut attempt = 0;
206 loop {
207 match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
208 Ok(_) => break,
209 Err(e) => {
210 attempt += 1;
211 tracing::error!(
212 "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
213 e.as_report(),
214 attempt
215 );
216 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
217 }
218 }
219 }
220 };
221
222 select(pin!(migrate_future), shutdown_rx).await;
223 });
224
225 (join_handle, shutdown_tx)
226 }
227}
228
229#[async_trait::async_trait]
230impl DdlService for DdlServiceImpl {
231 async fn create_database(
232 &self,
233 request: Request<CreateDatabaseRequest>,
234 ) -> Result<Response<CreateDatabaseResponse>, Status> {
235 let req = request.into_inner();
236 let database = req.get_db()?.clone();
237 let version = self
238 .ddl_controller
239 .run_command(DdlCommand::CreateDatabase(database))
240 .await?;
241
242 Ok(Response::new(CreateDatabaseResponse {
243 status: None,
244 version,
245 }))
246 }
247
248 async fn drop_database(
249 &self,
250 request: Request<DropDatabaseRequest>,
251 ) -> Result<Response<DropDatabaseResponse>, Status> {
252 let req = request.into_inner();
253 let database_id = req.get_database_id();
254
255 let version = self
256 .ddl_controller
257 .run_command(DdlCommand::DropDatabase(database_id))
258 .await?;
259
260 Ok(Response::new(DropDatabaseResponse {
261 status: None,
262 version,
263 }))
264 }
265
266 async fn create_secret(
267 &self,
268 request: Request<CreateSecretRequest>,
269 ) -> Result<Response<CreateSecretResponse>, Status> {
270 let req = request.into_inner();
271 let pb_secret = Secret {
272 id: 0.into(),
273 name: req.get_name().clone(),
274 database_id: req.get_database_id(),
275 value: req.get_value().clone(),
276 owner: req.get_owner_id(),
277 schema_id: req.get_schema_id(),
278 };
279 let version = self
280 .ddl_controller
281 .run_command(DdlCommand::CreateSecret(pb_secret))
282 .await?;
283
284 Ok(Response::new(CreateSecretResponse { version }))
285 }
286
287 async fn drop_secret(
288 &self,
289 request: Request<DropSecretRequest>,
290 ) -> Result<Response<DropSecretResponse>, Status> {
291 let req = request.into_inner();
292 let secret_id = req.get_secret_id();
293 let drop_mode = DropMode::from_request_setting(req.cascade);
294 let version = self
295 .ddl_controller
296 .run_command(DdlCommand::DropSecret(secret_id, drop_mode))
297 .await?;
298
299 Ok(Response::new(DropSecretResponse {
300 status: None,
301 version,
302 }))
303 }
304
305 async fn alter_secret(
306 &self,
307 request: Request<AlterSecretRequest>,
308 ) -> Result<Response<AlterSecretResponse>, Status> {
309 let req = request.into_inner();
310 let pb_secret = Secret {
311 id: req.get_secret_id(),
312 name: req.get_name().clone(),
313 database_id: req.get_database_id(),
314 value: req.get_value().clone(),
315 owner: req.get_owner_id(),
316 schema_id: req.get_schema_id(),
317 };
318 let version = self
319 .ddl_controller
320 .run_command(DdlCommand::AlterSecret(pb_secret))
321 .await?;
322
323 Ok(Response::new(AlterSecretResponse { version }))
324 }
325
326 async fn create_schema(
327 &self,
328 request: Request<CreateSchemaRequest>,
329 ) -> Result<Response<CreateSchemaResponse>, Status> {
330 let req = request.into_inner();
331 let schema = req.get_schema()?.clone();
332 let version = self
333 .ddl_controller
334 .run_command(DdlCommand::CreateSchema(schema))
335 .await?;
336
337 Ok(Response::new(CreateSchemaResponse {
338 status: None,
339 version,
340 }))
341 }
342
343 async fn drop_schema(
344 &self,
345 request: Request<DropSchemaRequest>,
346 ) -> Result<Response<DropSchemaResponse>, Status> {
347 let req = request.into_inner();
348 let schema_id = req.get_schema_id();
349 let drop_mode = DropMode::from_request_setting(req.cascade);
350 let version = self
351 .ddl_controller
352 .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
353 .await?;
354 Ok(Response::new(DropSchemaResponse {
355 status: None,
356 version,
357 }))
358 }
359
360 async fn create_source(
361 &self,
362 request: Request<CreateSourceRequest>,
363 ) -> Result<Response<CreateSourceResponse>, Status> {
364 let req = request.into_inner();
365 let source = req.get_source()?.clone();
366
367 match req.fragment_graph {
368 None => {
369 let version = self
370 .ddl_controller
371 .run_command(DdlCommand::CreateNonSharedSource(source))
372 .await?;
373 Ok(Response::new(CreateSourceResponse {
374 status: None,
375 version,
376 }))
377 }
378 Some(fragment_graph) => {
379 let stream_job = StreamingJob::Source(source);
381 let version = self
382 .ddl_controller
383 .run_command(DdlCommand::CreateStreamingJob {
384 stream_job,
385 fragment_graph,
386 dependencies: HashSet::new(),
387 resource_type: Self::default_streaming_job_resource_type(),
388 if_not_exists: req.if_not_exists,
389 refresh_interval_sec: None,
390 })
391 .await?;
392 Ok(Response::new(CreateSourceResponse {
393 status: None,
394 version,
395 }))
396 }
397 }
398 }
399
400 async fn drop_source(
401 &self,
402 request: Request<DropSourceRequest>,
403 ) -> Result<Response<DropSourceResponse>, Status> {
404 let request = request.into_inner();
405 let source_id = request.source_id;
406 let drop_mode = DropMode::from_request_setting(request.cascade);
407 let version = self
408 .ddl_controller
409 .run_command(DdlCommand::DropSource(source_id, drop_mode))
410 .await?;
411
412 Ok(Response::new(DropSourceResponse {
413 status: None,
414 version,
415 }))
416 }
417
418 async fn reset_source(
419 &self,
420 request: Request<ResetSourceRequest>,
421 ) -> Result<Response<ResetSourceResponse>, Status> {
422 let request = request.into_inner();
423 let source_id = request.source_id;
424
425 tracing::info!(
426 source_id = %source_id,
427 "Received RESET SOURCE request, routing to DDL controller"
428 );
429
430 let version = self
432 .ddl_controller
433 .run_command(DdlCommand::ResetSource(source_id))
434 .await?;
435
436 Ok(Response::new(ResetSourceResponse {
437 status: None,
438 version,
439 }))
440 }
441
442 async fn create_sink(
443 &self,
444 request: Request<CreateSinkRequest>,
445 ) -> Result<Response<CreateSinkResponse>, Status> {
446 self.env.idle_manager().record_activity();
447
448 let req = request.into_inner();
449
450 let sink = req.get_sink()?.clone();
451 let fragment_graph = req.get_fragment_graph()?.clone();
452 let dependencies = req.get_dependencies().iter().copied().collect();
453 let resource_type = req
454 .resource_type
455 .and_then(|resource_type| resource_type.resource_type)
456 .unwrap_or_else(Self::default_streaming_job_resource_type);
457
458 let stream_job = StreamingJob::Sink(sink);
459
460 let command = DdlCommand::CreateStreamingJob {
461 stream_job,
462 fragment_graph,
463 dependencies,
464 resource_type,
465 if_not_exists: req.if_not_exists,
466 refresh_interval_sec: None,
467 };
468
469 let version = self.ddl_controller.run_command(command).await?;
470
471 Ok(Response::new(CreateSinkResponse {
472 status: None,
473 version,
474 }))
475 }
476
477 async fn drop_sink(
478 &self,
479 request: Request<DropSinkRequest>,
480 ) -> Result<Response<DropSinkResponse>, Status> {
481 let request = request.into_inner();
482 let sink_id = request.sink_id;
483 let drop_mode = DropMode::from_request_setting(request.cascade);
484
485 let command = DdlCommand::DropStreamingJob {
486 job_id: StreamingJobId::Sink(sink_id),
487 drop_mode,
488 };
489
490 let version = self.ddl_controller.run_command(command).await?;
491
492 self.sink_manager
493 .stop_sink_coordinator(vec![SinkId::from(sink_id)])
494 .await;
495 self.iceberg_compaction_manager
496 .clear_iceberg_maintenance_by_sink_id(SinkId::from(sink_id));
497
498 Ok(Response::new(DropSinkResponse {
499 status: None,
500 version,
501 }))
502 }
503
504 async fn create_subscription(
505 &self,
506 request: Request<CreateSubscriptionRequest>,
507 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
508 self.env.idle_manager().record_activity();
509
510 let req = request.into_inner();
511
512 let subscription = req.get_subscription()?.clone();
513 let command = DdlCommand::CreateSubscription(subscription);
514
515 let version = self.ddl_controller.run_command(command).await?;
516
517 Ok(Response::new(CreateSubscriptionResponse {
518 status: None,
519 version,
520 }))
521 }
522
523 async fn drop_subscription(
524 &self,
525 request: Request<DropSubscriptionRequest>,
526 ) -> Result<Response<DropSubscriptionResponse>, Status> {
527 let request = request.into_inner();
528 let subscription_id = request.subscription_id;
529 let drop_mode = DropMode::from_request_setting(request.cascade);
530
531 let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
532
533 let version = self.ddl_controller.run_command(command).await?;
534
535 Ok(Response::new(DropSubscriptionResponse {
536 status: None,
537 version,
538 }))
539 }
540
541 async fn create_materialized_view(
542 &self,
543 request: Request<CreateMaterializedViewRequest>,
544 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
545 self.env.idle_manager().record_activity();
546
547 let req = request.into_inner();
548 let mview = req.get_materialized_view()?.clone();
549 let fragment_graph = req.get_fragment_graph()?.clone();
550 let dependencies = req.get_dependencies().iter().copied().collect();
551 let resource_type = req.resource_type.unwrap().resource_type.unwrap();
552
553 let stream_job = StreamingJob::MaterializedView(mview);
554 let version = self
555 .ddl_controller
556 .run_command(DdlCommand::CreateStreamingJob {
557 stream_job,
558 fragment_graph,
559 dependencies,
560 resource_type,
561 if_not_exists: req.if_not_exists,
562 refresh_interval_sec: req.refresh_interval_sec,
563 })
564 .await?;
565
566 Ok(Response::new(CreateMaterializedViewResponse {
567 status: None,
568 version,
569 }))
570 }
571
572 async fn drop_materialized_view(
573 &self,
574 request: Request<DropMaterializedViewRequest>,
575 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
576 self.env.idle_manager().record_activity();
577
578 let request = request.into_inner();
579 let table_id = request.table_id;
580 let drop_mode = DropMode::from_request_setting(request.cascade);
581
582 let version = self
583 .ddl_controller
584 .run_command(DdlCommand::DropStreamingJob {
585 job_id: StreamingJobId::MaterializedView(table_id),
586 drop_mode,
587 })
588 .await?;
589
590 Ok(Response::new(DropMaterializedViewResponse {
591 status: None,
592 version,
593 }))
594 }
595
596 async fn create_index(
597 &self,
598 request: Request<CreateIndexRequest>,
599 ) -> Result<Response<CreateIndexResponse>, Status> {
600 self.env.idle_manager().record_activity();
601
602 let req = request.into_inner();
603 let index = req.get_index()?.clone();
604 let index_table = req.get_index_table()?.clone();
605 let fragment_graph = req.get_fragment_graph()?.clone();
606 let resource_type = req
607 .resource_type
608 .and_then(|resource_type| resource_type.resource_type)
609 .unwrap_or_else(Self::default_streaming_job_resource_type);
610
611 let stream_job = StreamingJob::Index(index, index_table);
612 let version = self
613 .ddl_controller
614 .run_command(DdlCommand::CreateStreamingJob {
615 stream_job,
616 fragment_graph,
617 dependencies: HashSet::new(),
618 resource_type,
619 if_not_exists: req.if_not_exists,
620 refresh_interval_sec: None,
621 })
622 .await?;
623
624 Ok(Response::new(CreateIndexResponse {
625 status: None,
626 version,
627 }))
628 }
629
630 async fn drop_index(
631 &self,
632 request: Request<DropIndexRequest>,
633 ) -> Result<Response<DropIndexResponse>, Status> {
634 self.env.idle_manager().record_activity();
635
636 let request = request.into_inner();
637 let index_id = request.index_id;
638 let drop_mode = DropMode::from_request_setting(request.cascade);
639 let version = self
640 .ddl_controller
641 .run_command(DdlCommand::DropStreamingJob {
642 job_id: StreamingJobId::Index(index_id),
643 drop_mode,
644 })
645 .await?;
646
647 Ok(Response::new(DropIndexResponse {
648 status: None,
649 version,
650 }))
651 }
652
653 async fn create_function(
654 &self,
655 request: Request<CreateFunctionRequest>,
656 ) -> Result<Response<CreateFunctionResponse>, Status> {
657 let req = request.into_inner();
658 let function = req.get_function()?.clone();
659
660 let version = self
661 .ddl_controller
662 .run_command(DdlCommand::CreateFunction(function))
663 .await?;
664
665 Ok(Response::new(CreateFunctionResponse {
666 status: None,
667 version,
668 }))
669 }
670
671 async fn drop_function(
672 &self,
673 request: Request<DropFunctionRequest>,
674 ) -> Result<Response<DropFunctionResponse>, Status> {
675 let request = request.into_inner();
676
677 let version = self
678 .ddl_controller
679 .run_command(DdlCommand::DropFunction(
680 request.function_id,
681 DropMode::from_request_setting(request.cascade),
682 ))
683 .await?;
684
685 Ok(Response::new(DropFunctionResponse {
686 status: None,
687 version,
688 }))
689 }
690
691 async fn create_table(
692 &self,
693 request: Request<CreateTableRequest>,
694 ) -> Result<Response<CreateTableResponse>, Status> {
695 let request = request.into_inner();
696 let job_type = request.get_job_type().unwrap_or_default();
697 let dependencies = request.get_dependencies().iter().copied().collect();
698 let source = request.source;
699 let mview = request.materialized_view.unwrap();
700 let fragment_graph = request.fragment_graph.unwrap();
701
702 let stream_job = StreamingJob::Table(source, mview, job_type);
703 let version = self
704 .ddl_controller
705 .run_command(DdlCommand::CreateStreamingJob {
706 stream_job,
707 fragment_graph,
708 dependencies,
709 resource_type: Self::default_streaming_job_resource_type(),
710 if_not_exists: request.if_not_exists,
711 refresh_interval_sec: None,
712 })
713 .await?;
714
715 Ok(Response::new(CreateTableResponse {
716 status: None,
717 version,
718 }))
719 }
720
721 async fn drop_table(
722 &self,
723 request: Request<DropTableRequest>,
724 ) -> Result<Response<DropTableResponse>, Status> {
725 let request = request.into_inner();
726 let source_id = request.source_id;
727 let table_id = request.table_id;
728
729 let drop_mode = DropMode::from_request_setting(request.cascade);
730 let version = self
731 .ddl_controller
732 .run_command(DdlCommand::DropStreamingJob {
733 job_id: StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id), table_id),
734 drop_mode,
735 })
736 .await?;
737
738 Ok(Response::new(DropTableResponse {
739 status: None,
740 version,
741 }))
742 }
743
744 async fn create_view(
745 &self,
746 request: Request<CreateViewRequest>,
747 ) -> Result<Response<CreateViewResponse>, Status> {
748 let req = request.into_inner();
749 let view = req.get_view()?.clone();
750 let dependencies = req
751 .get_dependencies()
752 .iter()
753 .copied()
754 .collect::<HashSet<_>>();
755
756 let version = self
757 .ddl_controller
758 .run_command(DdlCommand::CreateView(view, dependencies))
759 .await?;
760
761 Ok(Response::new(CreateViewResponse {
762 status: None,
763 version,
764 }))
765 }
766
767 async fn drop_view(
768 &self,
769 request: Request<DropViewRequest>,
770 ) -> Result<Response<DropViewResponse>, Status> {
771 let request = request.into_inner();
772 let view_id = request.get_view_id();
773 let drop_mode = DropMode::from_request_setting(request.cascade);
774 let version = self
775 .ddl_controller
776 .run_command(DdlCommand::DropView(view_id, drop_mode))
777 .await?;
778 Ok(Response::new(DropViewResponse {
779 status: None,
780 version,
781 }))
782 }
783
784 async fn risectl_list_state_tables(
785 &self,
786 _request: Request<RisectlListStateTablesRequest>,
787 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
788 let tables = self
789 .metadata_manager
790 .catalog_controller
791 .list_all_state_tables()
792 .await?;
793 Ok(Response::new(RisectlListStateTablesResponse { tables }))
794 }
795
796 async fn risectl_resume_backfill(
797 &self,
798 request: Request<RisectlResumeBackfillRequest>,
799 ) -> Result<Response<RisectlResumeBackfillResponse>, Status> {
800 let request = request.into_inner();
801 let target = request
802 .target
803 .ok_or_else(|| Status::invalid_argument("missing resume backfill target"))?;
804
805 match target {
806 risectl_resume_backfill_request::Target::JobId(job_id) => {
807 let database_id = self
808 .metadata_manager
809 .catalog_controller
810 .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
811 .await?;
812 self.barrier_scheduler
813 .run_command(
814 database_id,
815 Command::ResumeBackfill {
816 target: ResumeBackfillTarget::Job(job_id),
817 },
818 )
819 .await?;
820 }
821 risectl_resume_backfill_request::Target::FragmentId(fragment_id) => {
822 let mut job_ids = self
823 .metadata_manager
824 .catalog_controller
825 .get_fragment_job_id(vec![fragment_id])
826 .await?;
827 let job_id = job_ids
828 .pop()
829 .ok_or_else(|| Status::invalid_argument("fragment not found"))?;
830 let database_id = self
831 .metadata_manager
832 .catalog_controller
833 .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
834 .await?;
835 self.barrier_scheduler
836 .run_command(
837 database_id,
838 Command::ResumeBackfill {
839 target: ResumeBackfillTarget::Fragment(fragment_id),
840 },
841 )
842 .await?;
843 }
844 }
845
846 Ok(Response::new(RisectlResumeBackfillResponse {}))
847 }
848
849 async fn replace_job_plan(
850 &self,
851 request: Request<ReplaceJobPlanRequest>,
852 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
853 let req = request.into_inner().get_plan().cloned()?;
854
855 let version = self
856 .ddl_controller
857 .run_command(DdlCommand::ReplaceStreamJob(
858 Self::extract_replace_table_info(req),
859 ))
860 .await?;
861
862 Ok(Response::new(ReplaceJobPlanResponse {
863 status: None,
864 version,
865 }))
866 }
867
868 async fn get_table(
869 &self,
870 request: Request<GetTableRequest>,
871 ) -> Result<Response<GetTableResponse>, Status> {
872 let req = request.into_inner();
873 let table = self
874 .metadata_manager
875 .catalog_controller
876 .get_table_by_name(&req.database_name, &req.table_name)
877 .await?;
878
879 Ok(Response::new(GetTableResponse { table }))
880 }
881
882 async fn alter_name(
883 &self,
884 request: Request<AlterNameRequest>,
885 ) -> Result<Response<AlterNameResponse>, Status> {
886 let AlterNameRequest { object, new_name } = request.into_inner();
887 let version = self
888 .ddl_controller
889 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
890 .await?;
891 Ok(Response::new(AlterNameResponse {
892 status: None,
893 version,
894 }))
895 }
896
897 async fn alter_source(
899 &self,
900 request: Request<AlterSourceRequest>,
901 ) -> Result<Response<AlterSourceResponse>, Status> {
902 let AlterSourceRequest { source } = request.into_inner();
903 let version = self
904 .ddl_controller
905 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
906 .await?;
907 Ok(Response::new(AlterSourceResponse {
908 status: None,
909 version,
910 }))
911 }
912
913 async fn alter_owner(
914 &self,
915 request: Request<AlterOwnerRequest>,
916 ) -> Result<Response<AlterOwnerResponse>, Status> {
917 let AlterOwnerRequest { object, owner_id } = request.into_inner();
918 let version = self
919 .ddl_controller
920 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
921 .await?;
922 Ok(Response::new(AlterOwnerResponse {
923 status: None,
924 version,
925 }))
926 }
927
928 async fn alter_subscription_retention(
929 &self,
930 request: Request<AlterSubscriptionRetentionRequest>,
931 ) -> Result<Response<AlterSubscriptionRetentionResponse>, Status> {
932 let AlterSubscriptionRetentionRequest {
933 subscription_id,
934 retention_seconds,
935 definition,
936 } = request.into_inner();
937 let version = self
938 .ddl_controller
939 .run_command(DdlCommand::AlterSubscriptionRetention {
940 subscription_id,
941 retention_seconds,
942 definition,
943 })
944 .await?;
945 Ok(Response::new(AlterSubscriptionRetentionResponse {
946 status: None,
947 version,
948 }))
949 }
950
951 async fn alter_set_schema(
952 &self,
953 request: Request<AlterSetSchemaRequest>,
954 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
955 let AlterSetSchemaRequest {
956 object,
957 new_schema_id,
958 } = request.into_inner();
959 let version = self
960 .ddl_controller
961 .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
962 .await?;
963 Ok(Response::new(AlterSetSchemaResponse {
964 status: None,
965 version,
966 }))
967 }
968
969 async fn get_ddl_progress(
970 &self,
971 _request: Request<GetDdlProgressRequest>,
972 ) -> Result<Response<GetDdlProgressResponse>, Status> {
973 Ok(Response::new(GetDdlProgressResponse {
974 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
975 }))
976 }
977
978 async fn create_connection(
979 &self,
980 request: Request<CreateConnectionRequest>,
981 ) -> Result<Response<CreateConnectionResponse>, Status> {
982 let req = request.into_inner();
983 if req.payload.is_none() {
984 return Err(Status::invalid_argument("request is empty"));
985 }
986
987 match req.payload.unwrap() {
988 #[expect(deprecated)]
989 create_connection_request::Payload::PrivateLink(_) => {
990 panic!("Private Link Connection has been deprecated")
991 }
992 create_connection_request::Payload::ConnectionParams(params) => {
993 let pb_connection = Connection {
994 id: 0.into(),
995 schema_id: req.schema_id,
996 database_id: req.database_id,
997 name: req.name,
998 info: Some(ConnectionInfo::ConnectionParams(params)),
999 owner: req.owner_id,
1000 };
1001 let version = self
1002 .ddl_controller
1003 .run_command(DdlCommand::CreateConnection(pb_connection))
1004 .await?;
1005 Ok(Response::new(CreateConnectionResponse { version }))
1006 }
1007 }
1008 }
1009
1010 async fn list_connections(
1011 &self,
1012 _request: Request<ListConnectionsRequest>,
1013 ) -> Result<Response<ListConnectionsResponse>, Status> {
1014 let conns = self
1015 .metadata_manager
1016 .catalog_controller
1017 .list_connections()
1018 .await?;
1019
1020 Ok(Response::new(ListConnectionsResponse {
1021 connections: conns,
1022 }))
1023 }
1024
1025 async fn drop_connection(
1026 &self,
1027 request: Request<DropConnectionRequest>,
1028 ) -> Result<Response<DropConnectionResponse>, Status> {
1029 let req = request.into_inner();
1030 let drop_mode = DropMode::from_request_setting(req.cascade);
1031
1032 let version = self
1033 .ddl_controller
1034 .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
1035 .await?;
1036
1037 Ok(Response::new(DropConnectionResponse {
1038 status: None,
1039 version,
1040 }))
1041 }
1042
1043 async fn comment_on(
1044 &self,
1045 request: Request<CommentOnRequest>,
1046 ) -> Result<Response<CommentOnResponse>, Status> {
1047 let req = request.into_inner();
1048 let comment = req.get_comment()?.clone();
1049
1050 let version = self
1051 .ddl_controller
1052 .run_command(DdlCommand::CommentOn(Comment {
1053 table_id: comment.table_id,
1054 schema_id: comment.schema_id,
1055 database_id: comment.database_id,
1056 column_index: comment.column_index,
1057 description: comment.description,
1058 }))
1059 .await?;
1060
1061 Ok(Response::new(CommentOnResponse {
1062 status: None,
1063 version,
1064 }))
1065 }
1066
1067 async fn get_tables(
1068 &self,
1069 request: Request<GetTablesRequest>,
1070 ) -> Result<Response<GetTablesResponse>, Status> {
1071 let GetTablesRequest {
1072 table_ids,
1073 include_dropped_tables,
1074 } = request.into_inner();
1075 let ret = self
1076 .metadata_manager
1077 .catalog_controller
1078 .get_table_by_ids(table_ids, include_dropped_tables)
1079 .await?;
1080
1081 let mut tables = HashMap::default();
1082 for table in ret {
1083 tables.insert(table.id, table);
1084 }
1085 Ok(Response::new(GetTablesResponse { tables }))
1086 }
1087
1088 async fn wait(&self, request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
1089 let req = request.into_inner();
1090 let version = self.ddl_controller.wait(req.job_id).await?;
1091 Ok(Response::new(WaitResponse {
1092 version: Some(version),
1093 }))
1094 }
1095
1096 async fn alter_cdc_table_backfill_parallelism(
1097 &self,
1098 request: Request<AlterCdcTableBackfillParallelismRequest>,
1099 ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1100 let req = request.into_inner();
1101 let job_id = req.get_table_id();
1102 let parallelism = *req.get_parallelism()?;
1103
1104 let table_parallelism = ModelTableParallelism::from(parallelism);
1105 let streaming_parallelism = match table_parallelism {
1106 ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1107 _ => bail_invalid_parameter!(
1108 "CDC table backfill parallelism must be set to a fixed value"
1109 ),
1110 };
1111
1112 self.ddl_controller
1113 .reschedule_cdc_table_backfill(
1114 job_id,
1115 ReschedulePolicy::Parallelism(ParallelismPolicy {
1116 parallelism: streaming_parallelism,
1117 adaptive_parallelism_strategy: None,
1118 }),
1119 )
1120 .await?;
1121 Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1122 }
1123
1124 async fn alter_parallelism(
1125 &self,
1126 request: Request<AlterParallelismRequest>,
1127 ) -> Result<Response<AlterParallelismResponse>, Status> {
1128 let req = request.into_inner();
1129
1130 let job_id = req.get_table_id();
1131 let parallelism = *req.get_parallelism()?;
1132 let deferred = req.get_deferred();
1133
1134 let adaptive_parallelism_strategy = req.adaptive_parallelism_strategy;
1135 let (parallelism, adaptive_parallelism_strategy) = match parallelism.get_parallelism()? {
1136 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1137 (StreamingParallelism::Fixed(*parallelism as _), None)
1138 }
1139 Parallelism::Auto(_) | Parallelism::Adaptive(_) => (
1140 StreamingParallelism::Adaptive,
1141 Some(adaptive_parallelism_strategy.unwrap_or_else(|| "AUTO".to_owned())),
1142 ),
1143 Parallelism::Custom(_) => (
1144 StreamingParallelism::Adaptive,
1145 Some(adaptive_parallelism_strategy.ok_or_else(|| {
1146 Status::invalid_argument(
1147 "adaptive_parallelism_strategy is required for custom parallelism",
1148 )
1149 })?),
1150 ),
1151 };
1152
1153 if let Some(strategy) = adaptive_parallelism_strategy.as_deref() {
1154 parse_strategy(strategy).map_err(|e| {
1155 Status::invalid_argument(format!(
1156 "invalid adaptive parallelism strategy: {}",
1157 e.as_report()
1158 ))
1159 })?;
1160 };
1161
1162 self.ddl_controller
1163 .reschedule_streaming_job(
1164 job_id,
1165 ReschedulePolicy::Parallelism(ParallelismPolicy {
1166 parallelism,
1167 adaptive_parallelism_strategy,
1168 }),
1169 deferred,
1170 )
1171 .await?;
1172
1173 Ok(Response::new(AlterParallelismResponse {}))
1174 }
1175
1176 async fn alter_backfill_parallelism(
1177 &self,
1178 request: Request<AlterBackfillParallelismRequest>,
1179 ) -> Result<Response<AlterBackfillParallelismResponse>, Status> {
1180 let req = request.into_inner();
1181
1182 let job_id = req.get_table_id();
1183 let deferred = req.get_deferred();
1184 let adaptive_parallelism_strategy = req.adaptive_parallelism_strategy;
1185
1186 let parallelism = match req.parallelism {
1187 None => None,
1188 Some(parallelism) => {
1189 let (parallelism, adaptive_parallelism_strategy) = match parallelism
1190 .get_parallelism()?
1191 {
1192 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1193 (StreamingParallelism::Fixed(*parallelism as _), None)
1194 }
1195 Parallelism::Auto(_) | Parallelism::Adaptive(_) => (
1196 StreamingParallelism::Adaptive,
1197 Some(adaptive_parallelism_strategy.unwrap_or_else(|| "AUTO".to_owned())),
1198 ),
1199 Parallelism::Custom(_) => (
1200 StreamingParallelism::Adaptive,
1201 Some(adaptive_parallelism_strategy.ok_or_else(|| {
1202 Status::invalid_argument(
1203 "adaptive_parallelism_strategy is required for custom parallelism",
1204 )
1205 })?),
1206 ),
1207 };
1208
1209 if let Some(strategy) = adaptive_parallelism_strategy.as_deref() {
1210 parse_strategy(strategy).map_err(|e| {
1211 Status::invalid_argument(format!(
1212 "invalid adaptive parallelism strategy: {}",
1213 e.as_report()
1214 ))
1215 })?;
1216 };
1217
1218 Some(ParallelismPolicy {
1219 parallelism,
1220 adaptive_parallelism_strategy,
1221 })
1222 }
1223 };
1224
1225 self.ddl_controller
1226 .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
1227 .await?;
1228
1229 Ok(Response::new(AlterBackfillParallelismResponse {}))
1230 }
1231
1232 async fn alter_fragment_parallelism(
1233 &self,
1234 request: Request<AlterFragmentParallelismRequest>,
1235 ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1236 let req = request.into_inner();
1237
1238 let fragment_ids = req.fragment_ids;
1239 if fragment_ids.is_empty() {
1240 return Err(Status::invalid_argument(
1241 "at least one fragment id must be provided",
1242 ));
1243 }
1244
1245 let parallelism = match req.parallelism {
1246 Some(parallelism) => {
1247 let streaming_parallelism = match parallelism.get_parallelism()? {
1248 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1249 StreamingParallelism::Fixed(*parallelism as _)
1250 }
1251 Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1252 StreamingParallelism::Adaptive
1253 }
1254 _ => bail_unavailable!(),
1255 };
1256 Some(streaming_parallelism)
1257 }
1258 None => None,
1259 };
1260
1261 let fragment_targets = fragment_ids
1262 .into_iter()
1263 .map(|fragment_id| (fragment_id, parallelism.clone()))
1264 .collect();
1265
1266 self.ddl_controller
1267 .reschedule_fragments(fragment_targets)
1268 .await?;
1269
1270 Ok(Response::new(AlterFragmentParallelismResponse {}))
1271 }
1272
1273 async fn alter_streaming_job_config(
1274 &self,
1275 request: Request<AlterStreamingJobConfigRequest>,
1276 ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1277 let AlterStreamingJobConfigRequest {
1278 job_id,
1279 entries_to_add,
1280 keys_to_remove,
1281 } = request.into_inner();
1282
1283 self.ddl_controller
1284 .run_command(DdlCommand::AlterStreamingJobConfig(
1285 job_id,
1286 entries_to_add,
1287 keys_to_remove,
1288 ))
1289 .await?;
1290
1291 Ok(Response::new(AlterStreamingJobConfigResponse {}))
1292 }
1293
1294 async fn auto_schema_change(
1297 &self,
1298 request: Request<AutoSchemaChangeRequest>,
1299 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1300 let req = request.into_inner();
1301
1302 let workers = self
1304 .metadata_manager
1305 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1306 .await?;
1307 let worker = workers
1308 .choose(&mut thread_rng())
1309 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1310
1311 let client = self
1312 .env
1313 .frontend_client_pool()
1314 .get(worker)
1315 .await
1316 .map_err(MetaError::from)?;
1317
1318 let Some(schema_change) = req.schema_change else {
1319 return Err(Status::invalid_argument(
1320 "schema change message is required",
1321 ));
1322 };
1323
1324 for table_change in schema_change.table_changes {
1325 for c in &table_change.columns {
1326 let c = ColumnCatalog::from(c.clone());
1327
1328 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1329 tracing::warn!(target: "auto_schema_change",
1330 cdc_table_id = table_change.cdc_table_id,
1331 upstraem_ddl = table_change.upstream_ddl,
1332 "invalid column type from cdc table change");
1333 Err(Status::invalid_argument(format!(
1334 "invalid column type: {} from cdc table change, column: {:?}",
1335 column_type, c
1336 )))
1337 };
1338 if c.is_generated() {
1339 return invalid_col_type("generated column", &c);
1340 }
1341 if c.is_rw_sys_column() {
1342 return invalid_col_type("rw system column", &c);
1343 }
1344 if c.is_hidden {
1345 return invalid_col_type("hidden column", &c);
1346 }
1347 }
1348
1349 let tables: Vec<Table> = self
1351 .metadata_manager
1352 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1353 .await?;
1354
1355 for table in tables {
1356 let original_columns: HashSet<(String, DataType)> =
1359 HashSet::from_iter(table.columns.iter().filter_map(|col| {
1360 let col = ColumnCatalog::from(col.clone());
1361 if col.is_generated() || col.is_hidden() {
1362 None
1363 } else {
1364 Some((col.column_desc.name.clone(), col.data_type().clone()))
1365 }
1366 }));
1367
1368 let mut new_columns: HashSet<(String, DataType)> =
1369 HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1370 let col = ColumnCatalog::from(col.clone());
1371 if col.is_generated() || col.is_hidden() {
1372 None
1373 } else {
1374 Some((col.column_desc.name.clone(), col.data_type().clone()))
1375 }
1376 }));
1377
1378 for col in &table.columns {
1381 let col = ColumnCatalog::from(col.clone());
1382 if col.is_connector_additional_column()
1383 && !col.is_hidden()
1384 && !col.is_generated()
1385 {
1386 new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1387 }
1388 }
1389
1390 if !(original_columns.is_subset(&new_columns)
1391 || original_columns.is_superset(&new_columns))
1392 {
1393 tracing::warn!(target: "auto_schema_change",
1394 table_id = %table.id,
1395 cdc_table_id = table.cdc_table_id,
1396 upstraem_ddl = table_change.upstream_ddl,
1397 original_columns = ?original_columns,
1398 new_columns = ?new_columns,
1399 "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1400
1401 let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1402 add_auto_schema_change_fail_event_log(
1403 &self.meta_metrics,
1404 table.id,
1405 table.name.clone(),
1406 table_change.cdc_table_id.clone(),
1407 table_change.upstream_ddl.clone(),
1408 &self.env.event_log_manager_ref(),
1409 fail_info,
1410 );
1411
1412 return Err(Status::invalid_argument(
1413 "New columns should be a subset or superset of the original columns (including hidden columns)",
1414 ));
1415 }
1416 if original_columns == new_columns {
1418 tracing::warn!(target: "auto_schema_change",
1419 table_id = %table.id,
1420 cdc_table_id = table.cdc_table_id,
1421 upstraem_ddl = table_change.upstream_ddl,
1422 original_columns = ?original_columns,
1423 new_columns = ?new_columns,
1424 "No change to columns, skipping the schema change");
1425 continue;
1426 }
1427
1428 let latency_timer = self
1429 .meta_metrics
1430 .auto_schema_change_latency
1431 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1432 .start_timer();
1433 let resp = client
1436 .get_table_replace_plan(GetTableReplacePlanRequest {
1437 database_id: table.database_id,
1438 table_id: table.id,
1439 cdc_table_change: Some(table_change.clone()),
1440 })
1441 .await;
1442
1443 match resp {
1444 Ok(resp) => {
1445 let resp = resp.into_inner();
1446 if let Some(plan) = resp.replace_plan {
1447 let plan = Self::extract_replace_table_info(plan);
1448 plan.streaming_job.table().inspect(|t| {
1449 tracing::info!(
1450 target: "auto_schema_change",
1451 table_id = %t.id,
1452 cdc_table_id = t.cdc_table_id,
1453 upstraem_ddl = table_change.upstream_ddl,
1454 "Start the replace config change")
1455 });
1456 let replace_res = self
1458 .ddl_controller
1459 .run_command(DdlCommand::ReplaceStreamJob(plan))
1460 .await;
1461
1462 match replace_res {
1463 Ok(_) => {
1464 tracing::info!(
1465 target: "auto_schema_change",
1466 table_id = %table.id,
1467 cdc_table_id = table.cdc_table_id,
1468 "Table replaced success");
1469
1470 self.meta_metrics
1471 .auto_schema_change_success_cnt
1472 .with_guarded_label_values(&[
1473 &table.id.to_string(),
1474 &table.name,
1475 ])
1476 .inc();
1477 latency_timer.observe_duration();
1478 }
1479 Err(e) => {
1480 tracing::error!(
1481 target: "auto_schema_change",
1482 error = %e.as_report(),
1483 table_id = %table.id,
1484 cdc_table_id = table.cdc_table_id,
1485 upstraem_ddl = table_change.upstream_ddl,
1486 "failed to replace the table",
1487 );
1488 let fail_info =
1489 format!("failed to replace the table: {}", e.as_report());
1490 add_auto_schema_change_fail_event_log(
1491 &self.meta_metrics,
1492 table.id,
1493 table.name.clone(),
1494 table_change.cdc_table_id.clone(),
1495 table_change.upstream_ddl.clone(),
1496 &self.env.event_log_manager_ref(),
1497 fail_info,
1498 );
1499 }
1500 };
1501 }
1502 }
1503 Err(e) => {
1504 tracing::error!(
1505 target: "auto_schema_change",
1506 error = %e.as_report(),
1507 table_id = %table.id,
1508 cdc_table_id = table.cdc_table_id,
1509 "failed to get replace table plan",
1510 );
1511 let fail_info =
1512 format!("failed to get replace table plan: {}", e.as_report());
1513 add_auto_schema_change_fail_event_log(
1514 &self.meta_metrics,
1515 table.id,
1516 table.name.clone(),
1517 table_change.cdc_table_id.clone(),
1518 table_change.upstream_ddl.clone(),
1519 &self.env.event_log_manager_ref(),
1520 fail_info,
1521 );
1522 }
1523 };
1524 }
1525 }
1526
1527 Ok(Response::new(AutoSchemaChangeResponse {}))
1528 }
1529
1530 async fn alter_swap_rename(
1531 &self,
1532 request: Request<AlterSwapRenameRequest>,
1533 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1534 let req = request.into_inner();
1535
1536 let version = self
1537 .ddl_controller
1538 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1539 .await?;
1540
1541 Ok(Response::new(AlterSwapRenameResponse {
1542 status: None,
1543 version,
1544 }))
1545 }
1546
1547 async fn alter_resource_group(
1548 &self,
1549 request: Request<AlterResourceGroupRequest>,
1550 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1551 let req = request.into_inner();
1552
1553 let job_id = req.get_job_id();
1554 let deferred = req.get_deferred();
1555 let resource_group = req.resource_group;
1556
1557 self.ddl_controller
1558 .reschedule_streaming_job(
1559 job_id,
1560 ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1561 deferred,
1562 )
1563 .await?;
1564
1565 Ok(Response::new(AlterResourceGroupResponse {}))
1566 }
1567
1568 async fn alter_database_resource_group(
1569 &self,
1570 request: Request<AlterDatabaseResourceGroupRequest>,
1571 ) -> Result<Response<AlterDatabaseResourceGroupResponse>, Status> {
1572 let req = request.into_inner();
1573
1574 let version = self
1575 .ddl_controller
1576 .run_command(DdlCommand::AlterDatabaseResourceGroup(
1577 req.database_id,
1578 req.resource_group,
1579 req.deferred,
1580 ))
1581 .await?;
1582
1583 Ok(Response::new(AlterDatabaseResourceGroupResponse {
1584 status: None,
1585 version,
1586 }))
1587 }
1588
1589 async fn alter_database_param(
1590 &self,
1591 request: Request<AlterDatabaseParamRequest>,
1592 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1593 let req = request.into_inner();
1594 let database_id = req.database_id;
1595
1596 let param = match req.param.unwrap() {
1597 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1598 AlterDatabaseParam::BarrierIntervalMs(value.value)
1599 }
1600 alter_database_param_request::Param::CheckpointFrequency(value) => {
1601 AlterDatabaseParam::CheckpointFrequency(value.value)
1602 }
1603 };
1604 let version = self
1605 .ddl_controller
1606 .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1607 .await?;
1608
1609 return Ok(Response::new(AlterDatabaseParamResponse {
1610 status: None,
1611 version,
1612 }));
1613 }
1614
1615 async fn compact_iceberg_table(
1616 &self,
1617 request: Request<CompactIcebergTableRequest>,
1618 ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1619 let req = request.into_inner();
1620 let sink_id = req.sink_id;
1621
1622 let task_id = self
1624 .iceberg_compaction_manager
1625 .trigger_manual_compaction(sink_id)
1626 .await
1627 .map_err(|e| {
1628 Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1629 })?;
1630
1631 Ok(Response::new(CompactIcebergTableResponse {
1632 status: None,
1633 task_id,
1634 }))
1635 }
1636
1637 async fn expire_iceberg_table_snapshots(
1638 &self,
1639 request: Request<ExpireIcebergTableSnapshotsRequest>,
1640 ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1641 let req = request.into_inner();
1642 let sink_id = req.sink_id;
1643
1644 self.iceberg_compaction_manager
1646 .check_and_expire_snapshots(sink_id)
1647 .await
1648 .map_err(|e| {
1649 Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1650 })?;
1651
1652 Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1653 status: None,
1654 }))
1655 }
1656
1657 async fn create_iceberg_table(
1658 &self,
1659 request: Request<CreateIcebergTableRequest>,
1660 ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1661 let req = request.into_inner();
1662 let CreateIcebergTableRequest {
1663 table_info,
1664 sink_info,
1665 iceberg_source,
1666 if_not_exists,
1667 } = req;
1668
1669 let PbTableJobInfo {
1671 source,
1672 table,
1673 fragment_graph,
1674 job_type,
1675 } = table_info.unwrap();
1676 let mut table = table.unwrap();
1677 let mut fragment_graph = fragment_graph.unwrap();
1678 let database_id = table.get_database_id();
1679 let schema_id = table.get_schema_id();
1680 let table_name = table.get_name().to_owned();
1681
1682 table.create_type = PbCreateType::Background as _;
1684
1685 let source_rate_limit = if let Some(source) = &source {
1687 for fragment in fragment_graph.fragments.values_mut() {
1688 stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1689 if let NodeBody::Source(source_node) = node
1690 && let Some(inner) = &mut source_node.source_inner
1691 {
1692 inner.rate_limit = Some(0);
1693 }
1694 });
1695 }
1696 Some(source.rate_limit)
1697 } else {
1698 None
1699 };
1700
1701 let stream_job =
1702 StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1703 let _ = self
1704 .ddl_controller
1705 .run_command(DdlCommand::CreateStreamingJob {
1706 stream_job,
1707 fragment_graph,
1708 dependencies: HashSet::new(),
1709 resource_type: Self::default_streaming_job_resource_type(),
1710 if_not_exists,
1711 refresh_interval_sec: None,
1712 })
1713 .await?;
1714
1715 let table_catalog = self
1716 .metadata_manager
1717 .catalog_controller
1718 .get_table_catalog_by_name(database_id, schema_id, &table_name)
1719 .await?
1720 .ok_or(Status::not_found("Internal error: table not found"))?;
1721
1722 let PbSinkJobInfo {
1724 sink,
1725 fragment_graph,
1726 } = sink_info.unwrap();
1727 let mut sink = sink.unwrap();
1728
1729 sink.create_type = PbCreateType::Background as _;
1731
1732 let enable_pk_index = sink
1733 .properties
1734 .get(ENABLE_PK_INDEX)
1735 .is_some_and(|v| v.eq_ignore_ascii_case("true"));
1736 sink.auto_refresh_schema_from_table = if enable_pk_index {
1739 None
1740 } else {
1741 Some(table_catalog.id)
1742 };
1743
1744 let mut fragment_graph = fragment_graph.unwrap();
1745
1746 assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1747 assert!(
1748 risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1749 .is_placeholder()
1750 );
1751 fragment_graph.dependent_table_ids[0] = table_catalog.id;
1752 for fragment in fragment_graph.fragments.values_mut() {
1753 stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1754 NodeBody::StreamScan(scan) => {
1755 scan.table_id = table_catalog.id;
1756 if let Some(table_desc) = &mut scan.table_desc {
1757 assert!(
1758 risingwave_common::catalog::TableId::from(table_desc.table_id)
1759 .is_placeholder()
1760 );
1761 table_desc.table_id = table_catalog.id;
1762 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1763 }
1764 if let Some(table) = &mut scan.arrangement_table {
1765 assert!(
1766 risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1767 );
1768 *table = table_catalog.clone();
1769 }
1770 }
1771 NodeBody::BatchPlan(plan) => {
1772 if let Some(table_desc) = &mut plan.table_desc {
1773 assert!(
1774 risingwave_common::catalog::TableId::from(table_desc.table_id)
1775 .is_placeholder()
1776 );
1777 table_desc.table_id = table_catalog.id;
1778 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1779 }
1780 }
1781 _ => {}
1782 });
1783 }
1784
1785 let table_id = table_catalog.id;
1786 let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1787 let stream_job = StreamingJob::Sink(sink);
1788 let res = self
1789 .ddl_controller
1790 .run_command(DdlCommand::CreateStreamingJob {
1791 stream_job,
1792 fragment_graph,
1793 dependencies,
1794 resource_type: Self::default_streaming_job_resource_type(),
1795 if_not_exists,
1796 refresh_interval_sec: None,
1797 })
1798 .await;
1799
1800 if res.is_err() {
1801 let _ = self
1802 .ddl_controller
1803 .run_command(DdlCommand::DropStreamingJob {
1804 job_id: StreamingJobId::Table(None, table_id),
1805 drop_mode: DropMode::Cascade,
1806 })
1807 .await
1808 .inspect_err(|err| {
1809 tracing::error!(error = %err.as_report(),
1810 "Failed to clean up table after iceberg sink creation failure",
1811 );
1812 });
1813 res?;
1814 }
1815
1816 if let Some(source_rate_limit) = source_rate_limit
1818 && source_rate_limit != Some(0)
1819 {
1820 let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1821 table_catalog.optional_associated_source_id.unwrap();
1822 let (jobs, fragments) = self
1823 .metadata_manager
1824 .update_source_rate_limit_by_source_id(source_id, source_rate_limit)
1825 .await?;
1826 let throttle_config = ThrottleConfig {
1827 throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1828 rate_limit: source_rate_limit,
1829 };
1830 let _ = self
1831 .barrier_scheduler
1832 .run_command(
1833 database_id,
1834 Command::Throttle {
1835 jobs,
1836 config: fragments
1837 .into_iter()
1838 .map(|fragment_id| (fragment_id, throttle_config))
1839 .collect(),
1840 },
1841 )
1842 .await?;
1843 }
1844
1845 let iceberg_source = iceberg_source.unwrap();
1847 let res = self
1848 .ddl_controller
1849 .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1850 .await;
1851 if res.is_err() {
1852 let _ = self
1853 .ddl_controller
1854 .run_command(DdlCommand::DropStreamingJob {
1855 job_id: StreamingJobId::Table(None, table_id),
1856 drop_mode: DropMode::Cascade,
1857 })
1858 .await
1859 .inspect_err(|err| {
1860 tracing::error!(
1861 error = %err.as_report(),
1862 "Failed to clean up table after iceberg source creation failure",
1863 );
1864 });
1865 }
1866
1867 Ok(Response::new(CreateIcebergTableResponse {
1868 status: None,
1869 version: res?,
1870 }))
1871 }
1872}
1873
1874fn add_auto_schema_change_fail_event_log(
1875 meta_metrics: &Arc<MetaMetrics>,
1876 table_id: TableId,
1877 table_name: String,
1878 cdc_table_id: String,
1879 upstream_ddl: String,
1880 event_log_manager: &EventLogManagerRef,
1881 fail_info: String,
1882) {
1883 meta_metrics
1884 .auto_schema_change_failure_cnt
1885 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1886 .inc();
1887 let event = event_log::EventAutoSchemaChangeFail {
1888 table_id,
1889 table_name,
1890 cdc_table_id,
1891 upstream_ddl,
1892 fail_info,
1893 };
1894 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1895}