risingwave_frontend/rpc/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod monitor_service;
16
17use itertools::Itertools;
18pub use monitor_service::MonitorServiceImpl;
19use pgwire::pg_server::{Session, SessionManager};
20use risingwave_common::id::{DatabaseId, TableId};
21use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
22use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
23use risingwave_pb::frontend_service::{
24    AllCursors, CancelRunningSqlRequest, CancelRunningSqlResponse, GetAllCursorsRequest,
25    GetAllCursorsResponse, GetAllSubCursorsRequest, GetAllSubCursorsResponse,
26    GetRunningSqlsRequest, GetRunningSqlsResponse, GetTableReplacePlanRequest,
27    GetTableReplacePlanResponse, RunningSql, SubscriptionCursor,
28};
29use risingwave_sqlparser::ast::ObjectName;
30use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
31
32use crate::error::RwError;
33use crate::handler::create_source::SqlColumnStrategy;
34use crate::handler::kill_process::handle_kill_local;
35use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
36use crate::session::{SESSION_MANAGER, SessionMapRef};
37
38#[derive(Default)]
39pub struct FrontendServiceImpl {
40    session_map: SessionMapRef,
41}
42
43impl FrontendServiceImpl {
44    pub fn new(session_map: SessionMapRef) -> Self {
45        Self { session_map }
46    }
47}
48
49#[async_trait::async_trait]
50impl FrontendService for FrontendServiceImpl {
51    async fn get_table_replace_plan(
52        &self,
53        request: RpcRequest<GetTableReplacePlanRequest>,
54    ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
55        let req = request.into_inner();
56
57        let replace_plan = Box::pin(get_new_table_plan(
58            req.table_id,
59            req.database_id,
60            req.cdc_table_change,
61        ))
62        .await?;
63
64        Ok(RpcResponse::new(GetTableReplacePlanResponse {
65            replace_plan: Some(replace_plan),
66        }))
67    }
68
69    async fn get_all_cursors(
70        &self,
71        _request: RpcRequest<GetAllCursorsRequest>,
72    ) -> Result<RpcResponse<GetAllCursorsResponse>, Status> {
73        let sessions = self.session_map.read().values().cloned().collect_vec();
74        let mut all_cursors = vec![];
75        for s in sessions {
76            let mut cursor_names = vec![];
77            s.get_cursor_manager()
78                .iter_query_cursors(|cursor_name: &String, _| {
79                    cursor_names.push(cursor_name.clone());
80                })
81                .await;
82            all_cursors.push(AllCursors {
83                session_id: s.id().0,
84                user_name: s.user_name(),
85                peer_addr: format!("{}", s.peer_addr()),
86                database: s.database(),
87                cursor_names,
88            });
89        }
90        Ok(RpcResponse::new(GetAllCursorsResponse { all_cursors }))
91    }
92
93    async fn get_all_sub_cursors(
94        &self,
95        _request: RpcRequest<GetAllSubCursorsRequest>,
96    ) -> Result<RpcResponse<GetAllSubCursorsResponse>, Status> {
97        let sessions = self.session_map.read().values().cloned().collect_vec();
98        let mut subscription_cursors = vec![];
99        for s in sessions {
100            let mut sub_cursor_names = vec![];
101            let mut cursor_names = vec![];
102            let mut states = vec![];
103            let mut idle_durations = vec![];
104            s.get_cursor_manager()
105                .iter_subscription_cursors(|cursor_name, sub_cursor| {
106                    cursor_names.push(cursor_name.clone());
107                    sub_cursor_names.push(sub_cursor.subscription_name().to_owned());
108                    states.push(sub_cursor.state_info_string());
109                    idle_durations.push(sub_cursor.idle_duration().as_secs());
110                })
111                .await;
112            subscription_cursors.push(SubscriptionCursor {
113                session_id: s.id().0,
114                user_name: s.user_name(),
115                peer_addr: format!("{}", s.peer_addr()),
116                database: s.database(),
117                states,
118                idle_durations,
119                cursor_names,
120                subscription_names: sub_cursor_names,
121            });
122        }
123        Ok(RpcResponse::new(GetAllSubCursorsResponse {
124            subscription_cursors,
125        }))
126    }
127
128    async fn get_running_sqls(
129        &self,
130        _request: RpcRequest<GetRunningSqlsRequest>,
131    ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
132        let running_sqls = self
133            .session_map
134            .read()
135            .values()
136            .map(|s| RunningSql {
137                process_id: s.id().0,
138                user_name: s.user_name(),
139                peer_addr: format!("{}", s.peer_addr()),
140                database: s.database(),
141                elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
142                sql: s.running_sql().map(|sql| format!("{}", sql)),
143            })
144            .collect();
145        Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
146    }
147
148    async fn cancel_running_sql(
149        &self,
150        request: RpcRequest<CancelRunningSqlRequest>,
151    ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
152        let process_id = request.into_inner().process_id;
153        handle_kill_local(self.session_map.clone(), process_id).await?;
154        Ok(RpcResponse::new(CancelRunningSqlResponse {}))
155    }
156}
157
158/// Rebuild the table's streaming plan, possibly with cdc column changes.
159async fn get_new_table_plan(
160    table_id: TableId,
161    database_id: DatabaseId,
162    cdc_table_change: Option<TableSchemaChange>,
163) -> Result<ReplaceJobPlan, RwError> {
164    tracing::info!("get_new_table_plan for table {}", table_id);
165
166    let session_mgr = SESSION_MANAGER
167        .get()
168        .expect("session manager has been initialized");
169
170    // get a session object for the corresponding user and database
171    let session = session_mgr.create_dummy_session(database_id)?;
172
173    let _guard = scopeguard::guard((), |_| {
174        session_mgr.end_session(&session);
175    });
176
177    let table_catalog = {
178        let reader = session.env().catalog_reader().read_guard();
179        reader.get_any_table_by_id(table_id)?.clone()
180    };
181    let original_owner = table_catalog.owner;
182
183    let schema_name = {
184        let reader = session.env().catalog_reader().read_guard();
185        let schema = reader.get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)?;
186        schema.name.clone()
187    };
188    let table_name = ObjectName::from(vec![
189        schema_name.as_str().into(),
190        table_catalog.name.as_str().into(),
191    ]);
192
193    let definition = if let Some(cdc_table_change) = cdc_table_change {
194        let new_version_columns = cdc_table_change
195            .columns
196            .into_iter()
197            .map(|c| c.into())
198            .collect_vec();
199        get_new_table_definition_for_cdc_table(table_catalog.clone(), &new_version_columns).await?
200    } else {
201        table_catalog.create_sql_ast_purified()?
202    };
203
204    let (mut source, mut table, graph, job_type) = Box::pin(get_replace_table_plan(
205        &session,
206        table_name,
207        definition,
208        &table_catalog,
209        SqlColumnStrategy::FollowUnchecked,
210    ))
211    .await?;
212
213    // The dummy session may be created with a fixed super user, which can cause the generated
214    // plan to carry an incorrect table owner. Restore it to the original owner.
215    table.owner = original_owner;
216    if let Some(source) = &mut source {
217        source.owner = original_owner;
218    }
219
220    Ok(ReplaceJobPlan {
221        replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
222            replace_job_plan::ReplaceTable {
223                table: Some(table.to_prost()),
224                source: source.map(|s| s.to_prost()),
225                job_type: job_type as _,
226            },
227        )),
228        fragment_graph: Some(graph),
229    })
230}