risingwave_meta_service/
ddl_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::pin::pin;
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use futures::future::select;
21use rand::rng as thread_rng;
22use rand::seq::IndexedRandom;
23use replace_job_plan::{ReplaceSource, ReplaceTable};
24use risingwave_common::catalog::{AlterDatabaseParam, ColumnCatalog};
25use risingwave_common::id::{ObjectId, TableId};
26use risingwave_common::types::DataType;
27use risingwave_common::util::stream_graph_visitor;
28use risingwave_connector::sink::catalog::SinkId;
29use risingwave_meta::barrier::{BarrierScheduler, Command, ResumeBackfillTarget};
30use risingwave_meta::manager::{EventLogManagerRef, MetadataManager, iceberg_compaction};
31use risingwave_meta::model::TableParallelism as ModelTableParallelism;
32use risingwave_meta::rpc::metrics::MetaMetrics;
33use risingwave_meta::stream::{ParallelismPolicy, ReschedulePolicy, ResourceGroupPolicy};
34use risingwave_meta::{MetaResult, bail_invalid_parameter, bail_unavailable};
35use risingwave_meta_model::StreamingParallelism;
36use risingwave_pb::catalog::connection::Info as ConnectionInfo;
37use risingwave_pb::catalog::table::OptionalAssociatedSourceId;
38use risingwave_pb::catalog::{Comment, Connection, PbCreateType, Secret, Table};
39use risingwave_pb::common::WorkerType;
40use risingwave_pb::common::worker_node::State;
41use risingwave_pb::ddl_service::create_iceberg_table_request::{PbSinkJobInfo, PbTableJobInfo};
42use risingwave_pb::ddl_service::ddl_service_server::DdlService;
43use risingwave_pb::ddl_service::drop_table_request::PbSourceId;
44use risingwave_pb::ddl_service::replace_job_plan::ReplaceMaterializedView;
45use risingwave_pb::ddl_service::{streaming_job_resource_type, *};
46use risingwave_pb::frontend_service::GetTableReplacePlanRequest;
47use risingwave_pb::meta::event_log;
48use risingwave_pb::meta::table_parallelism::{FixedParallelism, Parallelism};
49use risingwave_pb::stream_plan::stream_node::NodeBody;
50use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
51use thiserror_ext::AsReport;
52use tokio::sync::oneshot::Sender;
53use tokio::task::JoinHandle;
54use tonic::{Request, Response, Status};
55
56use crate::MetaError;
57use crate::barrier::BarrierManagerRef;
58use crate::manager::sink_coordination::SinkCoordinatorManager;
59use crate::manager::{MetaSrvEnv, StreamingJob};
60use crate::rpc::ddl_controller::{
61    DdlCommand, DdlController, DropMode, ReplaceStreamJobInfo, StreamingJobId,
62};
63use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
64
65#[derive(Clone)]
66pub struct DdlServiceImpl {
67    env: MetaSrvEnv,
68
69    metadata_manager: MetadataManager,
70    sink_manager: SinkCoordinatorManager,
71    ddl_controller: DdlController,
72    meta_metrics: Arc<MetaMetrics>,
73    iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
74    barrier_scheduler: BarrierScheduler,
75}
76
77impl DdlServiceImpl {
78    #[allow(clippy::too_many_arguments)]
79    pub async fn new(
80        env: MetaSrvEnv,
81        metadata_manager: MetadataManager,
82        stream_manager: GlobalStreamManagerRef,
83        source_manager: SourceManagerRef,
84        barrier_manager: BarrierManagerRef,
85        sink_manager: SinkCoordinatorManager,
86        meta_metrics: Arc<MetaMetrics>,
87        iceberg_compaction_manager: iceberg_compaction::IcebergCompactionManagerRef,
88        barrier_scheduler: BarrierScheduler,
89    ) -> Self {
90        let ddl_controller = DdlController::new(
91            env.clone(),
92            metadata_manager.clone(),
93            stream_manager,
94            source_manager,
95            barrier_manager,
96            sink_manager.clone(),
97            iceberg_compaction_manager.clone(),
98        )
99        .await;
100        Self {
101            env,
102            metadata_manager,
103            sink_manager,
104            ddl_controller,
105            meta_metrics,
106            iceberg_compaction_manager,
107            barrier_scheduler,
108        }
109    }
110
111    fn extract_replace_table_info(
112        ReplaceJobPlan {
113            fragment_graph,
114            replace_job,
115        }: ReplaceJobPlan,
116    ) -> ReplaceStreamJobInfo {
117        let replace_streaming_job: StreamingJob = match replace_job.unwrap() {
118            replace_job_plan::ReplaceJob::ReplaceTable(ReplaceTable {
119                table,
120                source,
121                job_type,
122            }) => StreamingJob::Table(
123                source,
124                table.unwrap(),
125                TableJobType::try_from(job_type).unwrap(),
126            ),
127            replace_job_plan::ReplaceJob::ReplaceSource(ReplaceSource { source }) => {
128                StreamingJob::Source(source.unwrap())
129            }
130            replace_job_plan::ReplaceJob::ReplaceMaterializedView(ReplaceMaterializedView {
131                table,
132            }) => StreamingJob::MaterializedView(table.unwrap()),
133        };
134
135        ReplaceStreamJobInfo {
136            streaming_job: replace_streaming_job,
137            fragment_graph: fragment_graph.unwrap(),
138        }
139    }
140
141    fn default_streaming_job_resource_type() -> streaming_job_resource_type::ResourceType {
142        streaming_job_resource_type::ResourceType::Regular(true)
143    }
144
145    pub fn start_migrate_table_fragments(&self) -> (JoinHandle<()>, Sender<()>) {
146        tracing::info!("start migrate legacy table fragments task");
147        let env = self.env.clone();
148        let metadata_manager = self.metadata_manager.clone();
149        let ddl_controller = self.ddl_controller.clone();
150
151        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
152        let join_handle = tokio::spawn(async move {
153            async fn migrate_inner(
154                env: &MetaSrvEnv,
155                metadata_manager: &MetadataManager,
156                ddl_controller: &DdlController,
157            ) -> MetaResult<()> {
158                let tables = metadata_manager
159                    .catalog_controller
160                    .list_unmigrated_tables()
161                    .await?;
162
163                if tables.is_empty() {
164                    tracing::info!("no legacy table fragments need migration");
165                    return Ok(());
166                }
167
168                let client = {
169                    let workers = metadata_manager
170                        .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
171                        .await?;
172                    if workers.is_empty() {
173                        return Err(anyhow::anyhow!("no active frontend nodes found").into());
174                    }
175                    let worker = workers.choose(&mut thread_rng()).unwrap();
176                    env.frontend_client_pool().get(worker).await?
177                };
178
179                for table in tables {
180                    let start = tokio::time::Instant::now();
181                    let req = GetTableReplacePlanRequest {
182                        database_id: table.database_id,
183                        table_id: table.id,
184                        cdc_table_change: None,
185                    };
186                    let resp = client
187                        .get_table_replace_plan(req)
188                        .await
189                        .context("failed to get table replace plan from frontend")?;
190
191                    let plan = resp.into_inner().replace_plan.unwrap();
192                    let replace_info = DdlServiceImpl::extract_replace_table_info(plan);
193                    ddl_controller
194                        .run_command(DdlCommand::ReplaceStreamJob(replace_info))
195                        .await?;
196                    tracing::info!(elapsed=?start.elapsed(), table_id=%table.id, "migrated table fragments");
197                }
198                tracing::info!("successfully migrated all legacy table fragments");
199
200                Ok(())
201            }
202
203            let migrate_future = async move {
204                let mut attempt = 0;
205                loop {
206                    match migrate_inner(&env, &metadata_manager, &ddl_controller).await {
207                        Ok(_) => break,
208                        Err(e) => {
209                            attempt += 1;
210                            tracing::error!(
211                                "failed to migrate legacy table fragments: {}, attempt {}, retrying in 5 secs",
212                                e.as_report(),
213                                attempt
214                            );
215                            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
216                        }
217                    }
218                }
219            };
220
221            select(pin!(migrate_future), shutdown_rx).await;
222        });
223
224        (join_handle, shutdown_tx)
225    }
226}
227
228#[async_trait::async_trait]
229impl DdlService for DdlServiceImpl {
230    async fn create_database(
231        &self,
232        request: Request<CreateDatabaseRequest>,
233    ) -> Result<Response<CreateDatabaseResponse>, Status> {
234        let req = request.into_inner();
235        let database = req.get_db()?.clone();
236        let version = self
237            .ddl_controller
238            .run_command(DdlCommand::CreateDatabase(database))
239            .await?;
240
241        Ok(Response::new(CreateDatabaseResponse {
242            status: None,
243            version,
244        }))
245    }
246
247    async fn drop_database(
248        &self,
249        request: Request<DropDatabaseRequest>,
250    ) -> Result<Response<DropDatabaseResponse>, Status> {
251        let req = request.into_inner();
252        let database_id = req.get_database_id();
253
254        let version = self
255            .ddl_controller
256            .run_command(DdlCommand::DropDatabase(database_id))
257            .await?;
258
259        Ok(Response::new(DropDatabaseResponse {
260            status: None,
261            version,
262        }))
263    }
264
265    async fn create_secret(
266        &self,
267        request: Request<CreateSecretRequest>,
268    ) -> Result<Response<CreateSecretResponse>, Status> {
269        let req = request.into_inner();
270        let pb_secret = Secret {
271            id: 0.into(),
272            name: req.get_name().clone(),
273            database_id: req.get_database_id(),
274            value: req.get_value().clone(),
275            owner: req.get_owner_id(),
276            schema_id: req.get_schema_id(),
277        };
278        let version = self
279            .ddl_controller
280            .run_command(DdlCommand::CreateSecret(pb_secret))
281            .await?;
282
283        Ok(Response::new(CreateSecretResponse { version }))
284    }
285
286    async fn drop_secret(
287        &self,
288        request: Request<DropSecretRequest>,
289    ) -> Result<Response<DropSecretResponse>, Status> {
290        let req = request.into_inner();
291        let secret_id = req.get_secret_id();
292        let drop_mode = DropMode::from_request_setting(req.cascade);
293        let version = self
294            .ddl_controller
295            .run_command(DdlCommand::DropSecret(secret_id, drop_mode))
296            .await?;
297
298        Ok(Response::new(DropSecretResponse {
299            status: None,
300            version,
301        }))
302    }
303
304    async fn alter_secret(
305        &self,
306        request: Request<AlterSecretRequest>,
307    ) -> Result<Response<AlterSecretResponse>, Status> {
308        let req = request.into_inner();
309        let pb_secret = Secret {
310            id: req.get_secret_id(),
311            name: req.get_name().clone(),
312            database_id: req.get_database_id(),
313            value: req.get_value().clone(),
314            owner: req.get_owner_id(),
315            schema_id: req.get_schema_id(),
316        };
317        let version = self
318            .ddl_controller
319            .run_command(DdlCommand::AlterSecret(pb_secret))
320            .await?;
321
322        Ok(Response::new(AlterSecretResponse { version }))
323    }
324
325    async fn create_schema(
326        &self,
327        request: Request<CreateSchemaRequest>,
328    ) -> Result<Response<CreateSchemaResponse>, Status> {
329        let req = request.into_inner();
330        let schema = req.get_schema()?.clone();
331        let version = self
332            .ddl_controller
333            .run_command(DdlCommand::CreateSchema(schema))
334            .await?;
335
336        Ok(Response::new(CreateSchemaResponse {
337            status: None,
338            version,
339        }))
340    }
341
342    async fn drop_schema(
343        &self,
344        request: Request<DropSchemaRequest>,
345    ) -> Result<Response<DropSchemaResponse>, Status> {
346        let req = request.into_inner();
347        let schema_id = req.get_schema_id();
348        let drop_mode = DropMode::from_request_setting(req.cascade);
349        let version = self
350            .ddl_controller
351            .run_command(DdlCommand::DropSchema(schema_id, drop_mode))
352            .await?;
353        Ok(Response::new(DropSchemaResponse {
354            status: None,
355            version,
356        }))
357    }
358
359    async fn create_source(
360        &self,
361        request: Request<CreateSourceRequest>,
362    ) -> Result<Response<CreateSourceResponse>, Status> {
363        let req = request.into_inner();
364        let source = req.get_source()?.clone();
365
366        match req.fragment_graph {
367            None => {
368                let version = self
369                    .ddl_controller
370                    .run_command(DdlCommand::CreateNonSharedSource(source))
371                    .await?;
372                Ok(Response::new(CreateSourceResponse {
373                    status: None,
374                    version,
375                }))
376            }
377            Some(fragment_graph) => {
378                // The id of stream job has been set above
379                let stream_job = StreamingJob::Source(source);
380                let version = self
381                    .ddl_controller
382                    .run_command(DdlCommand::CreateStreamingJob {
383                        stream_job,
384                        fragment_graph,
385                        dependencies: HashSet::new(),
386                        resource_type: Self::default_streaming_job_resource_type(),
387                        if_not_exists: req.if_not_exists,
388                    })
389                    .await?;
390                Ok(Response::new(CreateSourceResponse {
391                    status: None,
392                    version,
393                }))
394            }
395        }
396    }
397
398    async fn drop_source(
399        &self,
400        request: Request<DropSourceRequest>,
401    ) -> Result<Response<DropSourceResponse>, Status> {
402        let request = request.into_inner();
403        let source_id = request.source_id;
404        let drop_mode = DropMode::from_request_setting(request.cascade);
405        let version = self
406            .ddl_controller
407            .run_command(DdlCommand::DropSource(source_id, drop_mode))
408            .await?;
409
410        Ok(Response::new(DropSourceResponse {
411            status: None,
412            version,
413        }))
414    }
415
416    async fn reset_source(
417        &self,
418        request: Request<ResetSourceRequest>,
419    ) -> Result<Response<ResetSourceResponse>, Status> {
420        let request = request.into_inner();
421        let source_id = request.source_id;
422
423        tracing::info!(
424            source_id = %source_id,
425            "Received RESET SOURCE request, routing to DDL controller"
426        );
427
428        // Route to DDL controller
429        let version = self
430            .ddl_controller
431            .run_command(DdlCommand::ResetSource(source_id))
432            .await?;
433
434        Ok(Response::new(ResetSourceResponse {
435            status: None,
436            version,
437        }))
438    }
439
440    async fn create_sink(
441        &self,
442        request: Request<CreateSinkRequest>,
443    ) -> Result<Response<CreateSinkResponse>, Status> {
444        self.env.idle_manager().record_activity();
445
446        let req = request.into_inner();
447
448        let sink = req.get_sink()?.clone();
449        let fragment_graph = req.get_fragment_graph()?.clone();
450        let dependencies = req.get_dependencies().iter().copied().collect();
451
452        let stream_job = StreamingJob::Sink(sink);
453
454        let command = DdlCommand::CreateStreamingJob {
455            stream_job,
456            fragment_graph,
457            dependencies,
458            resource_type: Self::default_streaming_job_resource_type(),
459            if_not_exists: req.if_not_exists,
460        };
461
462        let version = self.ddl_controller.run_command(command).await?;
463
464        Ok(Response::new(CreateSinkResponse {
465            status: None,
466            version,
467        }))
468    }
469
470    async fn drop_sink(
471        &self,
472        request: Request<DropSinkRequest>,
473    ) -> Result<Response<DropSinkResponse>, Status> {
474        let request = request.into_inner();
475        let sink_id = request.sink_id;
476        let drop_mode = DropMode::from_request_setting(request.cascade);
477
478        let command = DdlCommand::DropStreamingJob {
479            job_id: StreamingJobId::Sink(sink_id),
480            drop_mode,
481        };
482
483        let version = self.ddl_controller.run_command(command).await?;
484
485        self.sink_manager
486            .stop_sink_coordinator(vec![SinkId::from(sink_id)])
487            .await;
488
489        Ok(Response::new(DropSinkResponse {
490            status: None,
491            version,
492        }))
493    }
494
495    async fn create_subscription(
496        &self,
497        request: Request<CreateSubscriptionRequest>,
498    ) -> Result<Response<CreateSubscriptionResponse>, Status> {
499        self.env.idle_manager().record_activity();
500
501        let req = request.into_inner();
502
503        let subscription = req.get_subscription()?.clone();
504        let command = DdlCommand::CreateSubscription(subscription);
505
506        let version = self.ddl_controller.run_command(command).await?;
507
508        Ok(Response::new(CreateSubscriptionResponse {
509            status: None,
510            version,
511        }))
512    }
513
514    async fn drop_subscription(
515        &self,
516        request: Request<DropSubscriptionRequest>,
517    ) -> Result<Response<DropSubscriptionResponse>, Status> {
518        let request = request.into_inner();
519        let subscription_id = request.subscription_id;
520        let drop_mode = DropMode::from_request_setting(request.cascade);
521
522        let command = DdlCommand::DropSubscription(subscription_id, drop_mode);
523
524        let version = self.ddl_controller.run_command(command).await?;
525
526        Ok(Response::new(DropSubscriptionResponse {
527            status: None,
528            version,
529        }))
530    }
531
532    async fn create_materialized_view(
533        &self,
534        request: Request<CreateMaterializedViewRequest>,
535    ) -> Result<Response<CreateMaterializedViewResponse>, Status> {
536        self.env.idle_manager().record_activity();
537
538        let req = request.into_inner();
539        let mview = req.get_materialized_view()?.clone();
540        let fragment_graph = req.get_fragment_graph()?.clone();
541        let dependencies = req.get_dependencies().iter().copied().collect();
542        let resource_type = req.resource_type.unwrap().resource_type.unwrap();
543
544        let stream_job = StreamingJob::MaterializedView(mview);
545        let version = self
546            .ddl_controller
547            .run_command(DdlCommand::CreateStreamingJob {
548                stream_job,
549                fragment_graph,
550                dependencies,
551                resource_type,
552                if_not_exists: req.if_not_exists,
553            })
554            .await?;
555
556        Ok(Response::new(CreateMaterializedViewResponse {
557            status: None,
558            version,
559        }))
560    }
561
562    async fn drop_materialized_view(
563        &self,
564        request: Request<DropMaterializedViewRequest>,
565    ) -> Result<Response<DropMaterializedViewResponse>, Status> {
566        self.env.idle_manager().record_activity();
567
568        let request = request.into_inner();
569        let table_id = request.table_id;
570        let drop_mode = DropMode::from_request_setting(request.cascade);
571
572        let version = self
573            .ddl_controller
574            .run_command(DdlCommand::DropStreamingJob {
575                job_id: StreamingJobId::MaterializedView(table_id),
576                drop_mode,
577            })
578            .await?;
579
580        Ok(Response::new(DropMaterializedViewResponse {
581            status: None,
582            version,
583        }))
584    }
585
586    async fn create_index(
587        &self,
588        request: Request<CreateIndexRequest>,
589    ) -> Result<Response<CreateIndexResponse>, Status> {
590        self.env.idle_manager().record_activity();
591
592        let req = request.into_inner();
593        let index = req.get_index()?.clone();
594        let index_table = req.get_index_table()?.clone();
595        let fragment_graph = req.get_fragment_graph()?.clone();
596
597        let stream_job = StreamingJob::Index(index, index_table);
598        let version = self
599            .ddl_controller
600            .run_command(DdlCommand::CreateStreamingJob {
601                stream_job,
602                fragment_graph,
603                dependencies: HashSet::new(),
604                resource_type: Self::default_streaming_job_resource_type(),
605                if_not_exists: req.if_not_exists,
606            })
607            .await?;
608
609        Ok(Response::new(CreateIndexResponse {
610            status: None,
611            version,
612        }))
613    }
614
615    async fn drop_index(
616        &self,
617        request: Request<DropIndexRequest>,
618    ) -> Result<Response<DropIndexResponse>, Status> {
619        self.env.idle_manager().record_activity();
620
621        let request = request.into_inner();
622        let index_id = request.index_id;
623        let drop_mode = DropMode::from_request_setting(request.cascade);
624        let version = self
625            .ddl_controller
626            .run_command(DdlCommand::DropStreamingJob {
627                job_id: StreamingJobId::Index(index_id),
628                drop_mode,
629            })
630            .await?;
631
632        Ok(Response::new(DropIndexResponse {
633            status: None,
634            version,
635        }))
636    }
637
638    async fn create_function(
639        &self,
640        request: Request<CreateFunctionRequest>,
641    ) -> Result<Response<CreateFunctionResponse>, Status> {
642        let req = request.into_inner();
643        let function = req.get_function()?.clone();
644
645        let version = self
646            .ddl_controller
647            .run_command(DdlCommand::CreateFunction(function))
648            .await?;
649
650        Ok(Response::new(CreateFunctionResponse {
651            status: None,
652            version,
653        }))
654    }
655
656    async fn drop_function(
657        &self,
658        request: Request<DropFunctionRequest>,
659    ) -> Result<Response<DropFunctionResponse>, Status> {
660        let request = request.into_inner();
661
662        let version = self
663            .ddl_controller
664            .run_command(DdlCommand::DropFunction(
665                request.function_id,
666                DropMode::from_request_setting(request.cascade),
667            ))
668            .await?;
669
670        Ok(Response::new(DropFunctionResponse {
671            status: None,
672            version,
673        }))
674    }
675
676    async fn create_table(
677        &self,
678        request: Request<CreateTableRequest>,
679    ) -> Result<Response<CreateTableResponse>, Status> {
680        let request = request.into_inner();
681        let job_type = request.get_job_type().unwrap_or_default();
682        let dependencies = request.get_dependencies().iter().copied().collect();
683        let source = request.source;
684        let mview = request.materialized_view.unwrap();
685        let fragment_graph = request.fragment_graph.unwrap();
686
687        let stream_job = StreamingJob::Table(source, mview, job_type);
688        let version = self
689            .ddl_controller
690            .run_command(DdlCommand::CreateStreamingJob {
691                stream_job,
692                fragment_graph,
693                dependencies,
694                resource_type: Self::default_streaming_job_resource_type(),
695                if_not_exists: request.if_not_exists,
696            })
697            .await?;
698
699        Ok(Response::new(CreateTableResponse {
700            status: None,
701            version,
702        }))
703    }
704
705    async fn drop_table(
706        &self,
707        request: Request<DropTableRequest>,
708    ) -> Result<Response<DropTableResponse>, Status> {
709        let request = request.into_inner();
710        let source_id = request.source_id;
711        let table_id = request.table_id;
712
713        let drop_mode = DropMode::from_request_setting(request.cascade);
714        let version = self
715            .ddl_controller
716            .run_command(DdlCommand::DropStreamingJob {
717                job_id: StreamingJobId::Table(source_id.map(|PbSourceId::Id(id)| id), table_id),
718                drop_mode,
719            })
720            .await?;
721
722        Ok(Response::new(DropTableResponse {
723            status: None,
724            version,
725        }))
726    }
727
728    async fn create_view(
729        &self,
730        request: Request<CreateViewRequest>,
731    ) -> Result<Response<CreateViewResponse>, Status> {
732        let req = request.into_inner();
733        let view = req.get_view()?.clone();
734        let dependencies = req
735            .get_dependencies()
736            .iter()
737            .copied()
738            .collect::<HashSet<_>>();
739
740        let version = self
741            .ddl_controller
742            .run_command(DdlCommand::CreateView(view, dependencies))
743            .await?;
744
745        Ok(Response::new(CreateViewResponse {
746            status: None,
747            version,
748        }))
749    }
750
751    async fn drop_view(
752        &self,
753        request: Request<DropViewRequest>,
754    ) -> Result<Response<DropViewResponse>, Status> {
755        let request = request.into_inner();
756        let view_id = request.get_view_id();
757        let drop_mode = DropMode::from_request_setting(request.cascade);
758        let version = self
759            .ddl_controller
760            .run_command(DdlCommand::DropView(view_id, drop_mode))
761            .await?;
762        Ok(Response::new(DropViewResponse {
763            status: None,
764            version,
765        }))
766    }
767
768    async fn risectl_list_state_tables(
769        &self,
770        _request: Request<RisectlListStateTablesRequest>,
771    ) -> Result<Response<RisectlListStateTablesResponse>, Status> {
772        let tables = self
773            .metadata_manager
774            .catalog_controller
775            .list_all_state_tables()
776            .await?;
777        Ok(Response::new(RisectlListStateTablesResponse { tables }))
778    }
779
780    async fn risectl_resume_backfill(
781        &self,
782        request: Request<RisectlResumeBackfillRequest>,
783    ) -> Result<Response<RisectlResumeBackfillResponse>, Status> {
784        let request = request.into_inner();
785        let target = request
786            .target
787            .ok_or_else(|| Status::invalid_argument("missing resume backfill target"))?;
788
789        match target {
790            risectl_resume_backfill_request::Target::JobId(job_id) => {
791                let database_id = self
792                    .metadata_manager
793                    .catalog_controller
794                    .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
795                    .await?;
796                self.barrier_scheduler
797                    .run_command(
798                        database_id,
799                        Command::ResumeBackfill {
800                            target: ResumeBackfillTarget::Job(job_id),
801                        },
802                    )
803                    .await?;
804            }
805            risectl_resume_backfill_request::Target::FragmentId(fragment_id) => {
806                let mut job_ids = self
807                    .metadata_manager
808                    .catalog_controller
809                    .get_fragment_job_id(vec![fragment_id])
810                    .await?;
811                let job_id = job_ids
812                    .pop()
813                    .ok_or_else(|| Status::invalid_argument("fragment not found"))?;
814                let database_id = self
815                    .metadata_manager
816                    .catalog_controller
817                    .get_object_database_id(ObjectId::new(job_id.as_raw_id()))
818                    .await?;
819                self.barrier_scheduler
820                    .run_command(
821                        database_id,
822                        Command::ResumeBackfill {
823                            target: ResumeBackfillTarget::Fragment(fragment_id),
824                        },
825                    )
826                    .await?;
827            }
828        }
829
830        Ok(Response::new(RisectlResumeBackfillResponse {}))
831    }
832
833    async fn replace_job_plan(
834        &self,
835        request: Request<ReplaceJobPlanRequest>,
836    ) -> Result<Response<ReplaceJobPlanResponse>, Status> {
837        let req = request.into_inner().get_plan().cloned()?;
838
839        let version = self
840            .ddl_controller
841            .run_command(DdlCommand::ReplaceStreamJob(
842                Self::extract_replace_table_info(req),
843            ))
844            .await?;
845
846        Ok(Response::new(ReplaceJobPlanResponse {
847            status: None,
848            version,
849        }))
850    }
851
852    async fn get_table(
853        &self,
854        request: Request<GetTableRequest>,
855    ) -> Result<Response<GetTableResponse>, Status> {
856        let req = request.into_inner();
857        let table = self
858            .metadata_manager
859            .catalog_controller
860            .get_table_by_name(&req.database_name, &req.table_name)
861            .await?;
862
863        Ok(Response::new(GetTableResponse { table }))
864    }
865
866    async fn alter_name(
867        &self,
868        request: Request<AlterNameRequest>,
869    ) -> Result<Response<AlterNameResponse>, Status> {
870        let AlterNameRequest { object, new_name } = request.into_inner();
871        let version = self
872            .ddl_controller
873            .run_command(DdlCommand::AlterName(object.unwrap(), new_name))
874            .await?;
875        Ok(Response::new(AlterNameResponse {
876            status: None,
877            version,
878        }))
879    }
880
881    /// Only support add column for now.
882    async fn alter_source(
883        &self,
884        request: Request<AlterSourceRequest>,
885    ) -> Result<Response<AlterSourceResponse>, Status> {
886        let AlterSourceRequest { source } = request.into_inner();
887        let version = self
888            .ddl_controller
889            .run_command(DdlCommand::AlterNonSharedSource(source.unwrap()))
890            .await?;
891        Ok(Response::new(AlterSourceResponse {
892            status: None,
893            version,
894        }))
895    }
896
897    async fn alter_owner(
898        &self,
899        request: Request<AlterOwnerRequest>,
900    ) -> Result<Response<AlterOwnerResponse>, Status> {
901        let AlterOwnerRequest { object, owner_id } = request.into_inner();
902        let version = self
903            .ddl_controller
904            .run_command(DdlCommand::AlterObjectOwner(object.unwrap(), owner_id as _))
905            .await?;
906        Ok(Response::new(AlterOwnerResponse {
907            status: None,
908            version,
909        }))
910    }
911
912    async fn alter_subscription_retention(
913        &self,
914        request: Request<AlterSubscriptionRetentionRequest>,
915    ) -> Result<Response<AlterSubscriptionRetentionResponse>, Status> {
916        let AlterSubscriptionRetentionRequest {
917            subscription_id,
918            retention_seconds,
919            definition,
920        } = request.into_inner();
921        let version = self
922            .ddl_controller
923            .run_command(DdlCommand::AlterSubscriptionRetention {
924                subscription_id,
925                retention_seconds,
926                definition,
927            })
928            .await?;
929        Ok(Response::new(AlterSubscriptionRetentionResponse {
930            status: None,
931            version,
932        }))
933    }
934
935    async fn alter_set_schema(
936        &self,
937        request: Request<AlterSetSchemaRequest>,
938    ) -> Result<Response<AlterSetSchemaResponse>, Status> {
939        let AlterSetSchemaRequest {
940            object,
941            new_schema_id,
942        } = request.into_inner();
943        let version = self
944            .ddl_controller
945            .run_command(DdlCommand::AlterSetSchema(object.unwrap(), new_schema_id))
946            .await?;
947        Ok(Response::new(AlterSetSchemaResponse {
948            status: None,
949            version,
950        }))
951    }
952
953    async fn get_ddl_progress(
954        &self,
955        _request: Request<GetDdlProgressRequest>,
956    ) -> Result<Response<GetDdlProgressResponse>, Status> {
957        Ok(Response::new(GetDdlProgressResponse {
958            ddl_progress: self.ddl_controller.get_ddl_progress().await?,
959        }))
960    }
961
962    async fn create_connection(
963        &self,
964        request: Request<CreateConnectionRequest>,
965    ) -> Result<Response<CreateConnectionResponse>, Status> {
966        let req = request.into_inner();
967        if req.payload.is_none() {
968            return Err(Status::invalid_argument("request is empty"));
969        }
970
971        match req.payload.unwrap() {
972            #[expect(deprecated)]
973            create_connection_request::Payload::PrivateLink(_) => {
974                panic!("Private Link Connection has been deprecated")
975            }
976            create_connection_request::Payload::ConnectionParams(params) => {
977                let pb_connection = Connection {
978                    id: 0.into(),
979                    schema_id: req.schema_id,
980                    database_id: req.database_id,
981                    name: req.name,
982                    info: Some(ConnectionInfo::ConnectionParams(params)),
983                    owner: req.owner_id,
984                };
985                let version = self
986                    .ddl_controller
987                    .run_command(DdlCommand::CreateConnection(pb_connection))
988                    .await?;
989                Ok(Response::new(CreateConnectionResponse { version }))
990            }
991        }
992    }
993
994    async fn list_connections(
995        &self,
996        _request: Request<ListConnectionsRequest>,
997    ) -> Result<Response<ListConnectionsResponse>, Status> {
998        let conns = self
999            .metadata_manager
1000            .catalog_controller
1001            .list_connections()
1002            .await?;
1003
1004        Ok(Response::new(ListConnectionsResponse {
1005            connections: conns,
1006        }))
1007    }
1008
1009    async fn drop_connection(
1010        &self,
1011        request: Request<DropConnectionRequest>,
1012    ) -> Result<Response<DropConnectionResponse>, Status> {
1013        let req = request.into_inner();
1014        let drop_mode = DropMode::from_request_setting(req.cascade);
1015
1016        let version = self
1017            .ddl_controller
1018            .run_command(DdlCommand::DropConnection(req.connection_id, drop_mode))
1019            .await?;
1020
1021        Ok(Response::new(DropConnectionResponse {
1022            status: None,
1023            version,
1024        }))
1025    }
1026
1027    async fn comment_on(
1028        &self,
1029        request: Request<CommentOnRequest>,
1030    ) -> Result<Response<CommentOnResponse>, Status> {
1031        let req = request.into_inner();
1032        let comment = req.get_comment()?.clone();
1033
1034        let version = self
1035            .ddl_controller
1036            .run_command(DdlCommand::CommentOn(Comment {
1037                table_id: comment.table_id,
1038                schema_id: comment.schema_id,
1039                database_id: comment.database_id,
1040                column_index: comment.column_index,
1041                description: comment.description,
1042            }))
1043            .await?;
1044
1045        Ok(Response::new(CommentOnResponse {
1046            status: None,
1047            version,
1048        }))
1049    }
1050
1051    async fn get_tables(
1052        &self,
1053        request: Request<GetTablesRequest>,
1054    ) -> Result<Response<GetTablesResponse>, Status> {
1055        let GetTablesRequest {
1056            table_ids,
1057            include_dropped_tables,
1058        } = request.into_inner();
1059        let ret = self
1060            .metadata_manager
1061            .catalog_controller
1062            .get_table_by_ids(table_ids, include_dropped_tables)
1063            .await?;
1064
1065        let mut tables = HashMap::default();
1066        for table in ret {
1067            tables.insert(table.id, table);
1068        }
1069        Ok(Response::new(GetTablesResponse { tables }))
1070    }
1071
1072    async fn wait(&self, request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
1073        let req = request.into_inner();
1074        let version = self.ddl_controller.wait(req.job_id).await?;
1075        Ok(Response::new(WaitResponse {
1076            version: Some(version),
1077        }))
1078    }
1079
1080    async fn alter_cdc_table_backfill_parallelism(
1081        &self,
1082        request: Request<AlterCdcTableBackfillParallelismRequest>,
1083    ) -> Result<Response<AlterCdcTableBackfillParallelismResponse>, Status> {
1084        let req = request.into_inner();
1085        let job_id = req.get_table_id();
1086        let parallelism = *req.get_parallelism()?;
1087
1088        let table_parallelism = ModelTableParallelism::from(parallelism);
1089        let streaming_parallelism = match table_parallelism {
1090            ModelTableParallelism::Fixed(n) => StreamingParallelism::Fixed(n),
1091            _ => bail_invalid_parameter!(
1092                "CDC table backfill parallelism must be set to a fixed value"
1093            ),
1094        };
1095
1096        self.ddl_controller
1097            .reschedule_cdc_table_backfill(
1098                job_id,
1099                ReschedulePolicy::Parallelism(ParallelismPolicy {
1100                    parallelism: streaming_parallelism,
1101                }),
1102            )
1103            .await?;
1104        Ok(Response::new(AlterCdcTableBackfillParallelismResponse {}))
1105    }
1106
1107    async fn alter_parallelism(
1108        &self,
1109        request: Request<AlterParallelismRequest>,
1110    ) -> Result<Response<AlterParallelismResponse>, Status> {
1111        let req = request.into_inner();
1112
1113        let job_id = req.get_table_id();
1114        let parallelism = *req.get_parallelism()?;
1115        let deferred = req.get_deferred();
1116
1117        let parallelism = match parallelism.get_parallelism()? {
1118            Parallelism::Fixed(FixedParallelism { parallelism }) => {
1119                StreamingParallelism::Fixed(*parallelism as _)
1120            }
1121            Parallelism::Auto(_) | Parallelism::Adaptive(_) => StreamingParallelism::Adaptive,
1122            _ => bail_unavailable!(),
1123        };
1124
1125        self.ddl_controller
1126            .reschedule_streaming_job(
1127                job_id,
1128                ReschedulePolicy::Parallelism(ParallelismPolicy { parallelism }),
1129                deferred,
1130            )
1131            .await?;
1132
1133        Ok(Response::new(AlterParallelismResponse {}))
1134    }
1135
1136    async fn alter_backfill_parallelism(
1137        &self,
1138        request: Request<AlterBackfillParallelismRequest>,
1139    ) -> Result<Response<AlterBackfillParallelismResponse>, Status> {
1140        let req = request.into_inner();
1141
1142        let job_id = req.get_table_id();
1143        let deferred = req.get_deferred();
1144
1145        let parallelism = match req.parallelism {
1146            None => None,
1147            Some(parallelism) => {
1148                let parallelism = match parallelism.get_parallelism()? {
1149                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1150                        StreamingParallelism::Fixed(*parallelism as _)
1151                    }
1152                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1153                        StreamingParallelism::Adaptive
1154                    }
1155                    _ => bail_unavailable!(),
1156                };
1157                Some(parallelism)
1158            }
1159        };
1160
1161        self.ddl_controller
1162            .reschedule_streaming_job_backfill_parallelism(job_id, parallelism, deferred)
1163            .await?;
1164
1165        Ok(Response::new(AlterBackfillParallelismResponse {}))
1166    }
1167
1168    async fn alter_fragment_parallelism(
1169        &self,
1170        request: Request<AlterFragmentParallelismRequest>,
1171    ) -> Result<Response<AlterFragmentParallelismResponse>, Status> {
1172        let req = request.into_inner();
1173
1174        let fragment_ids = req.fragment_ids;
1175        if fragment_ids.is_empty() {
1176            return Err(Status::invalid_argument(
1177                "at least one fragment id must be provided",
1178            ));
1179        }
1180
1181        let parallelism = match req.parallelism {
1182            Some(parallelism) => {
1183                let streaming_parallelism = match parallelism.get_parallelism()? {
1184                    Parallelism::Fixed(FixedParallelism { parallelism }) => {
1185                        StreamingParallelism::Fixed(*parallelism as _)
1186                    }
1187                    Parallelism::Auto(_) | Parallelism::Adaptive(_) => {
1188                        StreamingParallelism::Adaptive
1189                    }
1190                    _ => bail_unavailable!(),
1191                };
1192                Some(streaming_parallelism)
1193            }
1194            None => None,
1195        };
1196
1197        let fragment_targets = fragment_ids
1198            .into_iter()
1199            .map(|fragment_id| (fragment_id, parallelism.clone()))
1200            .collect();
1201
1202        self.ddl_controller
1203            .reschedule_fragments(fragment_targets)
1204            .await?;
1205
1206        Ok(Response::new(AlterFragmentParallelismResponse {}))
1207    }
1208
1209    async fn alter_streaming_job_config(
1210        &self,
1211        request: Request<AlterStreamingJobConfigRequest>,
1212    ) -> Result<Response<AlterStreamingJobConfigResponse>, Status> {
1213        let AlterStreamingJobConfigRequest {
1214            job_id,
1215            entries_to_add,
1216            keys_to_remove,
1217        } = request.into_inner();
1218
1219        self.ddl_controller
1220            .run_command(DdlCommand::AlterStreamingJobConfig(
1221                job_id,
1222                entries_to_add,
1223                keys_to_remove,
1224            ))
1225            .await?;
1226
1227        Ok(Response::new(AlterStreamingJobConfigResponse {}))
1228    }
1229
1230    /// Auto schema change for cdc sources,
1231    /// called by the source parser when a schema change is detected.
1232    async fn auto_schema_change(
1233        &self,
1234        request: Request<AutoSchemaChangeRequest>,
1235    ) -> Result<Response<AutoSchemaChangeResponse>, Status> {
1236        let req = request.into_inner();
1237
1238        // randomly select a frontend worker to get the replace table plan
1239        let workers = self
1240            .metadata_manager
1241            .list_worker_node(Some(WorkerType::Frontend), Some(State::Running))
1242            .await?;
1243        let worker = workers
1244            .choose(&mut thread_rng())
1245            .ok_or_else(|| MetaError::from(anyhow!("no frontend worker available")))?;
1246
1247        let client = self
1248            .env
1249            .frontend_client_pool()
1250            .get(worker)
1251            .await
1252            .map_err(MetaError::from)?;
1253
1254        let Some(schema_change) = req.schema_change else {
1255            return Err(Status::invalid_argument(
1256                "schema change message is required",
1257            ));
1258        };
1259
1260        for table_change in schema_change.table_changes {
1261            for c in &table_change.columns {
1262                let c = ColumnCatalog::from(c.clone());
1263
1264                let invalid_col_type = |column_type: &str, c: &ColumnCatalog| {
1265                    tracing::warn!(target: "auto_schema_change",
1266                      cdc_table_id = table_change.cdc_table_id,
1267                    upstraem_ddl = table_change.upstream_ddl,
1268                        "invalid column type from cdc table change");
1269                    Err(Status::invalid_argument(format!(
1270                        "invalid column type: {} from cdc table change, column: {:?}",
1271                        column_type, c
1272                    )))
1273                };
1274                if c.is_generated() {
1275                    return invalid_col_type("generated column", &c);
1276                }
1277                if c.is_rw_sys_column() {
1278                    return invalid_col_type("rw system column", &c);
1279                }
1280                if c.is_hidden {
1281                    return invalid_col_type("hidden column", &c);
1282                }
1283            }
1284
1285            // get the table catalog corresponding to the cdc table
1286            let tables: Vec<Table> = self
1287                .metadata_manager
1288                .get_table_catalog_by_cdc_table_id(&table_change.cdc_table_id)
1289                .await?;
1290
1291            for table in tables {
1292                // Since we only support `ADD` and `DROP` column, we check whether the new columns and the original columns
1293                // is a subset of the other.
1294                let original_columns: HashSet<(String, DataType)> =
1295                    HashSet::from_iter(table.columns.iter().filter_map(|col| {
1296                        let col = ColumnCatalog::from(col.clone());
1297                        if col.is_generated() || col.is_hidden() {
1298                            None
1299                        } else {
1300                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1301                        }
1302                    }));
1303
1304                let mut new_columns: HashSet<(String, DataType)> =
1305                    HashSet::from_iter(table_change.columns.iter().filter_map(|col| {
1306                        let col = ColumnCatalog::from(col.clone());
1307                        if col.is_generated() || col.is_hidden() {
1308                            None
1309                        } else {
1310                            Some((col.column_desc.name.clone(), col.data_type().clone()))
1311                        }
1312                    }));
1313
1314                // For subset/superset check, we need to add visible connector additional columns defined by INCLUDE in the original table to new_columns
1315                // This includes both _rw columns and user-defined INCLUDE columns (e.g., INCLUDE TIMESTAMP AS xxx)
1316                for col in &table.columns {
1317                    let col = ColumnCatalog::from(col.clone());
1318                    if col.is_connector_additional_column()
1319                        && !col.is_hidden()
1320                        && !col.is_generated()
1321                    {
1322                        new_columns.insert((col.column_desc.name.clone(), col.data_type().clone()));
1323                    }
1324                }
1325
1326                if !(original_columns.is_subset(&new_columns)
1327                    || original_columns.is_superset(&new_columns))
1328                {
1329                    tracing::warn!(target: "auto_schema_change",
1330                                    table_id = %table.id,
1331                                    cdc_table_id = table.cdc_table_id,
1332                                    upstraem_ddl = table_change.upstream_ddl,
1333                                    original_columns = ?original_columns,
1334                                    new_columns = ?new_columns,
1335                                    "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");
1336
1337                    let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
1338                    add_auto_schema_change_fail_event_log(
1339                        &self.meta_metrics,
1340                        table.id,
1341                        table.name.clone(),
1342                        table_change.cdc_table_id.clone(),
1343                        table_change.upstream_ddl.clone(),
1344                        &self.env.event_log_manager_ref(),
1345                        fail_info,
1346                    );
1347
1348                    return Err(Status::invalid_argument(
1349                        "New columns should be a subset or superset of the original columns (including hidden columns)",
1350                    ));
1351                }
1352                // skip the schema change if there is no change to original columns
1353                if original_columns == new_columns {
1354                    tracing::warn!(target: "auto_schema_change",
1355                                   table_id = %table.id,
1356                                   cdc_table_id = table.cdc_table_id,
1357                                   upstraem_ddl = table_change.upstream_ddl,
1358                                    original_columns = ?original_columns,
1359                                    new_columns = ?new_columns,
1360                                   "No change to columns, skipping the schema change");
1361                    continue;
1362                }
1363
1364                let latency_timer = self
1365                    .meta_metrics
1366                    .auto_schema_change_latency
1367                    .with_guarded_label_values(&[&table.id.to_string(), &table.name])
1368                    .start_timer();
1369                // send a request to the frontend to get the ReplaceJobPlan
1370                // will retry with exponential backoff if the request fails
1371                let resp = client
1372                    .get_table_replace_plan(GetTableReplacePlanRequest {
1373                        database_id: table.database_id,
1374                        table_id: table.id,
1375                        cdc_table_change: Some(table_change.clone()),
1376                    })
1377                    .await;
1378
1379                match resp {
1380                    Ok(resp) => {
1381                        let resp = resp.into_inner();
1382                        if let Some(plan) = resp.replace_plan {
1383                            let plan = Self::extract_replace_table_info(plan);
1384                            plan.streaming_job.table().inspect(|t| {
1385                                tracing::info!(
1386                                    target: "auto_schema_change",
1387                                    table_id = %t.id,
1388                                    cdc_table_id = t.cdc_table_id,
1389                                    upstraem_ddl = table_change.upstream_ddl,
1390                                    "Start the replace config change")
1391                            });
1392                            // start the schema change procedure
1393                            let replace_res = self
1394                                .ddl_controller
1395                                .run_command(DdlCommand::ReplaceStreamJob(plan))
1396                                .await;
1397
1398                            match replace_res {
1399                                Ok(_) => {
1400                                    tracing::info!(
1401                                        target: "auto_schema_change",
1402                                        table_id = %table.id,
1403                                        cdc_table_id = table.cdc_table_id,
1404                                        "Table replaced success");
1405
1406                                    self.meta_metrics
1407                                        .auto_schema_change_success_cnt
1408                                        .with_guarded_label_values(&[
1409                                            &table.id.to_string(),
1410                                            &table.name,
1411                                        ])
1412                                        .inc();
1413                                    latency_timer.observe_duration();
1414                                }
1415                                Err(e) => {
1416                                    tracing::error!(
1417                                        target: "auto_schema_change",
1418                                        error = %e.as_report(),
1419                                        table_id = %table.id,
1420                                        cdc_table_id = table.cdc_table_id,
1421                                        upstraem_ddl = table_change.upstream_ddl,
1422                                        "failed to replace the table",
1423                                    );
1424                                    let fail_info =
1425                                        format!("failed to replace the table: {}", e.as_report());
1426                                    add_auto_schema_change_fail_event_log(
1427                                        &self.meta_metrics,
1428                                        table.id,
1429                                        table.name.clone(),
1430                                        table_change.cdc_table_id.clone(),
1431                                        table_change.upstream_ddl.clone(),
1432                                        &self.env.event_log_manager_ref(),
1433                                        fail_info,
1434                                    );
1435                                }
1436                            };
1437                        }
1438                    }
1439                    Err(e) => {
1440                        tracing::error!(
1441                            target: "auto_schema_change",
1442                            error = %e.as_report(),
1443                            table_id = %table.id,
1444                            cdc_table_id = table.cdc_table_id,
1445                            "failed to get replace table plan",
1446                        );
1447                        let fail_info =
1448                            format!("failed to get replace table plan: {}", e.as_report());
1449                        add_auto_schema_change_fail_event_log(
1450                            &self.meta_metrics,
1451                            table.id,
1452                            table.name.clone(),
1453                            table_change.cdc_table_id.clone(),
1454                            table_change.upstream_ddl.clone(),
1455                            &self.env.event_log_manager_ref(),
1456                            fail_info,
1457                        );
1458                    }
1459                };
1460            }
1461        }
1462
1463        Ok(Response::new(AutoSchemaChangeResponse {}))
1464    }
1465
1466    async fn alter_swap_rename(
1467        &self,
1468        request: Request<AlterSwapRenameRequest>,
1469    ) -> Result<Response<AlterSwapRenameResponse>, Status> {
1470        let req = request.into_inner();
1471
1472        let version = self
1473            .ddl_controller
1474            .run_command(DdlCommand::AlterSwapRename(req.object.unwrap()))
1475            .await?;
1476
1477        Ok(Response::new(AlterSwapRenameResponse {
1478            status: None,
1479            version,
1480        }))
1481    }
1482
1483    async fn alter_resource_group(
1484        &self,
1485        request: Request<AlterResourceGroupRequest>,
1486    ) -> Result<Response<AlterResourceGroupResponse>, Status> {
1487        let req = request.into_inner();
1488
1489        let table_id = req.get_table_id();
1490        let deferred = req.get_deferred();
1491        let resource_group = req.resource_group;
1492
1493        self.ddl_controller
1494            .reschedule_streaming_job(
1495                table_id.as_job_id(),
1496                ReschedulePolicy::ResourceGroup(ResourceGroupPolicy { resource_group }),
1497                deferred,
1498            )
1499            .await?;
1500
1501        Ok(Response::new(AlterResourceGroupResponse {}))
1502    }
1503
1504    async fn alter_database_param(
1505        &self,
1506        request: Request<AlterDatabaseParamRequest>,
1507    ) -> Result<Response<AlterDatabaseParamResponse>, Status> {
1508        let req = request.into_inner();
1509        let database_id = req.database_id;
1510
1511        let param = match req.param.unwrap() {
1512            alter_database_param_request::Param::BarrierIntervalMs(value) => {
1513                AlterDatabaseParam::BarrierIntervalMs(value.value)
1514            }
1515            alter_database_param_request::Param::CheckpointFrequency(value) => {
1516                AlterDatabaseParam::CheckpointFrequency(value.value)
1517            }
1518        };
1519        let version = self
1520            .ddl_controller
1521            .run_command(DdlCommand::AlterDatabaseParam(database_id, param))
1522            .await?;
1523
1524        return Ok(Response::new(AlterDatabaseParamResponse {
1525            status: None,
1526            version,
1527        }));
1528    }
1529
1530    async fn compact_iceberg_table(
1531        &self,
1532        request: Request<CompactIcebergTableRequest>,
1533    ) -> Result<Response<CompactIcebergTableResponse>, Status> {
1534        let req = request.into_inner();
1535        let sink_id = req.sink_id;
1536
1537        // Trigger manual compaction directly using the sink ID
1538        let task_id = self
1539            .iceberg_compaction_manager
1540            .trigger_manual_compaction(sink_id)
1541            .await
1542            .map_err(|e| {
1543                Status::internal(format!("Failed to trigger compaction: {}", e.as_report()))
1544            })?;
1545
1546        Ok(Response::new(CompactIcebergTableResponse {
1547            status: None,
1548            task_id,
1549        }))
1550    }
1551
1552    async fn expire_iceberg_table_snapshots(
1553        &self,
1554        request: Request<ExpireIcebergTableSnapshotsRequest>,
1555    ) -> Result<Response<ExpireIcebergTableSnapshotsResponse>, Status> {
1556        let req = request.into_inner();
1557        let sink_id = req.sink_id;
1558
1559        // Trigger manual snapshot expiration directly using the sink ID
1560        self.iceberg_compaction_manager
1561            .check_and_expire_snapshots(sink_id)
1562            .await
1563            .map_err(|e| {
1564                Status::internal(format!("Failed to expire snapshots: {}", e.as_report()))
1565            })?;
1566
1567        Ok(Response::new(ExpireIcebergTableSnapshotsResponse {
1568            status: None,
1569        }))
1570    }
1571
1572    async fn create_iceberg_table(
1573        &self,
1574        request: Request<CreateIcebergTableRequest>,
1575    ) -> Result<Response<CreateIcebergTableResponse>, Status> {
1576        let req = request.into_inner();
1577        let CreateIcebergTableRequest {
1578            table_info,
1579            sink_info,
1580            iceberg_source,
1581            if_not_exists,
1582        } = req;
1583
1584        // 1. create table job
1585        let PbTableJobInfo {
1586            source,
1587            table,
1588            fragment_graph,
1589            job_type,
1590        } = table_info.unwrap();
1591        let mut table = table.unwrap();
1592        let mut fragment_graph = fragment_graph.unwrap();
1593        let database_id = table.get_database_id();
1594        let schema_id = table.get_schema_id();
1595        let table_name = table.get_name().to_owned();
1596
1597        // Mark table as background creation, so that it won't block sink creation.
1598        table.create_type = PbCreateType::Background as _;
1599
1600        // Set the source rate limit to 0 and reset it back after the iceberg sink is backfilling.
1601        let source_rate_limit = if let Some(source) = &source {
1602            for fragment in fragment_graph.fragments.values_mut() {
1603                stream_graph_visitor::visit_fragment_mut(fragment, |node| {
1604                    if let NodeBody::Source(source_node) = node
1605                        && let Some(inner) = &mut source_node.source_inner
1606                    {
1607                        inner.rate_limit = Some(0);
1608                    }
1609                });
1610            }
1611            Some(source.rate_limit)
1612        } else {
1613            None
1614        };
1615
1616        let stream_job =
1617            StreamingJob::Table(source, table, PbTableJobType::try_from(job_type).unwrap());
1618        let _ = self
1619            .ddl_controller
1620            .run_command(DdlCommand::CreateStreamingJob {
1621                stream_job,
1622                fragment_graph,
1623                dependencies: HashSet::new(),
1624                resource_type: Self::default_streaming_job_resource_type(),
1625                if_not_exists,
1626            })
1627            .await?;
1628
1629        let table_catalog = self
1630            .metadata_manager
1631            .catalog_controller
1632            .get_table_catalog_by_name(database_id, schema_id, &table_name)
1633            .await?
1634            .ok_or(Status::not_found("Internal error: table not found"))?;
1635
1636        // 2. create iceberg sink job
1637        let PbSinkJobInfo {
1638            sink,
1639            fragment_graph,
1640        } = sink_info.unwrap();
1641        let mut sink = sink.unwrap();
1642
1643        // Mark sink as background creation, so that it won't block source creation.
1644        sink.create_type = PbCreateType::Background as _;
1645        sink.auto_refresh_schema_from_table = Some(table_catalog.id);
1646
1647        let mut fragment_graph = fragment_graph.unwrap();
1648
1649        assert_eq!(fragment_graph.dependent_table_ids.len(), 1);
1650        assert!(
1651            risingwave_common::catalog::TableId::from(fragment_graph.dependent_table_ids[0])
1652                .is_placeholder()
1653        );
1654        fragment_graph.dependent_table_ids[0] = table_catalog.id;
1655        for fragment in fragment_graph.fragments.values_mut() {
1656            stream_graph_visitor::visit_fragment_mut(fragment, |node| match node {
1657                NodeBody::StreamScan(scan) => {
1658                    scan.table_id = table_catalog.id;
1659                    if let Some(table_desc) = &mut scan.table_desc {
1660                        assert!(
1661                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1662                                .is_placeholder()
1663                        );
1664                        table_desc.table_id = table_catalog.id;
1665                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1666                    }
1667                    if let Some(table) = &mut scan.arrangement_table {
1668                        assert!(
1669                            risingwave_common::catalog::TableId::from(table.id).is_placeholder()
1670                        );
1671                        *table = table_catalog.clone();
1672                    }
1673                }
1674                NodeBody::BatchPlan(plan) => {
1675                    if let Some(table_desc) = &mut plan.table_desc {
1676                        assert!(
1677                            risingwave_common::catalog::TableId::from(table_desc.table_id)
1678                                .is_placeholder()
1679                        );
1680                        table_desc.table_id = table_catalog.id;
1681                        table_desc.maybe_vnode_count = table_catalog.maybe_vnode_count;
1682                    }
1683                }
1684                _ => {}
1685            });
1686        }
1687
1688        let table_id = table_catalog.id;
1689        let dependencies = HashSet::from_iter([table_id.into(), schema_id.into()]);
1690        let stream_job = StreamingJob::Sink(sink);
1691        let res = self
1692            .ddl_controller
1693            .run_command(DdlCommand::CreateStreamingJob {
1694                stream_job,
1695                fragment_graph,
1696                dependencies,
1697                resource_type: Self::default_streaming_job_resource_type(),
1698                if_not_exists,
1699            })
1700            .await;
1701
1702        if res.is_err() {
1703            let _ = self
1704                .ddl_controller
1705                .run_command(DdlCommand::DropStreamingJob {
1706                    job_id: StreamingJobId::Table(None, table_id),
1707                    drop_mode: DropMode::Cascade,
1708                })
1709                .await
1710                .inspect_err(|err| {
1711                    tracing::error!(error = %err.as_report(),
1712                        "Failed to clean up table after iceberg sink creation failure",
1713                    );
1714                });
1715            res?;
1716        }
1717
1718        // 3. reset source rate limit back to normal after sink creation
1719        if let Some(source_rate_limit) = source_rate_limit
1720            && source_rate_limit != Some(0)
1721        {
1722            let OptionalAssociatedSourceId::AssociatedSourceId(source_id) =
1723                table_catalog.optional_associated_source_id.unwrap();
1724            let (jobs, fragments) = self
1725                .metadata_manager
1726                .update_source_rate_limit_by_source_id(source_id, source_rate_limit)
1727                .await?;
1728            let throttle_config = ThrottleConfig {
1729                throttle_type: risingwave_pb::common::ThrottleType::Source.into(),
1730                rate_limit: source_rate_limit,
1731            };
1732            let _ = self
1733                .barrier_scheduler
1734                .run_command(
1735                    database_id,
1736                    Command::Throttle {
1737                        jobs,
1738                        config: fragments
1739                            .into_iter()
1740                            .map(|fragment_id| (fragment_id, throttle_config))
1741                            .collect(),
1742                    },
1743                )
1744                .await?;
1745        }
1746
1747        // 4. create iceberg source
1748        let iceberg_source = iceberg_source.unwrap();
1749        let res = self
1750            .ddl_controller
1751            .run_command(DdlCommand::CreateNonSharedSource(iceberg_source))
1752            .await;
1753        if res.is_err() {
1754            let _ = self
1755                .ddl_controller
1756                .run_command(DdlCommand::DropStreamingJob {
1757                    job_id: StreamingJobId::Table(None, table_id),
1758                    drop_mode: DropMode::Cascade,
1759                })
1760                .await
1761                .inspect_err(|err| {
1762                    tracing::error!(
1763                        error = %err.as_report(),
1764                        "Failed to clean up table after iceberg source creation failure",
1765                    );
1766                });
1767        }
1768
1769        Ok(Response::new(CreateIcebergTableResponse {
1770            status: None,
1771            version: res?,
1772        }))
1773    }
1774}
1775
1776fn add_auto_schema_change_fail_event_log(
1777    meta_metrics: &Arc<MetaMetrics>,
1778    table_id: TableId,
1779    table_name: String,
1780    cdc_table_id: String,
1781    upstream_ddl: String,
1782    event_log_manager: &EventLogManagerRef,
1783    fail_info: String,
1784) {
1785    meta_metrics
1786        .auto_schema_change_failure_cnt
1787        .with_guarded_label_values(&[&table_id.to_string(), &table_name])
1788        .inc();
1789    let event = event_log::EventAutoSchemaChangeFail {
1790        table_id,
1791        table_name,
1792        cdc_table_id,
1793        upstream_ddl,
1794        fail_info,
1795    };
1796    event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
1797}