risingwave_frontend/rpc/
mod.rs1mod monitor_service;
16
17use itertools::Itertools;
18pub use monitor_service::MonitorServiceImpl;
19use pgwire::pg_server::{Session, SessionManager};
20use risingwave_common::id::{DatabaseId, TableId};
21use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
22use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
23use risingwave_pb::frontend_service::{
24 CancelRunningSqlRequest, CancelRunningSqlResponse, GetRunningSqlsRequest,
25 GetRunningSqlsResponse, GetTableReplacePlanRequest, GetTableReplacePlanResponse, RunningSql,
26};
27use risingwave_sqlparser::ast::ObjectName;
28use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
29
30use crate::error::RwError;
31use crate::handler::create_source::SqlColumnStrategy;
32use crate::handler::kill_process::handle_kill_local;
33use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
34use crate::session::{SESSION_MANAGER, SessionMapRef};
35
36#[derive(Default)]
37pub struct FrontendServiceImpl {
38 session_map: SessionMapRef,
39}
40
41impl FrontendServiceImpl {
42 pub fn new(session_map: SessionMapRef) -> Self {
43 Self { session_map }
44 }
45}
46
47#[async_trait::async_trait]
48impl FrontendService for FrontendServiceImpl {
49 async fn get_table_replace_plan(
50 &self,
51 request: RpcRequest<GetTableReplacePlanRequest>,
52 ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
53 let req = request.into_inner();
54
55 let replace_plan =
56 get_new_table_plan(req.table_id, req.database_id, req.cdc_table_change).await?;
57
58 Ok(RpcResponse::new(GetTableReplacePlanResponse {
59 replace_plan: Some(replace_plan),
60 }))
61 }
62
63 async fn get_running_sqls(
64 &self,
65 _request: RpcRequest<GetRunningSqlsRequest>,
66 ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
67 let running_sqls = self
68 .session_map
69 .read()
70 .values()
71 .map(|s| RunningSql {
72 process_id: s.id().0,
73 user_name: s.user_name(),
74 peer_addr: format!("{}", s.peer_addr()),
75 database: s.database(),
76 elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
77 sql: s.running_sql().map(|sql| format!("{}", sql)),
78 })
79 .collect();
80 Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
81 }
82
83 async fn cancel_running_sql(
84 &self,
85 request: RpcRequest<CancelRunningSqlRequest>,
86 ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
87 let process_id = request.into_inner().process_id;
88 handle_kill_local(self.session_map.clone(), process_id).await?;
89 Ok(RpcResponse::new(CancelRunningSqlResponse {}))
90 }
91}
92
93async fn get_new_table_plan(
95 table_id: TableId,
96 database_id: DatabaseId,
97 cdc_table_change: Option<TableSchemaChange>,
98) -> Result<ReplaceJobPlan, RwError> {
99 tracing::info!("get_new_table_plan for table {}", table_id);
100
101 let session_mgr = SESSION_MANAGER
102 .get()
103 .expect("session manager has been initialized");
104
105 let session = session_mgr.create_dummy_session(database_id)?;
107
108 let _guard = scopeguard::guard((), |_| {
109 session_mgr.end_session(&session);
110 });
111
112 let table_catalog = {
113 let reader = session.env().catalog_reader().read_guard();
114 reader.get_any_table_by_id(table_id)?.clone()
115 };
116 let original_owner = table_catalog.owner;
117
118 let schema_name = {
119 let reader = session.env().catalog_reader().read_guard();
120 let schema = reader.get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)?;
121 schema.name.clone()
122 };
123 let table_name = ObjectName::from(vec![
124 schema_name.as_str().into(),
125 table_catalog.name.as_str().into(),
126 ]);
127
128 let definition = if let Some(cdc_table_change) = cdc_table_change {
129 let new_version_columns = cdc_table_change
130 .columns
131 .into_iter()
132 .map(|c| c.into())
133 .collect_vec();
134 get_new_table_definition_for_cdc_table(table_catalog.clone(), &new_version_columns).await?
135 } else {
136 table_catalog.create_sql_ast_purified()?
137 };
138
139 let (mut source, mut table, graph, job_type) = get_replace_table_plan(
140 &session,
141 table_name,
142 definition,
143 &table_catalog,
144 SqlColumnStrategy::FollowUnchecked,
145 )
146 .await?;
147
148 table.owner = original_owner;
151 if let Some(source) = &mut source {
152 source.owner = original_owner;
153 }
154
155 Ok(ReplaceJobPlan {
156 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
157 replace_job_plan::ReplaceTable {
158 table: Some(table.to_prost()),
159 source: source.map(|s| s.to_prost()),
160 job_type: job_type as _,
161 },
162 )),
163 fragment_graph: Some(graph),
164 })
165}