risingwave_frontend/rpc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pgwire::pg_server::{BoxedError, Session, SessionManager};
17use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
18use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
19use risingwave_pb::frontend_service::{
20    CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
21    GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse, RunningSql,
22};
23use risingwave_rpc_client::error::ToTonicStatus;
24use risingwave_sqlparser::ast::ObjectName;
25use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
26
27use crate::error::RwError;
28use crate::handler::create_source::SqlColumnStrategy;
29use crate::handler::kill_process::handle_kill_local;
30use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
31use crate::session::{SESSION_MANAGER, SessionMapRef};
32
33#[derive(thiserror::Error, Debug)]
34pub enum AutoSchemaChangeError {
35    #[error("frontend error")]
36    FrontendError(
37        #[from]
38        #[backtrace]
39        RwError,
40    ),
41}
42
43impl From<BoxedError> for AutoSchemaChangeError {
44    fn from(err: BoxedError) -> Self {
45        AutoSchemaChangeError::FrontendError(RwError::from(err))
46    }
47}
48
49impl From<AutoSchemaChangeError> for tonic::Status {
50    fn from(err: AutoSchemaChangeError) -> Self {
51        err.to_status(tonic::Code::Internal, "frontend")
52    }
53}
54
55#[derive(Default)]
56pub struct FrontendServiceImpl {
57    session_map: SessionMapRef,
58}
59
60impl FrontendServiceImpl {
61    pub fn new(session_map: SessionMapRef) -> Self {
62        Self { session_map }
63    }
64}
65
66#[async_trait::async_trait]
67impl FrontendService for FrontendServiceImpl {
68    async fn get_table_replace_plan(
69        &self,
70        request: RpcRequest<GetTableReplacePlanRequest>,
71    ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
72        let req = request.into_inner();
73        tracing::info!("get_table_replace_plan for table {}", req.table_name);
74
75        let table_change = req.table_change.expect("schema change message is required");
76        let replace_plan =
77            get_new_table_plan(table_change, req.table_name, req.database_id, req.owner).await?;
78
79        Ok(RpcResponse::new(GetTableReplacePlanResponse {
80            replace_plan: Some(replace_plan),
81        }))
82    }
83
84    async fn get_running_sqls(
85        &self,
86        _request: RpcRequest<GetRunningSqlsRequest>,
87    ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
88        let running_sqls = self
89            .session_map
90            .read()
91            .values()
92            .map(|s| RunningSql {
93                process_id: s.id().0,
94                user_name: s.user_name(),
95                peer_addr: format!("{}", s.peer_addr()),
96                database: s.database(),
97                elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
98                sql: s.running_sql().map(|sql| format!("{}", sql)),
99            })
100            .collect();
101        Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
102    }
103
104    async fn cancel_running_sql(
105        &self,
106        request: RpcRequest<CancelRunningSqlRequest>,
107    ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
108        let process_id = request.into_inner().process_id;
109        handle_kill_local(self.session_map.clone(), process_id).await?;
110        Ok(RpcResponse::new(CancelRunningSqlResponse {}))
111    }
112}
113
114/// Get the new table plan for the given table schema change
115async fn get_new_table_plan(
116    table_change: TableSchemaChange,
117    table_name: String,
118    database_id: u32,
119    owner: u32,
120) -> Result<ReplaceJobPlan, AutoSchemaChangeError> {
121    tracing::info!("get_new_table_plan for table {}", table_name);
122
123    let session_mgr = SESSION_MANAGER
124        .get()
125        .expect("session manager has been initialized");
126
127    // get a session object for the corresponding user and database
128    let session = session_mgr.create_dummy_session(database_id, owner)?;
129
130    let new_version_columns = table_change
131        .columns
132        .into_iter()
133        .map(|c| c.into())
134        .collect_vec();
135    let table_name = ObjectName::from(vec![table_name.as_str().into()]);
136
137    let (new_table_definition, original_catalog) =
138        get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns)
139            .await?;
140    let (_, table, graph, job_type) = get_replace_table_plan(
141        &session,
142        table_name,
143        new_table_definition,
144        &original_catalog,
145        SqlColumnStrategy::FollowUnchecked, // not used
146    )
147    .await?;
148
149    Ok(ReplaceJobPlan {
150        replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
151            replace_job_plan::ReplaceTable {
152                table: Some(table),
153                source: None, // none for cdc table
154                job_type: job_type as _,
155            },
156        )),
157        fragment_graph: Some(graph),
158    })
159}