risingwave_frontend/rpc/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use pgwire::pg_server::{Session, SessionManager};
17use risingwave_common::id::{DatabaseId, TableId};
18use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
19use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
20use risingwave_pb::frontend_service::{
21    CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
22    GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse, RunningSql,
23};
24use risingwave_sqlparser::ast::ObjectName;
25use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
26
27use crate::error::RwError;
28use crate::handler::create_source::SqlColumnStrategy;
29use crate::handler::kill_process::handle_kill_local;
30use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
31use crate::session::{SESSION_MANAGER, SessionMapRef};
32
33#[derive(Default)]
34pub struct FrontendServiceImpl {
35    session_map: SessionMapRef,
36}
37
38impl FrontendServiceImpl {
39    pub fn new(session_map: SessionMapRef) -> Self {
40        Self { session_map }
41    }
42}
43
44#[async_trait::async_trait]
45impl FrontendService for FrontendServiceImpl {
46    async fn get_table_replace_plan(
47        &self,
48        request: RpcRequest<GetTableReplacePlanRequest>,
49    ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
50        let req = request.into_inner();
51
52        let replace_plan = get_new_table_plan(
53            req.table_id,
54            req.database_id,
55            req.owner,
56            req.cdc_table_change,
57        )
58        .await?;
59
60        Ok(RpcResponse::new(GetTableReplacePlanResponse {
61            replace_plan: Some(replace_plan),
62        }))
63    }
64
65    async fn get_running_sqls(
66        &self,
67        _request: RpcRequest<GetRunningSqlsRequest>,
68    ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
69        let running_sqls = self
70            .session_map
71            .read()
72            .values()
73            .map(|s| RunningSql {
74                process_id: s.id().0,
75                user_name: s.user_name(),
76                peer_addr: format!("{}", s.peer_addr()),
77                database: s.database(),
78                elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
79                sql: s.running_sql().map(|sql| format!("{}", sql)),
80            })
81            .collect();
82        Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
83    }
84
85    async fn cancel_running_sql(
86        &self,
87        request: RpcRequest<CancelRunningSqlRequest>,
88    ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
89        let process_id = request.into_inner().process_id;
90        handle_kill_local(self.session_map.clone(), process_id).await?;
91        Ok(RpcResponse::new(CancelRunningSqlResponse {}))
92    }
93}
94
95/// Rebuild the table's streaming plan, possibly with cdc column changes.
96async fn get_new_table_plan(
97    table_id: TableId,
98    database_id: DatabaseId,
99    owner: u32,
100    cdc_table_change: Option<TableSchemaChange>,
101) -> Result<ReplaceJobPlan, RwError> {
102    tracing::info!("get_new_table_plan for table {}", table_id);
103
104    let session_mgr = SESSION_MANAGER
105        .get()
106        .expect("session manager has been initialized");
107
108    // get a session object for the corresponding user and database
109    let session = session_mgr.create_dummy_session(database_id, owner)?;
110
111    let _guard = scopeguard::guard((), |_| {
112        session_mgr.end_session(&session);
113    });
114
115    let table_catalog = {
116        let reader = session.env().catalog_reader().read_guard();
117        reader.get_any_table_by_id(table_id)?.clone()
118    };
119
120    let schema_name = {
121        let reader = session.env().catalog_reader().read_guard();
122        let schema = reader.get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)?;
123        schema.name.clone()
124    };
125    let table_name = ObjectName::from(vec![
126        schema_name.as_str().into(),
127        table_catalog.name.as_str().into(),
128    ]);
129
130    let definition = if let Some(cdc_table_change) = cdc_table_change {
131        let new_version_columns = cdc_table_change
132            .columns
133            .into_iter()
134            .map(|c| c.into())
135            .collect_vec();
136        get_new_table_definition_for_cdc_table(table_catalog.clone(), &new_version_columns).await?
137    } else {
138        table_catalog.create_sql_ast_purified()?
139    };
140
141    let (source, table, graph, job_type) = get_replace_table_plan(
142        &session,
143        table_name,
144        definition,
145        &table_catalog,
146        SqlColumnStrategy::FollowUnchecked,
147    )
148    .await?;
149
150    Ok(ReplaceJobPlan {
151        replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
152            replace_job_plan::ReplaceTable {
153                table: Some(table.to_prost()),
154                source: source.map(|s| s.to_prost()),
155                job_type: job_type as _,
156            },
157        )),
158        fragment_graph: Some(graph),
159    })
160}