risingwave_frontend/rpc/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod monitor_service;
16
17use itertools::Itertools;
18pub use monitor_service::MonitorServiceImpl;
19use pgwire::pg_server::{Session, SessionManager};
20use risingwave_common::id::{DatabaseId, TableId};
21use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
22use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
23use risingwave_pb::frontend_service::{
24    AllCursors, CancelRunningSqlRequest, CancelRunningSqlResponse, GetAllCursorsRequest,
25    GetAllCursorsResponse, GetAllSubCursorsRequest, GetAllSubCursorsResponse,
26    GetRunningSqlsRequest, GetRunningSqlsResponse, GetTableReplacePlanRequest,
27    GetTableReplacePlanResponse, RunningSql, SubscriptionCursor,
28};
29use risingwave_sqlparser::ast::ObjectName;
30use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
31
32use crate::error::RwError;
33use crate::handler::create_source::SqlColumnStrategy;
34use crate::handler::kill_process::handle_kill_local;
35use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
36use crate::session::{SESSION_MANAGER, SessionMapRef};
37
38#[derive(Default)]
39pub struct FrontendServiceImpl {
40    session_map: SessionMapRef,
41}
42
43impl FrontendServiceImpl {
44    pub fn new(session_map: SessionMapRef) -> Self {
45        Self { session_map }
46    }
47}
48
49#[async_trait::async_trait]
50impl FrontendService for FrontendServiceImpl {
51    async fn get_table_replace_plan(
52        &self,
53        request: RpcRequest<GetTableReplacePlanRequest>,
54    ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
55        let req = request.into_inner();
56
57        let replace_plan =
58            get_new_table_plan(req.table_id, req.database_id, req.cdc_table_change).await?;
59
60        Ok(RpcResponse::new(GetTableReplacePlanResponse {
61            replace_plan: Some(replace_plan),
62        }))
63    }
64
65    async fn get_all_cursors(
66        &self,
67        _request: RpcRequest<GetAllCursorsRequest>,
68    ) -> Result<RpcResponse<GetAllCursorsResponse>, Status> {
69        let sessions = self.session_map.read().values().cloned().collect_vec();
70        let mut all_cursors = vec![];
71        for s in sessions {
72            let mut cursor_names = vec![];
73            s.get_cursor_manager()
74                .iter_query_cursors(|cursor_name: &String, _| {
75                    cursor_names.push(cursor_name.clone());
76                })
77                .await;
78            all_cursors.push(AllCursors {
79                session_id: s.id().0,
80                user_name: s.user_name(),
81                peer_addr: format!("{}", s.peer_addr()),
82                database: s.database(),
83                cursor_names,
84            });
85        }
86        Ok(RpcResponse::new(GetAllCursorsResponse { all_cursors }))
87    }
88
89    async fn get_all_sub_cursors(
90        &self,
91        _request: RpcRequest<GetAllSubCursorsRequest>,
92    ) -> Result<RpcResponse<GetAllSubCursorsResponse>, Status> {
93        let sessions = self.session_map.read().values().cloned().collect_vec();
94        let mut subscription_cursors = vec![];
95        for s in sessions {
96            let mut sub_cursor_names = vec![];
97            let mut cursor_names = vec![];
98            let mut states = vec![];
99            let mut idle_durations = vec![];
100            s.get_cursor_manager()
101                .iter_subscription_cursors(|cursor_name, sub_cursor| {
102                    cursor_names.push(cursor_name.clone());
103                    sub_cursor_names.push(sub_cursor.subscription_name().to_owned());
104                    states.push(sub_cursor.state_info_string());
105                    idle_durations.push(sub_cursor.idle_duration().as_secs());
106                })
107                .await;
108            subscription_cursors.push(SubscriptionCursor {
109                session_id: s.id().0,
110                user_name: s.user_name(),
111                peer_addr: format!("{}", s.peer_addr()),
112                database: s.database(),
113                states,
114                idle_durations,
115                cursor_names,
116                subscription_names: sub_cursor_names,
117            });
118        }
119        Ok(RpcResponse::new(GetAllSubCursorsResponse {
120            subscription_cursors,
121        }))
122    }
123
124    async fn get_running_sqls(
125        &self,
126        _request: RpcRequest<GetRunningSqlsRequest>,
127    ) -> Result<RpcResponse<GetRunningSqlsResponse>, Status> {
128        let running_sqls = self
129            .session_map
130            .read()
131            .values()
132            .map(|s| RunningSql {
133                process_id: s.id().0,
134                user_name: s.user_name(),
135                peer_addr: format!("{}", s.peer_addr()),
136                database: s.database(),
137                elapsed_millis: s.elapse_since_running_sql().and_then(|e| e.try_into().ok()),
138                sql: s.running_sql().map(|sql| format!("{}", sql)),
139            })
140            .collect();
141        Ok(RpcResponse::new(GetRunningSqlsResponse { running_sqls }))
142    }
143
144    async fn cancel_running_sql(
145        &self,
146        request: RpcRequest<CancelRunningSqlRequest>,
147    ) -> Result<RpcResponse<CancelRunningSqlResponse>, Status> {
148        let process_id = request.into_inner().process_id;
149        handle_kill_local(self.session_map.clone(), process_id).await?;
150        Ok(RpcResponse::new(CancelRunningSqlResponse {}))
151    }
152}
153
154/// Rebuild the table's streaming plan, possibly with cdc column changes.
155async fn get_new_table_plan(
156    table_id: TableId,
157    database_id: DatabaseId,
158    cdc_table_change: Option<TableSchemaChange>,
159) -> Result<ReplaceJobPlan, RwError> {
160    tracing::info!("get_new_table_plan for table {}", table_id);
161
162    let session_mgr = SESSION_MANAGER
163        .get()
164        .expect("session manager has been initialized");
165
166    // get a session object for the corresponding user and database
167    let session = session_mgr.create_dummy_session(database_id)?;
168
169    let _guard = scopeguard::guard((), |_| {
170        session_mgr.end_session(&session);
171    });
172
173    let table_catalog = {
174        let reader = session.env().catalog_reader().read_guard();
175        reader.get_any_table_by_id(table_id)?.clone()
176    };
177    let original_owner = table_catalog.owner;
178
179    let schema_name = {
180        let reader = session.env().catalog_reader().read_guard();
181        let schema = reader.get_schema_by_id(table_catalog.database_id, table_catalog.schema_id)?;
182        schema.name.clone()
183    };
184    let table_name = ObjectName::from(vec![
185        schema_name.as_str().into(),
186        table_catalog.name.as_str().into(),
187    ]);
188
189    let definition = if let Some(cdc_table_change) = cdc_table_change {
190        let new_version_columns = cdc_table_change
191            .columns
192            .into_iter()
193            .map(|c| c.into())
194            .collect_vec();
195        get_new_table_definition_for_cdc_table(table_catalog.clone(), &new_version_columns).await?
196    } else {
197        table_catalog.create_sql_ast_purified()?
198    };
199
200    let (mut source, mut table, graph, job_type) = get_replace_table_plan(
201        &session,
202        table_name,
203        definition,
204        &table_catalog,
205        SqlColumnStrategy::FollowUnchecked,
206    )
207    .await?;
208
209    // The dummy session may be created with a fixed super user, which can cause the generated
210    // plan to carry an incorrect table owner. Restore it to the original owner.
211    table.owner = original_owner;
212    if let Some(source) = &mut source {
213        source.owner = original_owner;
214    }
215
216    Ok(ReplaceJobPlan {
217        replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
218            replace_job_plan::ReplaceTable {
219                table: Some(table.to_prost()),
220                source: source.map(|s| s.to_prost()),
221                job_type: job_type as _,
222            },
223        )),
224        fragment_graph: Some(graph),
225    })
226}