1mod monitor_service;
16
17use itertools::Itertools;
18pub use monitor_service::MonitorServiceImpl;
19use pgwire::pg_server::{Session, SessionManager};
20use risingwave_common::id::{DatabaseId, TableId};
21use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
22use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
23use risingwave_pb::frontend_service::{
24 AllCursors, CancelRunningSqlRequest, CancelRunningSqlResponse, GetAllCursorsRequest,
25 GetAllCursorsResponse, GetAllSubCursorsRequest, GetAllSubCursorsResponse,
26 GetRunningSqlsRequest, GetRunningSqlsResponse, GetTableReplacePlanRequest,
27 GetTableReplacePlanResponse, RunningSql, SubscriptionCursor,
28};
29use risingwave_sqlparser::ast::ObjectName;
30use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
31
32use crate::error::RwError;
33use crate::handler::create_source::SqlColumnStrategy;
34use crate::handler::kill_process::handle_kill_local;
35use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
36use crate::session::{SESSION_MANAGER, SessionMapRef};
37
38#[derive(Default)]
39pub struct FrontendServiceImpl {
40 session_map: SessionMapRef,
41}
42
43impl FrontendServiceImpl {
44 pub fn new(session_map: SessionMapRef) -> Self {
45 Self { session_map }
46 }
47}
48
49#[async_trait::async_trait]
50impl FrontendService for FrontendServiceImpl {
51 async fn get_table_replace_plan(
52 &self,
53 request: RpcRequest<GetTableReplacePlanRequest>,
54 ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
55 let req = request.into_inner();
56
57 let replace_plan = Box::pin(get_new_table_plan(
58 req.table_id,
59 req.database_id,
60 req.cdc_table_change,
61 ))
62 .await?;
63
64 Ok(RpcResponse::new(GetTableReplacePlanResponse {
65 replace_plan: Some(replace_plan),
66 }))
67 }
68
69 async fn get_all_cursors(
70 &self,
71 _request: RpcRequest<GetAllCursorsRequest>,
72 ) -> Result<RpcResponse<GetAllCursorsResponse>, Status> {
73 let sessions = self.session_map.read().values().cloned().collect_vec();
74 let mut all_cursors = vec![];
75 for s in sessions {
76 let mut cursor_names = vec![];
77 s.get_cursor_manager()
78 .iter_query_cursors(|cursor_name: &String, _| {
79 cursor_names.push(cursor_name.clone());
80 })
81 .await;
82 all_cursors.push(AllCursors {
83 session_id: s.id().0,
84 user_name: s.user_name(),
85 peer_addr: format!("{}", s.peer_addr()),
86 database: s.database(),
87 cursor_names,
88 });
89 }
90 Ok(RpcResponse::new(GetAllCursorsResponse { all_cursors }))
91 }
92
93 async fn get_all_sub_cursors(
94 &self,
95 _request: RpcRequest<GetAllSubCursorsRequest>,
96 ) -> Result<RpcResponse<GetAllSubCursorsResponse>, Status> {
97 let sessions = self.session_map.read().values().cloned().collect_vec();
98 let mut subscription_cursors = vec![];
99 for s in sessions {
100 let mut sub_cursor_names = vec![];
101 let mut cursor_names = vec![];
102 let mut states = vec![];
103 let mut idle_durations = vec![];
104 s.get_cursor_manager()
105 .iter_subscription_cursors(|cursor_name, sub_cursor| {
106 cursor_names.push(cursor_name.clone());
107 sub_cursor_names.push(sub_cursor.subscription_name().to_owned());
108 states.push(sub_cursor.state_info_string());
109 idle_durations.push(sub_cursor.idle_duration().as_secs());
110 })
111 .await;
112 subscription_cursors.push(SubscriptionCursor {
113 session_id: s.id().0,
114 user_name: s.user_name(),
115 peer_addr: format!("{}", s.peer_addr()),
116 database: s.database(),
117 states,
118 idle_durations,
119 cursor_names,
120 subscription_names: sub_cursor_names,
121 });
122 }
123 Ok(RpcResponse::new(GetAllSubCursorsResponse {
124 subscription_cursors,
125 }))
126 }
127
128 async fn get_running_sqls(
129 &self,
130 _request: RpcRequest<GetRunningSqlsRequest>,
131 ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
132 let running_sqls = self
133 .session_map
134 .read()
135 .values()
136 .map(|s| RunningSql {
137 process_id: s.id().0,
138 user_name: s.user_name(),
139 peer_addr: format!("{}", s.peer_addr()),
140 database: s.database(),
141 elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
142 sql: s.running_sql().map(|sql| format!("{}", sql)),
143 })
144 .collect();
145 Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
146 }
147
148 async fn cancel_running_sql(
149 &self,
150 request: RpcRequest<CancelRunningSqlRequest>,
151 ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
152 let process_id = request.into_inner().process_id;
153 handle_kill_local(self.session_map.clone(), process_id).await?;
154 Ok(RpcResponse::new(CancelRunningSqlResponse {}))
155 }
156}
157
158async fn get_new_table_plan(
160 table_id: TableId,
161 database_id: DatabaseId,
162 cdc_table_change: Option<TableSchemaChange>,
163) -> Result<ReplaceJobPlan, RwError> {
164 tracing::info!("get_new_table_plan for table {}", table_id);
165
166 let session_mgr = SESSION_MANAGER
167 .get()
168 .expect("session manager has been initialized");
169
170 let session = session_mgr.create_dummy_session(database_id)?;
172
173 let _guard = scopeguard::guard((), |_| {
174 session_mgr.end_session(&session);
175 });
176
177 let table_catalog = {
178 let reader = session.env().catalog_reader().read_guard();
179 reader.get_any_table_by_id(table_id)?.clone()
180 };
181 let original_owner = table_catalog.owner;
182
183 let schema_name = {
184 let reader = session.env().catalog_reader().read_guard();
185 let schema = reader.get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)?;
186 schema.name.clone()
187 };
188 let table_name = ObjectName::from(vec![
189 schema_name.as_str().into(),
190 table_catalog.name.as_str().into(),
191 ]);
192
193 let definition = if let Some(cdc_table_change) = cdc_table_change {
194 let new_version_columns = cdc_table_change
195 .columns
196 .into_iter()
197 .map(|c| c.into())
198 .collect_vec();
199 get_new_table_definition_for_cdc_table(table_catalog.clone(), &new_version_columns).await?
200 } else {
201 table_catalog.create_sql_ast_purified()?
202 };
203
204 let (mut source, mut table, graph, job_type) = Box::pin(get_replace_table_plan(
205 &session,
206 table_name,
207 definition,
208 &table_catalog,
209 SqlColumnStrategy::FollowUnchecked,
210 ))
211 .await?;
212
213 table.owner = original_owner;
216 if let Some(source) = &mut source {
217 source.owner = original_owner;
218 }
219
220 Ok(ReplaceJobPlan {
221 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
222 replace_job_plan::ReplaceTable {
223 table: Some(table.to_prost()),
224 source: source.map(|s| s.to_prost()),
225 job_type: job_type as _,
226 },
227 )),
228 fragment_graph: Some(graph),
229 })
230}