risingwave_frontend/rpc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pgwire::pg_server::{Session, SessionManager};
17use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
18use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
19use risingwave_pb::frontend_service::{
20    CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
21    GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse, RunningSql,
22};
23use risingwave_sqlparser::ast::ObjectName;
24use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
25
26use crate::error::RwError;
27use crate::handler::create_source::SqlColumnStrategy;
28use crate::handler::kill_process::handle_kill_local;
29use crate::handler::{
30    fetch_table_catalog_for_alter, get_new_table_definition_for_cdc_table, get_replace_table_plan,
31};
32use crate::session::{SESSION_MANAGER, SessionMapRef};
33
34#[derive(Default)]
35pub struct FrontendServiceImpl {
36    session_map: SessionMapRef,
37}
38
39impl FrontendServiceImpl {
40    pub fn new(session_map: SessionMapRef) -> Self {
41        Self { session_map }
42    }
43}
44
45#[async_trait::async_trait]
46impl FrontendService for FrontendServiceImpl {
47    async fn get_table_replace_plan(
48        &self,
49        request: RpcRequest<GetTableReplacePlanRequest>,
50    ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
51        let req = request.into_inner();
52
53        let replace_plan = get_new_table_plan(
54            req.table_name,
55            req.database_id,
56            req.owner,
57            req.cdc_table_change,
58        )
59        .await?;
60
61        Ok(RpcResponse::new(GetTableReplacePlanResponse {
62            replace_plan: Some(replace_plan),
63        }))
64    }
65
66    async fn get_running_sqls(
67        &self,
68        _request: RpcRequest<GetRunningSqlsRequest>,
69    ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
70        let running_sqls = self
71            .session_map
72            .read()
73            .values()
74            .map(|s| RunningSql {
75                process_id: s.id().0,
76                user_name: s.user_name(),
77                peer_addr: format!("{}", s.peer_addr()),
78                database: s.database(),
79                elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
80                sql: s.running_sql().map(|sql| format!("{}", sql)),
81            })
82            .collect();
83        Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
84    }
85
86    async fn cancel_running_sql(
87        &self,
88        request: RpcRequest<CancelRunningSqlRequest>,
89    ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
90        let process_id = request.into_inner().process_id;
91        handle_kill_local(self.session_map.clone(), process_id).await?;
92        Ok(RpcResponse::new(CancelRunningSqlResponse {}))
93    }
94}
95
96/// Rebuild the table's streaming plan, possibly with cdc column changes.
97async fn get_new_table_plan(
98    table_name: String,
99    database_id: u32,
100    owner: u32,
101    cdc_table_change: Option<TableSchemaChange>,
102) -> Result<ReplaceJobPlan, RwError> {
103    tracing::info!("get_new_table_plan for table {}", table_name);
104
105    let session_mgr = SESSION_MANAGER
106        .get()
107        .expect("session manager has been initialized");
108
109    // get a session object for the corresponding user and database
110    let session = session_mgr.create_dummy_session(database_id, owner)?;
111
112    let table_name = ObjectName::from(vec![table_name.as_str().into()]);
113    let (original_catalog, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
114
115    let definition = if let Some(cdc_table_change) = cdc_table_change {
116        let new_version_columns = cdc_table_change
117            .columns
118            .into_iter()
119            .map(|c| c.into())
120            .collect_vec();
121        get_new_table_definition_for_cdc_table(original_catalog.clone(), &new_version_columns)
122            .await?
123    } else {
124        original_catalog.create_sql_ast_purified()?
125    };
126
127    let (source, table, graph, job_type) = get_replace_table_plan(
128        &session,
129        table_name,
130        definition,
131        &original_catalog,
132        SqlColumnStrategy::FollowUnchecked,
133    )
134    .await?;
135
136    Ok(ReplaceJobPlan {
137        replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
138            replace_job_plan::ReplaceTable {
139                table: Some(table.to_prost()),
140                source: source.map(|s| s.to_prost()),
141                job_type: job_type as _,
142            },
143        )),
144        fragment_graph: Some(graph),
145    })
146}