risingwave_frontend/rpc/
mod.rs1use itertools::Itertools;
16use pgwire::pg_server::{BoxedError, SessionManager};
17use risingwave_pb::ddl_service::{ReplaceJobPlan, TableSchemaChange, replace_job_plan};
18use risingwave_pb::frontend_service::frontend_service_server::FrontendService;
19use risingwave_pb::frontend_service::{GetTableReplacePlanRequest, GetTableReplacePlanResponse};
20use risingwave_rpc_client::error::ToTonicStatus;
21use risingwave_sqlparser::ast::ObjectName;
22use tonic::{Request as RpcRequest, Response as RpcResponse, Status};
23
24use crate::error::RwError;
25use crate::handler::create_source::SqlColumnStrategy;
26use crate::handler::{get_new_table_definition_for_cdc_table, get_replace_table_plan};
27use crate::session::SESSION_MANAGER;
28
29#[derive(thiserror::Error, Debug)]
30pub enum AutoSchemaChangeError {
31 #[error("frontend error")]
32 FrontendError(
33 #[from]
34 #[backtrace]
35 RwError,
36 ),
37}
38
39impl From<BoxedError> for AutoSchemaChangeError {
40 fn from(err: BoxedError) -> Self {
41 AutoSchemaChangeError::FrontendError(RwError::from(err))
42 }
43}
44
45impl From<AutoSchemaChangeError> for tonic::Status {
46 fn from(err: AutoSchemaChangeError) -> Self {
47 err.to_status(tonic::Code::Internal, "frontend")
48 }
49}
50
51#[derive(Default)]
52pub struct FrontendServiceImpl {}
53
54impl FrontendServiceImpl {
55 pub fn new() -> Self {
56 Self {}
57 }
58}
59
60#[async_trait::async_trait]
61impl FrontendService for FrontendServiceImpl {
62 async fn get_table_replace_plan(
63 &self,
64 request: RpcRequest<GetTableReplacePlanRequest>,
65 ) -> Result<RpcResponse<GetTableReplacePlanResponse>, Status> {
66 let req = request.into_inner();
67 tracing::info!("get_table_replace_plan for table {}", req.table_name);
68
69 let table_change = req.table_change.expect("schema change message is required");
70 let replace_plan =
71 get_new_table_plan(table_change, req.table_name, req.database_id, req.owner).await?;
72
73 Ok(RpcResponse::new(GetTableReplacePlanResponse {
74 replace_plan: Some(replace_plan),
75 }))
76 }
77}
78
79async fn get_new_table_plan(
81 table_change: TableSchemaChange,
82 table_name: String,
83 database_id: u32,
84 owner: u32,
85) -> Result<ReplaceJobPlan, AutoSchemaChangeError> {
86 tracing::info!("get_new_table_plan for table {}", table_name);
87
88 let session_mgr = SESSION_MANAGER
89 .get()
90 .expect("session manager has been initialized");
91
92 let session = session_mgr.create_dummy_session(database_id, owner)?;
94
95 let new_version_columns = table_change
96 .columns
97 .into_iter()
98 .map(|c| c.into())
99 .collect_vec();
100 let table_name = ObjectName::from(vec![table_name.as_str().into()]);
101
102 let (new_table_definition, original_catalog) =
103 get_new_table_definition_for_cdc_table(&session, table_name.clone(), &new_version_columns)
104 .await?;
105 let (_, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
106 &session,
107 table_name,
108 new_table_definition,
109 &original_catalog,
110 SqlColumnStrategy::FollowUnchecked, )
112 .await?;
113
114 Ok(ReplaceJobPlan {
115 replace_job: Some(replace_job_plan::ReplaceJob::ReplaceTable(
116 replace_job_plan::ReplaceTable {
117 table: Some(table),
118 source: None, job_type: job_type as _,
120 },
121 )),
122 fragment_graph: Some(graph),
123 table_col_index_mapping: Some(col_index_mapping.to_protobuf()),
124 })
125}