1use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::{ObjectId, TableId};
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command, ResumeBackfillTarget};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
34use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
35use risingwave_meta_model::StreamingParallelism;
36use risingwave_pb::catalog::connection::Info as ConnectionInfo;
37use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
38use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::common::worker_node::State;
41use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
42use risingwave_pb::ddl_service::ddl_service_server::DdlService;
43use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
44use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
45use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
46use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
47use risingwave_pb::meta::event_log;
48use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
49use risingwave_pb::stream_plan::stream_node::NodeBody;
50use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
51use thiserror_ext::AsReport;
52use tokio::sync::oneshot::Sender;
53use tokio::task::JoinHandle;
54use tonic::{Request, Response, Status};
55
56use crate::MetaError;
57use crate::barrier::BarrierManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{MetaSrvEnv, StreamingJob};
60use crate::rpc::ddl_controller::{
61 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
62};
63use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
64
65#[derive(Clone)]
66pub struct DdlServiceImpl {
67 env: MetaSrvEnv,
68
69 metadata_manager: MetadataManager,
70 sink_manager: SinkCoordinatorManager,
71 ddl_controller: DdlController,
72 meta_metrics: Arc<MetaMetrics>,
73 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
74 barrier_scheduler: BarrierScheduler,
75}
76
77impl DdlServiceImpl {
78 #[allow(clippy::too_many_arguments)]
79 pub async fn new(
80 env: MetaSrvEnv,
81 metadata_manager: MetadataManager,
82 stream_manager: GlobalStreamManagerRef,
83 source_manager: SourceManagerRef,
84 barrier_manager: BarrierManagerRef,
85 sink_manager: SinkCoordinatorManager,
86 meta_metrics: Arc<MetaMetrics>,
87 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
88 barrier_scheduler: BarrierScheduler,
89 ) -> Self {
90 let ddl_controller = DdlController::new(
91 env.clone(),
92 metadata_manager.clone(),
93 stream_manager,
94 source_manager,
95 barrier_manager,
96 sink_manager.clone(),
97 iceberg_compaction_manager.clone(),
98 )
99 .await;
100 Self {
101 env,
102 metadata_manager,
103 sink_manager,
104 ddl_controller,
105 meta_metrics,
106 iceberg_compaction_manager,
107 barrier_scheduler,
108 }
109 }
110
111 fn extract_replace_table_info(
112 ReplaceJobPlan {
113 fragment_graph,
114 replace_job,
115 }: ReplaceJobPlan,
116 ) -> ReplaceStreamJobInfo {
117 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
118 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
119 table,
120 source,
121 job_type,
122 }) => StreamingJob::Table(
123 source,
124 table.unwrap(),
125 TableJobType::try_from(job_type).unwrap(),
126 ),
127 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
128 StreamingJob::Source(source.unwrap())
129 }
130 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
131 table,
132 }) => StreamingJob::MaterializedView(table.unwrap()),
133 };
134
135 ReplaceStreamJobInfo {
136 streaming_job: replace_streaming_job,
137 fragment_graph: fragment_graph.unwrap(),
138 }
139 }
140
141 fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
142 streaming_job_resource_type::ResourceType::Regular(true)
143 }
144
145 pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
146 tracing::info!("start migrate legacy table fragments task");
147 let env = self.env.clone();
148 let metadata_manager = self.metadata_manager.clone();
149 let ddl_controller = self.ddl_controller.clone();
150
151 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
152 let join_handle = tokio::spawn(async move {
153 async fn migrate_inner(
154 env: &MetaSrvEnv,
155 metadata_manager: &MetadataManager,
156 ddl_controller: &DdlController,
157 ) -> MetaResult<()> {
158 let tables = metadata_manager
159 .catalog_controller
160 .list_unmigrated_tables()
161 .await?;
162
163 if tables.is_empty() {
164 tracing::info!("no legacy table fragments need migration");
165 return Ok(());
166 }
167
168 let client = {
169 let workers = metadata_manager
170 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
171 .await?;
172 if workers.is_empty() {
173 return Err(anyhow::anyhow!("no active frontend nodes found").into());
174 }
175 let worker = workers.choose(&mut thread_rng()).unwrap();
176 env.frontend_client_pool().get(worker).await?
177 };
178
179 for table in tables {
180 let start = tokio::time::Instant::now();
181 let req = GetTableReplacePlanRequest {
182 database_id: table.database_id,
183 table_id: table.id,
184 cdc_table_change: None,
185 };
186 let resp = client
187 .get_table_replace_plan(req)
188 .await
189 .context("failed to get table replace plan from frontend")?;
190
191 let plan = resp.into_inner().replace_plan.unwrap();
192 let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
193 ddl_controller
194 .run_command(DdlCommand::ReplaceStreamJob(replace_info))
195 .await?;
196 tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
197 }
198 tracing::info!("successfully migrated all legacy table fragments");
199
200 Ok(())
201 }
202
203 let migrate_future = async move {
204 let mut attempt = 0;
205 loop {
206 match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
207 Ok(_) => break,
208 Err(e) => {
209 attempt += 1;
210 tracing::error!(
211 "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
212 e.as_report(),
213 attempt
214 );
215 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
216 }
217 }
218 }
219 };
220
221 select(pin!(migrate_future), shutdown_rx).await;
222 });
223
224 (join_handle, shutdown_tx)
225 }
226}
227
228#[async_trait::async_trait]
229impl DdlService for DdlServiceImpl {
230 async fn create_database(
231 &self,
232 request: Request<CreateDatabaseRequest>,
233 ) -> Result<Response<CreateDatabaseResponse>, Status> {
234 let req = request.into_inner();
235 let database = req.get_db()?.clone();
236 let version = self
237 .ddl_controller
238 .run_command(DdlCommand::CreateDatabase(database))
239 .await?;
240
241 Ok(Response::new(CreateDatabaseResponse {
242 status: None,
243 version,
244 }))
245 }
246
247 async fn drop_database(
248 &self,
249 request: Request<DropDatabaseRequest>,
250 ) -> Result<Response<DropDatabaseResponse>, Status> {
251 let req = request.into_inner();
252 let database_id = req.get_database_id();
253
254 let version = self
255 .ddl_controller
256 .run_command(DdlCommand::DropDatabase(database_id))
257 .await?;
258
259 Ok(Response::new(DropDatabaseResponse {
260 status: None,
261 version,
262 }))
263 }
264
265 async fn create_secret(
266 &self,
267 request: Request<CreateSecretRequest>,
268 ) -> Result<Response<CreateSecretResponse>, Status> {
269 let req = request.into_inner();
270 let pb_secret = Secret {
271 id: 0.into(),
272 name: req.get_name().clone(),
273 database_id: req.get_database_id(),
274 value: req.get_value().clone(),
275 owner: req.get_owner_id(),
276 schema_id: req.get_schema_id(),
277 };
278 let version = self
279 .ddl_controller
280 .run_command(DdlCommand::CreateSecret(pb_secret))
281 .await?;
282
283 Ok(Response::new(CreateSecretResponse { version }))
284 }
285
286 async fn drop_secret(
287 &self,
288 request: Request<DropSecretRequest>,
289 ) -> Result<Response<DropSecretResponse>, Status> {
290 let req = request.into_inner();
291 let secret_id = req.get_secret_id();
292 let drop_mode = DropMode::from_request_setting(req.cascade);
293 let version = self
294 .ddl_controller
295 .run_command(DdlCommand::DropSecret(secret_id, drop_mode))
296 .await?;
297
298 Ok(Response::new(DropSecretResponse {
299 status: None,
300 version,
301 }))
302 }
303
304 async fn alter_secret(
305 &self,
306 request: Request<AlterSecretRequest>,
307 ) -> Result<Response<AlterSecretResponse>, Status> {
308 let req = request.into_inner();
309 let pb_secret = Secret {
310 id: req.get_secret_id(),
311 name: req.get_name().clone(),
312 database_id: req.get_database_id(),
313 value: req.get_value().clone(),
314 owner: req.get_owner_id(),
315 schema_id: req.get_schema_id(),
316 };
317 let version = self
318 .ddl_controller
319 .run_command(DdlCommand::AlterSecret(pb_secret))
320 .await?;
321
322 Ok(Response::new(AlterSecretResponse { version }))
323 }
324
325 async fn create_schema(
326 &self,
327 request: Request<CreateSchemaRequest>,
328 ) -> Result<Response<CreateSchemaResponse>, Status> {
329 let req = request.into_inner();
330 let schema = req.get_schema()?.clone();
331 let version = self
332 .ddl_controller
333 .run_command(DdlCommand::CreateSchema(schema))
334 .await?;
335
336 Ok(Response::new(CreateSchemaResponse {
337 status: None,
338 version,
339 }))
340 }
341
342 async fn drop_schema(
343 &self,
344 request: Request<DropSchemaRequest>,
345 ) -> Result<Response<DropSchemaResponse>, Status> {
346 let req = request.into_inner();
347 let schema_id = req.get_schema_id();
348 let drop_mode = DropMode::from_request_setting(req.cascade);
349 let version = self
350 .ddl_controller
351 .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
352 .await?;
353 Ok(Response::new(DropSchemaResponse {
354 status: None,
355 version,
356 }))
357 }
358
359 async fn create_source(
360 &self,
361 request: Request<CreateSourceRequest>,
362 ) -> Result<Response<CreateSourceResponse>, Status> {
363 let req = request.into_inner();
364 let source = req.get_source()?.clone();
365
366 match req.fragment_graph {
367 None => {
368 let version = self
369 .ddl_controller
370 .run_command(DdlCommand::CreateNonSharedSource(source))
371 .await?;
372 Ok(Response::new(CreateSourceResponse {
373 status: None,
374 version,
375 }))
376 }
377 Some(fragment_graph) => {
378 let stream_job = StreamingJob::Source(source);
380 let version = self
381 .ddl_controller
382 .run_command(DdlCommand::CreateStreamingJob {
383 stream_job,
384 fragment_graph,
385 dependencies: HashSet::new(),
386 resource_type: Self::default_streaming_job_resource_type(),
387 if_not_exists: req.if_not_exists,
388 })
389 .await?;
390 Ok(Response::new(CreateSourceResponse {
391 status: None,
392 version,
393 }))
394 }
395 }
396 }
397
398 async fn drop_source(
399 &self,
400 request: Request<DropSourceRequest>,
401 ) -> Result<Response<DropSourceResponse>, Status> {
402 let request = request.into_inner();
403 let source_id = request.source_id;
404 let drop_mode = DropMode::from_request_setting(request.cascade);
405 let version = self
406 .ddl_controller
407 .run_command(DdlCommand::DropSource(source_id, drop_mode))
408 .await?;
409
410 Ok(Response::new(DropSourceResponse {
411 status: None,
412 version,
413 }))
414 }
415
416 async fn reset_source(
417 &self,
418 request: Request<ResetSourceRequest>,
419 ) -> Result<Response<ResetSourceResponse>, Status> {
420 let request = request.into_inner();
421 let source_id = request.source_id;
422
423 tracing::info!(
424 source_id = %source_id,
425 "Received RESET SOURCE request, routing to DDL controller"
426 );
427
428 let version = self
430 .ddl_controller
431 .run_command(DdlCommand::ResetSource(source_id))
432 .await?;
433
434 Ok(Response::new(ResetSourceResponse {
435 status: None,
436 version,
437 }))
438 }
439
440 async fn create_sink(
441 &self,
442 request: Request<CreateSinkRequest>,
443 ) -> Result<Response<CreateSinkResponse>, Status> {
444 self.env.idle_manager().record_activity();
445
446 let req = request.into_inner();
447
448 let sink = req.get_sink()?.clone();
449 let fragment_graph = req.get_fragment_graph()?.clone();
450 let dependencies = req.get_dependencies().iter().copied().collect();
451
452 let stream_job = StreamingJob::Sink(sink);
453
454 let command = DdlCommand::CreateStreamingJob {
455 stream_job,
456 fragment_graph,
457 dependencies,
458 resource_type: Self::default_streaming_job_resource_type(),
459 if_not_exists: req.if_not_exists,
460 };
461
462 let version = self.ddl_controller.run_command(command).await?;
463
464 Ok(Response::new(CreateSinkResponse {
465 status: None,
466 version,
467 }))
468 }
469
470 async fn drop_sink(
471 &self,
472 request: Request<DropSinkRequest>,
473 ) -> Result<Response<DropSinkResponse>, Status> {
474 let request = request.into_inner();
475 let sink_id = request.sink_id;
476 let drop_mode = DropMode::from_request_setting(request.cascade);
477
478 let command = DdlCommand::DropStreamingJob {
479 job_id: StreamingJobId::Sink(sink_id),
480 drop_mode,
481 };
482
483 let version = self.ddl_controller.run_command(command).await?;
484
485 self.sink_manager
486 .stop_sink_coordinator(vec![SinkId::from(sink_id)])
487 .await;
488
489 Ok(Response::new(DropSinkResponse {
490 status: None,
491 version,
492 }))
493 }
494
495 async fn create_subscription(
496 &self,
497 request: Request<CreateSubscriptionRequest>,
498 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
499 self.env.idle_manager().record_activity();
500
501 let req = request.into_inner();
502
503 let subscription = req.get_subscription()?.clone();
504 let command = DdlCommand::CreateSubscription(subscription);
505
506 let version = self.ddl_controller.run_command(command).await?;
507
508 Ok(Response::new(CreateSubscriptionResponse {
509 status: None,
510 version,
511 }))
512 }
513
514 async fn drop_subscription(
515 &self,
516 request: Request<DropSubscriptionRequest>,
517 ) -> Result<Response<DropSubscriptionResponse>, Status> {
518 let request = request.into_inner();
519 let subscription_id = request.subscription_id;
520 let drop_mode = DropMode::from_request_setting(request.cascade);
521
522 let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
523
524 let version = self.ddl_controller.run_command(command).await?;
525
526 Ok(Response::new(DropSubscriptionResponse {
527 status: None,
528 version,
529 }))
530 }
531
532 async fn create_materialized_view(
533 &self,
534 request: Request<CreateMaterializedViewRequest>,
535 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
536 self.env.idle_manager().record_activity();
537
538 let req = request.into_inner();
539 let mview = req.get_materialized_view()?.clone();
540 let fragment_graph = req.get_fragment_graph()?.clone();
541 let dependencies = req.get_dependencies().iter().copied().collect();
542 let resource_type = req.resource_type.unwrap().resource_type.unwrap();
543
544 let stream_job = StreamingJob::MaterializedView(mview);
545 let version = self
546 .ddl_controller
547 .run_command(DdlCommand::CreateStreamingJob {
548 stream_job,
549 fragment_graph,
550 dependencies,
551 resource_type,
552 if_not_exists: req.if_not_exists,
553 })
554 .await?;
555
556 Ok(Response::new(CreateMaterializedViewResponse {
557 status: None,
558 version,
559 }))
560 }
561
562 async fn drop_materialized_view(
563 &self,
564 request: Request<DropMaterializedViewRequest>,
565 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
566 self.env.idle_manager().record_activity();
567
568 let request = request.into_inner();
569 let table_id = request.table_id;
570 let drop_mode = DropMode::from_request_setting(request.cascade);
571
572 let version = self
573 .ddl_controller
574 .run_command(DdlCommand::DropStreamingJob {
575 job_id: StreamingJobId::MaterializedView(table_id),
576 drop_mode,
577 })
578 .await?;
579
580 Ok(Response::new(DropMaterializedViewResponse {
581 status: None,
582 version,
583 }))
584 }
585
586 async fn create_index(
587 &self,
588 request: Request<CreateIndexRequest>,
589 ) -> Result<Response<CreateIndexResponse>, Status> {
590 self.env.idle_manager().record_activity();
591
592 let req = request.into_inner();
593 let index = req.get_index()?.clone();
594 let index_table = req.get_index_table()?.clone();
595 let fragment_graph = req.get_fragment_graph()?.clone();
596
597 let stream_job = StreamingJob::Index(index, index_table);
598 let version = self
599 .ddl_controller
600 .run_command(DdlCommand::CreateStreamingJob {
601 stream_job,
602 fragment_graph,
603 dependencies: HashSet::new(),
604 resource_type: Self::default_streaming_job_resource_type(),
605 if_not_exists: req.if_not_exists,
606 })
607 .await?;
608
609 Ok(Response::new(CreateIndexResponse {
610 status: None,
611 version,
612 }))
613 }
614
615 async fn drop_index(
616 &self,
617 request: Request<DropIndexRequest>,
618 ) -> Result<Response<DropIndexResponse>, Status> {
619 self.env.idle_manager().record_activity();
620
621 let request = request.into_inner();
622 let index_id = request.index_id;
623 let drop_mode = DropMode::from_request_setting(request.cascade);
624 let version = self
625 .ddl_controller
626 .run_command(DdlCommand::DropStreamingJob {
627 job_id: StreamingJobId::Index(index_id),
628 drop_mode,
629 })
630 .await?;
631
632 Ok(Response::new(DropIndexResponse {
633 status: None,
634 version,
635 }))
636 }
637
638 async fn create_function(
639 &self,
640 request: Request<CreateFunctionRequest>,
641 ) -> Result<Response<CreateFunctionResponse>, Status> {
642 let req = request.into_inner();
643 let function = req.get_function()?.clone();
644
645 let version = self
646 .ddl_controller
647 .run_command(DdlCommand::CreateFunction(function))
648 .await?;
649
650 Ok(Response::new(CreateFunctionResponse {
651 status: None,
652 version,
653 }))
654 }
655
656 async fn drop_function(
657 &self,
658 request: Request<DropFunctionRequest>,
659 ) -> Result<Response<DropFunctionResponse>, Status> {
660 let request = request.into_inner();
661
662 let version = self
663 .ddl_controller
664 .run_command(DdlCommand::DropFunction(
665 request.function_id,
666 DropMode::from_request_setting(request.cascade),
667 ))
668 .await?;
669
670 Ok(Response::new(DropFunctionResponse {
671 status: None,
672 version,
673 }))
674 }
675
676 async fn create_table(
677 &self,
678 request: Request<CreateTableRequest>,
679 ) -> Result<Response<CreateTableResponse>, Status> {
680 let request = request.into_inner();
681 let job_type = request.get_job_type().unwrap_or_default();
682 let dependencies = request.get_dependencies().iter().copied().collect();
683 let source = request.source;
684 let mview = request.materialized_view.unwrap();
685 let fragment_graph = request.fragment_graph.unwrap();
686
687 let stream_job = StreamingJob::Table(source, mview, job_type);
688 let version = self
689 .ddl_controller
690 .run_command(DdlCommand::CreateStreamingJob {
691 stream_job,
692 fragment_graph,
693 dependencies,
694 resource_type: Self::default_streaming_job_resource_type(),
695 if_not_exists: request.if_not_exists,
696 })
697 .await?;
698
699 Ok(Response::new(CreateTableResponse {
700 status: None,
701 version,
702 }))
703 }
704
705 async fn drop_table(
706 &self,
707 request: Request<DropTableRequest>,
708 ) -> Result<Response<DropTableResponse>, Status> {
709 let request = request.into_inner();
710 let source_id = request.source_id;
711 let table_id = request.table_id;
712
713 let drop_mode = DropMode::from_request_setting(request.cascade);
714 let version = self
715 .ddl_controller
716 .run_command(DdlCommand::DropStreamingJob {
717 job_id: StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id), table_id),
718 drop_mode,
719 })
720 .await?;
721
722 Ok(Response::new(DropTableResponse {
723 status: None,
724 version,
725 }))
726 }
727
728 async fn create_view(
729 &self,
730 request: Request<CreateViewRequest>,
731 ) -> Result<Response<CreateViewResponse>, Status> {
732 let req = request.into_inner();
733 let view = req.get_view()?.clone();
734 let dependencies = req
735 .get_dependencies()
736 .iter()
737 .copied()
738 .collect::<HashSet<_>>();
739
740 let version = self
741 .ddl_controller
742 .run_command(DdlCommand::CreateView(view, dependencies))
743 .await?;
744
745 Ok(Response::new(CreateViewResponse {
746 status: None,
747 version,
748 }))
749 }
750
751 async fn drop_view(
752 &self,
753 request: Request<DropViewRequest>,
754 ) -> Result<Response<DropViewResponse>, Status> {
755 let request = request.into_inner();
756 let view_id = request.get_view_id();
757 let drop_mode = DropMode::from_request_setting(request.cascade);
758 let version = self
759 .ddl_controller
760 .run_command(DdlCommand::DropView(view_id, drop_mode))
761 .await?;
762 Ok(Response::new(DropViewResponse {
763 status: None,
764 version,
765 }))
766 }
767
768 async fn risectl_list_state_tables(
769 &self,
770 _request: Request<RisectlListStateTablesRequest>,
771 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
772 let tables = self
773 .metadata_manager
774 .catalog_controller
775 .list_all_state_tables()
776 .await?;
777 Ok(Response::new(RisectlListStateTablesResponse { tables }))
778 }
779
780 async fn risectl_resume_backfill(
781 &self,
782 request: Request<RisectlResumeBackfillRequest>,
783 ) -> Result<Response<RisectlResumeBackfillResponse>, Status> {
784 let request = request.into_inner();
785 let target = request
786 .target
787 .ok_or_else(|| Status::invalid_argument("missing resume backfill target"))?;
788
789 match target {
790 risectl_resume_backfill_request::Target::JobId(job_id) => {
791 let database_id = self
792 .metadata_manager
793 .catalog_controller
794 .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
795 .await?;
796 self.barrier_scheduler
797 .run_command(
798 database_id,
799 Command::ResumeBackfill {
800 target: ResumeBackfillTarget::Job(job_id),
801 },
802 )
803 .await?;
804 }
805 risectl_resume_backfill_request::Target::FragmentId(fragment_id) => {
806 let mut job_ids = self
807 .metadata_manager
808 .catalog_controller
809 .get_fragment_job_id(vec![fragment_id])
810 .await?;
811 let job_id = job_ids
812 .pop()
813 .ok_or_else(|| Status::invalid_argument("fragment not found"))?;
814 let database_id = self
815 .metadata_manager
816 .catalog_controller
817 .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
818 .await?;
819 self.barrier_scheduler
820 .run_command(
821 database_id,
822 Command::ResumeBackfill {
823 target: ResumeBackfillTarget::Fragment(fragment_id),
824 },
825 )
826 .await?;
827 }
828 }
829
830 Ok(Response::new(RisectlResumeBackfillResponse {}))
831 }
832
833 async fn replace_job_plan(
834 &self,
835 request: Request<ReplaceJobPlanRequest>,
836 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
837 let req = request.into_inner().get_plan().cloned()?;
838
839 let version = self
840 .ddl_controller
841 .run_command(DdlCommand::ReplaceStreamJob(
842 Self::extract_replace_table_info(req),
843 ))
844 .await?;
845
846 Ok(Response::new(ReplaceJobPlanResponse {
847 status: None,
848 version,
849 }))
850 }
851
852 async fn get_table(
853 &self,
854 request: Request<GetTableRequest>,
855 ) -> Result<Response<GetTableResponse>, Status> {
856 let req = request.into_inner();
857 let table = self
858 .metadata_manager
859 .catalog_controller
860 .get_table_by_name(&req.database_name, &req.table_name)
861 .await?;
862
863 Ok(Response::new(GetTableResponse { table }))
864 }
865
866 async fn alter_name(
867 &self,
868 request: Request<AlterNameRequest>,
869 ) -> Result<Response<AlterNameResponse>, Status> {
870 let AlterNameRequest { object, new_name } = request.into_inner();
871 let version = self
872 .ddl_controller
873 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
874 .await?;
875 Ok(Response::new(AlterNameResponse {
876 status: None,
877 version,
878 }))
879 }
880
881 async fn alter_source(
883 &self,
884 request: Request<AlterSourceRequest>,
885 ) -> Result<Response<AlterSourceResponse>, Status> {
886 let AlterSourceRequest { source } = request.into_inner();
887 let version = self
888 .ddl_controller
889 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
890 .await?;
891 Ok(Response::new(AlterSourceResponse {
892 status: None,
893 version,
894 }))
895 }
896
897 async fn alter_owner(
898 &self,
899 request: Request<AlterOwnerRequest>,
900 ) -> Result<Response<AlterOwnerResponse>, Status> {
901 let AlterOwnerRequest { object, owner_id } = request.into_inner();
902 let version = self
903 .ddl_controller
904 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
905 .await?;
906 Ok(Response::new(AlterOwnerResponse {
907 status: None,
908 version,
909 }))
910 }
911
912 async fn alter_subscription_retention(
913 &self,
914 request: Request<AlterSubscriptionRetentionRequest>,
915 ) -> Result<Response<AlterSubscriptionRetentionResponse>, Status> {
916 let AlterSubscriptionRetentionRequest {
917 subscription_id,
918 retention_seconds,
919 definition,
920 } = request.into_inner();
921 let version = self
922 .ddl_controller
923 .run_command(DdlCommand::AlterSubscriptionRetention {
924 subscription_id,
925 retention_seconds,
926 definition,
927 })
928 .await?;
929 Ok(Response::new(AlterSubscriptionRetentionResponse {
930 status: None,
931 version,
932 }))
933 }
934
935 async fn alter_set_schema(
936 &self,
937 request: Request<AlterSetSchemaRequest>,
938 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
939 let AlterSetSchemaRequest {
940 object,
941 new_schema_id,
942 } = request.into_inner();
943 let version = self
944 .ddl_controller
945 .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
946 .await?;
947 Ok(Response::new(AlterSetSchemaResponse {
948 status: None,
949 version,
950 }))
951 }
952
953 async fn get_ddl_progress(
954 &self,
955 _request: Request<GetDdlProgressRequest>,
956 ) -> Result<Response<GetDdlProgressResponse>, Status> {
957 Ok(Response::new(GetDdlProgressResponse {
958 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
959 }))
960 }
961
962 async fn create_connection(
963 &self,
964 request: Request<CreateConnectionRequest>,
965 ) -> Result<Response<CreateConnectionResponse>, Status> {
966 let req = request.into_inner();
967 if req.payload.is_none() {
968 return Err(Status::invalid_argument("request is empty"));
969 }
970
971 match req.payload.unwrap() {
972 #[expect(deprecated)]
973 create_connection_request::Payload::PrivateLink(_) => {
974 panic!("Private Link Connection has been deprecated")
975 }
976 create_connection_request::Payload::ConnectionParams(params) => {
977 let pb_connection = Connection {
978 id: 0.into(),
979 schema_id: req.schema_id,
980 database_id: req.database_id,
981 name: req.name,
982 info: Some(ConnectionInfo::ConnectionParams(params)),
983 owner: req.owner_id,
984 };
985 let version = self
986 .ddl_controller
987 .run_command(DdlCommand::CreateConnection(pb_connection))
988 .await?;
989 Ok(Response::new(CreateConnectionResponse { version }))
990 }
991 }
992 }
993
994 async fn list_connections(
995 &self,
996 _request: Request<ListConnectionsRequest>,
997 ) -> Result<Response<ListConnectionsResponse>, Status> {
998 let conns = self
999 .metadata_manager
1000 .catalog_controller
1001 .list_connections()
1002 .await?;
1003
1004 Ok(Response::new(ListConnectionsResponse {
1005 connections: conns,
1006 }))
1007 }
1008
1009 async fn drop_connection(
1010 &self,
1011 request: Request<DropConnectionRequest>,
1012 ) -> Result<Response<DropConnectionResponse>, Status> {
1013 let req = request.into_inner();
1014 let drop_mode = DropMode::from_request_setting(req.cascade);
1015
1016 let version = self
1017 .ddl_controller
1018 .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
1019 .await?;
1020
1021 Ok(Response::new(DropConnectionResponse {
1022 status: None,
1023 version,
1024 }))
1025 }
1026
1027 async fn comment_on(
1028 &self,
1029 request: Request<CommentOnRequest>,
1030 ) -> Result<Response<CommentOnResponse>, Status> {
1031 let req = request.into_inner();
1032 let comment = req.get_comment()?.clone();
1033
1034 let version = self
1035 .ddl_controller
1036 .run_command(DdlCommand::CommentOn(Comment {
1037 table_id: comment.table_id,
1038 schema_id: comment.schema_id,
1039 database_id: comment.database_id,
1040 column_index: comment.column_index,
1041 description: comment.description,
1042 }))
1043 .await?;
1044
1045 Ok(Response::new(CommentOnResponse {
1046 status: None,
1047 version,
1048 }))
1049 }
1050
1051 async fn get_tables(
1052 &self,
1053 request: Request<GetTablesRequest>,
1054 ) -> Result<Response<GetTablesResponse>, Status> {
1055 let GetTablesRequest {
1056 table_ids,
1057 include_dropped_tables,
1058 } = request.into_inner();
1059 let ret = self
1060 .metadata_manager
1061 .catalog_controller
1062 .get_table_by_ids(table_ids, include_dropped_tables)
1063 .await?;
1064
1065 let mut tables = HashMap::default();
1066 for table in ret {
1067 tables.insert(table.id, table);
1068 }
1069 Ok(Response::new(GetTablesResponse { tables }))
1070 }
1071
1072 async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
1073 let version = self.ddl_controller.wait().await?;
1074 Ok(Response::new(WaitResponse {
1075 version: Some(version),
1076 }))
1077 }
1078
1079 async fn alter_cdc_table_backfill_parallelism(
1080 &self,
1081 request: Request<AlterCdcTableBackfillParallelismRequest>,
1082 ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1083 let req = request.into_inner();
1084 let job_id = req.get_table_id();
1085 let parallelism = *req.get_parallelism()?;
1086
1087 let table_parallelism = ModelTableParallelism::from(parallelism);
1088 let streaming_parallelism = match table_parallelism {
1089 ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1090 _ => bail_invalid_parameter!(
1091 "CDC table backfill parallelism must be set to a fixed value"
1092 ),
1093 };
1094
1095 self.ddl_controller
1096 .reschedule_cdc_table_backfill(
1097 job_id,
1098 ReschedulePolicy::Parallelism(ParallelismPolicy {
1099 parallelism: streaming_parallelism,
1100 }),
1101 )
1102 .await?;
1103 Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1104 }
1105
1106 async fn alter_parallelism(
1107 &self,
1108 request: Request<AlterParallelismRequest>,
1109 ) -> Result<Response<AlterParallelismResponse>, Status> {
1110 let req = request.into_inner();
1111
1112 let job_id = req.get_table_id();
1113 let parallelism = *req.get_parallelism()?;
1114 let deferred = req.get_deferred();
1115
1116 let parallelism = match parallelism.get_parallelism()? {
1117 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1118 StreamingParallelism::Fixed(*parallelism as _)
1119 }
1120 Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1121 _ => bail_unavailable!(),
1122 };
1123
1124 self.ddl_controller
1125 .reschedule_streaming_job(
1126 job_id,
1127 ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1128 deferred,
1129 )
1130 .await?;
1131
1132 Ok(Response::new(AlterParallelismResponse {}))
1133 }
1134
1135 async fn alter_backfill_parallelism(
1136 &self,
1137 request: Request<AlterBackfillParallelismRequest>,
1138 ) -> Result<Response<AlterBackfillParallelismResponse>, Status> {
1139 let req = request.into_inner();
1140
1141 let job_id = req.get_table_id();
1142 let deferred = req.get_deferred();
1143
1144 let parallelism = match req.parallelism {
1145 None => None,
1146 Some(parallelism) => {
1147 let parallelism = match parallelism.get_parallelism()? {
1148 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1149 StreamingParallelism::Fixed(*parallelism as _)
1150 }
1151 Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1152 StreamingParallelism::Adaptive
1153 }
1154 _ => bail_unavailable!(),
1155 };
1156 Some(parallelism)
1157 }
1158 };
1159
1160 self.ddl_controller
1161 .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
1162 .await?;
1163
1164 Ok(Response::new(AlterBackfillParallelismResponse {}))
1165 }
1166
1167 async fn alter_fragment_parallelism(
1168 &self,
1169 request: Request<AlterFragmentParallelismRequest>,
1170 ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1171 let req = request.into_inner();
1172
1173 let fragment_ids = req.fragment_ids;
1174 if fragment_ids.is_empty() {
1175 return Err(Status::invalid_argument(
1176 "at least one fragment id must be provided",
1177 ));
1178 }
1179
1180 let parallelism = match req.parallelism {
1181 Some(parallelism) => {
1182 let streaming_parallelism = match parallelism.get_parallelism()? {
1183 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1184 StreamingParallelism::Fixed(*parallelism as _)
1185 }
1186 Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1187 StreamingParallelism::Adaptive
1188 }
1189 _ => bail_unavailable!(),
1190 };
1191 Some(streaming_parallelism)
1192 }
1193 None => None,
1194 };
1195
1196 let fragment_targets = fragment_ids
1197 .into_iter()
1198 .map(|fragment_id| (fragment_id, parallelism.clone()))
1199 .collect();
1200
1201 self.ddl_controller
1202 .reschedule_fragments(fragment_targets)
1203 .await?;
1204
1205 Ok(Response::new(AlterFragmentParallelismResponse {}))
1206 }
1207
1208 async fn alter_streaming_job_config(
1209 &self,
1210 request: Request<AlterStreamingJobConfigRequest>,
1211 ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1212 let AlterStreamingJobConfigRequest {
1213 job_id,
1214 entries_to_add,
1215 keys_to_remove,
1216 } = request.into_inner();
1217
1218 self.ddl_controller
1219 .run_command(DdlCommand::AlterStreamingJobConfig(
1220 job_id,
1221 entries_to_add,
1222 keys_to_remove,
1223 ))
1224 .await?;
1225
1226 Ok(Response::new(AlterStreamingJobConfigResponse {}))
1227 }
1228
1229 async fn auto_schema_change(
1232 &self,
1233 request: Request<AutoSchemaChangeRequest>,
1234 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1235 let req = request.into_inner();
1236
1237 let workers = self
1239 .metadata_manager
1240 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1241 .await?;
1242 let worker = workers
1243 .choose(&mut thread_rng())
1244 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1245
1246 let client = self
1247 .env
1248 .frontend_client_pool()
1249 .get(worker)
1250 .await
1251 .map_err(MetaError::from)?;
1252
1253 let Some(schema_change) = req.schema_change else {
1254 return Err(Status::invalid_argument(
1255 "schema change message is required",
1256 ));
1257 };
1258
1259 for table_change in schema_change.table_changes {
1260 for c in &table_change.columns {
1261 let c = ColumnCatalog::from(c.clone());
1262
1263 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1264 tracing::warn!(target: "auto_schema_change",
1265 cdc_table_id = table_change.cdc_table_id,
1266 upstraem_ddl = table_change.upstream_ddl,
1267 "invalid column type from cdc table change");
1268 Err(Status::invalid_argument(format!(
1269 "invalid column type: {} from cdc table change, column: {:?}",
1270 column_type, c
1271 )))
1272 };
1273 if c.is_generated() {
1274 return invalid_col_type("generated column", &c);
1275 }
1276 if c.is_rw_sys_column() {
1277 return invalid_col_type("rw system column", &c);
1278 }
1279 if c.is_hidden {
1280 return invalid_col_type("hidden column", &c);
1281 }
1282 }
1283
1284 let tables: Vec<Table> = self
1286 .metadata_manager
1287 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1288 .await?;
1289
1290 for table in tables {
1291 let original_columns: HashSet<(String, DataType)> =
1294 HashSet::from_iter(table.columns.iter().filter_map(|col| {
1295 let col = ColumnCatalog::from(col.clone());
1296 if col.is_generated() || col.is_hidden() {
1297 None
1298 } else {
1299 Some((col.column_desc.name.clone(), col.data_type().clone()))
1300 }
1301 }));
1302
1303 let mut new_columns: HashSet<(String, DataType)> =
1304 HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1305 let col = ColumnCatalog::from(col.clone());
1306 if col.is_generated() || col.is_hidden() {
1307 None
1308 } else {
1309 Some((col.column_desc.name.clone(), col.data_type().clone()))
1310 }
1311 }));
1312
1313 for col in &table.columns {
1316 let col = ColumnCatalog::from(col.clone());
1317 if col.is_connector_additional_column()
1318 && !col.is_hidden()
1319 && !col.is_generated()
1320 {
1321 new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1322 }
1323 }
1324
1325 if !(original_columns.is_subset(&new_columns)
1326 || original_columns.is_superset(&new_columns))
1327 {
1328 tracing::warn!(target: "auto_schema_change",
1329 table_id = %table.id,
1330 cdc_table_id = table.cdc_table_id,
1331 upstraem_ddl = table_change.upstream_ddl,
1332 original_columns = ?original_columns,
1333 new_columns = ?new_columns,
1334 "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1335
1336 let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1337 add_auto_schema_change_fail_event_log(
1338 &self.meta_metrics,
1339 table.id,
1340 table.name.clone(),
1341 table_change.cdc_table_id.clone(),
1342 table_change.upstream_ddl.clone(),
1343 &self.env.event_log_manager_ref(),
1344 fail_info,
1345 );
1346
1347 return Err(Status::invalid_argument(
1348 "New columns should be a subset or superset of the original columns (including hidden columns)",
1349 ));
1350 }
1351 if original_columns == new_columns {
1353 tracing::warn!(target: "auto_schema_change",
1354 table_id = %table.id,
1355 cdc_table_id = table.cdc_table_id,
1356 upstraem_ddl = table_change.upstream_ddl,
1357 original_columns = ?original_columns,
1358 new_columns = ?new_columns,
1359 "No change to columns, skipping the schema change");
1360 continue;
1361 }
1362
1363 let latency_timer = self
1364 .meta_metrics
1365 .auto_schema_change_latency
1366 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1367 .start_timer();
1368 let resp = client
1371 .get_table_replace_plan(GetTableReplacePlanRequest {
1372 database_id: table.database_id,
1373 table_id: table.id,
1374 cdc_table_change: Some(table_change.clone()),
1375 })
1376 .await;
1377
1378 match resp {
1379 Ok(resp) => {
1380 let resp = resp.into_inner();
1381 if let Some(plan) = resp.replace_plan {
1382 let plan = Self::extract_replace_table_info(plan);
1383 plan.streaming_job.table().inspect(|t| {
1384 tracing::info!(
1385 target: "auto_schema_change",
1386 table_id = %t.id,
1387 cdc_table_id = t.cdc_table_id,
1388 upstraem_ddl = table_change.upstream_ddl,
1389 "Start the replace config change")
1390 });
1391 let replace_res = self
1393 .ddl_controller
1394 .run_command(DdlCommand::ReplaceStreamJob(plan))
1395 .await;
1396
1397 match replace_res {
1398 Ok(_) => {
1399 tracing::info!(
1400 target: "auto_schema_change",
1401 table_id = %table.id,
1402 cdc_table_id = table.cdc_table_id,
1403 "Table replaced success");
1404
1405 self.meta_metrics
1406 .auto_schema_change_success_cnt
1407 .with_guarded_label_values(&[
1408 &table.id.to_string(),
1409 &table.name,
1410 ])
1411 .inc();
1412 latency_timer.observe_duration();
1413 }
1414 Err(e) => {
1415 tracing::error!(
1416 target: "auto_schema_change",
1417 error = %e.as_report(),
1418 table_id = %table.id,
1419 cdc_table_id = table.cdc_table_id,
1420 upstraem_ddl = table_change.upstream_ddl,
1421 "failed to replace the table",
1422 );
1423 let fail_info =
1424 format!("failed to replace the table: {}", e.as_report());
1425 add_auto_schema_change_fail_event_log(
1426 &self.meta_metrics,
1427 table.id,
1428 table.name.clone(),
1429 table_change.cdc_table_id.clone(),
1430 table_change.upstream_ddl.clone(),
1431 &self.env.event_log_manager_ref(),
1432 fail_info,
1433 );
1434 }
1435 };
1436 }
1437 }
1438 Err(e) => {
1439 tracing::error!(
1440 target: "auto_schema_change",
1441 error = %e.as_report(),
1442 table_id = %table.id,
1443 cdc_table_id = table.cdc_table_id,
1444 "failed to get replace table plan",
1445 );
1446 let fail_info =
1447 format!("failed to get replace table plan: {}", e.as_report());
1448 add_auto_schema_change_fail_event_log(
1449 &self.meta_metrics,
1450 table.id,
1451 table.name.clone(),
1452 table_change.cdc_table_id.clone(),
1453 table_change.upstream_ddl.clone(),
1454 &self.env.event_log_manager_ref(),
1455 fail_info,
1456 );
1457 }
1458 };
1459 }
1460 }
1461
1462 Ok(Response::new(AutoSchemaChangeResponse {}))
1463 }
1464
1465 async fn alter_swap_rename(
1466 &self,
1467 request: Request<AlterSwapRenameRequest>,
1468 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1469 let req = request.into_inner();
1470
1471 let version = self
1472 .ddl_controller
1473 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1474 .await?;
1475
1476 Ok(Response::new(AlterSwapRenameResponse {
1477 status: None,
1478 version,
1479 }))
1480 }
1481
1482 async fn alter_resource_group(
1483 &self,
1484 request: Request<AlterResourceGroupRequest>,
1485 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1486 let req = request.into_inner();
1487
1488 let table_id = req.get_table_id();
1489 let deferred = req.get_deferred();
1490 let resource_group = req.resource_group;
1491
1492 self.ddl_controller
1493 .reschedule_streaming_job(
1494 table_id.as_job_id(),
1495 ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1496 deferred,
1497 )
1498 .await?;
1499
1500 Ok(Response::new(AlterResourceGroupResponse {}))
1501 }
1502
1503 async fn alter_database_param(
1504 &self,
1505 request: Request<AlterDatabaseParamRequest>,
1506 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1507 let req = request.into_inner();
1508 let database_id = req.database_id;
1509
1510 let param = match req.param.unwrap() {
1511 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1512 AlterDatabaseParam::BarrierIntervalMs(value.value)
1513 }
1514 alter_database_param_request::Param::CheckpointFrequency(value) => {
1515 AlterDatabaseParam::CheckpointFrequency(value.value)
1516 }
1517 };
1518 let version = self
1519 .ddl_controller
1520 .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1521 .await?;
1522
1523 return Ok(Response::new(AlterDatabaseParamResponse {
1524 status: None,
1525 version,
1526 }));
1527 }
1528
1529 async fn compact_iceberg_table(
1530 &self,
1531 request: Request<CompactIcebergTableRequest>,
1532 ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1533 let req = request.into_inner();
1534 let sink_id = req.sink_id;
1535
1536 let task_id = self
1538 .iceberg_compaction_manager
1539 .trigger_manual_compaction(sink_id)
1540 .await
1541 .map_err(|e| {
1542 Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1543 })?;
1544
1545 Ok(Response::new(CompactIcebergTableResponse {
1546 status: None,
1547 task_id,
1548 }))
1549 }
1550
1551 async fn expire_iceberg_table_snapshots(
1552 &self,
1553 request: Request<ExpireIcebergTableSnapshotsRequest>,
1554 ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1555 let req = request.into_inner();
1556 let sink_id = req.sink_id;
1557
1558 self.iceberg_compaction_manager
1560 .check_and_expire_snapshots(sink_id)
1561 .await
1562 .map_err(|e| {
1563 Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1564 })?;
1565
1566 Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1567 status: None,
1568 }))
1569 }
1570
1571 async fn create_iceberg_table(
1572 &self,
1573 request: Request<CreateIcebergTableRequest>,
1574 ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1575 let req = request.into_inner();
1576 let CreateIcebergTableRequest {
1577 table_info,
1578 sink_info,
1579 iceberg_source,
1580 if_not_exists,
1581 } = req;
1582
1583 let PbTableJobInfo {
1585 source,
1586 table,
1587 fragment_graph,
1588 job_type,
1589 } = table_info.unwrap();
1590 let mut table = table.unwrap();
1591 let mut fragment_graph = fragment_graph.unwrap();
1592 let database_id = table.get_database_id();
1593 let schema_id = table.get_schema_id();
1594 let table_name = table.get_name().to_owned();
1595
1596 table.create_type = PbCreateType::Background as _;
1598
1599 let source_rate_limit = if let Some(source) = &source {
1601 for fragment in fragment_graph.fragments.values_mut() {
1602 stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1603 if let NodeBody::Source(source_node) = node
1604 && let Some(inner) = &mut source_node.source_inner
1605 {
1606 inner.rate_limit = Some(0);
1607 }
1608 });
1609 }
1610 Some(source.rate_limit)
1611 } else {
1612 None
1613 };
1614
1615 let stream_job =
1616 StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1617 let _ = self
1618 .ddl_controller
1619 .run_command(DdlCommand::CreateStreamingJob {
1620 stream_job,
1621 fragment_graph,
1622 dependencies: HashSet::new(),
1623 resource_type: Self::default_streaming_job_resource_type(),
1624 if_not_exists,
1625 })
1626 .await?;
1627
1628 let table_catalog = self
1629 .metadata_manager
1630 .catalog_controller
1631 .get_table_catalog_by_name(database_id, schema_id, &table_name)
1632 .await?
1633 .ok_or(Status::not_found("Internal error: table not found"))?;
1634
1635 let PbSinkJobInfo {
1637 sink,
1638 fragment_graph,
1639 } = sink_info.unwrap();
1640 let mut sink = sink.unwrap();
1641
1642 sink.create_type = PbCreateType::Background as _;
1644
1645 let mut fragment_graph = fragment_graph.unwrap();
1646
1647 assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1648 assert!(
1649 risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1650 .is_placeholder()
1651 );
1652 fragment_graph.dependent_table_ids[0] = table_catalog.id;
1653 for fragment in fragment_graph.fragments.values_mut() {
1654 stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1655 NodeBody::StreamScan(scan) => {
1656 scan.table_id = table_catalog.id;
1657 if let Some(table_desc) = &mut scan.table_desc {
1658 assert!(
1659 risingwave_common::catalog::TableId::from(table_desc.table_id)
1660 .is_placeholder()
1661 );
1662 table_desc.table_id = table_catalog.id;
1663 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1664 }
1665 if let Some(table) = &mut scan.arrangement_table {
1666 assert!(
1667 risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1668 );
1669 *table = table_catalog.clone();
1670 }
1671 }
1672 NodeBody::BatchPlan(plan) => {
1673 if let Some(table_desc) = &mut plan.table_desc {
1674 assert!(
1675 risingwave_common::catalog::TableId::from(table_desc.table_id)
1676 .is_placeholder()
1677 );
1678 table_desc.table_id = table_catalog.id;
1679 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1680 }
1681 }
1682 _ => {}
1683 });
1684 }
1685
1686 let table_id = table_catalog.id;
1687 let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1688 let stream_job = StreamingJob::Sink(sink);
1689 let res = self
1690 .ddl_controller
1691 .run_command(DdlCommand::CreateStreamingJob {
1692 stream_job,
1693 fragment_graph,
1694 dependencies,
1695 resource_type: Self::default_streaming_job_resource_type(),
1696 if_not_exists,
1697 })
1698 .await;
1699
1700 if res.is_err() {
1701 let _ = self
1702 .ddl_controller
1703 .run_command(DdlCommand::DropStreamingJob {
1704 job_id: StreamingJobId::Table(None, table_id),
1705 drop_mode: DropMode::Cascade,
1706 })
1707 .await
1708 .inspect_err(|err| {
1709 tracing::error!(error = %err.as_report(),
1710 "Failed to clean up table after iceberg sink creation failure",
1711 );
1712 });
1713 res?;
1714 }
1715
1716 if let Some(source_rate_limit) = source_rate_limit
1718 && source_rate_limit != Some(0)
1719 {
1720 let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1721 table_catalog.optional_associated_source_id.unwrap();
1722 let (jobs, fragments) = self
1723 .metadata_manager
1724 .update_source_rate_limit_by_source_id(source_id, source_rate_limit)
1725 .await?;
1726 let throttle_config = ThrottleConfig {
1727 throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1728 rate_limit: source_rate_limit,
1729 };
1730 let _ = self
1731 .barrier_scheduler
1732 .run_command(
1733 database_id,
1734 Command::Throttle {
1735 jobs,
1736 config: fragments
1737 .into_iter()
1738 .map(|fragment_id| (fragment_id, throttle_config))
1739 .collect(),
1740 },
1741 )
1742 .await?;
1743 }
1744
1745 let iceberg_source = iceberg_source.unwrap();
1747 let res = self
1748 .ddl_controller
1749 .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1750 .await;
1751 if res.is_err() {
1752 let _ = self
1753 .ddl_controller
1754 .run_command(DdlCommand::DropStreamingJob {
1755 job_id: StreamingJobId::Table(None, table_id),
1756 drop_mode: DropMode::Cascade,
1757 })
1758 .await
1759 .inspect_err(|err| {
1760 tracing::error!(
1761 error = %err.as_report(),
1762 "Failed to clean up table after iceberg source creation failure",
1763 );
1764 });
1765 }
1766
1767 Ok(Response::new(CreateIcebergTableResponse {
1768 status: None,
1769 version: res?,
1770 }))
1771 }
1772}
1773
1774fn add_auto_schema_change_fail_event_log(
1775 meta_metrics: &Arc<MetaMetrics>,
1776 table_id: TableId,
1777 table_name: String,
1778 cdc_table_id: String,
1779 upstream_ddl: String,
1780 event_log_manager: &EventLogManagerRef,
1781 fail_info: String,
1782) {
1783 meta_metrics
1784 .auto_schema_change_failure_cnt
1785 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1786 .inc();
1787 let event = event_log::EventAutoSchemaChangeFail {
1788 table_id,
1789 table_name,
1790 cdc_table_id,
1791 upstream_ddl,
1792 fail_info,
1793 };
1794 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1795}