risingwave_frontend/rpc/
mod.rs1use itertools::Itertools;
16use pgwire::pg_server::{BoxedError, Session, SessionManager};
17use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
18use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
19use risingwave_pb::frontend_service::{
20 CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
21 GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse, RunningSql,
22};
23use risingwave_rpc_client::error::ToTonicStatus;
24use risingwave_sqlparser::ast::ObjectName;
25use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
26
27use crate::error::RwError;
28use crate::handler::create_source::SqlColumnStrategy;
29use crate::handler::kill_process::handle_kill_local;
30use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
31use crate::session::{SESSION_MANAGER, SessionMapRef};
32
33#[derive(thiserror::Error, Debug)]
34pub enum AutoSchemaChangeError {
35 #[error("frontend error")]
36 FrontendError(
37 #[from]
38 #[backtrace]
39 RwError,
40 ),
41}
42
43impl From<BoxedError> for AutoSchemaChangeError {
44 fn from(err: BoxedError) -> Self {
45 AutoSchemaChangeError::FrontendError(RwError::from(err))
46 }
47}
48
49impl From<AutoSchemaChangeError> for tonic::Status {
50 fn from(err: AutoSchemaChangeError) -> Self {
51 err.to_status(tonic::Code::Internal, "frontend")
52 }
53}
54
55#[derive(Default)]
56pub struct FrontendServiceImpl {
57 session_map: SessionMapRef,
58}
59
60impl FrontendServiceImpl {
61 pub fn new(session_map: SessionMapRef) -> Self {
62 Self { session_map }
63 }
64}
65
66#[async_trait::async_trait]
67impl FrontendService for FrontendServiceImpl {
68 async fn get_table_replace_plan(
69 &self,
70 request: RpcRequest<GetTableReplacePlanRequest>,
71 ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
72 let req = request.into_inner();
73 tracing::info!("get_table_replace_plan for table {}", req.table_name);
74
75 let table_change = req.table_change.expect("schema change message is required");
76 let replace_plan =
77 get_new_table_plan(table_change, req.table_name, req.database_id, req.owner).await?;
78
79 Ok(RpcResponse::new(GetTableReplacePlanResponse {
80 replace_plan: Some(replace_plan),
81 }))
82 }
83
84 async fn get_running_sqls(
85 &self,
86 _request: RpcRequest<GetRunningSqlsRequest>,
87 ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
88 let running_sqls = self
89 .session_map
90 .read()
91 .values()
92 .map(|s| RunningSql {
93 process_id: s.id().0,
94 user_name: s.user_name(),
95 peer_addr: format!("{}", s.peer_addr()),
96 database: s.database(),
97 elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
98 sql: s.running_sql().map(|sql| format!("{}", sql)),
99 })
100 .collect();
101 Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
102 }
103
104 async fn cancel_running_sql(
105 &self,
106 request: RpcRequest<CancelRunningSqlRequest>,
107 ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
108 let process_id = request.into_inner().process_id;
109 handle_kill_local(self.session_map.clone(), process_id).await?;
110 Ok(RpcResponse::new(CancelRunningSqlResponse {}))
111 }
112}
113
114async fn get_new_table_plan(
116 table_change: TableSchemaChange,
117 table_name: String,
118 database_id: u32,
119 owner: u32,
120) -> Result<ReplaceJobPlan, AutoSchemaChangeError> {
121 tracing::info!("get_new_table_plan for table {}", table_name);
122
123 let session_mgr = SESSION_MANAGER
124 .get()
125 .expect("session manager has been initialized");
126
127 let session = session_mgr.create_dummy_session(database_id, owner)?;
129
130 let new_version_columns = table_change
131 .columns
132 .into_iter()
133 .map(|c| c.into())
134 .collect_vec();
135 let table_name = ObjectName::from(vec![table_name.as_str().into()]);
136
137 let (new_table_definition, original_catalog) =
138 get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns)
139 .await?;
140 let (_, table, graph, job_type) = get_replace_table_plan(
141 &session,
142 table_name,
143 new_table_definition,
144 &original_catalog,
145 SqlColumnStrategy::FollowUnchecked, )
147 .await?;
148
149 Ok(ReplaceJobPlan {
150 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
151 replace_job_plan::ReplaceTable {
152 table: Some(table),
153 source: None, job_type: job_type as _,
155 },
156 )),
157 fragment_graph: Some(graph),
158 })
159}