risingwave_frontend/rpc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pgwire::pg_server::{BoxedError, SessionManager};
17use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
18use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
19use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse};
20use risingwave_rpc_client::error::ToTonicStatus;
21use risingwave_sqlparser::ast::ObjectName;
22use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
23
24use crate::error::RwError;
25use crate::handler::create_source::SqlColumnStrategy;
26use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
27use crate::session::SESSION_MANAGER;
28
29#[derive(thiserror::Error, Debug)]
30pub enum AutoSchemaChangeError {
31    #[error("frontend error")]
32    FrontendError(
33        #[from]
34        #[backtrace]
35        RwError,
36    ),
37}
38
39impl From<BoxedError> for AutoSchemaChangeError {
40    fn from(err: BoxedError) -> Self {
41        AutoSchemaChangeError::FrontendError(RwError::from(err))
42    }
43}
44
45impl From<AutoSchemaChangeError> for tonic::Status {
46    fn from(err: AutoSchemaChangeError) -> Self {
47        err.to_status(tonic::Code::Internal, "frontend")
48    }
49}
50
51#[derive(Default)]
52pub struct FrontendServiceImpl {}
53
54impl FrontendServiceImpl {
55    pub fn new() -> Self {
56        Self {}
57    }
58}
59
60#[async_trait::async_trait]
61impl FrontendService for FrontendServiceImpl {
62    async fn get_table_replace_plan(
63        &self,
64        request: RpcRequest<GetTableReplacePlanRequest>,
65    ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
66        let req = request.into_inner();
67        tracing::info!("get_table_replace_plan for table {}", req.table_name);
68
69        let table_change = req.table_change.expect("schema change message is required");
70        let replace_plan =
71            get_new_table_plan(table_change, req.table_name, req.database_id, req.owner).await?;
72
73        Ok(RpcResponse::new(GetTableReplacePlanResponse {
74            replace_plan: Some(replace_plan),
75        }))
76    }
77}
78
79/// Get the new table plan for the given table schema change
80async fn get_new_table_plan(
81    table_change: TableSchemaChange,
82    table_name: String,
83    database_id: u32,
84    owner: u32,
85) -> Result<ReplaceJobPlan, AutoSchemaChangeError> {
86    tracing::info!("get_new_table_plan for table {}", table_name);
87
88    let session_mgr = SESSION_MANAGER
89        .get()
90        .expect("session manager has been initialized");
91
92    // get a session object for the corresponding user and database
93    let session = session_mgr.create_dummy_session(database_id, owner)?;
94
95    let new_version_columns = table_change
96        .columns
97        .into_iter()
98        .map(|c| c.into())
99        .collect_vec();
100    let table_name = ObjectName::from(vec![table_name.as_str().into()]);
101
102    let (new_table_definition, original_catalog) =
103        get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns)
104            .await?;
105    let (_, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
106        &session,
107        table_name,
108        new_table_definition,
109        &original_catalog,
110        SqlColumnStrategy::FollowUnchecked, // not used
111    )
112    .await?;
113
114    Ok(ReplaceJobPlan {
115        replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
116            replace_job_plan::ReplaceTable {
117                table: Some(table),
118                source: None, // none for cdc table
119                job_type: job_type as _,
120            },
121        )),
122        fragment_graph: Some(graph),
123        table_col_index_mapping: Some(col_index_mapping.to_protobuf()),
124    })
125}