risingwave_meta_service/
ddl_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::{ObjectId, TableId};
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command, ResumeBackfillTarget};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
34use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
35use risingwave_meta_model::StreamingParallelism;
36use risingwave_pb::catalog::connection::Info as ConnectionInfo;
37use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
38use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::common::worker_node::State;
41use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
42use risingwave_pb::ddl_service::ddl_service_server::DdlService;
43use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
44use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
45use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
46use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
47use risingwave_pb::meta::event_log;
48use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
49use risingwave_pb::stream_plan::stream_node::NodeBody;
50use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
51use thiserror_ext::AsReport;
52use tokio::sync::oneshot::Sender;
53use tokio::task::JoinHandle;
54use tonic::{Request, Response, Status};
55
56use crate::MetaError;
57use crate::barrier::BarrierManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{MetaSrvEnv, StreamingJob};
60use crate::rpc::ddl_controller::{
61    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
62};
63use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
64
65#[derive(Clone)]
66pub struct DdlServiceImpl {
67    env: MetaSrvEnv,
68
69    metadata_manager: MetadataManager,
70    sink_manager: SinkCoordinatorManager,
71    ddl_controller: DdlController,
72    meta_metrics: Arc<MetaMetrics>,
73    iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
74    barrier_scheduler: BarrierScheduler,
75}
76
77impl DdlServiceImpl {
78    #[allow(clippy::too_many_arguments)]
79    pub async fn new(
80        env: MetaSrvEnv,
81        metadata_manager: MetadataManager,
82        stream_manager: GlobalStreamManagerRef,
83        source_manager: SourceManagerRef,
84        barrier_manager: BarrierManagerRef,
85        sink_manager: SinkCoordinatorManager,
86        meta_metrics: Arc<MetaMetrics>,
87        iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
88        barrier_scheduler: BarrierScheduler,
89    ) -> Self {
90        let ddl_controller = DdlController::new(
91            env.clone(),
92            metadata_manager.clone(),
93            stream_manager,
94            source_manager,
95            barrier_manager,
96            sink_manager.clone(),
97            iceberg_compaction_manager.clone(),
98        )
99        .await;
100        Self {
101            env,
102            metadata_manager,
103            sink_manager,
104            ddl_controller,
105            meta_metrics,
106            iceberg_compaction_manager,
107            barrier_scheduler,
108        }
109    }
110
111    fn extract_replace_table_info(
112        ReplaceJobPlan {
113            fragment_graph,
114            replace_job,
115        }: ReplaceJobPlan,
116    ) -> ReplaceStreamJobInfo {
117        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
118            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
119                table,
120                source,
121                job_type,
122            }) => StreamingJob::Table(
123                source,
124                table.unwrap(),
125                TableJobType::try_from(job_type).unwrap(),
126            ),
127            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
128                StreamingJob::Source(source.unwrap())
129            }
130            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
131                table,
132            }) => StreamingJob::MaterializedView(table.unwrap()),
133        };
134
135        ReplaceStreamJobInfo {
136            streaming_job: replace_streaming_job,
137            fragment_graph: fragment_graph.unwrap(),
138        }
139    }
140
141    fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
142        streaming_job_resource_type::ResourceType::Regular(true)
143    }
144
145    pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
146        tracing::info!("start migrate legacy table fragments task");
147        let env = self.env.clone();
148        let metadata_manager = self.metadata_manager.clone();
149        let ddl_controller = self.ddl_controller.clone();
150
151        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
152        let join_handle = tokio::spawn(async move {
153            async fn migrate_inner(
154                env: &MetaSrvEnv,
155                metadata_manager: &MetadataManager,
156                ddl_controller: &DdlController,
157            ) -> MetaResult<()> {
158                let tables = metadata_manager
159                    .catalog_controller
160                    .list_unmigrated_tables()
161                    .await?;
162
163                if tables.is_empty() {
164                    tracing::info!("no legacy table fragments need migration");
165                    return Ok(());
166                }
167
168                let client = {
169                    let workers = metadata_manager
170                        .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
171                        .await?;
172                    if workers.is_empty() {
173                        return Err(anyhow::anyhow!("no active frontend nodes found").into());
174                    }
175                    let worker = workers.choose(&mut thread_rng()).unwrap();
176                    env.frontend_client_pool().get(worker).await?
177                };
178
179                for table in tables {
180                    let start = tokio::time::Instant::now();
181                    let req = GetTableReplacePlanRequest {
182                        database_id: table.database_id,
183                        table_id: table.id,
184                        cdc_table_change: None,
185                    };
186                    let resp = client
187                        .get_table_replace_plan(req)
188                        .await
189                        .context("failed to get table replace plan from frontend")?;
190
191                    let plan = resp.into_inner().replace_plan.unwrap();
192                    let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
193                    ddl_controller
194                        .run_command(DdlCommand::ReplaceStreamJob(replace_info))
195                        .await?;
196                    tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
197                }
198                tracing::info!("successfully migrated all legacy table fragments");
199
200                Ok(())
201            }
202
203            let migrate_future = async move {
204                let mut attempt = 0;
205                loop {
206                    match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
207                        Ok(_) => break,
208                        Err(e) => {
209                            attempt += 1;
210                            tracing::error!(
211                                "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
212                                e.as_report(),
213                                attempt
214                            );
215                            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
216                        }
217                    }
218                }
219            };
220
221            select(pin!(migrate_future), shutdown_rx).await;
222        });
223
224        (join_handle, shutdown_tx)
225    }
226}
227
228#[async_trait::async_trait]
229impl DdlService for DdlServiceImpl {
230    async fn create_database(
231        &self,
232        request: Request<CreateDatabaseRequest>,
233    ) -> Result<Response<CreateDatabaseResponse>, Status> {
234        let req = request.into_inner();
235        let database = req.get_db()?.clone();
236        let version = self
237            .ddl_controller
238            .run_command(DdlCommand::CreateDatabase(database))
239            .await?;
240
241        Ok(Response::new(CreateDatabaseResponse {
242            status: None,
243            version,
244        }))
245    }
246
247    async fn drop_database(
248        &self,
249        request: Request<DropDatabaseRequest>,
250    ) -> Result<Response<DropDatabaseResponse>, Status> {
251        let req = request.into_inner();
252        let database_id = req.get_database_id();
253
254        let version = self
255            .ddl_controller
256            .run_command(DdlCommand::DropDatabase(database_id))
257            .await?;
258
259        Ok(Response::new(DropDatabaseResponse {
260            status: None,
261            version,
262        }))
263    }
264
265    async fn create_secret(
266        &self,
267        request: Request<CreateSecretRequest>,
268    ) -> Result<Response<CreateSecretResponse>, Status> {
269        let req = request.into_inner();
270        let pb_secret = Secret {
271            id: 0.into(),
272            name: req.get_name().clone(),
273            database_id: req.get_database_id(),
274            value: req.get_value().clone(),
275            owner: req.get_owner_id(),
276            schema_id: req.get_schema_id(),
277        };
278        let version = self
279            .ddl_controller
280            .run_command(DdlCommand::CreateSecret(pb_secret))
281            .await?;
282
283        Ok(Response::new(CreateSecretResponse { version }))
284    }
285
286    async fn drop_secret(
287        &self,
288        request: Request<DropSecretRequest>,
289    ) -> Result<Response<DropSecretResponse>, Status> {
290        let req = request.into_inner();
291        let secret_id = req.get_secret_id();
292        let drop_mode = DropMode::from_request_setting(req.cascade);
293        let version = self
294            .ddl_controller
295            .run_command(DdlCommand::DropSecret(secret_id, drop_mode))
296            .await?;
297
298        Ok(Response::new(DropSecretResponse {
299            status: None,
300            version,
301        }))
302    }
303
304    async fn alter_secret(
305        &self,
306        request: Request<AlterSecretRequest>,
307    ) -> Result<Response<AlterSecretResponse>, Status> {
308        let req = request.into_inner();
309        let pb_secret = Secret {
310            id: req.get_secret_id(),
311            name: req.get_name().clone(),
312            database_id: req.get_database_id(),
313            value: req.get_value().clone(),
314            owner: req.get_owner_id(),
315            schema_id: req.get_schema_id(),
316        };
317        let version = self
318            .ddl_controller
319            .run_command(DdlCommand::AlterSecret(pb_secret))
320            .await?;
321
322        Ok(Response::new(AlterSecretResponse { version }))
323    }
324
325    async fn create_schema(
326        &self,
327        request: Request<CreateSchemaRequest>,
328    ) -> Result<Response<CreateSchemaResponse>, Status> {
329        let req = request.into_inner();
330        let schema = req.get_schema()?.clone();
331        let version = self
332            .ddl_controller
333            .run_command(DdlCommand::CreateSchema(schema))
334            .await?;
335
336        Ok(Response::new(CreateSchemaResponse {
337            status: None,
338            version,
339        }))
340    }
341
342    async fn drop_schema(
343        &self,
344        request: Request<DropSchemaRequest>,
345    ) -> Result<Response<DropSchemaResponse>, Status> {
346        let req = request.into_inner();
347        let schema_id = req.get_schema_id();
348        let drop_mode = DropMode::from_request_setting(req.cascade);
349        let version = self
350            .ddl_controller
351            .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
352            .await?;
353        Ok(Response::new(DropSchemaResponse {
354            status: None,
355            version,
356        }))
357    }
358
359    async fn create_source(
360        &self,
361        request: Request<CreateSourceRequest>,
362    ) -> Result<Response<CreateSourceResponse>, Status> {
363        let req = request.into_inner();
364        let source = req.get_source()?.clone();
365
366        match req.fragment_graph {
367            None => {
368                let version = self
369                    .ddl_controller
370                    .run_command(DdlCommand::CreateNonSharedSource(source))
371                    .await?;
372                Ok(Response::new(CreateSourceResponse {
373                    status: None,
374                    version,
375                }))
376            }
377            Some(fragment_graph) => {
378                // The id of stream job has been set above
379                let stream_job = StreamingJob::Source(source);
380                let version = self
381                    .ddl_controller
382                    .run_command(DdlCommand::CreateStreamingJob {
383                        stream_job,
384                        fragment_graph,
385                        dependencies: HashSet::new(),
386                        resource_type: Self::default_streaming_job_resource_type(),
387                        if_not_exists: req.if_not_exists,
388                    })
389                    .await?;
390                Ok(Response::new(CreateSourceResponse {
391                    status: None,
392                    version,
393                }))
394            }
395        }
396    }
397
398    async fn drop_source(
399        &self,
400        request: Request<DropSourceRequest>,
401    ) -> Result<Response<DropSourceResponse>, Status> {
402        let request = request.into_inner();
403        let source_id = request.source_id;
404        let drop_mode = DropMode::from_request_setting(request.cascade);
405        let version = self
406            .ddl_controller
407            .run_command(DdlCommand::DropSource(source_id, drop_mode))
408            .await?;
409
410        Ok(Response::new(DropSourceResponse {
411            status: None,
412            version,
413        }))
414    }
415
416    async fn reset_source(
417        &self,
418        request: Request<ResetSourceRequest>,
419    ) -> Result<Response<ResetSourceResponse>, Status> {
420        let request = request.into_inner();
421        let source_id = request.source_id;
422
423        tracing::info!(
424            source_id = %source_id,
425            "Received RESET SOURCE request, routing to DDL controller"
426        );
427
428        // Route to DDL controller
429        let version = self
430            .ddl_controller
431            .run_command(DdlCommand::ResetSource(source_id))
432            .await?;
433
434        Ok(Response::new(ResetSourceResponse {
435            status: None,
436            version,
437        }))
438    }
439
440    async fn create_sink(
441        &self,
442        request: Request<CreateSinkRequest>,
443    ) -> Result<Response<CreateSinkResponse>, Status> {
444        self.env.idle_manager().record_activity();
445
446        let req = request.into_inner();
447
448        let sink = req.get_sink()?.clone();
449        let fragment_graph = req.get_fragment_graph()?.clone();
450        let dependencies = req.get_dependencies().iter().copied().collect();
451
452        let stream_job = StreamingJob::Sink(sink);
453
454        let command = DdlCommand::CreateStreamingJob {
455            stream_job,
456            fragment_graph,
457            dependencies,
458            resource_type: Self::default_streaming_job_resource_type(),
459            if_not_exists: req.if_not_exists,
460        };
461
462        let version = self.ddl_controller.run_command(command).await?;
463
464        Ok(Response::new(CreateSinkResponse {
465            status: None,
466            version,
467        }))
468    }
469
470    async fn drop_sink(
471        &self,
472        request: Request<DropSinkRequest>,
473    ) -> Result<Response<DropSinkResponse>, Status> {
474        let request = request.into_inner();
475        let sink_id = request.sink_id;
476        let drop_mode = DropMode::from_request_setting(request.cascade);
477
478        let command = DdlCommand::DropStreamingJob {
479            job_id: StreamingJobId::Sink(sink_id),
480            drop_mode,
481        };
482
483        let version = self.ddl_controller.run_command(command).await?;
484
485        self.sink_manager
486            .stop_sink_coordinator(vec![SinkId::from(sink_id)])
487            .await;
488
489        Ok(Response::new(DropSinkResponse {
490            status: None,
491            version,
492        }))
493    }
494
495    async fn create_subscription(
496        &self,
497        request: Request<CreateSubscriptionRequest>,
498    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
499        self.env.idle_manager().record_activity();
500
501        let req = request.into_inner();
502
503        let subscription = req.get_subscription()?.clone();
504        let command = DdlCommand::CreateSubscription(subscription);
505
506        let version = self.ddl_controller.run_command(command).await?;
507
508        Ok(Response::new(CreateSubscriptionResponse {
509            status: None,
510            version,
511        }))
512    }
513
514    async fn drop_subscription(
515        &self,
516        request: Request<DropSubscriptionRequest>,
517    ) -> Result<Response<DropSubscriptionResponse>, Status> {
518        let request = request.into_inner();
519        let subscription_id = request.subscription_id;
520        let drop_mode = DropMode::from_request_setting(request.cascade);
521
522        let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
523
524        let version = self.ddl_controller.run_command(command).await?;
525
526        Ok(Response::new(DropSubscriptionResponse {
527            status: None,
528            version,
529        }))
530    }
531
532    async fn create_materialized_view(
533        &self,
534        request: Request<CreateMaterializedViewRequest>,
535    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
536        self.env.idle_manager().record_activity();
537
538        let req = request.into_inner();
539        let mview = req.get_materialized_view()?.clone();
540        let fragment_graph = req.get_fragment_graph()?.clone();
541        let dependencies = req.get_dependencies().iter().copied().collect();
542        let resource_type = req.resource_type.unwrap().resource_type.unwrap();
543
544        let stream_job = StreamingJob::MaterializedView(mview);
545        let version = self
546            .ddl_controller
547            .run_command(DdlCommand::CreateStreamingJob {
548                stream_job,
549                fragment_graph,
550                dependencies,
551                resource_type,
552                if_not_exists: req.if_not_exists,
553            })
554            .await?;
555
556        Ok(Response::new(CreateMaterializedViewResponse {
557            status: None,
558            version,
559        }))
560    }
561
562    async fn drop_materialized_view(
563        &self,
564        request: Request<DropMaterializedViewRequest>,
565    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
566        self.env.idle_manager().record_activity();
567
568        let request = request.into_inner();
569        let table_id = request.table_id;
570        let drop_mode = DropMode::from_request_setting(request.cascade);
571
572        let version = self
573            .ddl_controller
574            .run_command(DdlCommand::DropStreamingJob {
575                job_id: StreamingJobId::MaterializedView(table_id),
576                drop_mode,
577            })
578            .await?;
579
580        Ok(Response::new(DropMaterializedViewResponse {
581            status: None,
582            version,
583        }))
584    }
585
586    async fn create_index(
587        &self,
588        request: Request<CreateIndexRequest>,
589    ) -> Result<Response<CreateIndexResponse>, Status> {
590        self.env.idle_manager().record_activity();
591
592        let req = request.into_inner();
593        let index = req.get_index()?.clone();
594        let index_table = req.get_index_table()?.clone();
595        let fragment_graph = req.get_fragment_graph()?.clone();
596
597        let stream_job = StreamingJob::Index(index, index_table);
598        let version = self
599            .ddl_controller
600            .run_command(DdlCommand::CreateStreamingJob {
601                stream_job,
602                fragment_graph,
603                dependencies: HashSet::new(),
604                resource_type: Self::default_streaming_job_resource_type(),
605                if_not_exists: req.if_not_exists,
606            })
607            .await?;
608
609        Ok(Response::new(CreateIndexResponse {
610            status: None,
611            version,
612        }))
613    }
614
615    async fn drop_index(
616        &self,
617        request: Request<DropIndexRequest>,
618    ) -> Result<Response<DropIndexResponse>, Status> {
619        self.env.idle_manager().record_activity();
620
621        let request = request.into_inner();
622        let index_id = request.index_id;
623        let drop_mode = DropMode::from_request_setting(request.cascade);
624        let version = self
625            .ddl_controller
626            .run_command(DdlCommand::DropStreamingJob {
627                job_id: StreamingJobId::Index(index_id),
628                drop_mode,
629            })
630            .await?;
631
632        Ok(Response::new(DropIndexResponse {
633            status: None,
634            version,
635        }))
636    }
637
638    async fn create_function(
639        &self,
640        request: Request<CreateFunctionRequest>,
641    ) -> Result<Response<CreateFunctionResponse>, Status> {
642        let req = request.into_inner();
643        let function = req.get_function()?.clone();
644
645        let version = self
646            .ddl_controller
647            .run_command(DdlCommand::CreateFunction(function))
648            .await?;
649
650        Ok(Response::new(CreateFunctionResponse {
651            status: None,
652            version,
653        }))
654    }
655
656    async fn drop_function(
657        &self,
658        request: Request<DropFunctionRequest>,
659    ) -> Result<Response<DropFunctionResponse>, Status> {
660        let request = request.into_inner();
661
662        let version = self
663            .ddl_controller
664            .run_command(DdlCommand::DropFunction(
665                request.function_id,
666                DropMode::from_request_setting(request.cascade),
667            ))
668            .await?;
669
670        Ok(Response::new(DropFunctionResponse {
671            status: None,
672            version,
673        }))
674    }
675
676    async fn create_table(
677        &self,
678        request: Request<CreateTableRequest>,
679    ) -> Result<Response<CreateTableResponse>, Status> {
680        let request = request.into_inner();
681        let job_type = request.get_job_type().unwrap_or_default();
682        let dependencies = request.get_dependencies().iter().copied().collect();
683        let source = request.source;
684        let mview = request.materialized_view.unwrap();
685        let fragment_graph = request.fragment_graph.unwrap();
686
687        let stream_job = StreamingJob::Table(source, mview, job_type);
688        let version = self
689            .ddl_controller
690            .run_command(DdlCommand::CreateStreamingJob {
691                stream_job,
692                fragment_graph,
693                dependencies,
694                resource_type: Self::default_streaming_job_resource_type(),
695                if_not_exists: request.if_not_exists,
696            })
697            .await?;
698
699        Ok(Response::new(CreateTableResponse {
700            status: None,
701            version,
702        }))
703    }
704
705    async fn drop_table(
706        &self,
707        request: Request<DropTableRequest>,
708    ) -> Result<Response<DropTableResponse>, Status> {
709        let request = request.into_inner();
710        let source_id = request.source_id;
711        let table_id = request.table_id;
712
713        let drop_mode = DropMode::from_request_setting(request.cascade);
714        let version = self
715            .ddl_controller
716            .run_command(DdlCommand::DropStreamingJob {
717                job_id: StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id), table_id),
718                drop_mode,
719            })
720            .await?;
721
722        Ok(Response::new(DropTableResponse {
723            status: None,
724            version,
725        }))
726    }
727
728    async fn create_view(
729        &self,
730        request: Request<CreateViewRequest>,
731    ) -> Result<Response<CreateViewResponse>, Status> {
732        let req = request.into_inner();
733        let view = req.get_view()?.clone();
734        let dependencies = req
735            .get_dependencies()
736            .iter()
737            .copied()
738            .collect::<HashSet<_>>();
739
740        let version = self
741            .ddl_controller
742            .run_command(DdlCommand::CreateView(view, dependencies))
743            .await?;
744
745        Ok(Response::new(CreateViewResponse {
746            status: None,
747            version,
748        }))
749    }
750
751    async fn drop_view(
752        &self,
753        request: Request<DropViewRequest>,
754    ) -> Result<Response<DropViewResponse>, Status> {
755        let request = request.into_inner();
756        let view_id = request.get_view_id();
757        let drop_mode = DropMode::from_request_setting(request.cascade);
758        let version = self
759            .ddl_controller
760            .run_command(DdlCommand::DropView(view_id, drop_mode))
761            .await?;
762        Ok(Response::new(DropViewResponse {
763            status: None,
764            version,
765        }))
766    }
767
768    async fn risectl_list_state_tables(
769        &self,
770        _request: Request<RisectlListStateTablesRequest>,
771    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
772        let tables = self
773            .metadata_manager
774            .catalog_controller
775            .list_all_state_tables()
776            .await?;
777        Ok(Response::new(RisectlListStateTablesResponse { tables }))
778    }
779
780    async fn risectl_resume_backfill(
781        &self,
782        request: Request<RisectlResumeBackfillRequest>,
783    ) -> Result<Response<RisectlResumeBackfillResponse>, Status> {
784        let request = request.into_inner();
785        let target = request
786            .target
787            .ok_or_else(|| Status::invalid_argument("missing resume backfill target"))?;
788
789        match target {
790            risectl_resume_backfill_request::Target::JobId(job_id) => {
791                let database_id = self
792                    .metadata_manager
793                    .catalog_controller
794                    .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
795                    .await?;
796                self.barrier_scheduler
797                    .run_command(
798                        database_id,
799                        Command::ResumeBackfill {
800                            target: ResumeBackfillTarget::Job(job_id),
801                        },
802                    )
803                    .await?;
804            }
805            risectl_resume_backfill_request::Target::FragmentId(fragment_id) => {
806                let mut job_ids = self
807                    .metadata_manager
808                    .catalog_controller
809                    .get_fragment_job_id(vec![fragment_id])
810                    .await?;
811                let job_id = job_ids
812                    .pop()
813                    .ok_or_else(|| Status::invalid_argument("fragment not found"))?;
814                let database_id = self
815                    .metadata_manager
816                    .catalog_controller
817                    .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
818                    .await?;
819                self.barrier_scheduler
820                    .run_command(
821                        database_id,
822                        Command::ResumeBackfill {
823                            target: ResumeBackfillTarget::Fragment(fragment_id),
824                        },
825                    )
826                    .await?;
827            }
828        }
829
830        Ok(Response::new(RisectlResumeBackfillResponse {}))
831    }
832
833    async fn replace_job_plan(
834        &self,
835        request: Request<ReplaceJobPlanRequest>,
836    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
837        let req = request.into_inner().get_plan().cloned()?;
838
839        let version = self
840            .ddl_controller
841            .run_command(DdlCommand::ReplaceStreamJob(
842                Self::extract_replace_table_info(req),
843            ))
844            .await?;
845
846        Ok(Response::new(ReplaceJobPlanResponse {
847            status: None,
848            version,
849        }))
850    }
851
852    async fn get_table(
853        &self,
854        request: Request<GetTableRequest>,
855    ) -> Result<Response<GetTableResponse>, Status> {
856        let req = request.into_inner();
857        let table = self
858            .metadata_manager
859            .catalog_controller
860            .get_table_by_name(&req.database_name, &req.table_name)
861            .await?;
862
863        Ok(Response::new(GetTableResponse { table }))
864    }
865
866    async fn alter_name(
867        &self,
868        request: Request<AlterNameRequest>,
869    ) -> Result<Response<AlterNameResponse>, Status> {
870        let AlterNameRequest { object, new_name } = request.into_inner();
871        let version = self
872            .ddl_controller
873            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
874            .await?;
875        Ok(Response::new(AlterNameResponse {
876            status: None,
877            version,
878        }))
879    }
880
881    /// Only support add column for now.
882    async fn alter_source(
883        &self,
884        request: Request<AlterSourceRequest>,
885    ) -> Result<Response<AlterSourceResponse>, Status> {
886        let AlterSourceRequest { source } = request.into_inner();
887        let version = self
888            .ddl_controller
889            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
890            .await?;
891        Ok(Response::new(AlterSourceResponse {
892            status: None,
893            version,
894        }))
895    }
896
897    async fn alter_owner(
898        &self,
899        request: Request<AlterOwnerRequest>,
900    ) -> Result<Response<AlterOwnerResponse>, Status> {
901        let AlterOwnerRequest { object, owner_id } = request.into_inner();
902        let version = self
903            .ddl_controller
904            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
905            .await?;
906        Ok(Response::new(AlterOwnerResponse {
907            status: None,
908            version,
909        }))
910    }
911
912    async fn alter_subscription_retention(
913        &self,
914        request: Request<AlterSubscriptionRetentionRequest>,
915    ) -> Result<Response<AlterSubscriptionRetentionResponse>, Status> {
916        let AlterSubscriptionRetentionRequest {
917            subscription_id,
918            retention_seconds,
919            definition,
920        } = request.into_inner();
921        let version = self
922            .ddl_controller
923            .run_command(DdlCommand::AlterSubscriptionRetention {
924                subscription_id,
925                retention_seconds,
926                definition,
927            })
928            .await?;
929        Ok(Response::new(AlterSubscriptionRetentionResponse {
930            status: None,
931            version,
932        }))
933    }
934
935    async fn alter_set_schema(
936        &self,
937        request: Request<AlterSetSchemaRequest>,
938    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
939        let AlterSetSchemaRequest {
940            object,
941            new_schema_id,
942        } = request.into_inner();
943        let version = self
944            .ddl_controller
945            .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
946            .await?;
947        Ok(Response::new(AlterSetSchemaResponse {
948            status: None,
949            version,
950        }))
951    }
952
953    async fn get_ddl_progress(
954        &self,
955        _request: Request<GetDdlProgressRequest>,
956    ) -> Result<Response<GetDdlProgressResponse>, Status> {
957        Ok(Response::new(GetDdlProgressResponse {
958            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
959        }))
960    }
961
962    async fn create_connection(
963        &self,
964        request: Request<CreateConnectionRequest>,
965    ) -> Result<Response<CreateConnectionResponse>, Status> {
966        let req = request.into_inner();
967        if req.payload.is_none() {
968            return Err(Status::invalid_argument("request is empty"));
969        }
970
971        match req.payload.unwrap() {
972            #[expect(deprecated)]
973            create_connection_request::Payload::PrivateLink(_) => {
974                panic!("Private Link Connection has been deprecated")
975            }
976            create_connection_request::Payload::ConnectionParams(params) => {
977                let pb_connection = Connection {
978                    id: 0.into(),
979                    schema_id: req.schema_id,
980                    database_id: req.database_id,
981                    name: req.name,
982                    info: Some(ConnectionInfo::ConnectionParams(params)),
983                    owner: req.owner_id,
984                };
985                let version = self
986                    .ddl_controller
987                    .run_command(DdlCommand::CreateConnection(pb_connection))
988                    .await?;
989                Ok(Response::new(CreateConnectionResponse { version }))
990            }
991        }
992    }
993
994    async fn list_connections(
995        &self,
996        _request: Request<ListConnectionsRequest>,
997    ) -> Result<Response<ListConnectionsResponse>, Status> {
998        let conns = self
999            .metadata_manager
1000            .catalog_controller
1001            .list_connections()
1002            .await?;
1003
1004        Ok(Response::new(ListConnectionsResponse {
1005            connections: conns,
1006        }))
1007    }
1008
1009    async fn drop_connection(
1010        &self,
1011        request: Request<DropConnectionRequest>,
1012    ) -> Result<Response<DropConnectionResponse>, Status> {
1013        let req = request.into_inner();
1014        let drop_mode = DropMode::from_request_setting(req.cascade);
1015
1016        let version = self
1017            .ddl_controller
1018            .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
1019            .await?;
1020
1021        Ok(Response::new(DropConnectionResponse {
1022            status: None,
1023            version,
1024        }))
1025    }
1026
1027    async fn comment_on(
1028        &self,
1029        request: Request<CommentOnRequest>,
1030    ) -> Result<Response<CommentOnResponse>, Status> {
1031        let req = request.into_inner();
1032        let comment = req.get_comment()?.clone();
1033
1034        let version = self
1035            .ddl_controller
1036            .run_command(DdlCommand::CommentOn(Comment {
1037                table_id: comment.table_id,
1038                schema_id: comment.schema_id,
1039                database_id: comment.database_id,
1040                column_index: comment.column_index,
1041                description: comment.description,
1042            }))
1043            .await?;
1044
1045        Ok(Response::new(CommentOnResponse {
1046            status: None,
1047            version,
1048        }))
1049    }
1050
1051    async fn get_tables(
1052        &self,
1053        request: Request<GetTablesRequest>,
1054    ) -> Result<Response<GetTablesResponse>, Status> {
1055        let GetTablesRequest {
1056            table_ids,
1057            include_dropped_tables,
1058        } = request.into_inner();
1059        let ret = self
1060            .metadata_manager
1061            .catalog_controller
1062            .get_table_by_ids(table_ids, include_dropped_tables)
1063            .await?;
1064
1065        let mut tables = HashMap::default();
1066        for table in ret {
1067            tables.insert(table.id, table);
1068        }
1069        Ok(Response::new(GetTablesResponse { tables }))
1070    }
1071
1072    async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
1073        let version = self.ddl_controller.wait().await?;
1074        Ok(Response::new(WaitResponse {
1075            version: Some(version),
1076        }))
1077    }
1078
1079    async fn alter_cdc_table_backfill_parallelism(
1080        &self,
1081        request: Request<AlterCdcTableBackfillParallelismRequest>,
1082    ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1083        let req = request.into_inner();
1084        let job_id = req.get_table_id();
1085        let parallelism = *req.get_parallelism()?;
1086
1087        let table_parallelism = ModelTableParallelism::from(parallelism);
1088        let streaming_parallelism = match table_parallelism {
1089            ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1090            _ => bail_invalid_parameter!(
1091                "CDC table backfill parallelism must be set to a fixed value"
1092            ),
1093        };
1094
1095        self.ddl_controller
1096            .reschedule_cdc_table_backfill(
1097                job_id,
1098                ReschedulePolicy::Parallelism(ParallelismPolicy {
1099                    parallelism: streaming_parallelism,
1100                }),
1101            )
1102            .await?;
1103        Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1104    }
1105
1106    async fn alter_parallelism(
1107        &self,
1108        request: Request<AlterParallelismRequest>,
1109    ) -> Result<Response<AlterParallelismResponse>, Status> {
1110        let req = request.into_inner();
1111
1112        let job_id = req.get_table_id();
1113        let parallelism = *req.get_parallelism()?;
1114        let deferred = req.get_deferred();
1115
1116        let parallelism = match parallelism.get_parallelism()? {
1117            Parallelism::Fixed(FixedParallelism { parallelism }) => {
1118                StreamingParallelism::Fixed(*parallelism as _)
1119            }
1120            Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1121            _ => bail_unavailable!(),
1122        };
1123
1124        self.ddl_controller
1125            .reschedule_streaming_job(
1126                job_id,
1127                ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1128                deferred,
1129            )
1130            .await?;
1131
1132        Ok(Response::new(AlterParallelismResponse {}))
1133    }
1134
1135    async fn alter_backfill_parallelism(
1136        &self,
1137        request: Request<AlterBackfillParallelismRequest>,
1138    ) -> Result<Response<AlterBackfillParallelismResponse>, Status> {
1139        let req = request.into_inner();
1140
1141        let job_id = req.get_table_id();
1142        let deferred = req.get_deferred();
1143
1144        let parallelism = match req.parallelism {
1145            None => None,
1146            Some(parallelism) => {
1147                let parallelism = match parallelism.get_parallelism()? {
1148                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1149                        StreamingParallelism::Fixed(*parallelism as _)
1150                    }
1151                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1152                        StreamingParallelism::Adaptive
1153                    }
1154                    _ => bail_unavailable!(),
1155                };
1156                Some(parallelism)
1157            }
1158        };
1159
1160        self.ddl_controller
1161            .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
1162            .await?;
1163
1164        Ok(Response::new(AlterBackfillParallelismResponse {}))
1165    }
1166
1167    async fn alter_fragment_parallelism(
1168        &self,
1169        request: Request<AlterFragmentParallelismRequest>,
1170    ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1171        let req = request.into_inner();
1172
1173        let fragment_ids = req.fragment_ids;
1174        if fragment_ids.is_empty() {
1175            return Err(Status::invalid_argument(
1176                "at least one fragment id must be provided",
1177            ));
1178        }
1179
1180        let parallelism = match req.parallelism {
1181            Some(parallelism) => {
1182                let streaming_parallelism = match parallelism.get_parallelism()? {
1183                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1184                        StreamingParallelism::Fixed(*parallelism as _)
1185                    }
1186                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1187                        StreamingParallelism::Adaptive
1188                    }
1189                    _ => bail_unavailable!(),
1190                };
1191                Some(streaming_parallelism)
1192            }
1193            None => None,
1194        };
1195
1196        let fragment_targets = fragment_ids
1197            .into_iter()
1198            .map(|fragment_id| (fragment_id, parallelism.clone()))
1199            .collect();
1200
1201        self.ddl_controller
1202            .reschedule_fragments(fragment_targets)
1203            .await?;
1204
1205        Ok(Response::new(AlterFragmentParallelismResponse {}))
1206    }
1207
1208    async fn alter_streaming_job_config(
1209        &self,
1210        request: Request<AlterStreamingJobConfigRequest>,
1211    ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1212        let AlterStreamingJobConfigRequest {
1213            job_id,
1214            entries_to_add,
1215            keys_to_remove,
1216        } = request.into_inner();
1217
1218        self.ddl_controller
1219            .run_command(DdlCommand::AlterStreamingJobConfig(
1220                job_id,
1221                entries_to_add,
1222                keys_to_remove,
1223            ))
1224            .await?;
1225
1226        Ok(Response::new(AlterStreamingJobConfigResponse {}))
1227    }
1228
1229    /// Auto schema change for cdc sources,
1230    /// called by the source parser when a schema change is detected.
1231    async fn auto_schema_change(
1232        &self,
1233        request: Request<AutoSchemaChangeRequest>,
1234    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1235        let req = request.into_inner();
1236
1237        // randomly select a frontend worker to get the replace table plan
1238        let workers = self
1239            .metadata_manager
1240            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1241            .await?;
1242        let worker = workers
1243            .choose(&mut thread_rng())
1244            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1245
1246        let client = self
1247            .env
1248            .frontend_client_pool()
1249            .get(worker)
1250            .await
1251            .map_err(MetaError::from)?;
1252
1253        let Some(schema_change) = req.schema_change else {
1254            return Err(Status::invalid_argument(
1255                "schema change message is required",
1256            ));
1257        };
1258
1259        for table_change in schema_change.table_changes {
1260            for c in &table_change.columns {
1261                let c = ColumnCatalog::from(c.clone());
1262
1263                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1264                    tracing::warn!(target: "auto_schema_change",
1265                      cdc_table_id = table_change.cdc_table_id,
1266                    upstraem_ddl = table_change.upstream_ddl,
1267                        "invalid column type from cdc table change");
1268                    Err(Status::invalid_argument(format!(
1269                        "invalid column type: {} from cdc table change, column: {:?}",
1270                        column_type, c
1271                    )))
1272                };
1273                if c.is_generated() {
1274                    return invalid_col_type("generated column", &c);
1275                }
1276                if c.is_rw_sys_column() {
1277                    return invalid_col_type("rw system column", &c);
1278                }
1279                if c.is_hidden {
1280                    return invalid_col_type("hidden column", &c);
1281                }
1282            }
1283
1284            // get the table catalog corresponding to the cdc table
1285            let tables: Vec<Table> = self
1286                .metadata_manager
1287                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1288                .await?;
1289
1290            for table in tables {
1291                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
1292                // is a subset of the other.
1293                let original_columns: HashSet<(String, DataType)> =
1294                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
1295                        let col = ColumnCatalog::from(col.clone());
1296                        if col.is_generated() || col.is_hidden() {
1297                            None
1298                        } else {
1299                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1300                        }
1301                    }));
1302
1303                let mut new_columns: HashSet<(String, DataType)> =
1304                    HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1305                        let col = ColumnCatalog::from(col.clone());
1306                        if col.is_generated() || col.is_hidden() {
1307                            None
1308                        } else {
1309                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1310                        }
1311                    }));
1312
1313                // For subset/superset check, we need to add visible connector additional columns defined by INCLUDE in the original table to new_columns
1314                // This includes both _rw columns and user-defined INCLUDE columns (e.g., INCLUDE TIMESTAMP AS xxx)
1315                for col in &table.columns {
1316                    let col = ColumnCatalog::from(col.clone());
1317                    if col.is_connector_additional_column()
1318                        && !col.is_hidden()
1319                        && !col.is_generated()
1320                    {
1321                        new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1322                    }
1323                }
1324
1325                if !(original_columns.is_subset(&new_columns)
1326                    || original_columns.is_superset(&new_columns))
1327                {
1328                    tracing::warn!(target: "auto_schema_change",
1329                                    table_id = %table.id,
1330                                    cdc_table_id = table.cdc_table_id,
1331                                    upstraem_ddl = table_change.upstream_ddl,
1332                                    original_columns = ?original_columns,
1333                                    new_columns = ?new_columns,
1334                                    "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1335
1336                    let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1337                    add_auto_schema_change_fail_event_log(
1338                        &self.meta_metrics,
1339                        table.id,
1340                        table.name.clone(),
1341                        table_change.cdc_table_id.clone(),
1342                        table_change.upstream_ddl.clone(),
1343                        &self.env.event_log_manager_ref(),
1344                        fail_info,
1345                    );
1346
1347                    return Err(Status::invalid_argument(
1348                        "New columns should be a subset or superset of the original columns (including hidden columns)",
1349                    ));
1350                }
1351                // skip the schema change if there is no change to original columns
1352                if original_columns == new_columns {
1353                    tracing::warn!(target: "auto_schema_change",
1354                                   table_id = %table.id,
1355                                   cdc_table_id = table.cdc_table_id,
1356                                   upstraem_ddl = table_change.upstream_ddl,
1357                                    original_columns = ?original_columns,
1358                                    new_columns = ?new_columns,
1359                                   "No change to columns, skipping the schema change");
1360                    continue;
1361                }
1362
1363                let latency_timer = self
1364                    .meta_metrics
1365                    .auto_schema_change_latency
1366                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1367                    .start_timer();
1368                // send a request to the frontend to get the ReplaceJobPlan
1369                // will retry with exponential backoff if the request fails
1370                let resp = client
1371                    .get_table_replace_plan(GetTableReplacePlanRequest {
1372                        database_id: table.database_id,
1373                        table_id: table.id,
1374                        cdc_table_change: Some(table_change.clone()),
1375                    })
1376                    .await;
1377
1378                match resp {
1379                    Ok(resp) => {
1380                        let resp = resp.into_inner();
1381                        if let Some(plan) = resp.replace_plan {
1382                            let plan = Self::extract_replace_table_info(plan);
1383                            plan.streaming_job.table().inspect(|t| {
1384                                tracing::info!(
1385                                    target: "auto_schema_change",
1386                                    table_id = %t.id,
1387                                    cdc_table_id = t.cdc_table_id,
1388                                    upstraem_ddl = table_change.upstream_ddl,
1389                                    "Start the replace config change")
1390                            });
1391                            // start the schema change procedure
1392                            let replace_res = self
1393                                .ddl_controller
1394                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1395                                .await;
1396
1397                            match replace_res {
1398                                Ok(_) => {
1399                                    tracing::info!(
1400                                        target: "auto_schema_change",
1401                                        table_id = %table.id,
1402                                        cdc_table_id = table.cdc_table_id,
1403                                        "Table replaced success");
1404
1405                                    self.meta_metrics
1406                                        .auto_schema_change_success_cnt
1407                                        .with_guarded_label_values(&[
1408                                            &table.id.to_string(),
1409                                            &table.name,
1410                                        ])
1411                                        .inc();
1412                                    latency_timer.observe_duration();
1413                                }
1414                                Err(e) => {
1415                                    tracing::error!(
1416                                        target: "auto_schema_change",
1417                                        error = %e.as_report(),
1418                                        table_id = %table.id,
1419                                        cdc_table_id = table.cdc_table_id,
1420                                        upstraem_ddl = table_change.upstream_ddl,
1421                                        "failed to replace the table",
1422                                    );
1423                                    let fail_info =
1424                                        format!("failed to replace the table: {}", e.as_report());
1425                                    add_auto_schema_change_fail_event_log(
1426                                        &self.meta_metrics,
1427                                        table.id,
1428                                        table.name.clone(),
1429                                        table_change.cdc_table_id.clone(),
1430                                        table_change.upstream_ddl.clone(),
1431                                        &self.env.event_log_manager_ref(),
1432                                        fail_info,
1433                                    );
1434                                }
1435                            };
1436                        }
1437                    }
1438                    Err(e) => {
1439                        tracing::error!(
1440                            target: "auto_schema_change",
1441                            error = %e.as_report(),
1442                            table_id = %table.id,
1443                            cdc_table_id = table.cdc_table_id,
1444                            "failed to get replace table plan",
1445                        );
1446                        let fail_info =
1447                            format!("failed to get replace table plan: {}", e.as_report());
1448                        add_auto_schema_change_fail_event_log(
1449                            &self.meta_metrics,
1450                            table.id,
1451                            table.name.clone(),
1452                            table_change.cdc_table_id.clone(),
1453                            table_change.upstream_ddl.clone(),
1454                            &self.env.event_log_manager_ref(),
1455                            fail_info,
1456                        );
1457                    }
1458                };
1459            }
1460        }
1461
1462        Ok(Response::new(AutoSchemaChangeResponse {}))
1463    }
1464
1465    async fn alter_swap_rename(
1466        &self,
1467        request: Request<AlterSwapRenameRequest>,
1468    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1469        let req = request.into_inner();
1470
1471        let version = self
1472            .ddl_controller
1473            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1474            .await?;
1475
1476        Ok(Response::new(AlterSwapRenameResponse {
1477            status: None,
1478            version,
1479        }))
1480    }
1481
1482    async fn alter_resource_group(
1483        &self,
1484        request: Request<AlterResourceGroupRequest>,
1485    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1486        let req = request.into_inner();
1487
1488        let table_id = req.get_table_id();
1489        let deferred = req.get_deferred();
1490        let resource_group = req.resource_group;
1491
1492        self.ddl_controller
1493            .reschedule_streaming_job(
1494                table_id.as_job_id(),
1495                ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1496                deferred,
1497            )
1498            .await?;
1499
1500        Ok(Response::new(AlterResourceGroupResponse {}))
1501    }
1502
1503    async fn alter_database_param(
1504        &self,
1505        request: Request<AlterDatabaseParamRequest>,
1506    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1507        let req = request.into_inner();
1508        let database_id = req.database_id;
1509
1510        let param = match req.param.unwrap() {
1511            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1512                AlterDatabaseParam::BarrierIntervalMs(value.value)
1513            }
1514            alter_database_param_request::Param::CheckpointFrequency(value) => {
1515                AlterDatabaseParam::CheckpointFrequency(value.value)
1516            }
1517        };
1518        let version = self
1519            .ddl_controller
1520            .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1521            .await?;
1522
1523        return Ok(Response::new(AlterDatabaseParamResponse {
1524            status: None,
1525            version,
1526        }));
1527    }
1528
1529    async fn compact_iceberg_table(
1530        &self,
1531        request: Request<CompactIcebergTableRequest>,
1532    ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1533        let req = request.into_inner();
1534        let sink_id = req.sink_id;
1535
1536        // Trigger manual compaction directly using the sink ID
1537        let task_id = self
1538            .iceberg_compaction_manager
1539            .trigger_manual_compaction(sink_id)
1540            .await
1541            .map_err(|e| {
1542                Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1543            })?;
1544
1545        Ok(Response::new(CompactIcebergTableResponse {
1546            status: None,
1547            task_id,
1548        }))
1549    }
1550
1551    async fn expire_iceberg_table_snapshots(
1552        &self,
1553        request: Request<ExpireIcebergTableSnapshotsRequest>,
1554    ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1555        let req = request.into_inner();
1556        let sink_id = req.sink_id;
1557
1558        // Trigger manual snapshot expiration directly using the sink ID
1559        self.iceberg_compaction_manager
1560            .check_and_expire_snapshots(sink_id)
1561            .await
1562            .map_err(|e| {
1563                Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1564            })?;
1565
1566        Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1567            status: None,
1568        }))
1569    }
1570
1571    async fn create_iceberg_table(
1572        &self,
1573        request: Request<CreateIcebergTableRequest>,
1574    ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1575        let req = request.into_inner();
1576        let CreateIcebergTableRequest {
1577            table_info,
1578            sink_info,
1579            iceberg_source,
1580            if_not_exists,
1581        } = req;
1582
1583        // 1. create table job
1584        let PbTableJobInfo {
1585            source,
1586            table,
1587            fragment_graph,
1588            job_type,
1589        } = table_info.unwrap();
1590        let mut table = table.unwrap();
1591        let mut fragment_graph = fragment_graph.unwrap();
1592        let database_id = table.get_database_id();
1593        let schema_id = table.get_schema_id();
1594        let table_name = table.get_name().to_owned();
1595
1596        // Mark table as background creation, so that it won't block sink creation.
1597        table.create_type = PbCreateType::Background as _;
1598
1599        // Set the source rate limit to 0 and reset it back after the iceberg sink is backfilling.
1600        let source_rate_limit = if let Some(source) = &source {
1601            for fragment in fragment_graph.fragments.values_mut() {
1602                stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1603                    if let NodeBody::Source(source_node) = node
1604                        && let Some(inner) = &mut source_node.source_inner
1605                    {
1606                        inner.rate_limit = Some(0);
1607                    }
1608                });
1609            }
1610            Some(source.rate_limit)
1611        } else {
1612            None
1613        };
1614
1615        let stream_job =
1616            StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1617        let _ = self
1618            .ddl_controller
1619            .run_command(DdlCommand::CreateStreamingJob {
1620                stream_job,
1621                fragment_graph,
1622                dependencies: HashSet::new(),
1623                resource_type: Self::default_streaming_job_resource_type(),
1624                if_not_exists,
1625            })
1626            .await?;
1627
1628        let table_catalog = self
1629            .metadata_manager
1630            .catalog_controller
1631            .get_table_catalog_by_name(database_id, schema_id, &table_name)
1632            .await?
1633            .ok_or(Status::not_found("Internal error: table not found"))?;
1634
1635        // 2. create iceberg sink job
1636        let PbSinkJobInfo {
1637            sink,
1638            fragment_graph,
1639        } = sink_info.unwrap();
1640        let mut sink = sink.unwrap();
1641
1642        // Mark sink as background creation, so that it won't block source creation.
1643        sink.create_type = PbCreateType::Background as _;
1644
1645        let mut fragment_graph = fragment_graph.unwrap();
1646
1647        assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1648        assert!(
1649            risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1650                .is_placeholder()
1651        );
1652        fragment_graph.dependent_table_ids[0] = table_catalog.id;
1653        for fragment in fragment_graph.fragments.values_mut() {
1654            stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1655                NodeBody::StreamScan(scan) => {
1656                    scan.table_id = table_catalog.id;
1657                    if let Some(table_desc) = &mut scan.table_desc {
1658                        assert!(
1659                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1660                                .is_placeholder()
1661                        );
1662                        table_desc.table_id = table_catalog.id;
1663                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1664                    }
1665                    if let Some(table) = &mut scan.arrangement_table {
1666                        assert!(
1667                            risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1668                        );
1669                        *table = table_catalog.clone();
1670                    }
1671                }
1672                NodeBody::BatchPlan(plan) => {
1673                    if let Some(table_desc) = &mut plan.table_desc {
1674                        assert!(
1675                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1676                                .is_placeholder()
1677                        );
1678                        table_desc.table_id = table_catalog.id;
1679                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1680                    }
1681                }
1682                _ => {}
1683            });
1684        }
1685
1686        let table_id = table_catalog.id;
1687        let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1688        let stream_job = StreamingJob::Sink(sink);
1689        let res = self
1690            .ddl_controller
1691            .run_command(DdlCommand::CreateStreamingJob {
1692                stream_job,
1693                fragment_graph,
1694                dependencies,
1695                resource_type: Self::default_streaming_job_resource_type(),
1696                if_not_exists,
1697            })
1698            .await;
1699
1700        if res.is_err() {
1701            let _ = self
1702                .ddl_controller
1703                .run_command(DdlCommand::DropStreamingJob {
1704                    job_id: StreamingJobId::Table(None, table_id),
1705                    drop_mode: DropMode::Cascade,
1706                })
1707                .await
1708                .inspect_err(|err| {
1709                    tracing::error!(error = %err.as_report(),
1710                        "Failed to clean up table after iceberg sink creation failure",
1711                    );
1712                });
1713            res?;
1714        }
1715
1716        // 3. reset source rate limit back to normal after sink creation
1717        if let Some(source_rate_limit) = source_rate_limit
1718            && source_rate_limit != Some(0)
1719        {
1720            let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1721                table_catalog.optional_associated_source_id.unwrap();
1722            let (jobs, fragments) = self
1723                .metadata_manager
1724                .update_source_rate_limit_by_source_id(source_id, source_rate_limit)
1725                .await?;
1726            let throttle_config = ThrottleConfig {
1727                throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1728                rate_limit: source_rate_limit,
1729            };
1730            let _ = self
1731                .barrier_scheduler
1732                .run_command(
1733                    database_id,
1734                    Command::Throttle {
1735                        jobs,
1736                        config: fragments
1737                            .into_iter()
1738                            .map(|fragment_id| (fragment_id, throttle_config))
1739                            .collect(),
1740                    },
1741                )
1742                .await?;
1743        }
1744
1745        // 4. create iceberg source
1746        let iceberg_source = iceberg_source.unwrap();
1747        let res = self
1748            .ddl_controller
1749            .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1750            .await;
1751        if res.is_err() {
1752            let _ = self
1753                .ddl_controller
1754                .run_command(DdlCommand::DropStreamingJob {
1755                    job_id: StreamingJobId::Table(None, table_id),
1756                    drop_mode: DropMode::Cascade,
1757                })
1758                .await
1759                .inspect_err(|err| {
1760                    tracing::error!(
1761                        error = %err.as_report(),
1762                        "Failed to clean up table after iceberg source creation failure",
1763                    );
1764                });
1765        }
1766
1767        Ok(Response::new(CreateIcebergTableResponse {
1768            status: None,
1769            version: res?,
1770        }))
1771    }
1772}
1773
1774fn add_auto_schema_change_fail_event_log(
1775    meta_metrics: &Arc<MetaMetrics>,
1776    table_id: TableId,
1777    table_name: String,
1778    cdc_table_id: String,
1779    upstream_ddl: String,
1780    event_log_manager: &EventLogManagerRef,
1781    fail_info: String,
1782) {
1783    meta_metrics
1784        .auto_schema_change_failure_cnt
1785        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1786        .inc();
1787    let event = event_log::EventAutoSchemaChangeFail {
1788        table_id,
1789        table_name,
1790        cdc_table_id,
1791        upstream_ddl,
1792        fail_info,
1793    };
1794    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1795}