risingwave_frontend/handler/
extended_handle.rs1use std::fmt;
16use std::fmt::Formatter;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use pgwire::types::Format;
21use risingwave_common::bail_not_implemented;
22use risingwave_common::types::DataType;
23use risingwave_sqlparser::ast::{CreateSink, Query, Statement};
24
25use super::query::BoundResult;
26use super::{HandlerArgs, RwPgResponse, fetch_cursor, handle, query};
27use crate::error::Result;
28use crate::session::SessionImpl;
29
30#[derive(Clone)]
38pub enum PrepareStatement {
39 Empty,
40 Prepared(PreparedResult),
41 PureStatement(Statement),
42}
43
44#[derive(Clone)]
45pub struct PreparedResult {
46 pub statement: Statement,
47 pub bound_result: BoundResult,
48}
49
50#[expect(clippy::enum_variant_names)]
62#[derive(Clone)]
63pub enum Portal {
64 Empty,
65 Portal(PortalResult),
66 PureStatement(Statement),
67}
68
69impl std::fmt::Display for Portal {
70 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
71 match &self {
72 Portal::Empty => write!(f, "Empty"),
73 Portal::Portal(portal) => portal.fmt(f),
74 Portal::PureStatement(stmt) => write!(f, "{}", stmt),
75 }
76 }
77}
78
79#[derive(Clone)]
81pub struct PortalResult {
82 pub statement: Statement,
83 pub bound_result: BoundResult,
84 pub result_formats: Vec<Format>,
85}
86
87impl std::fmt::Display for PortalResult {
88 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
89 write!(
90 f,
91 "{}, params = {:?}",
92 self.statement, self.bound_result.parsed_params
93 )
94 }
95}
96
97pub async fn handle_parse(
98 session: Arc<SessionImpl>,
99 statement: Statement,
100 specified_param_types: Vec<Option<DataType>>,
101) -> Result<PrepareStatement> {
102 session.clear_cancel_query_flag();
103 let sql: Arc<str> = Arc::from(statement.to_string());
104 let handler_args = HandlerArgs::new(session, &statement, sql)?;
105 match &statement {
106 Statement::Query(_)
107 | Statement::Insert { .. }
108 | Statement::Delete { .. }
109 | Statement::Update { .. } => {
110 query::handle_parse_for_batch(handler_args, statement, specified_param_types)
111 }
112 Statement::FetchCursor { .. } => {
113 fetch_cursor::handle_parse(handler_args, statement, specified_param_types).await
114 }
115 Statement::DeclareCursor { .. } => {
116 query::handle_parse_for_batch(handler_args, statement, specified_param_types)
117 }
118 Statement::CreateView {
119 query,
120 materialized,
121 ..
122 } => {
123 if *materialized {
124 return query::handle_parse_for_stream(
125 handler_args,
126 statement,
127 specified_param_types,
128 );
129 }
130 if have_parameter_in_query(query) {
131 bail_not_implemented!("CREATE VIEW with parameters");
132 }
133 Ok(PrepareStatement::PureStatement(statement))
134 }
135 Statement::CreateTable { query, .. } => {
136 if let Some(query) = query
137 && have_parameter_in_query(query)
138 {
139 bail_not_implemented!("CREATE TABLE AS SELECT with parameters");
140 } else {
141 Ok(PrepareStatement::PureStatement(statement))
142 }
143 }
144 Statement::CreateSink { stmt } => {
145 if let CreateSink::AsQuery(query) = &stmt.sink_from
146 && have_parameter_in_query(query)
147 {
148 bail_not_implemented!("CREATE SINK AS SELECT with parameters");
149 } else {
150 Ok(PrepareStatement::PureStatement(statement))
151 }
152 }
153 _ => Ok(PrepareStatement::PureStatement(statement)),
154 }
155}
156
157pub fn handle_bind(
158 prepare_statement: PrepareStatement,
159 params: Vec<Option<Bytes>>,
160 param_formats: Vec<Format>,
161 result_formats: Vec<Format>,
162) -> Result<Portal> {
163 match prepare_statement {
164 PrepareStatement::Empty => Ok(Portal::Empty),
165 PrepareStatement::Prepared(prepared_result) => {
166 let PreparedResult {
167 bound_result,
168 statement,
169 } = prepared_result;
170 let BoundResult {
171 stmt_type,
172 must_dist,
173 bound,
174 param_types,
175 dependent_relations,
176 dependent_udfs,
177 ..
178 } = bound_result;
179
180 let (new_bound, parsed_params) = bound.bind_parameter(params, param_formats)?;
181 let new_bound_result = BoundResult {
182 stmt_type,
183 must_dist,
184 param_types,
185 parsed_params: Some(parsed_params),
186 dependent_relations,
187 dependent_udfs,
188 bound: new_bound,
189 };
190 Ok(Portal::Portal(PortalResult {
191 bound_result: new_bound_result,
192 result_formats,
193 statement,
194 }))
195 }
196 PrepareStatement::PureStatement(stmt) => {
197 Ok(Portal::PureStatement(stmt))
199 }
200 }
201}
202
203pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result<RwPgResponse> {
204 match portal {
205 Portal::Empty => Ok(RwPgResponse::empty_result(
206 pgwire::pg_response::StatementType::EMPTY,
207 )),
208 Portal::Portal(portal) => {
209 session.clear_cancel_query_flag();
210 let _guard = session.txn_begin_implicit(); let sql: Arc<str> = Arc::from(portal.statement.to_string());
212 let handler_args = HandlerArgs::new(session, &portal.statement, sql)?;
213 if let Statement::FetchCursor { .. } = &portal.statement {
214 fetch_cursor::handle_fetch_cursor_execute(handler_args, portal).await
215 } else {
216 query::handle_execute(handler_args, portal).await
217 }
218 }
219 Portal::PureStatement(stmt) => {
220 let sql: Arc<str> = Arc::from(stmt.to_string());
221 handle(session, stmt, sql, vec![]).await
222 }
223 }
224}
225
226fn have_parameter_in_query(query: &Query) -> bool {
228 query.to_string().contains("$1")
229}