risingwave_frontend/handler/
extended_handle.rs1use std::fmt;
16use std::fmt::Formatter;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use pgwire::types::Format;
21use risingwave_common::bail_not_implemented;
22use risingwave_common::types::DataType;
23use risingwave_sqlparser::ast::{CreateSink, Query, Statement};
24
25use super::query::BoundResult;
26use super::{HandlerArgs, RwPgResponse, fetch_cursor, handle, query};
27use crate::error::Result;
28use crate::session::SessionImpl;
29
30#[derive(Clone)]
38pub enum PrepareStatement {
39 Empty,
40 Prepared(PreparedResult),
41 PureStatement(Statement),
42}
43
44#[derive(Clone)]
45pub struct PreparedResult {
46 pub statement: Statement,
47 pub bound_result: BoundResult,
48}
49
50#[expect(clippy::enum_variant_names)]
62#[derive(Clone)]
63pub enum Portal {
64 Empty,
65 Portal(PortalResult),
66 PureStatement(Statement),
67}
68
69impl std::fmt::Display for Portal {
70 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
71 match &self {
72 Portal::Empty => write!(f, "Empty"),
73 Portal::Portal(portal) => portal.fmt(f),
74 Portal::PureStatement(stmt) => write!(f, "{}", stmt),
75 }
76 }
77}
78
79#[derive(Clone)]
81pub struct PortalResult {
82 pub statement: Statement,
83 pub bound_result: BoundResult,
84 pub result_formats: Vec<Format>,
85}
86
87impl std::fmt::Display for PortalResult {
88 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "{}, params = {:?}",
92 self.statement, self.bound_result.parsed_params
93 )
94 }
95}
96
97pub async fn handle_parse(
98 session: Arc<SessionImpl>,
99 statement: Statement,
100 specific_param_types: Vec<Option<DataType>>,
101) -> Result<PrepareStatement> {
102 session.clear_cancel_query_flag();
103 let sql: Arc<str> = Arc::from(statement.to_string());
104 let handler_args = HandlerArgs::new(session, &statement, sql)?;
105 match &statement {
106 Statement::Query(_)
107 | Statement::Insert { .. }
108 | Statement::Delete { .. }
109 | Statement::Update { .. } => {
110 query::handle_parse(handler_args, statement, specific_param_types)
111 }
112 Statement::FetchCursor { .. } => {
113 fetch_cursor::handle_parse(handler_args, statement, specific_param_types).await
114 }
115 Statement::DeclareCursor { .. } => {
116 query::handle_parse(handler_args, statement, specific_param_types)
117 }
118 Statement::CreateView {
119 query,
120 materialized,
121 ..
122 } => {
123 if *materialized {
124 return query::handle_parse(handler_args, statement, specific_param_types);
125 }
126 if have_parameter_in_query(query) {
127 bail_not_implemented!("CREATE VIEW with parameters");
128 }
129 Ok(PrepareStatement::PureStatement(statement))
130 }
131 Statement::CreateTable { query, .. } => {
132 if let Some(query) = query
133 && have_parameter_in_query(query)
134 {
135 bail_not_implemented!("CREATE TABLE AS SELECT with parameters");
136 } else {
137 Ok(PrepareStatement::PureStatement(statement))
138 }
139 }
140 Statement::CreateSink { stmt } => {
141 if let CreateSink::AsQuery(query) = &stmt.sink_from
142 && have_parameter_in_query(query)
143 {
144 bail_not_implemented!("CREATE SINK AS SELECT with parameters");
145 } else {
146 Ok(PrepareStatement::PureStatement(statement))
147 }
148 }
149 _ => Ok(PrepareStatement::PureStatement(statement)),
150 }
151}
152
153pub fn handle_bind(
154 prepare_statement: PrepareStatement,
155 params: Vec<Option<Bytes>>,
156 param_formats: Vec<Format>,
157 result_formats: Vec<Format>,
158) -> Result<Portal> {
159 match prepare_statement {
160 PrepareStatement::Empty => Ok(Portal::Empty),
161 PrepareStatement::Prepared(prepared_result) => {
162 let PreparedResult {
163 bound_result,
164 statement,
165 } = prepared_result;
166 let BoundResult {
167 stmt_type,
168 must_dist,
169 bound,
170 param_types,
171 dependent_relations,
172 dependent_udfs,
173 ..
174 } = bound_result;
175
176 let (new_bound, parsed_params) = bound.bind_parameter(params, param_formats)?;
177 let new_bound_result = BoundResult {
178 stmt_type,
179 must_dist,
180 param_types,
181 parsed_params: Some(parsed_params),
182 dependent_relations,
183 dependent_udfs,
184 bound: new_bound,
185 };
186 Ok(Portal::Portal(PortalResult {
187 bound_result: new_bound_result,
188 result_formats,
189 statement,
190 }))
191 }
192 PrepareStatement::PureStatement(stmt) => {
193 Ok(Portal::PureStatement(stmt))
195 }
196 }
197}
198
199pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result<RwPgResponse> {
200 match portal {
201 Portal::Empty => Ok(RwPgResponse::empty_result(
202 pgwire::pg_response::StatementType::EMPTY,
203 )),
204 Portal::Portal(portal) => {
205 session.clear_cancel_query_flag();
206 let _guard = session.txn_begin_implicit(); let sql: Arc<str> = Arc::from(portal.statement.to_string());
208 let handler_args = HandlerArgs::new(session, &portal.statement, sql)?;
209 if let Statement::FetchCursor { .. } = &portal.statement {
210 fetch_cursor::handle_fetch_cursor_execute(handler_args, portal).await
211 } else {
212 query::handle_execute(handler_args, portal).await
213 }
214 }
215 Portal::PureStatement(stmt) => {
216 let sql: Arc<str> = Arc::from(stmt.to_string());
217 handle(session, stmt, sql, vec![]).await
218 }
219 }
220}
221
222fn have_parameter_in_query(query: &Query) -> bool {
224 query.to_string().contains("$1")
225}