1use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::TableId;
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
34use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
35use risingwave_meta_model::StreamingParallelism;
36use risingwave_pb::catalog::connection::Info as ConnectionInfo;
37use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
38use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::common::worker_node::State;
41use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
42use risingwave_pb::ddl_service::ddl_service_server::DdlService;
43use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
44use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
45use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
46use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
47use risingwave_pb::id::SourceId;
48use risingwave_pb::meta::event_log;
49use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
50use risingwave_pb::stream_plan::stream_node::NodeBody;
51use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
52use thiserror_ext::AsReport;
53use tokio::sync::oneshot::Sender;
54use tokio::task::JoinHandle;
55use tonic::{Request, Response, Status};
56
57use crate::MetaError;
58use crate::barrier::BarrierManagerRef;
59use crate::manager::sink_coordination::SinkCoordinatorManager;
60use crate::manager::{MetaSrvEnv, StreamingJob};
61use crate::rpc::ddl_controller::{
62 DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
63};
64use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
65
66#[derive(Clone)]
67pub struct DdlServiceImpl {
68 env: MetaSrvEnv,
69
70 metadata_manager: MetadataManager,
71 sink_manager: SinkCoordinatorManager,
72 ddl_controller: DdlController,
73 meta_metrics: Arc<MetaMetrics>,
74 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
75 barrier_scheduler: BarrierScheduler,
76}
77
78impl DdlServiceImpl {
79 #[allow(clippy::too_many_arguments)]
80 pub async fn new(
81 env: MetaSrvEnv,
82 metadata_manager: MetadataManager,
83 stream_manager: GlobalStreamManagerRef,
84 source_manager: SourceManagerRef,
85 barrier_manager: BarrierManagerRef,
86 sink_manager: SinkCoordinatorManager,
87 meta_metrics: Arc<MetaMetrics>,
88 iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
89 barrier_scheduler: BarrierScheduler,
90 ) -> Self {
91 let ddl_controller = DdlController::new(
92 env.clone(),
93 metadata_manager.clone(),
94 stream_manager,
95 source_manager,
96 barrier_manager,
97 sink_manager.clone(),
98 )
99 .await;
100 Self {
101 env,
102 metadata_manager,
103 sink_manager,
104 ddl_controller,
105 meta_metrics,
106 iceberg_compaction_manager,
107 barrier_scheduler,
108 }
109 }
110
111 fn extract_replace_table_info(
112 ReplaceJobPlan {
113 fragment_graph,
114 replace_job,
115 }: ReplaceJobPlan,
116 ) -> ReplaceStreamJobInfo {
117 let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
118 replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
119 table,
120 source,
121 job_type,
122 }) => StreamingJob::Table(
123 source,
124 table.unwrap(),
125 TableJobType::try_from(job_type).unwrap(),
126 ),
127 replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
128 StreamingJob::Source(source.unwrap())
129 }
130 replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
131 table,
132 }) => StreamingJob::MaterializedView(table.unwrap()),
133 };
134
135 ReplaceStreamJobInfo {
136 streaming_job: replace_streaming_job,
137 fragment_graph: fragment_graph.unwrap(),
138 }
139 }
140
141 fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
142 streaming_job_resource_type::ResourceType::Regular(true)
143 }
144
145 pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
146 tracing::info!("start migrate legacy table fragments task");
147 let env = self.env.clone();
148 let metadata_manager = self.metadata_manager.clone();
149 let ddl_controller = self.ddl_controller.clone();
150
151 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
152 let join_handle = tokio::spawn(async move {
153 async fn migrate_inner(
154 env: &MetaSrvEnv,
155 metadata_manager: &MetadataManager,
156 ddl_controller: &DdlController,
157 ) -> MetaResult<()> {
158 let tables = metadata_manager
159 .catalog_controller
160 .list_unmigrated_tables()
161 .await?;
162
163 if tables.is_empty() {
164 tracing::info!("no legacy table fragments need migration");
165 return Ok(());
166 }
167
168 let client = {
169 let workers = metadata_manager
170 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
171 .await?;
172 if workers.is_empty() {
173 return Err(anyhow::anyhow!("no active frontend nodes found").into());
174 }
175 let worker = workers.choose(&mut thread_rng()).unwrap();
176 env.frontend_client_pool().get(worker).await?
177 };
178
179 for table in tables {
180 let start = tokio::time::Instant::now();
181 let req = GetTableReplacePlanRequest {
182 database_id: table.database_id,
183 table_id: table.id,
184 cdc_table_change: None,
185 };
186 let resp = client
187 .get_table_replace_plan(req)
188 .await
189 .context("failed to get table replace plan from frontend")?;
190
191 let plan = resp.into_inner().replace_plan.unwrap();
192 let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
193 ddl_controller
194 .run_command(DdlCommand::ReplaceStreamJob(replace_info))
195 .await?;
196 tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
197 }
198 tracing::info!("successfully migrated all legacy table fragments");
199
200 Ok(())
201 }
202
203 let migrate_future = async move {
204 let mut attempt = 0;
205 loop {
206 match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
207 Ok(_) => break,
208 Err(e) => {
209 attempt += 1;
210 tracing::error!(
211 "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
212 e.as_report(),
213 attempt
214 );
215 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
216 }
217 }
218 }
219 };
220
221 select(pin!(migrate_future), shutdown_rx).await;
222 });
223
224 (join_handle, shutdown_tx)
225 }
226}
227
228#[async_trait::async_trait]
229impl DdlService for DdlServiceImpl {
230 async fn create_database(
231 &self,
232 request: Request<CreateDatabaseRequest>,
233 ) -> Result<Response<CreateDatabaseResponse>, Status> {
234 let req = request.into_inner();
235 let database = req.get_db()?.clone();
236 let version = self
237 .ddl_controller
238 .run_command(DdlCommand::CreateDatabase(database))
239 .await?;
240
241 Ok(Response::new(CreateDatabaseResponse {
242 status: None,
243 version,
244 }))
245 }
246
247 async fn drop_database(
248 &self,
249 request: Request<DropDatabaseRequest>,
250 ) -> Result<Response<DropDatabaseResponse>, Status> {
251 let req = request.into_inner();
252 let database_id = req.get_database_id();
253
254 let version = self
255 .ddl_controller
256 .run_command(DdlCommand::DropDatabase(database_id))
257 .await?;
258
259 Ok(Response::new(DropDatabaseResponse {
260 status: None,
261 version,
262 }))
263 }
264
265 async fn create_secret(
266 &self,
267 request: Request<CreateSecretRequest>,
268 ) -> Result<Response<CreateSecretResponse>, Status> {
269 let req = request.into_inner();
270 let pb_secret = Secret {
271 id: 0.into(),
272 name: req.get_name().clone(),
273 database_id: req.get_database_id(),
274 value: req.get_value().clone(),
275 owner: req.get_owner_id(),
276 schema_id: req.get_schema_id(),
277 };
278 let version = self
279 .ddl_controller
280 .run_command(DdlCommand::CreateSecret(pb_secret))
281 .await?;
282
283 Ok(Response::new(CreateSecretResponse { version }))
284 }
285
286 async fn drop_secret(
287 &self,
288 request: Request<DropSecretRequest>,
289 ) -> Result<Response<DropSecretResponse>, Status> {
290 let req = request.into_inner();
291 let secret_id = req.get_secret_id();
292 let version = self
293 .ddl_controller
294 .run_command(DdlCommand::DropSecret(secret_id))
295 .await?;
296
297 Ok(Response::new(DropSecretResponse { version }))
298 }
299
300 async fn alter_secret(
301 &self,
302 request: Request<AlterSecretRequest>,
303 ) -> Result<Response<AlterSecretResponse>, Status> {
304 let req = request.into_inner();
305 let pb_secret = Secret {
306 id: req.get_secret_id(),
307 name: req.get_name().clone(),
308 database_id: req.get_database_id(),
309 value: req.get_value().clone(),
310 owner: req.get_owner_id(),
311 schema_id: req.get_schema_id(),
312 };
313 let version = self
314 .ddl_controller
315 .run_command(DdlCommand::AlterSecret(pb_secret))
316 .await?;
317
318 Ok(Response::new(AlterSecretResponse { version }))
319 }
320
321 async fn create_schema(
322 &self,
323 request: Request<CreateSchemaRequest>,
324 ) -> Result<Response<CreateSchemaResponse>, Status> {
325 let req = request.into_inner();
326 let schema = req.get_schema()?.clone();
327 let version = self
328 .ddl_controller
329 .run_command(DdlCommand::CreateSchema(schema))
330 .await?;
331
332 Ok(Response::new(CreateSchemaResponse {
333 status: None,
334 version,
335 }))
336 }
337
338 async fn drop_schema(
339 &self,
340 request: Request<DropSchemaRequest>,
341 ) -> Result<Response<DropSchemaResponse>, Status> {
342 let req = request.into_inner();
343 let schema_id = req.get_schema_id();
344 let drop_mode = DropMode::from_request_setting(req.cascade);
345 let version = self
346 .ddl_controller
347 .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
348 .await?;
349 Ok(Response::new(DropSchemaResponse {
350 status: None,
351 version,
352 }))
353 }
354
355 async fn create_source(
356 &self,
357 request: Request<CreateSourceRequest>,
358 ) -> Result<Response<CreateSourceResponse>, Status> {
359 let req = request.into_inner();
360 let source = req.get_source()?.clone();
361
362 match req.fragment_graph {
363 None => {
364 let version = self
365 .ddl_controller
366 .run_command(DdlCommand::CreateNonSharedSource(source))
367 .await?;
368 Ok(Response::new(CreateSourceResponse {
369 status: None,
370 version,
371 }))
372 }
373 Some(fragment_graph) => {
374 let stream_job = StreamingJob::Source(source);
376 let version = self
377 .ddl_controller
378 .run_command(DdlCommand::CreateStreamingJob {
379 stream_job,
380 fragment_graph,
381 dependencies: HashSet::new(),
382 resource_type: Self::default_streaming_job_resource_type(),
383 if_not_exists: req.if_not_exists,
384 })
385 .await?;
386 Ok(Response::new(CreateSourceResponse {
387 status: None,
388 version,
389 }))
390 }
391 }
392 }
393
394 async fn drop_source(
395 &self,
396 request: Request<DropSourceRequest>,
397 ) -> Result<Response<DropSourceResponse>, Status> {
398 let request = request.into_inner();
399 let source_id = request.source_id;
400 let drop_mode = DropMode::from_request_setting(request.cascade);
401 let version = self
402 .ddl_controller
403 .run_command(DdlCommand::DropSource(source_id, drop_mode))
404 .await?;
405
406 Ok(Response::new(DropSourceResponse {
407 status: None,
408 version,
409 }))
410 }
411
412 async fn reset_source(
413 &self,
414 request: Request<ResetSourceRequest>,
415 ) -> Result<Response<ResetSourceResponse>, Status> {
416 let request = request.into_inner();
417 let source_id = request.source_id;
418
419 tracing::info!(
420 source_id = %source_id,
421 "Received RESET SOURCE request, routing to DDL controller"
422 );
423
424 let version = self
426 .ddl_controller
427 .run_command(DdlCommand::ResetSource(source_id))
428 .await?;
429
430 Ok(Response::new(ResetSourceResponse {
431 status: None,
432 version,
433 }))
434 }
435
436 async fn create_sink(
437 &self,
438 request: Request<CreateSinkRequest>,
439 ) -> Result<Response<CreateSinkResponse>, Status> {
440 self.env.idle_manager().record_activity();
441
442 let req = request.into_inner();
443
444 let sink = req.get_sink()?.clone();
445 let fragment_graph = req.get_fragment_graph()?.clone();
446 let dependencies = req.get_dependencies().iter().copied().collect();
447
448 let stream_job = StreamingJob::Sink(sink);
449
450 let command = DdlCommand::CreateStreamingJob {
451 stream_job,
452 fragment_graph,
453 dependencies,
454 resource_type: Self::default_streaming_job_resource_type(),
455 if_not_exists: req.if_not_exists,
456 };
457
458 let version = self.ddl_controller.run_command(command).await?;
459
460 Ok(Response::new(CreateSinkResponse {
461 status: None,
462 version,
463 }))
464 }
465
466 async fn drop_sink(
467 &self,
468 request: Request<DropSinkRequest>,
469 ) -> Result<Response<DropSinkResponse>, Status> {
470 let request = request.into_inner();
471 let sink_id = request.sink_id;
472 let drop_mode = DropMode::from_request_setting(request.cascade);
473
474 let command = DdlCommand::DropStreamingJob {
475 job_id: StreamingJobId::Sink(sink_id),
476 drop_mode,
477 };
478
479 let version = self.ddl_controller.run_command(command).await?;
480
481 self.sink_manager
482 .stop_sink_coordinator(vec![SinkId::from(sink_id)])
483 .await;
484
485 Ok(Response::new(DropSinkResponse {
486 status: None,
487 version,
488 }))
489 }
490
491 async fn create_subscription(
492 &self,
493 request: Request<CreateSubscriptionRequest>,
494 ) -> Result<Response<CreateSubscriptionResponse>, Status> {
495 self.env.idle_manager().record_activity();
496
497 let req = request.into_inner();
498
499 let subscription = req.get_subscription()?.clone();
500 let command = DdlCommand::CreateSubscription(subscription);
501
502 let version = self.ddl_controller.run_command(command).await?;
503
504 Ok(Response::new(CreateSubscriptionResponse {
505 status: None,
506 version,
507 }))
508 }
509
510 async fn drop_subscription(
511 &self,
512 request: Request<DropSubscriptionRequest>,
513 ) -> Result<Response<DropSubscriptionResponse>, Status> {
514 let request = request.into_inner();
515 let subscription_id = request.subscription_id;
516 let drop_mode = DropMode::from_request_setting(request.cascade);
517
518 let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
519
520 let version = self.ddl_controller.run_command(command).await?;
521
522 Ok(Response::new(DropSubscriptionResponse {
523 status: None,
524 version,
525 }))
526 }
527
528 async fn create_materialized_view(
529 &self,
530 request: Request<CreateMaterializedViewRequest>,
531 ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
532 self.env.idle_manager().record_activity();
533
534 let req = request.into_inner();
535 let mview = req.get_materialized_view()?.clone();
536 let fragment_graph = req.get_fragment_graph()?.clone();
537 let dependencies = req.get_dependencies().iter().copied().collect();
538 let resource_type = req.resource_type.unwrap().resource_type.unwrap();
539
540 let stream_job = StreamingJob::MaterializedView(mview);
541 let version = self
542 .ddl_controller
543 .run_command(DdlCommand::CreateStreamingJob {
544 stream_job,
545 fragment_graph,
546 dependencies,
547 resource_type,
548 if_not_exists: req.if_not_exists,
549 })
550 .await?;
551
552 Ok(Response::new(CreateMaterializedViewResponse {
553 status: None,
554 version,
555 }))
556 }
557
558 async fn drop_materialized_view(
559 &self,
560 request: Request<DropMaterializedViewRequest>,
561 ) -> Result<Response<DropMaterializedViewResponse>, Status> {
562 self.env.idle_manager().record_activity();
563
564 let request = request.into_inner();
565 let table_id = request.table_id;
566 let drop_mode = DropMode::from_request_setting(request.cascade);
567
568 let version = self
569 .ddl_controller
570 .run_command(DdlCommand::DropStreamingJob {
571 job_id: StreamingJobId::MaterializedView(table_id),
572 drop_mode,
573 })
574 .await?;
575
576 Ok(Response::new(DropMaterializedViewResponse {
577 status: None,
578 version,
579 }))
580 }
581
582 async fn create_index(
583 &self,
584 request: Request<CreateIndexRequest>,
585 ) -> Result<Response<CreateIndexResponse>, Status> {
586 self.env.idle_manager().record_activity();
587
588 let req = request.into_inner();
589 let index = req.get_index()?.clone();
590 let index_table = req.get_index_table()?.clone();
591 let fragment_graph = req.get_fragment_graph()?.clone();
592
593 let stream_job = StreamingJob::Index(index, index_table);
594 let version = self
595 .ddl_controller
596 .run_command(DdlCommand::CreateStreamingJob {
597 stream_job,
598 fragment_graph,
599 dependencies: HashSet::new(),
600 resource_type: Self::default_streaming_job_resource_type(),
601 if_not_exists: req.if_not_exists,
602 })
603 .await?;
604
605 Ok(Response::new(CreateIndexResponse {
606 status: None,
607 version,
608 }))
609 }
610
611 async fn drop_index(
612 &self,
613 request: Request<DropIndexRequest>,
614 ) -> Result<Response<DropIndexResponse>, Status> {
615 self.env.idle_manager().record_activity();
616
617 let request = request.into_inner();
618 let index_id = request.index_id;
619 let drop_mode = DropMode::from_request_setting(request.cascade);
620 let version = self
621 .ddl_controller
622 .run_command(DdlCommand::DropStreamingJob {
623 job_id: StreamingJobId::Index(index_id),
624 drop_mode,
625 })
626 .await?;
627
628 Ok(Response::new(DropIndexResponse {
629 status: None,
630 version,
631 }))
632 }
633
634 async fn create_function(
635 &self,
636 request: Request<CreateFunctionRequest>,
637 ) -> Result<Response<CreateFunctionResponse>, Status> {
638 let req = request.into_inner();
639 let function = req.get_function()?.clone();
640
641 let version = self
642 .ddl_controller
643 .run_command(DdlCommand::CreateFunction(function))
644 .await?;
645
646 Ok(Response::new(CreateFunctionResponse {
647 status: None,
648 version,
649 }))
650 }
651
652 async fn drop_function(
653 &self,
654 request: Request<DropFunctionRequest>,
655 ) -> Result<Response<DropFunctionResponse>, Status> {
656 let request = request.into_inner();
657
658 let version = self
659 .ddl_controller
660 .run_command(DdlCommand::DropFunction(
661 request.function_id,
662 DropMode::from_request_setting(request.cascade),
663 ))
664 .await?;
665
666 Ok(Response::new(DropFunctionResponse {
667 status: None,
668 version,
669 }))
670 }
671
672 async fn create_table(
673 &self,
674 request: Request<CreateTableRequest>,
675 ) -> Result<Response<CreateTableResponse>, Status> {
676 let request = request.into_inner();
677 let job_type = request.get_job_type().unwrap_or_default();
678 let dependencies = request.get_dependencies().iter().copied().collect();
679 let source = request.source;
680 let mview = request.materialized_view.unwrap();
681 let fragment_graph = request.fragment_graph.unwrap();
682
683 let stream_job = StreamingJob::Table(source, mview, job_type);
684 let version = self
685 .ddl_controller
686 .run_command(DdlCommand::CreateStreamingJob {
687 stream_job,
688 fragment_graph,
689 dependencies,
690 resource_type: Self::default_streaming_job_resource_type(),
691 if_not_exists: request.if_not_exists,
692 })
693 .await?;
694
695 Ok(Response::new(CreateTableResponse {
696 status: None,
697 version,
698 }))
699 }
700
701 async fn drop_table(
702 &self,
703 request: Request<DropTableRequest>,
704 ) -> Result<Response<DropTableResponse>, Status> {
705 let request = request.into_inner();
706 let source_id = request.source_id;
707 let table_id = request.table_id;
708
709 let drop_mode = DropMode::from_request_setting(request.cascade);
710 let version = self
711 .ddl_controller
712 .run_command(DdlCommand::DropStreamingJob {
713 job_id: StreamingJobId::Table(
714 source_id.map(|PbSourceId::Id(id)| id.into()),
715 table_id,
716 ),
717 drop_mode,
718 })
719 .await?;
720
721 Ok(Response::new(DropTableResponse {
722 status: None,
723 version,
724 }))
725 }
726
727 async fn create_view(
728 &self,
729 request: Request<CreateViewRequest>,
730 ) -> Result<Response<CreateViewResponse>, Status> {
731 let req = request.into_inner();
732 let view = req.get_view()?.clone();
733 let dependencies = req
734 .get_dependencies()
735 .iter()
736 .copied()
737 .collect::<HashSet<_>>();
738
739 let version = self
740 .ddl_controller
741 .run_command(DdlCommand::CreateView(view, dependencies))
742 .await?;
743
744 Ok(Response::new(CreateViewResponse {
745 status: None,
746 version,
747 }))
748 }
749
750 async fn drop_view(
751 &self,
752 request: Request<DropViewRequest>,
753 ) -> Result<Response<DropViewResponse>, Status> {
754 let request = request.into_inner();
755 let view_id = request.get_view_id();
756 let drop_mode = DropMode::from_request_setting(request.cascade);
757 let version = self
758 .ddl_controller
759 .run_command(DdlCommand::DropView(view_id, drop_mode))
760 .await?;
761 Ok(Response::new(DropViewResponse {
762 status: None,
763 version,
764 }))
765 }
766
767 async fn risectl_list_state_tables(
768 &self,
769 _request: Request<RisectlListStateTablesRequest>,
770 ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
771 let tables = self
772 .metadata_manager
773 .catalog_controller
774 .list_all_state_tables()
775 .await?;
776 Ok(Response::new(RisectlListStateTablesResponse { tables }))
777 }
778
779 async fn replace_job_plan(
780 &self,
781 request: Request<ReplaceJobPlanRequest>,
782 ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
783 let req = request.into_inner().get_plan().cloned()?;
784
785 let version = self
786 .ddl_controller
787 .run_command(DdlCommand::ReplaceStreamJob(
788 Self::extract_replace_table_info(req),
789 ))
790 .await?;
791
792 Ok(Response::new(ReplaceJobPlanResponse {
793 status: None,
794 version,
795 }))
796 }
797
798 async fn get_table(
799 &self,
800 request: Request<GetTableRequest>,
801 ) -> Result<Response<GetTableResponse>, Status> {
802 let req = request.into_inner();
803 let table = self
804 .metadata_manager
805 .catalog_controller
806 .get_table_by_name(&req.database_name, &req.table_name)
807 .await?;
808
809 Ok(Response::new(GetTableResponse { table }))
810 }
811
812 async fn alter_name(
813 &self,
814 request: Request<AlterNameRequest>,
815 ) -> Result<Response<AlterNameResponse>, Status> {
816 let AlterNameRequest { object, new_name } = request.into_inner();
817 let version = self
818 .ddl_controller
819 .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
820 .await?;
821 Ok(Response::new(AlterNameResponse {
822 status: None,
823 version,
824 }))
825 }
826
827 async fn alter_source(
829 &self,
830 request: Request<AlterSourceRequest>,
831 ) -> Result<Response<AlterSourceResponse>, Status> {
832 let AlterSourceRequest { source } = request.into_inner();
833 let version = self
834 .ddl_controller
835 .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
836 .await?;
837 Ok(Response::new(AlterSourceResponse {
838 status: None,
839 version,
840 }))
841 }
842
843 async fn alter_owner(
844 &self,
845 request: Request<AlterOwnerRequest>,
846 ) -> Result<Response<AlterOwnerResponse>, Status> {
847 let AlterOwnerRequest { object, owner_id } = request.into_inner();
848 let version = self
849 .ddl_controller
850 .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
851 .await?;
852 Ok(Response::new(AlterOwnerResponse {
853 status: None,
854 version,
855 }))
856 }
857
858 async fn alter_set_schema(
859 &self,
860 request: Request<AlterSetSchemaRequest>,
861 ) -> Result<Response<AlterSetSchemaResponse>, Status> {
862 let AlterSetSchemaRequest {
863 object,
864 new_schema_id,
865 } = request.into_inner();
866 let version = self
867 .ddl_controller
868 .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
869 .await?;
870 Ok(Response::new(AlterSetSchemaResponse {
871 status: None,
872 version,
873 }))
874 }
875
876 async fn get_ddl_progress(
877 &self,
878 _request: Request<GetDdlProgressRequest>,
879 ) -> Result<Response<GetDdlProgressResponse>, Status> {
880 Ok(Response::new(GetDdlProgressResponse {
881 ddl_progress: self.ddl_controller.get_ddl_progress().await?,
882 }))
883 }
884
885 async fn create_connection(
886 &self,
887 request: Request<CreateConnectionRequest>,
888 ) -> Result<Response<CreateConnectionResponse>, Status> {
889 let req = request.into_inner();
890 if req.payload.is_none() {
891 return Err(Status::invalid_argument("request is empty"));
892 }
893
894 match req.payload.unwrap() {
895 create_connection_request::Payload::PrivateLink(_) => {
896 panic!("Private Link Connection has been deprecated")
897 }
898 create_connection_request::Payload::ConnectionParams(params) => {
899 let pb_connection = Connection {
900 id: 0.into(),
901 schema_id: req.schema_id,
902 database_id: req.database_id,
903 name: req.name,
904 info: Some(ConnectionInfo::ConnectionParams(params)),
905 owner: req.owner_id,
906 };
907 let version = self
908 .ddl_controller
909 .run_command(DdlCommand::CreateConnection(pb_connection))
910 .await?;
911 Ok(Response::new(CreateConnectionResponse { version }))
912 }
913 }
914 }
915
916 async fn list_connections(
917 &self,
918 _request: Request<ListConnectionsRequest>,
919 ) -> Result<Response<ListConnectionsResponse>, Status> {
920 let conns = self
921 .metadata_manager
922 .catalog_controller
923 .list_connections()
924 .await?;
925
926 Ok(Response::new(ListConnectionsResponse {
927 connections: conns,
928 }))
929 }
930
931 async fn drop_connection(
932 &self,
933 request: Request<DropConnectionRequest>,
934 ) -> Result<Response<DropConnectionResponse>, Status> {
935 let req = request.into_inner();
936 let drop_mode = DropMode::from_request_setting(req.cascade);
937
938 let version = self
939 .ddl_controller
940 .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
941 .await?;
942
943 Ok(Response::new(DropConnectionResponse {
944 status: None,
945 version,
946 }))
947 }
948
949 async fn comment_on(
950 &self,
951 request: Request<CommentOnRequest>,
952 ) -> Result<Response<CommentOnResponse>, Status> {
953 let req = request.into_inner();
954 let comment = req.get_comment()?.clone();
955
956 let version = self
957 .ddl_controller
958 .run_command(DdlCommand::CommentOn(Comment {
959 table_id: comment.table_id,
960 schema_id: comment.schema_id,
961 database_id: comment.database_id,
962 column_index: comment.column_index,
963 description: comment.description,
964 }))
965 .await?;
966
967 Ok(Response::new(CommentOnResponse {
968 status: None,
969 version,
970 }))
971 }
972
973 async fn get_tables(
974 &self,
975 request: Request<GetTablesRequest>,
976 ) -> Result<Response<GetTablesResponse>, Status> {
977 let GetTablesRequest {
978 table_ids,
979 include_dropped_tables,
980 } = request.into_inner();
981 let ret = self
982 .metadata_manager
983 .catalog_controller
984 .get_table_by_ids(table_ids, include_dropped_tables)
985 .await?;
986
987 let mut tables = HashMap::default();
988 for table in ret {
989 tables.insert(table.id, table);
990 }
991 Ok(Response::new(GetTablesResponse { tables }))
992 }
993
994 async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
995 self.ddl_controller.wait().await?;
996 Ok(Response::new(WaitResponse {}))
997 }
998
999 async fn alter_cdc_table_backfill_parallelism(
1000 &self,
1001 request: Request<AlterCdcTableBackfillParallelismRequest>,
1002 ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1003 let req = request.into_inner();
1004 let job_id = req.get_table_id();
1005 let parallelism = *req.get_parallelism()?;
1006
1007 let table_parallelism = ModelTableParallelism::from(parallelism);
1008 let streaming_parallelism = match table_parallelism {
1009 ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1010 _ => bail_invalid_parameter!(
1011 "CDC table backfill parallelism must be set to a fixed value"
1012 ),
1013 };
1014
1015 self.ddl_controller
1016 .reschedule_cdc_table_backfill(
1017 job_id,
1018 ReschedulePolicy::Parallelism(ParallelismPolicy {
1019 parallelism: streaming_parallelism,
1020 }),
1021 )
1022 .await?;
1023 Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1024 }
1025
1026 async fn alter_parallelism(
1027 &self,
1028 request: Request<AlterParallelismRequest>,
1029 ) -> Result<Response<AlterParallelismResponse>, Status> {
1030 let req = request.into_inner();
1031
1032 let job_id = req.get_table_id();
1033 let parallelism = *req.get_parallelism()?;
1034 let deferred = req.get_deferred();
1035
1036 let parallelism = match parallelism.get_parallelism()? {
1037 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1038 StreamingParallelism::Fixed(*parallelism as _)
1039 }
1040 Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1041 _ => bail_unavailable!(),
1042 };
1043
1044 self.ddl_controller
1045 .reschedule_streaming_job(
1046 job_id,
1047 ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1048 deferred,
1049 )
1050 .await?;
1051
1052 Ok(Response::new(AlterParallelismResponse {}))
1053 }
1054
1055 async fn alter_fragment_parallelism(
1056 &self,
1057 request: Request<AlterFragmentParallelismRequest>,
1058 ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1059 let req = request.into_inner();
1060
1061 let fragment_ids = req.fragment_ids;
1062 if fragment_ids.is_empty() {
1063 return Err(Status::invalid_argument(
1064 "at least one fragment id must be provided",
1065 ));
1066 }
1067
1068 let parallelism = match req.parallelism {
1069 Some(parallelism) => {
1070 let streaming_parallelism = match parallelism.get_parallelism()? {
1071 Parallelism::Fixed(FixedParallelism { parallelism }) => {
1072 StreamingParallelism::Fixed(*parallelism as _)
1073 }
1074 Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1075 StreamingParallelism::Adaptive
1076 }
1077 _ => bail_unavailable!(),
1078 };
1079 Some(streaming_parallelism)
1080 }
1081 None => None,
1082 };
1083
1084 let fragment_targets = fragment_ids
1085 .into_iter()
1086 .map(|fragment_id| (fragment_id, parallelism.clone()))
1087 .collect();
1088
1089 self.ddl_controller
1090 .reschedule_fragments(fragment_targets)
1091 .await?;
1092
1093 Ok(Response::new(AlterFragmentParallelismResponse {}))
1094 }
1095
1096 async fn alter_streaming_job_config(
1097 &self,
1098 request: Request<AlterStreamingJobConfigRequest>,
1099 ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1100 let AlterStreamingJobConfigRequest {
1101 job_id,
1102 entries_to_add,
1103 keys_to_remove,
1104 } = request.into_inner();
1105
1106 self.ddl_controller
1107 .run_command(DdlCommand::AlterStreamingJobConfig(
1108 job_id,
1109 entries_to_add,
1110 keys_to_remove,
1111 ))
1112 .await?;
1113
1114 Ok(Response::new(AlterStreamingJobConfigResponse {}))
1115 }
1116
1117 async fn auto_schema_change(
1120 &self,
1121 request: Request<AutoSchemaChangeRequest>,
1122 ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1123 let req = request.into_inner();
1124
1125 let workers = self
1127 .metadata_manager
1128 .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1129 .await?;
1130 let worker = workers
1131 .choose(&mut thread_rng())
1132 .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1133
1134 let client = self
1135 .env
1136 .frontend_client_pool()
1137 .get(worker)
1138 .await
1139 .map_err(MetaError::from)?;
1140
1141 let Some(schema_change) = req.schema_change else {
1142 return Err(Status::invalid_argument(
1143 "schema change message is required",
1144 ));
1145 };
1146
1147 for table_change in schema_change.table_changes {
1148 for c in &table_change.columns {
1149 let c = ColumnCatalog::from(c.clone());
1150
1151 let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1152 tracing::warn!(target: "auto_schema_change",
1153 cdc_table_id = table_change.cdc_table_id,
1154 upstraem_ddl = table_change.upstream_ddl,
1155 "invalid column type from cdc table change");
1156 Err(Status::invalid_argument(format!(
1157 "invalid column type: {} from cdc table change, column: {:?}",
1158 column_type, c
1159 )))
1160 };
1161 if c.is_generated() {
1162 return invalid_col_type("generated column", &c);
1163 }
1164 if c.is_rw_sys_column() {
1165 return invalid_col_type("rw system column", &c);
1166 }
1167 if c.is_hidden {
1168 return invalid_col_type("hidden column", &c);
1169 }
1170 }
1171
1172 let tables: Vec<Table> = self
1174 .metadata_manager
1175 .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1176 .await?;
1177
1178 for table in tables {
1179 let original_columns: HashSet<(String, DataType)> =
1182 HashSet::from_iter(table.columns.iter().filter_map(|col| {
1183 let col = ColumnCatalog::from(col.clone());
1184 if col.is_generated() || col.is_hidden() {
1185 None
1186 } else {
1187 Some((col.column_desc.name.clone(), col.data_type().clone()))
1188 }
1189 }));
1190
1191 let mut new_columns: HashSet<(String, DataType)> =
1192 HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1193 let col = ColumnCatalog::from(col.clone());
1194 if col.is_generated() || col.is_hidden() {
1195 None
1196 } else {
1197 Some((col.column_desc.name.clone(), col.data_type().clone()))
1198 }
1199 }));
1200
1201 for col in &table.columns {
1204 let col = ColumnCatalog::from(col.clone());
1205 if col.is_connector_additional_column()
1206 && !col.is_hidden()
1207 && !col.is_generated()
1208 {
1209 new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1210 }
1211 }
1212
1213 if !(original_columns.is_subset(&new_columns)
1214 || original_columns.is_superset(&new_columns))
1215 {
1216 tracing::warn!(target: "auto_schema_change",
1217 table_id = %table.id,
1218 cdc_table_id = table.cdc_table_id,
1219 upstraem_ddl = table_change.upstream_ddl,
1220 original_columns = ?original_columns,
1221 new_columns = ?new_columns,
1222 "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1223
1224 let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1225 add_auto_schema_change_fail_event_log(
1226 &self.meta_metrics,
1227 table.id,
1228 table.name.clone(),
1229 table_change.cdc_table_id.clone(),
1230 table_change.upstream_ddl.clone(),
1231 &self.env.event_log_manager_ref(),
1232 fail_info,
1233 );
1234
1235 return Err(Status::invalid_argument(
1236 "New columns should be a subset or superset of the original columns (including hidden columns)",
1237 ));
1238 }
1239 if original_columns == new_columns {
1241 tracing::warn!(target: "auto_schema_change",
1242 table_id = %table.id,
1243 cdc_table_id = table.cdc_table_id,
1244 upstraem_ddl = table_change.upstream_ddl,
1245 original_columns = ?original_columns,
1246 new_columns = ?new_columns,
1247 "No change to columns, skipping the schema change");
1248 continue;
1249 }
1250
1251 let latency_timer = self
1252 .meta_metrics
1253 .auto_schema_change_latency
1254 .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1255 .start_timer();
1256 let resp = client
1259 .get_table_replace_plan(GetTableReplacePlanRequest {
1260 database_id: table.database_id,
1261 table_id: table.id,
1262 cdc_table_change: Some(table_change.clone()),
1263 })
1264 .await;
1265
1266 match resp {
1267 Ok(resp) => {
1268 let resp = resp.into_inner();
1269 if let Some(plan) = resp.replace_plan {
1270 let plan = Self::extract_replace_table_info(plan);
1271 plan.streaming_job.table().inspect(|t| {
1272 tracing::info!(
1273 target: "auto_schema_change",
1274 table_id = %t.id,
1275 cdc_table_id = t.cdc_table_id,
1276 upstraem_ddl = table_change.upstream_ddl,
1277 "Start the replace config change")
1278 });
1279 let replace_res = self
1281 .ddl_controller
1282 .run_command(DdlCommand::ReplaceStreamJob(plan))
1283 .await;
1284
1285 match replace_res {
1286 Ok(_) => {
1287 tracing::info!(
1288 target: "auto_schema_change",
1289 table_id = %table.id,
1290 cdc_table_id = table.cdc_table_id,
1291 "Table replaced success");
1292
1293 self.meta_metrics
1294 .auto_schema_change_success_cnt
1295 .with_guarded_label_values(&[
1296 &table.id.to_string(),
1297 &table.name,
1298 ])
1299 .inc();
1300 latency_timer.observe_duration();
1301 }
1302 Err(e) => {
1303 tracing::error!(
1304 target: "auto_schema_change",
1305 error = %e.as_report(),
1306 table_id = %table.id,
1307 cdc_table_id = table.cdc_table_id,
1308 upstraem_ddl = table_change.upstream_ddl,
1309 "failed to replace the table",
1310 );
1311 let fail_info =
1312 format!("failed to replace the table: {}", e.as_report());
1313 add_auto_schema_change_fail_event_log(
1314 &self.meta_metrics,
1315 table.id,
1316 table.name.clone(),
1317 table_change.cdc_table_id.clone(),
1318 table_change.upstream_ddl.clone(),
1319 &self.env.event_log_manager_ref(),
1320 fail_info,
1321 );
1322 }
1323 };
1324 }
1325 }
1326 Err(e) => {
1327 tracing::error!(
1328 target: "auto_schema_change",
1329 error = %e.as_report(),
1330 table_id = %table.id,
1331 cdc_table_id = table.cdc_table_id,
1332 "failed to get replace table plan",
1333 );
1334 let fail_info =
1335 format!("failed to get replace table plan: {}", e.as_report());
1336 add_auto_schema_change_fail_event_log(
1337 &self.meta_metrics,
1338 table.id,
1339 table.name.clone(),
1340 table_change.cdc_table_id.clone(),
1341 table_change.upstream_ddl.clone(),
1342 &self.env.event_log_manager_ref(),
1343 fail_info,
1344 );
1345 }
1346 };
1347 }
1348 }
1349
1350 Ok(Response::new(AutoSchemaChangeResponse {}))
1351 }
1352
1353 async fn alter_swap_rename(
1354 &self,
1355 request: Request<AlterSwapRenameRequest>,
1356 ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1357 let req = request.into_inner();
1358
1359 let version = self
1360 .ddl_controller
1361 .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1362 .await?;
1363
1364 Ok(Response::new(AlterSwapRenameResponse {
1365 status: None,
1366 version,
1367 }))
1368 }
1369
1370 async fn alter_resource_group(
1371 &self,
1372 request: Request<AlterResourceGroupRequest>,
1373 ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1374 let req = request.into_inner();
1375
1376 let table_id = req.get_table_id();
1377 let deferred = req.get_deferred();
1378 let resource_group = req.resource_group;
1379
1380 self.ddl_controller
1381 .reschedule_streaming_job(
1382 table_id.as_job_id(),
1383 ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1384 deferred,
1385 )
1386 .await?;
1387
1388 Ok(Response::new(AlterResourceGroupResponse {}))
1389 }
1390
1391 async fn alter_database_param(
1392 &self,
1393 request: Request<AlterDatabaseParamRequest>,
1394 ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1395 let req = request.into_inner();
1396 let database_id = req.database_id;
1397
1398 let param = match req.param.unwrap() {
1399 alter_database_param_request::Param::BarrierIntervalMs(value) => {
1400 AlterDatabaseParam::BarrierIntervalMs(value.value)
1401 }
1402 alter_database_param_request::Param::CheckpointFrequency(value) => {
1403 AlterDatabaseParam::CheckpointFrequency(value.value)
1404 }
1405 };
1406 let version = self
1407 .ddl_controller
1408 .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1409 .await?;
1410
1411 return Ok(Response::new(AlterDatabaseParamResponse {
1412 status: None,
1413 version,
1414 }));
1415 }
1416
1417 async fn compact_iceberg_table(
1418 &self,
1419 request: Request<CompactIcebergTableRequest>,
1420 ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1421 let req = request.into_inner();
1422 let sink_id = req.sink_id;
1423
1424 let task_id = self
1426 .iceberg_compaction_manager
1427 .trigger_manual_compaction(sink_id)
1428 .await
1429 .map_err(|e| {
1430 Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1431 })?;
1432
1433 Ok(Response::new(CompactIcebergTableResponse {
1434 status: None,
1435 task_id,
1436 }))
1437 }
1438
1439 async fn expire_iceberg_table_snapshots(
1440 &self,
1441 request: Request<ExpireIcebergTableSnapshotsRequest>,
1442 ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1443 let req = request.into_inner();
1444 let sink_id = req.sink_id;
1445
1446 self.iceberg_compaction_manager
1448 .check_and_expire_snapshots(sink_id)
1449 .await
1450 .map_err(|e| {
1451 Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1452 })?;
1453
1454 Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1455 status: None,
1456 }))
1457 }
1458
1459 async fn create_iceberg_table(
1460 &self,
1461 request: Request<CreateIcebergTableRequest>,
1462 ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1463 let req = request.into_inner();
1464 let CreateIcebergTableRequest {
1465 table_info,
1466 sink_info,
1467 iceberg_source,
1468 if_not_exists,
1469 } = req;
1470
1471 let PbTableJobInfo {
1473 source,
1474 table,
1475 fragment_graph,
1476 job_type,
1477 } = table_info.unwrap();
1478 let mut table = table.unwrap();
1479 let mut fragment_graph = fragment_graph.unwrap();
1480 let database_id = table.get_database_id();
1481 let schema_id = table.get_schema_id();
1482 let table_name = table.get_name().to_owned();
1483
1484 table.create_type = PbCreateType::Background as _;
1486
1487 let source_rate_limit = if let Some(source) = &source {
1489 for fragment in fragment_graph.fragments.values_mut() {
1490 stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1491 if let NodeBody::Source(source_node) = node
1492 && let Some(inner) = &mut source_node.source_inner
1493 {
1494 inner.rate_limit = Some(0);
1495 }
1496 });
1497 }
1498 Some(source.rate_limit)
1499 } else {
1500 None
1501 };
1502
1503 let stream_job =
1504 StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1505 let _ = self
1506 .ddl_controller
1507 .run_command(DdlCommand::CreateStreamingJob {
1508 stream_job,
1509 fragment_graph,
1510 dependencies: HashSet::new(),
1511 resource_type: Self::default_streaming_job_resource_type(),
1512 if_not_exists,
1513 })
1514 .await?;
1515
1516 let table_catalog = self
1517 .metadata_manager
1518 .catalog_controller
1519 .get_table_catalog_by_name(database_id, schema_id, &table_name)
1520 .await?
1521 .ok_or(Status::not_found("Internal error: table not found"))?;
1522
1523 let PbSinkJobInfo {
1525 sink,
1526 fragment_graph,
1527 } = sink_info.unwrap();
1528 let mut sink = sink.unwrap();
1529
1530 sink.create_type = PbCreateType::Background as _;
1532
1533 let mut fragment_graph = fragment_graph.unwrap();
1534
1535 assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1536 assert!(
1537 risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1538 .is_placeholder()
1539 );
1540 fragment_graph.dependent_table_ids[0] = table_catalog.id;
1541 for fragment in fragment_graph.fragments.values_mut() {
1542 stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1543 NodeBody::StreamScan(scan) => {
1544 scan.table_id = table_catalog.id;
1545 if let Some(table_desc) = &mut scan.table_desc {
1546 assert!(
1547 risingwave_common::catalog::TableId::from(table_desc.table_id)
1548 .is_placeholder()
1549 );
1550 table_desc.table_id = table_catalog.id;
1551 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1552 }
1553 if let Some(table) = &mut scan.arrangement_table {
1554 assert!(
1555 risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1556 );
1557 *table = table_catalog.clone();
1558 }
1559 }
1560 NodeBody::BatchPlan(plan) => {
1561 if let Some(table_desc) = &mut plan.table_desc {
1562 assert!(
1563 risingwave_common::catalog::TableId::from(table_desc.table_id)
1564 .is_placeholder()
1565 );
1566 table_desc.table_id = table_catalog.id;
1567 table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1568 }
1569 }
1570 _ => {}
1571 });
1572 }
1573
1574 let table_id = table_catalog.id;
1575 let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1576 let stream_job = StreamingJob::Sink(sink);
1577 let res = self
1578 .ddl_controller
1579 .run_command(DdlCommand::CreateStreamingJob {
1580 stream_job,
1581 fragment_graph,
1582 dependencies,
1583 resource_type: Self::default_streaming_job_resource_type(),
1584 if_not_exists,
1585 })
1586 .await;
1587
1588 if res.is_err() {
1589 let _ = self
1590 .ddl_controller
1591 .run_command(DdlCommand::DropStreamingJob {
1592 job_id: StreamingJobId::Table(None, table_id),
1593 drop_mode: DropMode::Cascade,
1594 })
1595 .await
1596 .inspect_err(|err| {
1597 tracing::error!(error = %err.as_report(),
1598 "Failed to clean up table after iceberg sink creation failure",
1599 );
1600 });
1601 res?;
1602 }
1603
1604 if let Some(source_rate_limit) = source_rate_limit
1606 && source_rate_limit != Some(0)
1607 {
1608 let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1609 table_catalog.optional_associated_source_id.unwrap();
1610 let (jobs, fragments) = self
1611 .metadata_manager
1612 .update_source_rate_limit_by_source_id(SourceId::new(source_id), source_rate_limit)
1613 .await?;
1614 let throttle_config = ThrottleConfig {
1615 throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1616 rate_limit: source_rate_limit,
1617 };
1618 let _ = self
1619 .barrier_scheduler
1620 .run_command(
1621 database_id,
1622 Command::Throttle {
1623 jobs,
1624 config: fragments
1625 .into_iter()
1626 .map(|fragment_id| (fragment_id, throttle_config))
1627 .collect(),
1628 },
1629 )
1630 .await?;
1631 }
1632
1633 let iceberg_source = iceberg_source.unwrap();
1635 let res = self
1636 .ddl_controller
1637 .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1638 .await;
1639 if res.is_err() {
1640 let _ = self
1641 .ddl_controller
1642 .run_command(DdlCommand::DropStreamingJob {
1643 job_id: StreamingJobId::Table(None, table_id),
1644 drop_mode: DropMode::Cascade,
1645 })
1646 .await
1647 .inspect_err(|err| {
1648 tracing::error!(
1649 error = %err.as_report(),
1650 "Failed to clean up table after iceberg source creation failure",
1651 );
1652 });
1653 }
1654
1655 Ok(Response::new(CreateIcebergTableResponse {
1656 status: None,
1657 version: res?,
1658 }))
1659 }
1660}
1661
1662fn add_auto_schema_change_fail_event_log(
1663 meta_metrics: &Arc<MetaMetrics>,
1664 table_id: TableId,
1665 table_name: String,
1666 cdc_table_id: String,
1667 upstream_ddl: String,
1668 event_log_manager: &EventLogManagerRef,
1669 fail_info: String,
1670) {
1671 meta_metrics
1672 .auto_schema_change_failure_cnt
1673 .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1674 .inc();
1675 let event = event_log::EventAutoSchemaChangeFail {
1676 table_id,
1677 table_name,
1678 cdc_table_id,
1679 upstream_ddl,
1680 fail_info,
1681 };
1682 event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1683}