1use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::TableId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{
34 ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy, ThrottleConfig,
35};
36use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
37use risingwave_meta_model::StreamingParallelism;
38use risingwave_pb::catalog::connection::Info as ConnectionInfo;
39use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
40use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
41use risingwave_pb::common::WorkerType;
42use risingwave_pb::common::worker_node::State;
43use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
44use risingwave_pb::ddl_service::ddl_service_server::DdlService;
45use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
46use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
47use risingwave_pb::ddl_service::*;
48use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
49use risingwave_pb::id::SourceId;
50use risingwave_pb::meta::event_log;
51use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
52use risingwave_pb::stream_plan::stream_node::NodeBody;
53use thiserror_ext::AsReport;
54use tokio::sync::oneshot::Sender;
55use tokio::task::JoinHandle;
56use tonic::{Request, Response, Status};
57
58use crate::MetaError;
59use crate::barrier::BarrierManagerRef;
60use crate::manager::sink_coordination::SinkCoordinatorManager;
61use crate::manager::{MetaSrvEnv, StreamingJob};
62use crate::rpc::ddl_controller::{
63 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
64};
65use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
66
67#[derive(Clone)]
68pub struct DdlServiceImpl {
69 env: MetaSrvEnv,
70
71 metadata_manager: MetadataManager,
72 sink_manager: SinkCoordinatorManager,
73 ddl_controller: DdlController,
74 meta_metrics: Arc<MetaMetrics>,
75 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
76 barrier_scheduler: BarrierScheduler,
77}
78
79impl DdlServiceImpl {
80 #[allow(clippy::too_many_arguments)]
81 pub async fn new(
82 env: MetaSrvEnv,
83 metadata_manager: MetadataManager,
84 stream_manager: GlobalStreamManagerRef,
85 source_manager: SourceManagerRef,
86 barrier_manager: BarrierManagerRef,
87 sink_manager: SinkCoordinatorManager,
88 meta_metrics: Arc<MetaMetrics>,
89 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
90 barrier_scheduler: BarrierScheduler,
91 ) -> Self {
92 let ddl_controller = DdlController::new(
93 env.clone(),
94 metadata_manager.clone(),
95 stream_manager,
96 source_manager,
97 barrier_manager,
98 sink_manager.clone(),
99 )
100 .await;
101 Self {
102 env,
103 metadata_manager,
104 sink_manager,
105 ddl_controller,
106 meta_metrics,
107 iceberg_compaction_manager,
108 barrier_scheduler,
109 }
110 }
111
112 fn extract_replace_table_info(
113 ReplaceJobPlan {
114 fragment_graph,
115 replace_job,
116 }: ReplaceJobPlan,
117 ) -> ReplaceStreamJobInfo {
118 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
119 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
120 table,
121 source,
122 job_type,
123 }) => StreamingJob::Table(
124 source,
125 table.unwrap(),
126 TableJobType::try_from(job_type).unwrap(),
127 ),
128 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
129 StreamingJob::Source(source.unwrap())
130 }
131 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
132 table,
133 }) => StreamingJob::MaterializedView(table.unwrap()),
134 };
135
136 ReplaceStreamJobInfo {
137 streaming_job: replace_streaming_job,
138 fragment_graph: fragment_graph.unwrap(),
139 }
140 }
141
142 pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
143 tracing::info!("start migrate legacy table fragments task");
144 let env = self.env.clone();
145 let metadata_manager = self.metadata_manager.clone();
146 let ddl_controller = self.ddl_controller.clone();
147
148 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
149 let join_handle = tokio::spawn(async move {
150 async fn migrate_inner(
151 env: &MetaSrvEnv,
152 metadata_manager: &MetadataManager,
153 ddl_controller: &DdlController,
154 ) -> MetaResult<()> {
155 let tables = metadata_manager
156 .catalog_controller
157 .list_unmigrated_tables()
158 .await?;
159
160 if tables.is_empty() {
161 tracing::info!("no legacy table fragments need migration");
162 return Ok(());
163 }
164
165 let client = {
166 let workers = metadata_manager
167 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
168 .await?;
169 if workers.is_empty() {
170 return Err(anyhow::anyhow!("no active frontend nodes found").into());
171 }
172 let worker = workers.choose(&mut thread_rng()).unwrap();
173 env.frontend_client_pool().get(worker).await?
174 };
175
176 for table in tables {
177 let start = tokio::time::Instant::now();
178 let req = GetTableReplacePlanRequest {
179 database_id: table.database_id,
180 owner: table.owner,
181 table_id: table.id,
182 cdc_table_change: None,
183 };
184 let resp = client
185 .get_table_replace_plan(req)
186 .await
187 .context("failed to get table replace plan from frontend")?;
188
189 let plan = resp.into_inner().replace_plan.unwrap();
190 let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
191 ddl_controller
192 .run_command(DdlCommand::ReplaceStreamJob(replace_info))
193 .await?;
194 tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
195 }
196 tracing::info!("successfully migrated all legacy table fragments");
197
198 Ok(())
199 }
200
201 let migrate_future = async move {
202 let mut attempt = 0;
203 loop {
204 match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
205 Ok(_) => break,
206 Err(e) => {
207 attempt += 1;
208 tracing::error!(
209 "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
210 e.as_report(),
211 attempt
212 );
213 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
214 }
215 }
216 }
217 };
218
219 select(pin!(migrate_future), shutdown_rx).await;
220 });
221
222 (join_handle, shutdown_tx)
223 }
224}
225
226#[async_trait::async_trait]
227impl DdlService for DdlServiceImpl {
228 async fn create_database(
229 &self,
230 request: Request<CreateDatabaseRequest>,
231 ) -> Result<Response<CreateDatabaseResponse>, Status> {
232 let req = request.into_inner();
233 let database = req.get_db()?.clone();
234 let version = self
235 .ddl_controller
236 .run_command(DdlCommand::CreateDatabase(database))
237 .await?;
238
239 Ok(Response::new(CreateDatabaseResponse {
240 status: None,
241 version,
242 }))
243 }
244
245 async fn drop_database(
246 &self,
247 request: Request<DropDatabaseRequest>,
248 ) -> Result<Response<DropDatabaseResponse>, Status> {
249 let req = request.into_inner();
250 let database_id = req.get_database_id();
251
252 let version = self
253 .ddl_controller
254 .run_command(DdlCommand::DropDatabase(database_id))
255 .await?;
256
257 Ok(Response::new(DropDatabaseResponse {
258 status: None,
259 version,
260 }))
261 }
262
263 async fn create_secret(
264 &self,
265 request: Request<CreateSecretRequest>,
266 ) -> Result<Response<CreateSecretResponse>, Status> {
267 let req = request.into_inner();
268 let pb_secret = Secret {
269 id: 0.into(),
270 name: req.get_name().clone(),
271 database_id: req.get_database_id(),
272 value: req.get_value().clone(),
273 owner: req.get_owner_id(),
274 schema_id: req.get_schema_id(),
275 };
276 let version = self
277 .ddl_controller
278 .run_command(DdlCommand::CreateSecret(pb_secret))
279 .await?;
280
281 Ok(Response::new(CreateSecretResponse { version }))
282 }
283
284 async fn drop_secret(
285 &self,
286 request: Request<DropSecretRequest>,
287 ) -> Result<Response<DropSecretResponse>, Status> {
288 let req = request.into_inner();
289 let secret_id = req.get_secret_id();
290 let version = self
291 .ddl_controller
292 .run_command(DdlCommand::DropSecret(secret_id))
293 .await?;
294
295 Ok(Response::new(DropSecretResponse { version }))
296 }
297
298 async fn alter_secret(
299 &self,
300 request: Request<AlterSecretRequest>,
301 ) -> Result<Response<AlterSecretResponse>, Status> {
302 let req = request.into_inner();
303 let pb_secret = Secret {
304 id: req.get_secret_id(),
305 name: req.get_name().clone(),
306 database_id: req.get_database_id(),
307 value: req.get_value().clone(),
308 owner: req.get_owner_id(),
309 schema_id: req.get_schema_id(),
310 };
311 let version = self
312 .ddl_controller
313 .run_command(DdlCommand::AlterSecret(pb_secret))
314 .await?;
315
316 Ok(Response::new(AlterSecretResponse { version }))
317 }
318
319 async fn create_schema(
320 &self,
321 request: Request<CreateSchemaRequest>,
322 ) -> Result<Response<CreateSchemaResponse>, Status> {
323 let req = request.into_inner();
324 let schema = req.get_schema()?.clone();
325 let version = self
326 .ddl_controller
327 .run_command(DdlCommand::CreateSchema(schema))
328 .await?;
329
330 Ok(Response::new(CreateSchemaResponse {
331 status: None,
332 version,
333 }))
334 }
335
336 async fn drop_schema(
337 &self,
338 request: Request<DropSchemaRequest>,
339 ) -> Result<Response<DropSchemaResponse>, Status> {
340 let req = request.into_inner();
341 let schema_id = req.get_schema_id();
342 let drop_mode = DropMode::from_request_setting(req.cascade);
343 let version = self
344 .ddl_controller
345 .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
346 .await?;
347 Ok(Response::new(DropSchemaResponse {
348 status: None,
349 version,
350 }))
351 }
352
353 async fn create_source(
354 &self,
355 request: Request<CreateSourceRequest>,
356 ) -> Result<Response<CreateSourceResponse>, Status> {
357 let req = request.into_inner();
358 let source = req.get_source()?.clone();
359
360 match req.fragment_graph {
361 None => {
362 let version = self
363 .ddl_controller
364 .run_command(DdlCommand::CreateNonSharedSource(source))
365 .await?;
366 Ok(Response::new(CreateSourceResponse {
367 status: None,
368 version,
369 }))
370 }
371 Some(fragment_graph) => {
372 let stream_job = StreamingJob::Source(source);
374 let version = self
375 .ddl_controller
376 .run_command(DdlCommand::CreateStreamingJob {
377 stream_job,
378 fragment_graph,
379 dependencies: HashSet::new(),
380 specific_resource_group: None,
381 if_not_exists: req.if_not_exists,
382 })
383 .await?;
384 Ok(Response::new(CreateSourceResponse {
385 status: None,
386 version,
387 }))
388 }
389 }
390 }
391
392 async fn drop_source(
393 &self,
394 request: Request<DropSourceRequest>,
395 ) -> Result<Response<DropSourceResponse>, Status> {
396 let request = request.into_inner();
397 let source_id = request.source_id;
398 let drop_mode = DropMode::from_request_setting(request.cascade);
399 let version = self
400 .ddl_controller
401 .run_command(DdlCommand::DropSource(source_id, drop_mode))
402 .await?;
403
404 Ok(Response::new(DropSourceResponse {
405 status: None,
406 version,
407 }))
408 }
409
410 async fn create_sink(
411 &self,
412 request: Request<CreateSinkRequest>,
413 ) -> Result<Response<CreateSinkResponse>, Status> {
414 self.env.idle_manager().record_activity();
415
416 let req = request.into_inner();
417
418 let sink = req.get_sink()?.clone();
419 let fragment_graph = req.get_fragment_graph()?.clone();
420 let dependencies = req.get_dependencies().iter().copied().collect();
421
422 let stream_job = StreamingJob::Sink(sink);
423
424 let command = DdlCommand::CreateStreamingJob {
425 stream_job,
426 fragment_graph,
427 dependencies,
428 specific_resource_group: None,
429 if_not_exists: req.if_not_exists,
430 };
431
432 let version = self.ddl_controller.run_command(command).await?;
433
434 Ok(Response::new(CreateSinkResponse {
435 status: None,
436 version,
437 }))
438 }
439
440 async fn drop_sink(
441 &self,
442 request: Request<DropSinkRequest>,
443 ) -> Result<Response<DropSinkResponse>, Status> {
444 let request = request.into_inner();
445 let sink_id = request.sink_id;
446 let drop_mode = DropMode::from_request_setting(request.cascade);
447
448 let command = DdlCommand::DropStreamingJob {
449 job_id: StreamingJobId::Sink(sink_id),
450 drop_mode,
451 };
452
453 let version = self.ddl_controller.run_command(command).await?;
454
455 self.sink_manager
456 .stop_sink_coordinator(vec![SinkId::from(sink_id)])
457 .await;
458
459 Ok(Response::new(DropSinkResponse {
460 status: None,
461 version,
462 }))
463 }
464
465 async fn create_subscription(
466 &self,
467 request: Request<CreateSubscriptionRequest>,
468 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
469 self.env.idle_manager().record_activity();
470
471 let req = request.into_inner();
472
473 let subscription = req.get_subscription()?.clone();
474 let command = DdlCommand::CreateSubscription(subscription);
475
476 let version = self.ddl_controller.run_command(command).await?;
477
478 Ok(Response::new(CreateSubscriptionResponse {
479 status: None,
480 version,
481 }))
482 }
483
484 async fn drop_subscription(
485 &self,
486 request: Request<DropSubscriptionRequest>,
487 ) -> Result<Response<DropSubscriptionResponse>, Status> {
488 let request = request.into_inner();
489 let subscription_id = request.subscription_id;
490 let drop_mode = DropMode::from_request_setting(request.cascade);
491
492 let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
493
494 let version = self.ddl_controller.run_command(command).await?;
495
496 Ok(Response::new(DropSubscriptionResponse {
497 status: None,
498 version,
499 }))
500 }
501
502 async fn create_materialized_view(
503 &self,
504 request: Request<CreateMaterializedViewRequest>,
505 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
506 self.env.idle_manager().record_activity();
507
508 let req = request.into_inner();
509 let mview = req.get_materialized_view()?.clone();
510 let specific_resource_group = req.specific_resource_group.clone();
511 let fragment_graph = req.get_fragment_graph()?.clone();
512 let dependencies = req.get_dependencies().iter().copied().collect();
513
514 let stream_job = StreamingJob::MaterializedView(mview);
515 let version = self
516 .ddl_controller
517 .run_command(DdlCommand::CreateStreamingJob {
518 stream_job,
519 fragment_graph,
520 dependencies,
521 specific_resource_group,
522 if_not_exists: req.if_not_exists,
523 })
524 .await?;
525
526 Ok(Response::new(CreateMaterializedViewResponse {
527 status: None,
528 version,
529 }))
530 }
531
532 async fn drop_materialized_view(
533 &self,
534 request: Request<DropMaterializedViewRequest>,
535 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
536 self.env.idle_manager().record_activity();
537
538 let request = request.into_inner();
539 let table_id = request.table_id;
540 let drop_mode = DropMode::from_request_setting(request.cascade);
541
542 let version = self
543 .ddl_controller
544 .run_command(DdlCommand::DropStreamingJob {
545 job_id: StreamingJobId::MaterializedView(table_id),
546 drop_mode,
547 })
548 .await?;
549
550 Ok(Response::new(DropMaterializedViewResponse {
551 status: None,
552 version,
553 }))
554 }
555
556 async fn create_index(
557 &self,
558 request: Request<CreateIndexRequest>,
559 ) -> Result<Response<CreateIndexResponse>, Status> {
560 self.env.idle_manager().record_activity();
561
562 let req = request.into_inner();
563 let index = req.get_index()?.clone();
564 let index_table = req.get_index_table()?.clone();
565 let fragment_graph = req.get_fragment_graph()?.clone();
566
567 let stream_job = StreamingJob::Index(index, index_table);
568 let version = self
569 .ddl_controller
570 .run_command(DdlCommand::CreateStreamingJob {
571 stream_job,
572 fragment_graph,
573 dependencies: HashSet::new(),
574 specific_resource_group: None,
575 if_not_exists: req.if_not_exists,
576 })
577 .await?;
578
579 Ok(Response::new(CreateIndexResponse {
580 status: None,
581 version,
582 }))
583 }
584
585 async fn drop_index(
586 &self,
587 request: Request<DropIndexRequest>,
588 ) -> Result<Response<DropIndexResponse>, Status> {
589 self.env.idle_manager().record_activity();
590
591 let request = request.into_inner();
592 let index_id = request.index_id;
593 let drop_mode = DropMode::from_request_setting(request.cascade);
594 let version = self
595 .ddl_controller
596 .run_command(DdlCommand::DropStreamingJob {
597 job_id: StreamingJobId::Index(index_id),
598 drop_mode,
599 })
600 .await?;
601
602 Ok(Response::new(DropIndexResponse {
603 status: None,
604 version,
605 }))
606 }
607
608 async fn create_function(
609 &self,
610 request: Request<CreateFunctionRequest>,
611 ) -> Result<Response<CreateFunctionResponse>, Status> {
612 let req = request.into_inner();
613 let function = req.get_function()?.clone();
614
615 let version = self
616 .ddl_controller
617 .run_command(DdlCommand::CreateFunction(function))
618 .await?;
619
620 Ok(Response::new(CreateFunctionResponse {
621 status: None,
622 version,
623 }))
624 }
625
626 async fn drop_function(
627 &self,
628 request: Request<DropFunctionRequest>,
629 ) -> Result<Response<DropFunctionResponse>, Status> {
630 let request = request.into_inner();
631
632 let version = self
633 .ddl_controller
634 .run_command(DdlCommand::DropFunction(
635 request.function_id,
636 DropMode::from_request_setting(request.cascade),
637 ))
638 .await?;
639
640 Ok(Response::new(DropFunctionResponse {
641 status: None,
642 version,
643 }))
644 }
645
646 async fn create_table(
647 &self,
648 request: Request<CreateTableRequest>,
649 ) -> Result<Response<CreateTableResponse>, Status> {
650 let request = request.into_inner();
651 let job_type = request.get_job_type().unwrap_or_default();
652 let dependencies = request.get_dependencies().iter().copied().collect();
653 let source = request.source;
654 let mview = request.materialized_view.unwrap();
655 let fragment_graph = request.fragment_graph.unwrap();
656
657 let stream_job = StreamingJob::Table(source, mview, job_type);
658 let version = self
659 .ddl_controller
660 .run_command(DdlCommand::CreateStreamingJob {
661 stream_job,
662 fragment_graph,
663 dependencies,
664 specific_resource_group: None,
665 if_not_exists: request.if_not_exists,
666 })
667 .await?;
668
669 Ok(Response::new(CreateTableResponse {
670 status: None,
671 version,
672 }))
673 }
674
675 async fn drop_table(
676 &self,
677 request: Request<DropTableRequest>,
678 ) -> Result<Response<DropTableResponse>, Status> {
679 let request = request.into_inner();
680 let source_id = request.source_id;
681 let table_id = request.table_id;
682
683 let drop_mode = DropMode::from_request_setting(request.cascade);
684 let version = self
685 .ddl_controller
686 .run_command(DdlCommand::DropStreamingJob {
687 job_id: StreamingJobId::Table(
688 source_id.map(|PbSourceId::Id(id)| id.into()),
689 table_id,
690 ),
691 drop_mode,
692 })
693 .await?;
694
695 Ok(Response::new(DropTableResponse {
696 status: None,
697 version,
698 }))
699 }
700
701 async fn create_view(
702 &self,
703 request: Request<CreateViewRequest>,
704 ) -> Result<Response<CreateViewResponse>, Status> {
705 let req = request.into_inner();
706 let view = req.get_view()?.clone();
707 let dependencies = req
708 .get_dependencies()
709 .iter()
710 .copied()
711 .collect::<HashSet<_>>();
712
713 let version = self
714 .ddl_controller
715 .run_command(DdlCommand::CreateView(view, dependencies))
716 .await?;
717
718 Ok(Response::new(CreateViewResponse {
719 status: None,
720 version,
721 }))
722 }
723
724 async fn drop_view(
725 &self,
726 request: Request<DropViewRequest>,
727 ) -> Result<Response<DropViewResponse>, Status> {
728 let request = request.into_inner();
729 let view_id = request.get_view_id();
730 let drop_mode = DropMode::from_request_setting(request.cascade);
731 let version = self
732 .ddl_controller
733 .run_command(DdlCommand::DropView(view_id, drop_mode))
734 .await?;
735 Ok(Response::new(DropViewResponse {
736 status: None,
737 version,
738 }))
739 }
740
741 async fn risectl_list_state_tables(
742 &self,
743 _request: Request<RisectlListStateTablesRequest>,
744 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
745 let tables = self
746 .metadata_manager
747 .catalog_controller
748 .list_all_state_tables()
749 .await?;
750 Ok(Response::new(RisectlListStateTablesResponse { tables }))
751 }
752
753 async fn replace_job_plan(
754 &self,
755 request: Request<ReplaceJobPlanRequest>,
756 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
757 let req = request.into_inner().get_plan().cloned()?;
758
759 let version = self
760 .ddl_controller
761 .run_command(DdlCommand::ReplaceStreamJob(
762 Self::extract_replace_table_info(req),
763 ))
764 .await?;
765
766 Ok(Response::new(ReplaceJobPlanResponse {
767 status: None,
768 version,
769 }))
770 }
771
772 async fn get_table(
773 &self,
774 request: Request<GetTableRequest>,
775 ) -> Result<Response<GetTableResponse>, Status> {
776 let req = request.into_inner();
777 let table = self
778 .metadata_manager
779 .catalog_controller
780 .get_table_by_name(&req.database_name, &req.table_name)
781 .await?;
782
783 Ok(Response::new(GetTableResponse { table }))
784 }
785
786 async fn alter_name(
787 &self,
788 request: Request<AlterNameRequest>,
789 ) -> Result<Response<AlterNameResponse>, Status> {
790 let AlterNameRequest { object, new_name } = request.into_inner();
791 let version = self
792 .ddl_controller
793 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
794 .await?;
795 Ok(Response::new(AlterNameResponse {
796 status: None,
797 version,
798 }))
799 }
800
801 async fn alter_source(
803 &self,
804 request: Request<AlterSourceRequest>,
805 ) -> Result<Response<AlterSourceResponse>, Status> {
806 let AlterSourceRequest { source } = request.into_inner();
807 let version = self
808 .ddl_controller
809 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
810 .await?;
811 Ok(Response::new(AlterSourceResponse {
812 status: None,
813 version,
814 }))
815 }
816
817 async fn alter_owner(
818 &self,
819 request: Request<AlterOwnerRequest>,
820 ) -> Result<Response<AlterOwnerResponse>, Status> {
821 let AlterOwnerRequest { object, owner_id } = request.into_inner();
822 let version = self
823 .ddl_controller
824 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
825 .await?;
826 Ok(Response::new(AlterOwnerResponse {
827 status: None,
828 version,
829 }))
830 }
831
832 async fn alter_set_schema(
833 &self,
834 request: Request<AlterSetSchemaRequest>,
835 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
836 let AlterSetSchemaRequest {
837 object,
838 new_schema_id,
839 } = request.into_inner();
840 let version = self
841 .ddl_controller
842 .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
843 .await?;
844 Ok(Response::new(AlterSetSchemaResponse {
845 status: None,
846 version,
847 }))
848 }
849
850 async fn get_ddl_progress(
851 &self,
852 _request: Request<GetDdlProgressRequest>,
853 ) -> Result<Response<GetDdlProgressResponse>, Status> {
854 Ok(Response::new(GetDdlProgressResponse {
855 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
856 }))
857 }
858
859 async fn create_connection(
860 &self,
861 request: Request<CreateConnectionRequest>,
862 ) -> Result<Response<CreateConnectionResponse>, Status> {
863 let req = request.into_inner();
864 if req.payload.is_none() {
865 return Err(Status::invalid_argument("request is empty"));
866 }
867
868 match req.payload.unwrap() {
869 create_connection_request::Payload::PrivateLink(_) => {
870 panic!("Private Link Connection has been deprecated")
871 }
872 create_connection_request::Payload::ConnectionParams(params) => {
873 let pb_connection = Connection {
874 id: 0.into(),
875 schema_id: req.schema_id,
876 database_id: req.database_id,
877 name: req.name,
878 info: Some(ConnectionInfo::ConnectionParams(params)),
879 owner: req.owner_id,
880 };
881 let version = self
882 .ddl_controller
883 .run_command(DdlCommand::CreateConnection(pb_connection))
884 .await?;
885 Ok(Response::new(CreateConnectionResponse { version }))
886 }
887 }
888 }
889
890 async fn list_connections(
891 &self,
892 _request: Request<ListConnectionsRequest>,
893 ) -> Result<Response<ListConnectionsResponse>, Status> {
894 let conns = self
895 .metadata_manager
896 .catalog_controller
897 .list_connections()
898 .await?;
899
900 Ok(Response::new(ListConnectionsResponse {
901 connections: conns,
902 }))
903 }
904
905 async fn drop_connection(
906 &self,
907 request: Request<DropConnectionRequest>,
908 ) -> Result<Response<DropConnectionResponse>, Status> {
909 let req = request.into_inner();
910 let drop_mode = DropMode::from_request_setting(req.cascade);
911
912 let version = self
913 .ddl_controller
914 .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
915 .await?;
916
917 Ok(Response::new(DropConnectionResponse {
918 status: None,
919 version,
920 }))
921 }
922
923 async fn comment_on(
924 &self,
925 request: Request<CommentOnRequest>,
926 ) -> Result<Response<CommentOnResponse>, Status> {
927 let req = request.into_inner();
928 let comment = req.get_comment()?.clone();
929
930 let version = self
931 .ddl_controller
932 .run_command(DdlCommand::CommentOn(Comment {
933 table_id: comment.table_id,
934 schema_id: comment.schema_id,
935 database_id: comment.database_id,
936 column_index: comment.column_index,
937 description: comment.description,
938 }))
939 .await?;
940
941 Ok(Response::new(CommentOnResponse {
942 status: None,
943 version,
944 }))
945 }
946
947 async fn get_tables(
948 &self,
949 request: Request<GetTablesRequest>,
950 ) -> Result<Response<GetTablesResponse>, Status> {
951 let GetTablesRequest {
952 table_ids,
953 include_dropped_tables,
954 } = request.into_inner();
955 let ret = self
956 .metadata_manager
957 .catalog_controller
958 .get_table_by_ids(table_ids, include_dropped_tables)
959 .await?;
960
961 let mut tables = HashMap::default();
962 for table in ret {
963 tables.insert(table.id, table);
964 }
965 Ok(Response::new(GetTablesResponse { tables }))
966 }
967
968 async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
969 self.ddl_controller.wait().await?;
970 Ok(Response::new(WaitResponse {}))
971 }
972
973 async fn alter_cdc_table_backfill_parallelism(
974 &self,
975 request: Request<AlterCdcTableBackfillParallelismRequest>,
976 ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
977 let req = request.into_inner();
978 let job_id = req.get_table_id();
979 let parallelism = *req.get_parallelism()?;
980
981 let table_parallelism = ModelTableParallelism::from(parallelism);
982 let streaming_parallelism = match table_parallelism {
983 ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
984 _ => bail_invalid_parameter!(
985 "CDC table backfill parallelism must be set to a fixed value"
986 ),
987 };
988
989 self.ddl_controller
990 .reschedule_cdc_table_backfill(
991 job_id,
992 ReschedulePolicy::Parallelism(ParallelismPolicy {
993 parallelism: streaming_parallelism,
994 }),
995 )
996 .await?;
997 Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
998 }
999
1000 async fn alter_parallelism(
1001 &self,
1002 request: Request<AlterParallelismRequest>,
1003 ) -> Result<Response<AlterParallelismResponse>, Status> {
1004 let req = request.into_inner();
1005
1006 let job_id = req.get_table_id();
1007 let parallelism = *req.get_parallelism()?;
1008 let deferred = req.get_deferred();
1009
1010 let parallelism = match parallelism.get_parallelism()? {
1011 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1012 StreamingParallelism::Fixed(*parallelism as _)
1013 }
1014 Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1015 _ => bail_unavailable!(),
1016 };
1017
1018 self.ddl_controller
1019 .reschedule_streaming_job(
1020 job_id,
1021 ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1022 deferred,
1023 )
1024 .await?;
1025
1026 Ok(Response::new(AlterParallelismResponse {}))
1027 }
1028
1029 async fn alter_fragment_parallelism(
1030 &self,
1031 request: Request<AlterFragmentParallelismRequest>,
1032 ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1033 let req = request.into_inner();
1034
1035 let fragment_ids = req.fragment_ids;
1036 if fragment_ids.is_empty() {
1037 return Err(Status::invalid_argument(
1038 "at least one fragment id must be provided",
1039 ));
1040 }
1041
1042 let parallelism = match req.parallelism {
1043 Some(parallelism) => {
1044 let streaming_parallelism = match parallelism.get_parallelism()? {
1045 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1046 StreamingParallelism::Fixed(*parallelism as _)
1047 }
1048 Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1049 StreamingParallelism::Adaptive
1050 }
1051 _ => bail_unavailable!(),
1052 };
1053 Some(streaming_parallelism)
1054 }
1055 None => None,
1056 };
1057
1058 let fragment_targets = fragment_ids
1059 .into_iter()
1060 .map(|fragment_id| (fragment_id, parallelism.clone()))
1061 .collect();
1062
1063 self.ddl_controller
1064 .reschedule_fragments(fragment_targets)
1065 .await?;
1066
1067 Ok(Response::new(AlterFragmentParallelismResponse {}))
1068 }
1069
1070 async fn alter_streaming_job_config(
1071 &self,
1072 request: Request<AlterStreamingJobConfigRequest>,
1073 ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1074 let AlterStreamingJobConfigRequest {
1075 job_id,
1076 entries_to_add,
1077 keys_to_remove,
1078 } = request.into_inner();
1079
1080 self.ddl_controller
1081 .run_command(DdlCommand::AlterStreamingJobConfig(
1082 job_id,
1083 entries_to_add,
1084 keys_to_remove,
1085 ))
1086 .await?;
1087
1088 Ok(Response::new(AlterStreamingJobConfigResponse {}))
1089 }
1090
1091 async fn auto_schema_change(
1094 &self,
1095 request: Request<AutoSchemaChangeRequest>,
1096 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1097 let req = request.into_inner();
1098
1099 let workers = self
1101 .metadata_manager
1102 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1103 .await?;
1104 let worker = workers
1105 .choose(&mut thread_rng())
1106 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1107
1108 let client = self
1109 .env
1110 .frontend_client_pool()
1111 .get(worker)
1112 .await
1113 .map_err(MetaError::from)?;
1114
1115 let Some(schema_change) = req.schema_change else {
1116 return Err(Status::invalid_argument(
1117 "schema change message is required",
1118 ));
1119 };
1120
1121 for table_change in schema_change.table_changes {
1122 for c in &table_change.columns {
1123 let c = ColumnCatalog::from(c.clone());
1124
1125 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1126 tracing::warn!(target: "auto_schema_change",
1127 cdc_table_id = table_change.cdc_table_id,
1128 upstraem_ddl = table_change.upstream_ddl,
1129 "invalid column type from cdc table change");
1130 Err(Status::invalid_argument(format!(
1131 "invalid column type: {} from cdc table change, column: {:?}",
1132 column_type, c
1133 )))
1134 };
1135 if c.is_generated() {
1136 return invalid_col_type("generated column", &c);
1137 }
1138 if c.is_rw_sys_column() {
1139 return invalid_col_type("rw system column", &c);
1140 }
1141 if c.is_hidden {
1142 return invalid_col_type("hidden column", &c);
1143 }
1144 }
1145
1146 let tables: Vec<Table> = self
1148 .metadata_manager
1149 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1150 .await?;
1151
1152 for table in tables {
1153 let original_columns: HashSet<(String, DataType)> =
1156 HashSet::from_iter(table.columns.iter().filter_map(|col| {
1157 let col = ColumnCatalog::from(col.clone());
1158 if col.is_generated() || col.is_hidden() {
1159 None
1160 } else {
1161 Some((col.column_desc.name.clone(), col.data_type().clone()))
1162 }
1163 }));
1164
1165 let mut new_columns: HashSet<(String, DataType)> =
1166 HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1167 let col = ColumnCatalog::from(col.clone());
1168 if col.is_generated() || col.is_hidden() {
1169 None
1170 } else {
1171 Some((col.column_desc.name.clone(), col.data_type().clone()))
1172 }
1173 }));
1174
1175 for col in &table.columns {
1178 let col = ColumnCatalog::from(col.clone());
1179 if col.is_connector_additional_column()
1180 && !col.is_hidden()
1181 && !col.is_generated()
1182 {
1183 new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1184 }
1185 }
1186
1187 if !(original_columns.is_subset(&new_columns)
1188 || original_columns.is_superset(&new_columns))
1189 {
1190 tracing::warn!(target: "auto_schema_change",
1191 table_id = %table.id,
1192 cdc_table_id = table.cdc_table_id,
1193 upstraem_ddl = table_change.upstream_ddl,
1194 original_columns = ?original_columns,
1195 new_columns = ?new_columns,
1196 "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1197
1198 let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1199 add_auto_schema_change_fail_event_log(
1200 &self.meta_metrics,
1201 table.id,
1202 table.name.clone(),
1203 table_change.cdc_table_id.clone(),
1204 table_change.upstream_ddl.clone(),
1205 &self.env.event_log_manager_ref(),
1206 fail_info,
1207 );
1208
1209 return Err(Status::invalid_argument(
1210 "New columns should be a subset or superset of the original columns (including hidden columns)",
1211 ));
1212 }
1213 if original_columns == new_columns {
1215 tracing::warn!(target: "auto_schema_change",
1216 table_id = %table.id,
1217 cdc_table_id = table.cdc_table_id,
1218 upstraem_ddl = table_change.upstream_ddl,
1219 original_columns = ?original_columns,
1220 new_columns = ?new_columns,
1221 "No change to columns, skipping the schema change");
1222 continue;
1223 }
1224
1225 let latency_timer = self
1226 .meta_metrics
1227 .auto_schema_change_latency
1228 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1229 .start_timer();
1230 let resp = client
1233 .get_table_replace_plan(GetTableReplacePlanRequest {
1234 database_id: table.database_id,
1235 owner: table.owner,
1236 table_id: table.id,
1237 cdc_table_change: Some(table_change.clone()),
1238 })
1239 .await;
1240
1241 match resp {
1242 Ok(resp) => {
1243 let resp = resp.into_inner();
1244 if let Some(plan) = resp.replace_plan {
1245 let plan = Self::extract_replace_table_info(plan);
1246 plan.streaming_job.table().inspect(|t| {
1247 tracing::info!(
1248 target: "auto_schema_change",
1249 table_id = %t.id,
1250 cdc_table_id = t.cdc_table_id,
1251 upstraem_ddl = table_change.upstream_ddl,
1252 "Start the replace config change")
1253 });
1254 let replace_res = self
1256 .ddl_controller
1257 .run_command(DdlCommand::ReplaceStreamJob(plan))
1258 .await;
1259
1260 match replace_res {
1261 Ok(_) => {
1262 tracing::info!(
1263 target: "auto_schema_change",
1264 table_id = %table.id,
1265 cdc_table_id = table.cdc_table_id,
1266 "Table replaced success");
1267
1268 self.meta_metrics
1269 .auto_schema_change_success_cnt
1270 .with_guarded_label_values(&[
1271 &table.id.to_string(),
1272 &table.name,
1273 ])
1274 .inc();
1275 latency_timer.observe_duration();
1276 }
1277 Err(e) => {
1278 tracing::error!(
1279 target: "auto_schema_change",
1280 error = %e.as_report(),
1281 table_id = %table.id,
1282 cdc_table_id = table.cdc_table_id,
1283 upstraem_ddl = table_change.upstream_ddl,
1284 "failed to replace the table",
1285 );
1286 let fail_info =
1287 format!("failed to replace the table: {}", e.as_report());
1288 add_auto_schema_change_fail_event_log(
1289 &self.meta_metrics,
1290 table.id,
1291 table.name.clone(),
1292 table_change.cdc_table_id.clone(),
1293 table_change.upstream_ddl.clone(),
1294 &self.env.event_log_manager_ref(),
1295 fail_info,
1296 );
1297 }
1298 };
1299 }
1300 }
1301 Err(e) => {
1302 tracing::error!(
1303 target: "auto_schema_change",
1304 error = %e.as_report(),
1305 table_id = %table.id,
1306 cdc_table_id = table.cdc_table_id,
1307 "failed to get replace table plan",
1308 );
1309 let fail_info =
1310 format!("failed to get replace table plan: {}", e.as_report());
1311 add_auto_schema_change_fail_event_log(
1312 &self.meta_metrics,
1313 table.id,
1314 table.name.clone(),
1315 table_change.cdc_table_id.clone(),
1316 table_change.upstream_ddl.clone(),
1317 &self.env.event_log_manager_ref(),
1318 fail_info,
1319 );
1320 }
1321 };
1322 }
1323 }
1324
1325 Ok(Response::new(AutoSchemaChangeResponse {}))
1326 }
1327
1328 async fn alter_swap_rename(
1329 &self,
1330 request: Request<AlterSwapRenameRequest>,
1331 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1332 let req = request.into_inner();
1333
1334 let version = self
1335 .ddl_controller
1336 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1337 .await?;
1338
1339 Ok(Response::new(AlterSwapRenameResponse {
1340 status: None,
1341 version,
1342 }))
1343 }
1344
1345 async fn alter_resource_group(
1346 &self,
1347 request: Request<AlterResourceGroupRequest>,
1348 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1349 let req = request.into_inner();
1350
1351 let table_id = req.get_table_id();
1352 let deferred = req.get_deferred();
1353 let resource_group = req.resource_group;
1354
1355 self.ddl_controller
1356 .reschedule_streaming_job(
1357 table_id.as_job_id(),
1358 ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1359 deferred,
1360 )
1361 .await?;
1362
1363 Ok(Response::new(AlterResourceGroupResponse {}))
1364 }
1365
1366 async fn alter_database_param(
1367 &self,
1368 request: Request<AlterDatabaseParamRequest>,
1369 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1370 let req = request.into_inner();
1371 let database_id = req.database_id;
1372
1373 let param = match req.param.unwrap() {
1374 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1375 AlterDatabaseParam::BarrierIntervalMs(value.value)
1376 }
1377 alter_database_param_request::Param::CheckpointFrequency(value) => {
1378 AlterDatabaseParam::CheckpointFrequency(value.value)
1379 }
1380 };
1381 let version = self
1382 .ddl_controller
1383 .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1384 .await?;
1385
1386 return Ok(Response::new(AlterDatabaseParamResponse {
1387 status: None,
1388 version,
1389 }));
1390 }
1391
1392 async fn compact_iceberg_table(
1393 &self,
1394 request: Request<CompactIcebergTableRequest>,
1395 ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1396 let req = request.into_inner();
1397 let sink_id = req.sink_id;
1398
1399 let task_id = self
1401 .iceberg_compaction_manager
1402 .trigger_manual_compaction(sink_id)
1403 .await
1404 .map_err(|e| {
1405 Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1406 })?;
1407
1408 Ok(Response::new(CompactIcebergTableResponse {
1409 status: None,
1410 task_id,
1411 }))
1412 }
1413
1414 async fn expire_iceberg_table_snapshots(
1415 &self,
1416 request: Request<ExpireIcebergTableSnapshotsRequest>,
1417 ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1418 let req = request.into_inner();
1419 let sink_id = req.sink_id;
1420
1421 self.iceberg_compaction_manager
1423 .check_and_expire_snapshots(sink_id)
1424 .await
1425 .map_err(|e| {
1426 Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1427 })?;
1428
1429 Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1430 status: None,
1431 }))
1432 }
1433
1434 async fn create_iceberg_table(
1435 &self,
1436 request: Request<CreateIcebergTableRequest>,
1437 ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1438 let req = request.into_inner();
1439 let CreateIcebergTableRequest {
1440 table_info,
1441 sink_info,
1442 iceberg_source,
1443 if_not_exists,
1444 } = req;
1445
1446 let PbTableJobInfo {
1448 source,
1449 table,
1450 fragment_graph,
1451 job_type,
1452 } = table_info.unwrap();
1453 let mut table = table.unwrap();
1454 let mut fragment_graph = fragment_graph.unwrap();
1455 let database_id = table.get_database_id();
1456 let schema_id = table.get_schema_id();
1457 let table_name = table.get_name().to_owned();
1458
1459 table.create_type = PbCreateType::Background as _;
1461
1462 let source_rate_limit = if let Some(source) = &source {
1464 for fragment in fragment_graph.fragments.values_mut() {
1465 stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1466 if let NodeBody::Source(source_node) = node
1467 && let Some(inner) = &mut source_node.source_inner
1468 {
1469 inner.rate_limit = Some(0);
1470 }
1471 });
1472 }
1473 Some(source.rate_limit)
1474 } else {
1475 None
1476 };
1477
1478 let stream_job =
1479 StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1480 let _ = self
1481 .ddl_controller
1482 .run_command(DdlCommand::CreateStreamingJob {
1483 stream_job,
1484 fragment_graph,
1485 dependencies: HashSet::new(),
1486 specific_resource_group: None,
1487 if_not_exists,
1488 })
1489 .await?;
1490
1491 let table_catalog = self
1492 .metadata_manager
1493 .catalog_controller
1494 .get_table_catalog_by_name(database_id, schema_id, &table_name)
1495 .await?
1496 .ok_or(Status::not_found("Internal error: table not found"))?;
1497
1498 let PbSinkJobInfo {
1500 sink,
1501 fragment_graph,
1502 } = sink_info.unwrap();
1503 let mut sink = sink.unwrap();
1504
1505 sink.create_type = PbCreateType::Background as _;
1507
1508 let mut fragment_graph = fragment_graph.unwrap();
1509
1510 assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1511 assert!(
1512 risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1513 .is_placeholder()
1514 );
1515 fragment_graph.dependent_table_ids[0] = table_catalog.id;
1516 for fragment in fragment_graph.fragments.values_mut() {
1517 stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1518 NodeBody::StreamScan(scan) => {
1519 scan.table_id = table_catalog.id;
1520 if let Some(table_desc) = &mut scan.table_desc {
1521 assert!(
1522 risingwave_common::catalog::TableId::from(table_desc.table_id)
1523 .is_placeholder()
1524 );
1525 table_desc.table_id = table_catalog.id;
1526 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1527 }
1528 if let Some(table) = &mut scan.arrangement_table {
1529 assert!(
1530 risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1531 );
1532 *table = table_catalog.clone();
1533 }
1534 }
1535 NodeBody::BatchPlan(plan) => {
1536 if let Some(table_desc) = &mut plan.table_desc {
1537 assert!(
1538 risingwave_common::catalog::TableId::from(table_desc.table_id)
1539 .is_placeholder()
1540 );
1541 table_desc.table_id = table_catalog.id;
1542 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1543 }
1544 }
1545 _ => {}
1546 });
1547 }
1548
1549 let table_id = table_catalog.id;
1550 let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1551 let stream_job = StreamingJob::Sink(sink);
1552 let res = self
1553 .ddl_controller
1554 .run_command(DdlCommand::CreateStreamingJob {
1555 stream_job,
1556 fragment_graph,
1557 dependencies,
1558 specific_resource_group: None,
1559 if_not_exists,
1560 })
1561 .await;
1562
1563 if res.is_err() {
1564 let _ = self
1565 .ddl_controller
1566 .run_command(DdlCommand::DropStreamingJob {
1567 job_id: StreamingJobId::Table(None, table_id),
1568 drop_mode: DropMode::Cascade,
1569 })
1570 .await
1571 .inspect_err(|err| {
1572 tracing::error!(error = %err.as_report(),
1573 "Failed to clean up table after iceberg sink creation failure",
1574 );
1575 });
1576 res?;
1577 }
1578
1579 if let Some(source_rate_limit) = source_rate_limit
1581 && source_rate_limit != Some(0)
1582 {
1583 let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1584 table_catalog.optional_associated_source_id.unwrap();
1585 let actors_to_apply = self
1586 .metadata_manager
1587 .update_source_rate_limit_by_source_id(SourceId::new(source_id), source_rate_limit)
1588 .await?;
1589 let mutation: ThrottleConfig = actors_to_apply
1590 .into_iter()
1591 .map(|(fragment_id, actors)| {
1592 (
1593 fragment_id,
1594 actors
1595 .into_iter()
1596 .map(|actor_id| (actor_id, source_rate_limit))
1597 .collect::<HashMap<_, _>>(),
1598 )
1599 })
1600 .collect();
1601 let _ = self
1602 .barrier_scheduler
1603 .run_command(database_id, Command::Throttle(mutation))
1604 .await?;
1605 }
1606
1607 let iceberg_source = iceberg_source.unwrap();
1609 let res = self
1610 .ddl_controller
1611 .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1612 .await;
1613 if res.is_err() {
1614 let _ = self
1615 .ddl_controller
1616 .run_command(DdlCommand::DropStreamingJob {
1617 job_id: StreamingJobId::Table(None, table_id),
1618 drop_mode: DropMode::Cascade,
1619 })
1620 .await
1621 .inspect_err(|err| {
1622 tracing::error!(
1623 error = %err.as_report(),
1624 "Failed to clean up table after iceberg source creation failure",
1625 );
1626 });
1627 }
1628
1629 Ok(Response::new(CreateIcebergTableResponse {
1630 status: None,
1631 version: res?,
1632 }))
1633 }
1634}
1635
1636fn add_auto_schema_change_fail_event_log(
1637 meta_metrics: &Arc<MetaMetrics>,
1638 table_id: TableId,
1639 table_name: String,
1640 cdc_table_id: String,
1641 upstream_ddl: String,
1642 event_log_manager: &EventLogManagerRef,
1643 fail_info: String,
1644) {
1645 meta_metrics
1646 .auto_schema_change_failure_cnt
1647 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1648 .inc();
1649 let event = event_log::EventAutoSchemaChangeFail {
1650 table_id,
1651 table_name,
1652 cdc_table_id,
1653 upstream_ddl,
1654 fail_info,
1655 };
1656 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1657}