risingwave_frontend/rpc/
mod.rs1use itertools::Itertools;
16use pgwire::pg_server::{Session, SessionManager};
17use risingwave_common::id::{DatabaseId, TableId};
18use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
19use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
20use risingwave_pb::frontend_service::{
21 CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
22 GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse, RunningSql,
23};
24use risingwave_sqlparser::ast::ObjectName;
25use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
26
27use crate::error::RwError;
28use crate::handler::create_source::SqlColumnStrategy;
29use crate::handler::kill_process::handle_kill_local;
30use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
31use crate::session::{SESSION_MANAGER, SessionMapRef};
32
33#[derive(Default)]
34pub struct FrontendServiceImpl {
35 session_map: SessionMapRef,
36}
37
38impl FrontendServiceImpl {
39 pub fn new(session_map: SessionMapRef) -> Self {
40 Self { session_map }
41 }
42}
43
44#[async_trait::async_trait]
45impl FrontendService for FrontendServiceImpl {
46 async fn get_table_replace_plan(
47 &self,
48 request: RpcRequest<GetTableReplacePlanRequest>,
49 ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
50 let req = request.into_inner();
51
52 let replace_plan =
53 get_new_table_plan(req.table_id, req.database_id, req.cdc_table_change).await?;
54
55 Ok(RpcResponse::new(GetTableReplacePlanResponse {
56 replace_plan: Some(replace_plan),
57 }))
58 }
59
60 async fn get_running_sqls(
61 &self,
62 _request: RpcRequest<GetRunningSqlsRequest>,
63 ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
64 let running_sqls = self
65 .session_map
66 .read()
67 .values()
68 .map(|s| RunningSql {
69 process_id: s.id().0,
70 user_name: s.user_name(),
71 peer_addr: format!("{}", s.peer_addr()),
72 database: s.database(),
73 elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
74 sql: s.running_sql().map(|sql| format!("{}", sql)),
75 })
76 .collect();
77 Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
78 }
79
80 async fn cancel_running_sql(
81 &self,
82 request: RpcRequest<CancelRunningSqlRequest>,
83 ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
84 let process_id = request.into_inner().process_id;
85 handle_kill_local(self.session_map.clone(), process_id).await?;
86 Ok(RpcResponse::new(CancelRunningSqlResponse {}))
87 }
88}
89
90async fn get_new_table_plan(
92 table_id: TableId,
93 database_id: DatabaseId,
94 cdc_table_change: Option<TableSchemaChange>,
95) -> Result<ReplaceJobPlan, RwError> {
96 tracing::info!("get_new_table_plan for table {}", table_id);
97
98 let session_mgr = SESSION_MANAGER
99 .get()
100 .expect("session manager has been initialized");
101
102 let session = session_mgr.create_dummy_session(database_id)?;
104
105 let _guard = scopeguard::guard((), |_| {
106 session_mgr.end_session(&session);
107 });
108
109 let table_catalog = {
110 let reader = session.env().catalog_reader().read_guard();
111 reader.get_any_table_by_id(table_id)?.clone()
112 };
113 let original_owner = table_catalog.owner;
114
115 let schema_name = {
116 let reader = session.env().catalog_reader().read_guard();
117 let schema = reader.get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)?;
118 schema.name.clone()
119 };
120 let table_name = ObjectName::from(vec![
121 schema_name.as_str().into(),
122 table_catalog.name.as_str().into(),
123 ]);
124
125 let definition = if let Some(cdc_table_change) = cdc_table_change {
126 let new_version_columns = cdc_table_change
127 .columns
128 .into_iter()
129 .map(|c| c.into())
130 .collect_vec();
131 get_new_table_definition_for_cdc_table(table_catalog.clone(), &new_version_columns).await?
132 } else {
133 table_catalog.create_sql_ast_purified()?
134 };
135
136 let (mut source, mut table, graph, job_type) = get_replace_table_plan(
137 &session,
138 table_name,
139 definition,
140 &table_catalog,
141 SqlColumnStrategy::FollowUnchecked,
142 )
143 .await?;
144
145 table.owner = original_owner;
148 if let Some(source) = &mut source {
149 source.owner = original_owner;
150 }
151
152 Ok(ReplaceJobPlan {
153 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
154 replace_job_plan::ReplaceTable {
155 table: Some(table.to_prost()),
156 source: source.map(|s| s.to_prost()),
157 job_type: job_type as _,
158 },
159 )),
160 fragment_graph: Some(graph),
161 })
162}