1use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::{ObjectId, TableId};
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command, ResumeBackfillTarget};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
34use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
35use risingwave_meta_model::StreamingParallelism;
36use risingwave_pb::catalog::connection::Info as ConnectionInfo;
37use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
38use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::common::worker_node::State;
41use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
42use risingwave_pb::ddl_service::ddl_service_server::DdlService;
43use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
44use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
45use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
46use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
47use risingwave_pb::meta::event_log;
48use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
49use risingwave_pb::stream_plan::stream_node::NodeBody;
50use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
51use thiserror_ext::AsReport;
52use tokio::sync::oneshot::Sender;
53use tokio::task::JoinHandle;
54use tonic::{Request, Response, Status};
55
56use crate::MetaError;
57use crate::barrier::BarrierManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{MetaSrvEnv, StreamingJob};
60use crate::rpc::ddl_controller::{
61 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
62};
63use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
64
65#[derive(Clone)]
66pub struct DdlServiceImpl {
67 env: MetaSrvEnv,
68
69 metadata_manager: MetadataManager,
70 sink_manager: SinkCoordinatorManager,
71 ddl_controller: DdlController,
72 meta_metrics: Arc<MetaMetrics>,
73 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
74 barrier_scheduler: BarrierScheduler,
75}
76
77impl DdlServiceImpl {
78 #[allow(clippy::too_many_arguments)]
79 pub async fn new(
80 env: MetaSrvEnv,
81 metadata_manager: MetadataManager,
82 stream_manager: GlobalStreamManagerRef,
83 source_manager: SourceManagerRef,
84 barrier_manager: BarrierManagerRef,
85 sink_manager: SinkCoordinatorManager,
86 meta_metrics: Arc<MetaMetrics>,
87 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
88 barrier_scheduler: BarrierScheduler,
89 ) -> Self {
90 let ddl_controller = DdlController::new(
91 env.clone(),
92 metadata_manager.clone(),
93 stream_manager,
94 source_manager,
95 barrier_manager,
96 sink_manager.clone(),
97 iceberg_compaction_manager.clone(),
98 )
99 .await;
100 Self {
101 env,
102 metadata_manager,
103 sink_manager,
104 ddl_controller,
105 meta_metrics,
106 iceberg_compaction_manager,
107 barrier_scheduler,
108 }
109 }
110
111 fn extract_replace_table_info(
112 ReplaceJobPlan {
113 fragment_graph,
114 replace_job,
115 }: ReplaceJobPlan,
116 ) -> ReplaceStreamJobInfo {
117 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
118 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
119 table,
120 source,
121 job_type,
122 }) => StreamingJob::Table(
123 source,
124 table.unwrap(),
125 TableJobType::try_from(job_type).unwrap(),
126 ),
127 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
128 StreamingJob::Source(source.unwrap())
129 }
130 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
131 table,
132 }) => StreamingJob::MaterializedView(table.unwrap()),
133 };
134
135 ReplaceStreamJobInfo {
136 streaming_job: replace_streaming_job,
137 fragment_graph: fragment_graph.unwrap(),
138 }
139 }
140
141 fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
142 streaming_job_resource_type::ResourceType::Regular(true)
143 }
144
145 pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
146 tracing::info!("start migrate legacy table fragments task");
147 let env = self.env.clone();
148 let metadata_manager = self.metadata_manager.clone();
149 let ddl_controller = self.ddl_controller.clone();
150
151 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
152 let join_handle = tokio::spawn(async move {
153 async fn migrate_inner(
154 env: &MetaSrvEnv,
155 metadata_manager: &MetadataManager,
156 ddl_controller: &DdlController,
157 ) -> MetaResult<()> {
158 let tables = metadata_manager
159 .catalog_controller
160 .list_unmigrated_tables()
161 .await?;
162
163 if tables.is_empty() {
164 tracing::info!("no legacy table fragments need migration");
165 return Ok(());
166 }
167
168 let client = {
169 let workers = metadata_manager
170 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
171 .await?;
172 if workers.is_empty() {
173 return Err(anyhow::anyhow!("no active frontend nodes found").into());
174 }
175 let worker = workers.choose(&mut thread_rng()).unwrap();
176 env.frontend_client_pool().get(worker).await?
177 };
178
179 for table in tables {
180 let start = tokio::time::Instant::now();
181 let req = GetTableReplacePlanRequest {
182 database_id: table.database_id,
183 table_id: table.id,
184 cdc_table_change: None,
185 };
186 let resp = client
187 .get_table_replace_plan(req)
188 .await
189 .context("failed to get table replace plan from frontend")?;
190
191 let plan = resp.into_inner().replace_plan.unwrap();
192 let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
193 ddl_controller
194 .run_command(DdlCommand::ReplaceStreamJob(replace_info))
195 .await?;
196 tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
197 }
198 tracing::info!("successfully migrated all legacy table fragments");
199
200 Ok(())
201 }
202
203 let migrate_future = async move {
204 let mut attempt = 0;
205 loop {
206 match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
207 Ok(_) => break,
208 Err(e) => {
209 attempt += 1;
210 tracing::error!(
211 "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
212 e.as_report(),
213 attempt
214 );
215 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
216 }
217 }
218 }
219 };
220
221 select(pin!(migrate_future), shutdown_rx).await;
222 });
223
224 (join_handle, shutdown_tx)
225 }
226}
227
228#[async_trait::async_trait]
229impl DdlService for DdlServiceImpl {
230 async fn create_database(
231 &self,
232 request: Request<CreateDatabaseRequest>,
233 ) -> Result<Response<CreateDatabaseResponse>, Status> {
234 let req = request.into_inner();
235 let database = req.get_db()?.clone();
236 let version = self
237 .ddl_controller
238 .run_command(DdlCommand::CreateDatabase(database))
239 .await?;
240
241 Ok(Response::new(CreateDatabaseResponse {
242 status: None,
243 version,
244 }))
245 }
246
247 async fn drop_database(
248 &self,
249 request: Request<DropDatabaseRequest>,
250 ) -> Result<Response<DropDatabaseResponse>, Status> {
251 let req = request.into_inner();
252 let database_id = req.get_database_id();
253
254 let version = self
255 .ddl_controller
256 .run_command(DdlCommand::DropDatabase(database_id))
257 .await?;
258
259 Ok(Response::new(DropDatabaseResponse {
260 status: None,
261 version,
262 }))
263 }
264
265 async fn create_secret(
266 &self,
267 request: Request<CreateSecretRequest>,
268 ) -> Result<Response<CreateSecretResponse>, Status> {
269 let req = request.into_inner();
270 let pb_secret = Secret {
271 id: 0.into(),
272 name: req.get_name().clone(),
273 database_id: req.get_database_id(),
274 value: req.get_value().clone(),
275 owner: req.get_owner_id(),
276 schema_id: req.get_schema_id(),
277 };
278 let version = self
279 .ddl_controller
280 .run_command(DdlCommand::CreateSecret(pb_secret))
281 .await?;
282
283 Ok(Response::new(CreateSecretResponse { version }))
284 }
285
286 async fn drop_secret(
287 &self,
288 request: Request<DropSecretRequest>,
289 ) -> Result<Response<DropSecretResponse>, Status> {
290 let req = request.into_inner();
291 let secret_id = req.get_secret_id();
292 let drop_mode = DropMode::from_request_setting(req.cascade);
293 let version = self
294 .ddl_controller
295 .run_command(DdlCommand::DropSecret(secret_id, drop_mode))
296 .await?;
297
298 Ok(Response::new(DropSecretResponse {
299 status: None,
300 version,
301 }))
302 }
303
304 async fn alter_secret(
305 &self,
306 request: Request<AlterSecretRequest>,
307 ) -> Result<Response<AlterSecretResponse>, Status> {
308 let req = request.into_inner();
309 let pb_secret = Secret {
310 id: req.get_secret_id(),
311 name: req.get_name().clone(),
312 database_id: req.get_database_id(),
313 value: req.get_value().clone(),
314 owner: req.get_owner_id(),
315 schema_id: req.get_schema_id(),
316 };
317 let version = self
318 .ddl_controller
319 .run_command(DdlCommand::AlterSecret(pb_secret))
320 .await?;
321
322 Ok(Response::new(AlterSecretResponse { version }))
323 }
324
325 async fn create_schema(
326 &self,
327 request: Request<CreateSchemaRequest>,
328 ) -> Result<Response<CreateSchemaResponse>, Status> {
329 let req = request.into_inner();
330 let schema = req.get_schema()?.clone();
331 let version = self
332 .ddl_controller
333 .run_command(DdlCommand::CreateSchema(schema))
334 .await?;
335
336 Ok(Response::new(CreateSchemaResponse {
337 status: None,
338 version,
339 }))
340 }
341
342 async fn drop_schema(
343 &self,
344 request: Request<DropSchemaRequest>,
345 ) -> Result<Response<DropSchemaResponse>, Status> {
346 let req = request.into_inner();
347 let schema_id = req.get_schema_id();
348 let drop_mode = DropMode::from_request_setting(req.cascade);
349 let version = self
350 .ddl_controller
351 .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
352 .await?;
353 Ok(Response::new(DropSchemaResponse {
354 status: None,
355 version,
356 }))
357 }
358
359 async fn create_source(
360 &self,
361 request: Request<CreateSourceRequest>,
362 ) -> Result<Response<CreateSourceResponse>, Status> {
363 let req = request.into_inner();
364 let source = req.get_source()?.clone();
365
366 match req.fragment_graph {
367 None => {
368 let version = self
369 .ddl_controller
370 .run_command(DdlCommand::CreateNonSharedSource(source))
371 .await?;
372 Ok(Response::new(CreateSourceResponse {
373 status: None,
374 version,
375 }))
376 }
377 Some(fragment_graph) => {
378 let stream_job = StreamingJob::Source(source);
380 let version = self
381 .ddl_controller
382 .run_command(DdlCommand::CreateStreamingJob {
383 stream_job,
384 fragment_graph,
385 dependencies: HashSet::new(),
386 resource_type: Self::default_streaming_job_resource_type(),
387 if_not_exists: req.if_not_exists,
388 })
389 .await?;
390 Ok(Response::new(CreateSourceResponse {
391 status: None,
392 version,
393 }))
394 }
395 }
396 }
397
398 async fn drop_source(
399 &self,
400 request: Request<DropSourceRequest>,
401 ) -> Result<Response<DropSourceResponse>, Status> {
402 let request = request.into_inner();
403 let source_id = request.source_id;
404 let drop_mode = DropMode::from_request_setting(request.cascade);
405 let version = self
406 .ddl_controller
407 .run_command(DdlCommand::DropSource(source_id, drop_mode))
408 .await?;
409
410 Ok(Response::new(DropSourceResponse {
411 status: None,
412 version,
413 }))
414 }
415
416 async fn reset_source(
417 &self,
418 request: Request<ResetSourceRequest>,
419 ) -> Result<Response<ResetSourceResponse>, Status> {
420 let request = request.into_inner();
421 let source_id = request.source_id;
422
423 tracing::info!(
424 source_id = %source_id,
425 "Received RESET SOURCE request, routing to DDL controller"
426 );
427
428 let version = self
430 .ddl_controller
431 .run_command(DdlCommand::ResetSource(source_id))
432 .await?;
433
434 Ok(Response::new(ResetSourceResponse {
435 status: None,
436 version,
437 }))
438 }
439
440 async fn create_sink(
441 &self,
442 request: Request<CreateSinkRequest>,
443 ) -> Result<Response<CreateSinkResponse>, Status> {
444 self.env.idle_manager().record_activity();
445
446 let req = request.into_inner();
447
448 let sink = req.get_sink()?.clone();
449 let fragment_graph = req.get_fragment_graph()?.clone();
450 let dependencies = req.get_dependencies().iter().copied().collect();
451
452 let stream_job = StreamingJob::Sink(sink);
453
454 let command = DdlCommand::CreateStreamingJob {
455 stream_job,
456 fragment_graph,
457 dependencies,
458 resource_type: Self::default_streaming_job_resource_type(),
459 if_not_exists: req.if_not_exists,
460 };
461
462 let version = self.ddl_controller.run_command(command).await?;
463
464 Ok(Response::new(CreateSinkResponse {
465 status: None,
466 version,
467 }))
468 }
469
470 async fn drop_sink(
471 &self,
472 request: Request<DropSinkRequest>,
473 ) -> Result<Response<DropSinkResponse>, Status> {
474 let request = request.into_inner();
475 let sink_id = request.sink_id;
476 let drop_mode = DropMode::from_request_setting(request.cascade);
477
478 let command = DdlCommand::DropStreamingJob {
479 job_id: StreamingJobId::Sink(sink_id),
480 drop_mode,
481 };
482
483 let version = self.ddl_controller.run_command(command).await?;
484
485 self.sink_manager
486 .stop_sink_coordinator(vec![SinkId::from(sink_id)])
487 .await;
488
489 Ok(Response::new(DropSinkResponse {
490 status: None,
491 version,
492 }))
493 }
494
495 async fn create_subscription(
496 &self,
497 request: Request<CreateSubscriptionRequest>,
498 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
499 self.env.idle_manager().record_activity();
500
501 let req = request.into_inner();
502
503 let subscription = req.get_subscription()?.clone();
504 let command = DdlCommand::CreateSubscription(subscription);
505
506 let version = self.ddl_controller.run_command(command).await?;
507
508 Ok(Response::new(CreateSubscriptionResponse {
509 status: None,
510 version,
511 }))
512 }
513
514 async fn drop_subscription(
515 &self,
516 request: Request<DropSubscriptionRequest>,
517 ) -> Result<Response<DropSubscriptionResponse>, Status> {
518 let request = request.into_inner();
519 let subscription_id = request.subscription_id;
520 let drop_mode = DropMode::from_request_setting(request.cascade);
521
522 let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
523
524 let version = self.ddl_controller.run_command(command).await?;
525
526 Ok(Response::new(DropSubscriptionResponse {
527 status: None,
528 version,
529 }))
530 }
531
532 async fn create_materialized_view(
533 &self,
534 request: Request<CreateMaterializedViewRequest>,
535 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
536 self.env.idle_manager().record_activity();
537
538 let req = request.into_inner();
539 let mview = req.get_materialized_view()?.clone();
540 let fragment_graph = req.get_fragment_graph()?.clone();
541 let dependencies = req.get_dependencies().iter().copied().collect();
542 let resource_type = req.resource_type.unwrap().resource_type.unwrap();
543
544 let stream_job = StreamingJob::MaterializedView(mview);
545 let version = self
546 .ddl_controller
547 .run_command(DdlCommand::CreateStreamingJob {
548 stream_job,
549 fragment_graph,
550 dependencies,
551 resource_type,
552 if_not_exists: req.if_not_exists,
553 })
554 .await?;
555
556 Ok(Response::new(CreateMaterializedViewResponse {
557 status: None,
558 version,
559 }))
560 }
561
562 async fn drop_materialized_view(
563 &self,
564 request: Request<DropMaterializedViewRequest>,
565 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
566 self.env.idle_manager().record_activity();
567
568 let request = request.into_inner();
569 let table_id = request.table_id;
570 let drop_mode = DropMode::from_request_setting(request.cascade);
571
572 let version = self
573 .ddl_controller
574 .run_command(DdlCommand::DropStreamingJob {
575 job_id: StreamingJobId::MaterializedView(table_id),
576 drop_mode,
577 })
578 .await?;
579
580 Ok(Response::new(DropMaterializedViewResponse {
581 status: None,
582 version,
583 }))
584 }
585
586 async fn create_index(
587 &self,
588 request: Request<CreateIndexRequest>,
589 ) -> Result<Response<CreateIndexResponse>, Status> {
590 self.env.idle_manager().record_activity();
591
592 let req = request.into_inner();
593 let index = req.get_index()?.clone();
594 let index_table = req.get_index_table()?.clone();
595 let fragment_graph = req.get_fragment_graph()?.clone();
596
597 let stream_job = StreamingJob::Index(index, index_table);
598 let version = self
599 .ddl_controller
600 .run_command(DdlCommand::CreateStreamingJob {
601 stream_job,
602 fragment_graph,
603 dependencies: HashSet::new(),
604 resource_type: Self::default_streaming_job_resource_type(),
605 if_not_exists: req.if_not_exists,
606 })
607 .await?;
608
609 Ok(Response::new(CreateIndexResponse {
610 status: None,
611 version,
612 }))
613 }
614
615 async fn drop_index(
616 &self,
617 request: Request<DropIndexRequest>,
618 ) -> Result<Response<DropIndexResponse>, Status> {
619 self.env.idle_manager().record_activity();
620
621 let request = request.into_inner();
622 let index_id = request.index_id;
623 let drop_mode = DropMode::from_request_setting(request.cascade);
624 let version = self
625 .ddl_controller
626 .run_command(DdlCommand::DropStreamingJob {
627 job_id: StreamingJobId::Index(index_id),
628 drop_mode,
629 })
630 .await?;
631
632 Ok(Response::new(DropIndexResponse {
633 status: None,
634 version,
635 }))
636 }
637
638 async fn create_function(
639 &self,
640 request: Request<CreateFunctionRequest>,
641 ) -> Result<Response<CreateFunctionResponse>, Status> {
642 let req = request.into_inner();
643 let function = req.get_function()?.clone();
644
645 let version = self
646 .ddl_controller
647 .run_command(DdlCommand::CreateFunction(function))
648 .await?;
649
650 Ok(Response::new(CreateFunctionResponse {
651 status: None,
652 version,
653 }))
654 }
655
656 async fn drop_function(
657 &self,
658 request: Request<DropFunctionRequest>,
659 ) -> Result<Response<DropFunctionResponse>, Status> {
660 let request = request.into_inner();
661
662 let version = self
663 .ddl_controller
664 .run_command(DdlCommand::DropFunction(
665 request.function_id,
666 DropMode::from_request_setting(request.cascade),
667 ))
668 .await?;
669
670 Ok(Response::new(DropFunctionResponse {
671 status: None,
672 version,
673 }))
674 }
675
676 async fn create_table(
677 &self,
678 request: Request<CreateTableRequest>,
679 ) -> Result<Response<CreateTableResponse>, Status> {
680 let request = request.into_inner();
681 let job_type = request.get_job_type().unwrap_or_default();
682 let dependencies = request.get_dependencies().iter().copied().collect();
683 let source = request.source;
684 let mview = request.materialized_view.unwrap();
685 let fragment_graph = request.fragment_graph.unwrap();
686
687 let stream_job = StreamingJob::Table(source, mview, job_type);
688 let version = self
689 .ddl_controller
690 .run_command(DdlCommand::CreateStreamingJob {
691 stream_job,
692 fragment_graph,
693 dependencies,
694 resource_type: Self::default_streaming_job_resource_type(),
695 if_not_exists: request.if_not_exists,
696 })
697 .await?;
698
699 Ok(Response::new(CreateTableResponse {
700 status: None,
701 version,
702 }))
703 }
704
705 async fn drop_table(
706 &self,
707 request: Request<DropTableRequest>,
708 ) -> Result<Response<DropTableResponse>, Status> {
709 let request = request.into_inner();
710 let source_id = request.source_id;
711 let table_id = request.table_id;
712
713 let drop_mode = DropMode::from_request_setting(request.cascade);
714 let version = self
715 .ddl_controller
716 .run_command(DdlCommand::DropStreamingJob {
717 job_id: StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id), table_id),
718 drop_mode,
719 })
720 .await?;
721
722 Ok(Response::new(DropTableResponse {
723 status: None,
724 version,
725 }))
726 }
727
728 async fn create_view(
729 &self,
730 request: Request<CreateViewRequest>,
731 ) -> Result<Response<CreateViewResponse>, Status> {
732 let req = request.into_inner();
733 let view = req.get_view()?.clone();
734 let dependencies = req
735 .get_dependencies()
736 .iter()
737 .copied()
738 .collect::<HashSet<_>>();
739
740 let version = self
741 .ddl_controller
742 .run_command(DdlCommand::CreateView(view, dependencies))
743 .await?;
744
745 Ok(Response::new(CreateViewResponse {
746 status: None,
747 version,
748 }))
749 }
750
751 async fn drop_view(
752 &self,
753 request: Request<DropViewRequest>,
754 ) -> Result<Response<DropViewResponse>, Status> {
755 let request = request.into_inner();
756 let view_id = request.get_view_id();
757 let drop_mode = DropMode::from_request_setting(request.cascade);
758 let version = self
759 .ddl_controller
760 .run_command(DdlCommand::DropView(view_id, drop_mode))
761 .await?;
762 Ok(Response::new(DropViewResponse {
763 status: None,
764 version,
765 }))
766 }
767
768 async fn risectl_list_state_tables(
769 &self,
770 _request: Request<RisectlListStateTablesRequest>,
771 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
772 let tables = self
773 .metadata_manager
774 .catalog_controller
775 .list_all_state_tables()
776 .await?;
777 Ok(Response::new(RisectlListStateTablesResponse { tables }))
778 }
779
780 async fn risectl_resume_backfill(
781 &self,
782 request: Request<RisectlResumeBackfillRequest>,
783 ) -> Result<Response<RisectlResumeBackfillResponse>, Status> {
784 let request = request.into_inner();
785 let target = request
786 .target
787 .ok_or_else(|| Status::invalid_argument("missing resume backfill target"))?;
788
789 match target {
790 risectl_resume_backfill_request::Target::JobId(job_id) => {
791 let database_id = self
792 .metadata_manager
793 .catalog_controller
794 .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
795 .await?;
796 self.barrier_scheduler
797 .run_command(
798 database_id,
799 Command::ResumeBackfill {
800 target: ResumeBackfillTarget::Job(job_id),
801 },
802 )
803 .await?;
804 }
805 risectl_resume_backfill_request::Target::FragmentId(fragment_id) => {
806 let mut job_ids = self
807 .metadata_manager
808 .catalog_controller
809 .get_fragment_job_id(vec![fragment_id])
810 .await?;
811 let job_id = job_ids
812 .pop()
813 .ok_or_else(|| Status::invalid_argument("fragment not found"))?;
814 let database_id = self
815 .metadata_manager
816 .catalog_controller
817 .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
818 .await?;
819 self.barrier_scheduler
820 .run_command(
821 database_id,
822 Command::ResumeBackfill {
823 target: ResumeBackfillTarget::Fragment(fragment_id),
824 },
825 )
826 .await?;
827 }
828 }
829
830 Ok(Response::new(RisectlResumeBackfillResponse {}))
831 }
832
833 async fn replace_job_plan(
834 &self,
835 request: Request<ReplaceJobPlanRequest>,
836 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
837 let req = request.into_inner().get_plan().cloned()?;
838
839 let version = self
840 .ddl_controller
841 .run_command(DdlCommand::ReplaceStreamJob(
842 Self::extract_replace_table_info(req),
843 ))
844 .await?;
845
846 Ok(Response::new(ReplaceJobPlanResponse {
847 status: None,
848 version,
849 }))
850 }
851
852 async fn get_table(
853 &self,
854 request: Request<GetTableRequest>,
855 ) -> Result<Response<GetTableResponse>, Status> {
856 let req = request.into_inner();
857 let table = self
858 .metadata_manager
859 .catalog_controller
860 .get_table_by_name(&req.database_name, &req.table_name)
861 .await?;
862
863 Ok(Response::new(GetTableResponse { table }))
864 }
865
866 async fn alter_name(
867 &self,
868 request: Request<AlterNameRequest>,
869 ) -> Result<Response<AlterNameResponse>, Status> {
870 let AlterNameRequest { object, new_name } = request.into_inner();
871 let version = self
872 .ddl_controller
873 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
874 .await?;
875 Ok(Response::new(AlterNameResponse {
876 status: None,
877 version,
878 }))
879 }
880
881 async fn alter_source(
883 &self,
884 request: Request<AlterSourceRequest>,
885 ) -> Result<Response<AlterSourceResponse>, Status> {
886 let AlterSourceRequest { source } = request.into_inner();
887 let version = self
888 .ddl_controller
889 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
890 .await?;
891 Ok(Response::new(AlterSourceResponse {
892 status: None,
893 version,
894 }))
895 }
896
897 async fn alter_owner(
898 &self,
899 request: Request<AlterOwnerRequest>,
900 ) -> Result<Response<AlterOwnerResponse>, Status> {
901 let AlterOwnerRequest { object, owner_id } = request.into_inner();
902 let version = self
903 .ddl_controller
904 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
905 .await?;
906 Ok(Response::new(AlterOwnerResponse {
907 status: None,
908 version,
909 }))
910 }
911
912 async fn alter_subscription_retention(
913 &self,
914 request: Request<AlterSubscriptionRetentionRequest>,
915 ) -> Result<Response<AlterSubscriptionRetentionResponse>, Status> {
916 let AlterSubscriptionRetentionRequest {
917 subscription_id,
918 retention_seconds,
919 definition,
920 } = request.into_inner();
921 let version = self
922 .ddl_controller
923 .run_command(DdlCommand::AlterSubscriptionRetention {
924 subscription_id,
925 retention_seconds,
926 definition,
927 })
928 .await?;
929 Ok(Response::new(AlterSubscriptionRetentionResponse {
930 status: None,
931 version,
932 }))
933 }
934
935 async fn alter_set_schema(
936 &self,
937 request: Request<AlterSetSchemaRequest>,
938 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
939 let AlterSetSchemaRequest {
940 object,
941 new_schema_id,
942 } = request.into_inner();
943 let version = self
944 .ddl_controller
945 .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
946 .await?;
947 Ok(Response::new(AlterSetSchemaResponse {
948 status: None,
949 version,
950 }))
951 }
952
953 async fn get_ddl_progress(
954 &self,
955 _request: Request<GetDdlProgressRequest>,
956 ) -> Result<Response<GetDdlProgressResponse>, Status> {
957 Ok(Response::new(GetDdlProgressResponse {
958 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
959 }))
960 }
961
962 async fn create_connection(
963 &self,
964 request: Request<CreateConnectionRequest>,
965 ) -> Result<Response<CreateConnectionResponse>, Status> {
966 let req = request.into_inner();
967 if req.payload.is_none() {
968 return Err(Status::invalid_argument("request is empty"));
969 }
970
971 match req.payload.unwrap() {
972 #[expect(deprecated)]
973 create_connection_request::Payload::PrivateLink(_) => {
974 panic!("Private Link Connection has been deprecated")
975 }
976 create_connection_request::Payload::ConnectionParams(params) => {
977 let pb_connection = Connection {
978 id: 0.into(),
979 schema_id: req.schema_id,
980 database_id: req.database_id,
981 name: req.name,
982 info: Some(ConnectionInfo::ConnectionParams(params)),
983 owner: req.owner_id,
984 };
985 let version = self
986 .ddl_controller
987 .run_command(DdlCommand::CreateConnection(pb_connection))
988 .await?;
989 Ok(Response::new(CreateConnectionResponse { version }))
990 }
991 }
992 }
993
994 async fn list_connections(
995 &self,
996 _request: Request<ListConnectionsRequest>,
997 ) -> Result<Response<ListConnectionsResponse>, Status> {
998 let conns = self
999 .metadata_manager
1000 .catalog_controller
1001 .list_connections()
1002 .await?;
1003
1004 Ok(Response::new(ListConnectionsResponse {
1005 connections: conns,
1006 }))
1007 }
1008
1009 async fn drop_connection(
1010 &self,
1011 request: Request<DropConnectionRequest>,
1012 ) -> Result<Response<DropConnectionResponse>, Status> {
1013 let req = request.into_inner();
1014 let drop_mode = DropMode::from_request_setting(req.cascade);
1015
1016 let version = self
1017 .ddl_controller
1018 .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
1019 .await?;
1020
1021 Ok(Response::new(DropConnectionResponse {
1022 status: None,
1023 version,
1024 }))
1025 }
1026
1027 async fn comment_on(
1028 &self,
1029 request: Request<CommentOnRequest>,
1030 ) -> Result<Response<CommentOnResponse>, Status> {
1031 let req = request.into_inner();
1032 let comment = req.get_comment()?.clone();
1033
1034 let version = self
1035 .ddl_controller
1036 .run_command(DdlCommand::CommentOn(Comment {
1037 table_id: comment.table_id,
1038 schema_id: comment.schema_id,
1039 database_id: comment.database_id,
1040 column_index: comment.column_index,
1041 description: comment.description,
1042 }))
1043 .await?;
1044
1045 Ok(Response::new(CommentOnResponse {
1046 status: None,
1047 version,
1048 }))
1049 }
1050
1051 async fn get_tables(
1052 &self,
1053 request: Request<GetTablesRequest>,
1054 ) -> Result<Response<GetTablesResponse>, Status> {
1055 let GetTablesRequest {
1056 table_ids,
1057 include_dropped_tables,
1058 } = request.into_inner();
1059 let ret = self
1060 .metadata_manager
1061 .catalog_controller
1062 .get_table_by_ids(table_ids, include_dropped_tables)
1063 .await?;
1064
1065 let mut tables = HashMap::default();
1066 for table in ret {
1067 tables.insert(table.id, table);
1068 }
1069 Ok(Response::new(GetTablesResponse { tables }))
1070 }
1071
1072 async fn wait(&self, request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
1073 let req = request.into_inner();
1074 let version = self.ddl_controller.wait(req.job_id).await?;
1075 Ok(Response::new(WaitResponse {
1076 version: Some(version),
1077 }))
1078 }
1079
1080 async fn alter_cdc_table_backfill_parallelism(
1081 &self,
1082 request: Request<AlterCdcTableBackfillParallelismRequest>,
1083 ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1084 let req = request.into_inner();
1085 let job_id = req.get_table_id();
1086 let parallelism = *req.get_parallelism()?;
1087
1088 let table_parallelism = ModelTableParallelism::from(parallelism);
1089 let streaming_parallelism = match table_parallelism {
1090 ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1091 _ => bail_invalid_parameter!(
1092 "CDC table backfill parallelism must be set to a fixed value"
1093 ),
1094 };
1095
1096 self.ddl_controller
1097 .reschedule_cdc_table_backfill(
1098 job_id,
1099 ReschedulePolicy::Parallelism(ParallelismPolicy {
1100 parallelism: streaming_parallelism,
1101 }),
1102 )
1103 .await?;
1104 Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1105 }
1106
1107 async fn alter_parallelism(
1108 &self,
1109 request: Request<AlterParallelismRequest>,
1110 ) -> Result<Response<AlterParallelismResponse>, Status> {
1111 let req = request.into_inner();
1112
1113 let job_id = req.get_table_id();
1114 let parallelism = *req.get_parallelism()?;
1115 let deferred = req.get_deferred();
1116
1117 let parallelism = match parallelism.get_parallelism()? {
1118 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1119 StreamingParallelism::Fixed(*parallelism as _)
1120 }
1121 Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1122 _ => bail_unavailable!(),
1123 };
1124
1125 self.ddl_controller
1126 .reschedule_streaming_job(
1127 job_id,
1128 ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1129 deferred,
1130 )
1131 .await?;
1132
1133 Ok(Response::new(AlterParallelismResponse {}))
1134 }
1135
1136 async fn alter_backfill_parallelism(
1137 &self,
1138 request: Request<AlterBackfillParallelismRequest>,
1139 ) -> Result<Response<AlterBackfillParallelismResponse>, Status> {
1140 let req = request.into_inner();
1141
1142 let job_id = req.get_table_id();
1143 let deferred = req.get_deferred();
1144
1145 let parallelism = match req.parallelism {
1146 None => None,
1147 Some(parallelism) => {
1148 let parallelism = match parallelism.get_parallelism()? {
1149 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1150 StreamingParallelism::Fixed(*parallelism as _)
1151 }
1152 Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1153 StreamingParallelism::Adaptive
1154 }
1155 _ => bail_unavailable!(),
1156 };
1157 Some(parallelism)
1158 }
1159 };
1160
1161 self.ddl_controller
1162 .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
1163 .await?;
1164
1165 Ok(Response::new(AlterBackfillParallelismResponse {}))
1166 }
1167
1168 async fn alter_fragment_parallelism(
1169 &self,
1170 request: Request<AlterFragmentParallelismRequest>,
1171 ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1172 let req = request.into_inner();
1173
1174 let fragment_ids = req.fragment_ids;
1175 if fragment_ids.is_empty() {
1176 return Err(Status::invalid_argument(
1177 "at least one fragment id must be provided",
1178 ));
1179 }
1180
1181 let parallelism = match req.parallelism {
1182 Some(parallelism) => {
1183 let streaming_parallelism = match parallelism.get_parallelism()? {
1184 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1185 StreamingParallelism::Fixed(*parallelism as _)
1186 }
1187 Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1188 StreamingParallelism::Adaptive
1189 }
1190 _ => bail_unavailable!(),
1191 };
1192 Some(streaming_parallelism)
1193 }
1194 None => None,
1195 };
1196
1197 let fragment_targets = fragment_ids
1198 .into_iter()
1199 .map(|fragment_id| (fragment_id, parallelism.clone()))
1200 .collect();
1201
1202 self.ddl_controller
1203 .reschedule_fragments(fragment_targets)
1204 .await?;
1205
1206 Ok(Response::new(AlterFragmentParallelismResponse {}))
1207 }
1208
1209 async fn alter_streaming_job_config(
1210 &self,
1211 request: Request<AlterStreamingJobConfigRequest>,
1212 ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1213 let AlterStreamingJobConfigRequest {
1214 job_id,
1215 entries_to_add,
1216 keys_to_remove,
1217 } = request.into_inner();
1218
1219 self.ddl_controller
1220 .run_command(DdlCommand::AlterStreamingJobConfig(
1221 job_id,
1222 entries_to_add,
1223 keys_to_remove,
1224 ))
1225 .await?;
1226
1227 Ok(Response::new(AlterStreamingJobConfigResponse {}))
1228 }
1229
1230 async fn auto_schema_change(
1233 &self,
1234 request: Request<AutoSchemaChangeRequest>,
1235 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1236 let req = request.into_inner();
1237
1238 let workers = self
1240 .metadata_manager
1241 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1242 .await?;
1243 let worker = workers
1244 .choose(&mut thread_rng())
1245 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1246
1247 let client = self
1248 .env
1249 .frontend_client_pool()
1250 .get(worker)
1251 .await
1252 .map_err(MetaError::from)?;
1253
1254 let Some(schema_change) = req.schema_change else {
1255 return Err(Status::invalid_argument(
1256 "schema change message is required",
1257 ));
1258 };
1259
1260 for table_change in schema_change.table_changes {
1261 for c in &table_change.columns {
1262 let c = ColumnCatalog::from(c.clone());
1263
1264 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1265 tracing::warn!(target: "auto_schema_change",
1266 cdc_table_id = table_change.cdc_table_id,
1267 upstraem_ddl = table_change.upstream_ddl,
1268 "invalid column type from cdc table change");
1269 Err(Status::invalid_argument(format!(
1270 "invalid column type: {} from cdc table change, column: {:?}",
1271 column_type, c
1272 )))
1273 };
1274 if c.is_generated() {
1275 return invalid_col_type("generated column", &c);
1276 }
1277 if c.is_rw_sys_column() {
1278 return invalid_col_type("rw system column", &c);
1279 }
1280 if c.is_hidden {
1281 return invalid_col_type("hidden column", &c);
1282 }
1283 }
1284
1285 let tables: Vec<Table> = self
1287 .metadata_manager
1288 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1289 .await?;
1290
1291 for table in tables {
1292 let original_columns: HashSet<(String, DataType)> =
1295 HashSet::from_iter(table.columns.iter().filter_map(|col| {
1296 let col = ColumnCatalog::from(col.clone());
1297 if col.is_generated() || col.is_hidden() {
1298 None
1299 } else {
1300 Some((col.column_desc.name.clone(), col.data_type().clone()))
1301 }
1302 }));
1303
1304 let mut new_columns: HashSet<(String, DataType)> =
1305 HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1306 let col = ColumnCatalog::from(col.clone());
1307 if col.is_generated() || col.is_hidden() {
1308 None
1309 } else {
1310 Some((col.column_desc.name.clone(), col.data_type().clone()))
1311 }
1312 }));
1313
1314 for col in &table.columns {
1317 let col = ColumnCatalog::from(col.clone());
1318 if col.is_connector_additional_column()
1319 && !col.is_hidden()
1320 && !col.is_generated()
1321 {
1322 new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1323 }
1324 }
1325
1326 if !(original_columns.is_subset(&new_columns)
1327 || original_columns.is_superset(&new_columns))
1328 {
1329 tracing::warn!(target: "auto_schema_change",
1330 table_id = %table.id,
1331 cdc_table_id = table.cdc_table_id,
1332 upstraem_ddl = table_change.upstream_ddl,
1333 original_columns = ?original_columns,
1334 new_columns = ?new_columns,
1335 "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1336
1337 let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1338 add_auto_schema_change_fail_event_log(
1339 &self.meta_metrics,
1340 table.id,
1341 table.name.clone(),
1342 table_change.cdc_table_id.clone(),
1343 table_change.upstream_ddl.clone(),
1344 &self.env.event_log_manager_ref(),
1345 fail_info,
1346 );
1347
1348 return Err(Status::invalid_argument(
1349 "New columns should be a subset or superset of the original columns (including hidden columns)",
1350 ));
1351 }
1352 if original_columns == new_columns {
1354 tracing::warn!(target: "auto_schema_change",
1355 table_id = %table.id,
1356 cdc_table_id = table.cdc_table_id,
1357 upstraem_ddl = table_change.upstream_ddl,
1358 original_columns = ?original_columns,
1359 new_columns = ?new_columns,
1360 "No change to columns, skipping the schema change");
1361 continue;
1362 }
1363
1364 let latency_timer = self
1365 .meta_metrics
1366 .auto_schema_change_latency
1367 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1368 .start_timer();
1369 let resp = client
1372 .get_table_replace_plan(GetTableReplacePlanRequest {
1373 database_id: table.database_id,
1374 table_id: table.id,
1375 cdc_table_change: Some(table_change.clone()),
1376 })
1377 .await;
1378
1379 match resp {
1380 Ok(resp) => {
1381 let resp = resp.into_inner();
1382 if let Some(plan) = resp.replace_plan {
1383 let plan = Self::extract_replace_table_info(plan);
1384 plan.streaming_job.table().inspect(|t| {
1385 tracing::info!(
1386 target: "auto_schema_change",
1387 table_id = %t.id,
1388 cdc_table_id = t.cdc_table_id,
1389 upstraem_ddl = table_change.upstream_ddl,
1390 "Start the replace config change")
1391 });
1392 let replace_res = self
1394 .ddl_controller
1395 .run_command(DdlCommand::ReplaceStreamJob(plan))
1396 .await;
1397
1398 match replace_res {
1399 Ok(_) => {
1400 tracing::info!(
1401 target: "auto_schema_change",
1402 table_id = %table.id,
1403 cdc_table_id = table.cdc_table_id,
1404 "Table replaced success");
1405
1406 self.meta_metrics
1407 .auto_schema_change_success_cnt
1408 .with_guarded_label_values(&[
1409 &table.id.to_string(),
1410 &table.name,
1411 ])
1412 .inc();
1413 latency_timer.observe_duration();
1414 }
1415 Err(e) => {
1416 tracing::error!(
1417 target: "auto_schema_change",
1418 error = %e.as_report(),
1419 table_id = %table.id,
1420 cdc_table_id = table.cdc_table_id,
1421 upstraem_ddl = table_change.upstream_ddl,
1422 "failed to replace the table",
1423 );
1424 let fail_info =
1425 format!("failed to replace the table: {}", e.as_report());
1426 add_auto_schema_change_fail_event_log(
1427 &self.meta_metrics,
1428 table.id,
1429 table.name.clone(),
1430 table_change.cdc_table_id.clone(),
1431 table_change.upstream_ddl.clone(),
1432 &self.env.event_log_manager_ref(),
1433 fail_info,
1434 );
1435 }
1436 };
1437 }
1438 }
1439 Err(e) => {
1440 tracing::error!(
1441 target: "auto_schema_change",
1442 error = %e.as_report(),
1443 table_id = %table.id,
1444 cdc_table_id = table.cdc_table_id,
1445 "failed to get replace table plan",
1446 );
1447 let fail_info =
1448 format!("failed to get replace table plan: {}", e.as_report());
1449 add_auto_schema_change_fail_event_log(
1450 &self.meta_metrics,
1451 table.id,
1452 table.name.clone(),
1453 table_change.cdc_table_id.clone(),
1454 table_change.upstream_ddl.clone(),
1455 &self.env.event_log_manager_ref(),
1456 fail_info,
1457 );
1458 }
1459 };
1460 }
1461 }
1462
1463 Ok(Response::new(AutoSchemaChangeResponse {}))
1464 }
1465
1466 async fn alter_swap_rename(
1467 &self,
1468 request: Request<AlterSwapRenameRequest>,
1469 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1470 let req = request.into_inner();
1471
1472 let version = self
1473 .ddl_controller
1474 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1475 .await?;
1476
1477 Ok(Response::new(AlterSwapRenameResponse {
1478 status: None,
1479 version,
1480 }))
1481 }
1482
1483 async fn alter_resource_group(
1484 &self,
1485 request: Request<AlterResourceGroupRequest>,
1486 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1487 let req = request.into_inner();
1488
1489 let table_id = req.get_table_id();
1490 let deferred = req.get_deferred();
1491 let resource_group = req.resource_group;
1492
1493 self.ddl_controller
1494 .reschedule_streaming_job(
1495 table_id.as_job_id(),
1496 ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1497 deferred,
1498 )
1499 .await?;
1500
1501 Ok(Response::new(AlterResourceGroupResponse {}))
1502 }
1503
1504 async fn alter_database_param(
1505 &self,
1506 request: Request<AlterDatabaseParamRequest>,
1507 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1508 let req = request.into_inner();
1509 let database_id = req.database_id;
1510
1511 let param = match req.param.unwrap() {
1512 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1513 AlterDatabaseParam::BarrierIntervalMs(value.value)
1514 }
1515 alter_database_param_request::Param::CheckpointFrequency(value) => {
1516 AlterDatabaseParam::CheckpointFrequency(value.value)
1517 }
1518 };
1519 let version = self
1520 .ddl_controller
1521 .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1522 .await?;
1523
1524 return Ok(Response::new(AlterDatabaseParamResponse {
1525 status: None,
1526 version,
1527 }));
1528 }
1529
1530 async fn compact_iceberg_table(
1531 &self,
1532 request: Request<CompactIcebergTableRequest>,
1533 ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1534 let req = request.into_inner();
1535 let sink_id = req.sink_id;
1536
1537 let task_id = self
1539 .iceberg_compaction_manager
1540 .trigger_manual_compaction(sink_id)
1541 .await
1542 .map_err(|e| {
1543 Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1544 })?;
1545
1546 Ok(Response::new(CompactIcebergTableResponse {
1547 status: None,
1548 task_id,
1549 }))
1550 }
1551
1552 async fn expire_iceberg_table_snapshots(
1553 &self,
1554 request: Request<ExpireIcebergTableSnapshotsRequest>,
1555 ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1556 let req = request.into_inner();
1557 let sink_id = req.sink_id;
1558
1559 self.iceberg_compaction_manager
1561 .check_and_expire_snapshots(sink_id)
1562 .await
1563 .map_err(|e| {
1564 Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1565 })?;
1566
1567 Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1568 status: None,
1569 }))
1570 }
1571
1572 async fn create_iceberg_table(
1573 &self,
1574 request: Request<CreateIcebergTableRequest>,
1575 ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1576 let req = request.into_inner();
1577 let CreateIcebergTableRequest {
1578 table_info,
1579 sink_info,
1580 iceberg_source,
1581 if_not_exists,
1582 } = req;
1583
1584 let PbTableJobInfo {
1586 source,
1587 table,
1588 fragment_graph,
1589 job_type,
1590 } = table_info.unwrap();
1591 let mut table = table.unwrap();
1592 let mut fragment_graph = fragment_graph.unwrap();
1593 let database_id = table.get_database_id();
1594 let schema_id = table.get_schema_id();
1595 let table_name = table.get_name().to_owned();
1596
1597 table.create_type = PbCreateType::Background as _;
1599
1600 let source_rate_limit = if let Some(source) = &source {
1602 for fragment in fragment_graph.fragments.values_mut() {
1603 stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1604 if let NodeBody::Source(source_node) = node
1605 && let Some(inner) = &mut source_node.source_inner
1606 {
1607 inner.rate_limit = Some(0);
1608 }
1609 });
1610 }
1611 Some(source.rate_limit)
1612 } else {
1613 None
1614 };
1615
1616 let stream_job =
1617 StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1618 let _ = self
1619 .ddl_controller
1620 .run_command(DdlCommand::CreateStreamingJob {
1621 stream_job,
1622 fragment_graph,
1623 dependencies: HashSet::new(),
1624 resource_type: Self::default_streaming_job_resource_type(),
1625 if_not_exists,
1626 })
1627 .await?;
1628
1629 let table_catalog = self
1630 .metadata_manager
1631 .catalog_controller
1632 .get_table_catalog_by_name(database_id, schema_id, &table_name)
1633 .await?
1634 .ok_or(Status::not_found("Internal error: table not found"))?;
1635
1636 let PbSinkJobInfo {
1638 sink,
1639 fragment_graph,
1640 } = sink_info.unwrap();
1641 let mut sink = sink.unwrap();
1642
1643 sink.create_type = PbCreateType::Background as _;
1645 sink.auto_refresh_schema_from_table = Some(table_catalog.id);
1646
1647 let mut fragment_graph = fragment_graph.unwrap();
1648
1649 assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1650 assert!(
1651 risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1652 .is_placeholder()
1653 );
1654 fragment_graph.dependent_table_ids[0] = table_catalog.id;
1655 for fragment in fragment_graph.fragments.values_mut() {
1656 stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1657 NodeBody::StreamScan(scan) => {
1658 scan.table_id = table_catalog.id;
1659 if let Some(table_desc) = &mut scan.table_desc {
1660 assert!(
1661 risingwave_common::catalog::TableId::from(table_desc.table_id)
1662 .is_placeholder()
1663 );
1664 table_desc.table_id = table_catalog.id;
1665 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1666 }
1667 if let Some(table) = &mut scan.arrangement_table {
1668 assert!(
1669 risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1670 );
1671 *table = table_catalog.clone();
1672 }
1673 }
1674 NodeBody::BatchPlan(plan) => {
1675 if let Some(table_desc) = &mut plan.table_desc {
1676 assert!(
1677 risingwave_common::catalog::TableId::from(table_desc.table_id)
1678 .is_placeholder()
1679 );
1680 table_desc.table_id = table_catalog.id;
1681 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1682 }
1683 }
1684 _ => {}
1685 });
1686 }
1687
1688 let table_id = table_catalog.id;
1689 let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1690 let stream_job = StreamingJob::Sink(sink);
1691 let res = self
1692 .ddl_controller
1693 .run_command(DdlCommand::CreateStreamingJob {
1694 stream_job,
1695 fragment_graph,
1696 dependencies,
1697 resource_type: Self::default_streaming_job_resource_type(),
1698 if_not_exists,
1699 })
1700 .await;
1701
1702 if res.is_err() {
1703 let _ = self
1704 .ddl_controller
1705 .run_command(DdlCommand::DropStreamingJob {
1706 job_id: StreamingJobId::Table(None, table_id),
1707 drop_mode: DropMode::Cascade,
1708 })
1709 .await
1710 .inspect_err(|err| {
1711 tracing::error!(error = %err.as_report(),
1712 "Failed to clean up table after iceberg sink creation failure",
1713 );
1714 });
1715 res?;
1716 }
1717
1718 if let Some(source_rate_limit) = source_rate_limit
1720 && source_rate_limit != Some(0)
1721 {
1722 let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1723 table_catalog.optional_associated_source_id.unwrap();
1724 let (jobs, fragments) = self
1725 .metadata_manager
1726 .update_source_rate_limit_by_source_id(source_id, source_rate_limit)
1727 .await?;
1728 let throttle_config = ThrottleConfig {
1729 throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1730 rate_limit: source_rate_limit,
1731 };
1732 let _ = self
1733 .barrier_scheduler
1734 .run_command(
1735 database_id,
1736 Command::Throttle {
1737 jobs,
1738 config: fragments
1739 .into_iter()
1740 .map(|fragment_id| (fragment_id, throttle_config))
1741 .collect(),
1742 },
1743 )
1744 .await?;
1745 }
1746
1747 let iceberg_source = iceberg_source.unwrap();
1749 let res = self
1750 .ddl_controller
1751 .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1752 .await;
1753 if res.is_err() {
1754 let _ = self
1755 .ddl_controller
1756 .run_command(DdlCommand::DropStreamingJob {
1757 job_id: StreamingJobId::Table(None, table_id),
1758 drop_mode: DropMode::Cascade,
1759 })
1760 .await
1761 .inspect_err(|err| {
1762 tracing::error!(
1763 error = %err.as_report(),
1764 "Failed to clean up table after iceberg source creation failure",
1765 );
1766 });
1767 }
1768
1769 Ok(Response::new(CreateIcebergTableResponse {
1770 status: None,
1771 version: res?,
1772 }))
1773 }
1774}
1775
1776fn add_auto_schema_change_fail_event_log(
1777 meta_metrics: &Arc<MetaMetrics>,
1778 table_id: TableId,
1779 table_name: String,
1780 cdc_table_id: String,
1781 upstream_ddl: String,
1782 event_log_manager: &EventLogManagerRef,
1783 fail_info: String,
1784) {
1785 meta_metrics
1786 .auto_schema_change_failure_cnt
1787 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1788 .inc();
1789 let event = event_log::EventAutoSchemaChangeFail {
1790 table_id,
1791 table_name,
1792 cdc_table_id,
1793 upstream_ddl,
1794 fail_info,
1795 };
1796 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1797}