1mod monitor_service;
16
17use itertools::Itertools;
18pub use monitor_service::MonitorServiceImpl;
19use pgwire::pg_server::{Session, SessionManager};
20use risingwave_common::id::{DatabaseId, TableId};
21use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
22use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
23use risingwave_pb::frontend_service::{
24 AllCursors, CancelRunningSqlRequest, CancelRunningSqlResponse, GetAllCursorsRequest,
25 GetAllCursorsResponse, GetAllSubCursorsRequest, GetAllSubCursorsResponse,
26 GetRunningSqlsRequest, GetRunningSqlsResponse, GetTableReplacePlanRequest,
27 GetTableReplacePlanResponse, RunningSql, SubscriptionCursor,
28};
29use risingwave_sqlparser::ast::ObjectName;
30use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
31
32use crate::error::RwError;
33use crate::handler::create_source::SqlColumnStrategy;
34use crate::handler::kill_process::handle_kill_local;
35use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
36use crate::session::{SESSION_MANAGER, SessionMapRef};
37
38#[derive(Default)]
39pub struct FrontendServiceImpl {
40 session_map: SessionMapRef,
41}
42
43impl FrontendServiceImpl {
44 pub fn new(session_map: SessionMapRef) -> Self {
45 Self { session_map }
46 }
47}
48
49#[async_trait::async_trait]
50impl FrontendService for FrontendServiceImpl {
51 async fn get_table_replace_plan(
52 &self,
53 request: RpcRequest<GetTableReplacePlanRequest>,
54 ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
55 let req = request.into_inner();
56
57 let replace_plan =
58 get_new_table_plan(req.table_id, req.database_id, req.cdc_table_change).await?;
59
60 Ok(RpcResponse::new(GetTableReplacePlanResponse {
61 replace_plan: Some(replace_plan),
62 }))
63 }
64
65 async fn get_all_cursors(
66 &self,
67 _request: RpcRequest<GetAllCursorsRequest>,
68 ) -> Result<RpcResponse<GetAllCursorsResponse>, Status> {
69 let sessions = self.session_map.read().values().cloned().collect_vec();
70 let mut all_cursors = vec![];
71 for s in sessions {
72 let mut cursor_names = vec![];
73 s.get_cursor_manager()
74 .iter_query_cursors(|cursor_name: &String, _| {
75 cursor_names.push(cursor_name.clone());
76 })
77 .await;
78 all_cursors.push(AllCursors {
79 session_id: s.id().0,
80 user_name: s.user_name(),
81 peer_addr: format!("{}", s.peer_addr()),
82 database: s.database(),
83 cursor_names,
84 });
85 }
86 Ok(RpcResponse::new(GetAllCursorsResponse { all_cursors }))
87 }
88
89 async fn get_all_sub_cursors(
90 &self,
91 _request: RpcRequest<GetAllSubCursorsRequest>,
92 ) -> Result<RpcResponse<GetAllSubCursorsResponse>, Status> {
93 let sessions = self.session_map.read().values().cloned().collect_vec();
94 let mut subscription_cursors = vec![];
95 for s in sessions {
96 let mut sub_cursor_names = vec![];
97 let mut cursor_names = vec![];
98 let mut states = vec![];
99 let mut idle_durations = vec![];
100 s.get_cursor_manager()
101 .iter_subscription_cursors(|cursor_name, sub_cursor| {
102 cursor_names.push(cursor_name.clone());
103 sub_cursor_names.push(sub_cursor.subscription_name().to_owned());
104 states.push(sub_cursor.state_info_string());
105 idle_durations.push(sub_cursor.idle_duration().as_secs());
106 })
107 .await;
108 subscription_cursors.push(SubscriptionCursor {
109 session_id: s.id().0,
110 user_name: s.user_name(),
111 peer_addr: format!("{}", s.peer_addr()),
112 database: s.database(),
113 states,
114 idle_durations,
115 cursor_names,
116 subscription_names: sub_cursor_names,
117 });
118 }
119 Ok(RpcResponse::new(GetAllSubCursorsResponse {
120 subscription_cursors,
121 }))
122 }
123
124 async fn get_running_sqls(
125 &self,
126 _request: RpcRequest<GetRunningSqlsRequest>,
127 ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
128 let running_sqls = self
129 .session_map
130 .read()
131 .values()
132 .map(|s| RunningSql {
133 process_id: s.id().0,
134 user_name: s.user_name(),
135 peer_addr: format!("{}", s.peer_addr()),
136 database: s.database(),
137 elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
138 sql: s.running_sql().map(|sql| format!("{}", sql)),
139 })
140 .collect();
141 Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
142 }
143
144 async fn cancel_running_sql(
145 &self,
146 request: RpcRequest<CancelRunningSqlRequest>,
147 ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
148 let process_id = request.into_inner().process_id;
149 handle_kill_local(self.session_map.clone(), process_id).await?;
150 Ok(RpcResponse::new(CancelRunningSqlResponse {}))
151 }
152}
153
154async fn get_new_table_plan(
156 table_id: TableId,
157 database_id: DatabaseId,
158 cdc_table_change: Option<TableSchemaChange>,
159) -> Result<ReplaceJobPlan, RwError> {
160 tracing::info!("get_new_table_plan for table {}", table_id);
161
162 let session_mgr = SESSION_MANAGER
163 .get()
164 .expect("session manager has been initialized");
165
166 let session = session_mgr.create_dummy_session(database_id)?;
168
169 let _guard = scopeguard::guard((), |_| {
170 session_mgr.end_session(&session);
171 });
172
173 let table_catalog = {
174 let reader = session.env().catalog_reader().read_guard();
175 reader.get_any_table_by_id(table_id)?.clone()
176 };
177 let original_owner = table_catalog.owner;
178
179 let schema_name = {
180 let reader = session.env().catalog_reader().read_guard();
181 let schema = reader.get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)?;
182 schema.name.clone()
183 };
184 let table_name = ObjectName::from(vec![
185 schema_name.as_str().into(),
186 table_catalog.name.as_str().into(),
187 ]);
188
189 let definition = if let Some(cdc_table_change) = cdc_table_change {
190 let new_version_columns = cdc_table_change
191 .columns
192 .into_iter()
193 .map(|c| c.into())
194 .collect_vec();
195 get_new_table_definition_for_cdc_table(table_catalog.clone(), &new_version_columns).await?
196 } else {
197 table_catalog.create_sql_ast_purified()?
198 };
199
200 let (mut source, mut table, graph, job_type) = get_replace_table_plan(
201 &session,
202 table_name,
203 definition,
204 &table_catalog,
205 SqlColumnStrategy::FollowUnchecked,
206 )
207 .await?;
208
209 table.owner = original_owner;
212 if let Some(source) = &mut source {
213 source.owner = original_owner;
214 }
215
216 Ok(ReplaceJobPlan {
217 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
218 replace_job_plan::ReplaceTable {
219 table: Some(table.to_prost()),
220 source: source.map(|s| s.to_prost()),
221 job_type: job_type as _,
222 },
223 )),
224 fragment_graph: Some(graph),
225 })
226}