risingwave_frontend/handler/
extended_handle.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::Formatter;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use pgwire::types::Format;
21use risingwave_common::bail_not_implemented;
22use risingwave_common::types::DataType;
23use risingwave_sqlparser::ast::{CreateSink, Query, Statement};
24
25use super::query::BoundResult;
26use super::{HandlerArgs, RwPgResponse, fetch_cursor, handle, query};
27use crate::error::Result;
28use crate::session::SessionImpl;
29
30/// Except for Query,Insert,Delete,Update statement, we store other statement as `PureStatement`.
31/// We separate them because `PureStatement` don't have query and parameters (except
32/// create-table-as, create-view-as, create-sink-as), so we don't need to do extra work(infer and
33/// bind parameter) for them.
34/// For create-table-as, create-view-as, create-sink-as with query parameters, we can't
35/// support them. If we find that there are parameter in their query, we return a error otherwise we
36/// store them as `PureStatement`.
37#[derive(Clone)]
38pub enum PrepareStatement {
39    Empty,
40    Prepared(PreparedResult),
41    PureStatement(Statement),
42}
43
44#[derive(Clone)]
45pub struct PreparedResult {
46    pub statement: Statement,
47    pub bound_result: BoundResult,
48}
49
50/// Partial was a concept in the PostgreSQL protocol.
51///
52/// In the extended-query protocol, execution of SQL commands is divided into multiple steps.
53/// The state retained between steps is represented by two types of objects: prepared statements
54/// and portals. A prepared statement represents the result of parsing and semantic analysis of a
55/// textual query string. A prepared statement is not in itself ready to execute, because it might
56/// lack specific values for parameters.
57/// A portal represents a ready-to-execute or already-partially-executed statement,
58/// with any missing parameter values filled in.
59///
60/// Reference: <https://www.postgresql.org/docs/current/protocol-overview.html#PROTOCOL-QUERY-CONCEPTS>
61#[expect(clippy::enum_variant_names)]
62#[derive(Clone)]
63pub enum Portal {
64    Empty,
65    Portal(PortalResult),
66    PureStatement(Statement),
67}
68
69impl std::fmt::Display for Portal {
70    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
71        match &self {
72            Portal::Empty => write!(f, "Empty"),
73            Portal::Portal(portal) => portal.fmt(f),
74            Portal::PureStatement(stmt) => write!(f, "{}", stmt),
75        }
76    }
77}
78
79/// See the docs of [`Portal`].
80#[derive(Clone)]
81pub struct PortalResult {
82    pub statement: Statement,
83    pub bound_result: BoundResult,
84    pub result_formats: Vec<Format>,
85}
86
87impl std::fmt::Display for PortalResult {
88    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
89        write!(
90            f,
91            "{}, params = {:?}",
92            self.statement, self.bound_result.parsed_params
93        )
94    }
95}
96
97pub async fn handle_parse(
98    session: Arc<SessionImpl>,
99    statement: Statement,
100    specified_param_types: Vec<Option<DataType>>,
101) -> Result<PrepareStatement> {
102    session.clear_cancel_query_flag();
103    let sql: Arc<str> = Arc::from(statement.to_string());
104    let handler_args = HandlerArgs::new(session, &statement, sql)?;
105    match &statement {
106        Statement::Query(_)
107        | Statement::Insert { .. }
108        | Statement::Delete { .. }
109        | Statement::Update { .. } => {
110            query::handle_parse_for_batch(handler_args, statement, specified_param_types)
111        }
112        Statement::FetchCursor { .. } => {
113            fetch_cursor::handle_parse(handler_args, statement, specified_param_types).await
114        }
115        Statement::DeclareCursor { .. } => {
116            query::handle_parse_for_batch(handler_args, statement, specified_param_types)
117        }
118        Statement::CreateView {
119            query,
120            materialized,
121            ..
122        } => {
123            if *materialized {
124                return query::handle_parse_for_stream(
125                    handler_args,
126                    statement,
127                    specified_param_types,
128                );
129            }
130            if have_parameter_in_query(query) {
131                bail_not_implemented!("CREATE VIEW with parameters");
132            }
133            Ok(PrepareStatement::PureStatement(statement))
134        }
135        Statement::CreateTable { query, .. } => {
136            if let Some(query) = query
137                && have_parameter_in_query(query)
138            {
139                bail_not_implemented!("CREATE TABLE AS SELECT with parameters");
140            } else {
141                Ok(PrepareStatement::PureStatement(statement))
142            }
143        }
144        Statement::CreateSink { stmt } => {
145            if let CreateSink::AsQuery(query) = &stmt.sink_from
146                && have_parameter_in_query(query)
147            {
148                bail_not_implemented!("CREATE SINK AS SELECT with parameters");
149            } else {
150                Ok(PrepareStatement::PureStatement(statement))
151            }
152        }
153        _ => Ok(PrepareStatement::PureStatement(statement)),
154    }
155}
156
157pub fn handle_bind(
158    prepare_statement: PrepareStatement,
159    params: Vec<Option<Bytes>>,
160    param_formats: Vec<Format>,
161    result_formats: Vec<Format>,
162) -> Result<Portal> {
163    match prepare_statement {
164        PrepareStatement::Empty => Ok(Portal::Empty),
165        PrepareStatement::Prepared(prepared_result) => {
166            let PreparedResult {
167                bound_result,
168                statement,
169            } = prepared_result;
170            let BoundResult {
171                stmt_type,
172                must_dist,
173                bound,
174                param_types,
175                dependent_relations,
176                dependent_udfs,
177                ..
178            } = bound_result;
179
180            let (new_bound, parsed_params) = bound.bind_parameter(params, param_formats)?;
181            let new_bound_result = BoundResult {
182                stmt_type,
183                must_dist,
184                param_types,
185                parsed_params: Some(parsed_params),
186                dependent_relations,
187                dependent_udfs,
188                bound: new_bound,
189            };
190            Ok(Portal::Portal(PortalResult {
191                bound_result: new_bound_result,
192                result_formats,
193                statement,
194            }))
195        }
196        PrepareStatement::PureStatement(stmt) => {
197            // Jdbc might send set statements in a prepare statement, so params could be not empty.
198            Ok(Portal::PureStatement(stmt))
199        }
200    }
201}
202
203pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result<RwPgResponse> {
204    match portal {
205        Portal::Empty => Ok(RwPgResponse::empty_result(
206            pgwire::pg_response::StatementType::EMPTY,
207        )),
208        Portal::Portal(portal) => {
209            session.clear_cancel_query_flag();
210            let _guard = session.txn_begin_implicit(); // TODO(bugen): is this behavior correct?
211            let sql: Arc<str> = Arc::from(portal.statement.to_string());
212            let handler_args = HandlerArgs::new(session, &portal.statement, sql)?;
213            if let Statement::FetchCursor { .. } = &portal.statement {
214                fetch_cursor::handle_fetch_cursor_execute(handler_args, portal).await
215            } else {
216                query::handle_execute(handler_args, portal).await
217            }
218        }
219        Portal::PureStatement(stmt) => {
220            let sql: Arc<str> = Arc::from(stmt.to_string());
221            handle(session, stmt, sql, vec![]).await
222        }
223    }
224}
225
226/// A quick way to check if a query contains parameters.
227fn have_parameter_in_query(query: &Query) -> bool {
228    query.to_string().contains("$1")
229}