risingwave_frontend/handler/
extended_handle.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::Formatter;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use pgwire::types::Format;
21use risingwave_common::bail_not_implemented;
22use risingwave_common::types::DataType;
23use risingwave_sqlparser::ast::{CreateSink, Query, Statement};
24
25use super::query::BoundResult;
26use super::{HandlerArgs, RwPgResponse, fetch_cursor, handle, query};
27use crate::error::Result;
28use crate::session::SessionImpl;
29
30/// Except for Query,Insert,Delete,Update statement, we store other statement as `PureStatement`.
31/// We separate them because `PureStatement` don't have query and parameters (except
32/// create-table-as, create-view-as, create-sink-as), so we don't need to do extra work(infer and
33/// bind parameter) for them.
34/// For create-table-as, create-view-as, create-sink-as with query parameters, we can't
35/// support them. If we find that there are parameter in their query, we return a error otherwise we
36/// store them as `PureStatement`.
37#[derive(Clone)]
38pub enum PrepareStatement {
39    Empty,
40    Prepared(PreparedResult),
41    PureStatement(Statement),
42}
43
44#[derive(Clone)]
45pub struct PreparedResult {
46    pub statement: Statement,
47    pub bound_result: BoundResult,
48}
49
50/// Partial was a concept in the PostgreSQL protocol.
51///
52/// In the extended-query protocol, execution of SQL commands is divided into multiple steps.
53/// The state retained between steps is represented by two types of objects: prepared statements
54/// and portals. A prepared statement represents the result of parsing and semantic analysis of a
55/// textual query string. A prepared statement is not in itself ready to execute, because it might
56/// lack specific values for parameters.
57/// A portal represents a ready-to-execute or already-partially-executed statement,
58/// with any missing parameter values filled in.
59///
60/// Reference: <https://www.postgresql.org/docs/current/protocol-overview.html#PROTOCOL-QUERY-CONCEPTS>
61#[expect(clippy::enum_variant_names)]
62#[derive(Clone)]
63pub enum Portal {
64    Empty,
65    Portal(PortalResult),
66    PureStatement(Statement),
67}
68
69impl std::fmt::Display for Portal {
70    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
71        match &self {
72            Portal::Empty => write!(f, "Empty"),
73            Portal::Portal(portal) => portal.fmt(f),
74            Portal::PureStatement(stmt) => write!(f, "{}", stmt),
75        }
76    }
77}
78
79/// See the docs of [`Portal`].
80#[derive(Clone)]
81pub struct PortalResult {
82    pub statement: Statement,
83    pub bound_result: BoundResult,
84    pub result_formats: Vec<Format>,
85}
86
87impl std::fmt::Display for PortalResult {
88    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
89        write!(
90            f,
91            "{}, params = {:?}",
92            self.statement, self.bound_result.parsed_params
93        )
94    }
95}
96
97pub async fn handle_parse(
98    session: Arc<SessionImpl>,
99    statement: Statement,
100    specific_param_types: Vec<Option<DataType>>,
101) -> Result<PrepareStatement> {
102    session.clear_cancel_query_flag();
103    let sql: Arc<str> = Arc::from(statement.to_string());
104    let handler_args = HandlerArgs::new(session, &statement, sql)?;
105    match &statement {
106        Statement::Query(_)
107        | Statement::Insert { .. }
108        | Statement::Delete { .. }
109        | Statement::Update { .. } => {
110            query::handle_parse(handler_args, statement, specific_param_types)
111        }
112        Statement::FetchCursor { .. } => {
113            fetch_cursor::handle_parse(handler_args, statement, specific_param_types).await
114        }
115        Statement::DeclareCursor { .. } => {
116            query::handle_parse(handler_args, statement, specific_param_types)
117        }
118        Statement::CreateView {
119            query,
120            materialized,
121            ..
122        } => {
123            if *materialized {
124                return query::handle_parse(handler_args, statement, specific_param_types);
125            }
126            if have_parameter_in_query(query) {
127                bail_not_implemented!("CREATE VIEW with parameters");
128            }
129            Ok(PrepareStatement::PureStatement(statement))
130        }
131        Statement::CreateTable { query, .. } => {
132            if let Some(query) = query
133                && have_parameter_in_query(query)
134            {
135                bail_not_implemented!("CREATE TABLE AS SELECT with parameters");
136            } else {
137                Ok(PrepareStatement::PureStatement(statement))
138            }
139        }
140        Statement::CreateSink { stmt } => {
141            if let CreateSink::AsQuery(query) = &stmt.sink_from
142                && have_parameter_in_query(query)
143            {
144                bail_not_implemented!("CREATE SINK AS SELECT with parameters");
145            } else {
146                Ok(PrepareStatement::PureStatement(statement))
147            }
148        }
149        _ => Ok(PrepareStatement::PureStatement(statement)),
150    }
151}
152
153pub fn handle_bind(
154    prepare_statement: PrepareStatement,
155    params: Vec<Option<Bytes>>,
156    param_formats: Vec<Format>,
157    result_formats: Vec<Format>,
158) -> Result<Portal> {
159    match prepare_statement {
160        PrepareStatement::Empty => Ok(Portal::Empty),
161        PrepareStatement::Prepared(prepared_result) => {
162            let PreparedResult {
163                bound_result,
164                statement,
165            } = prepared_result;
166            let BoundResult {
167                stmt_type,
168                must_dist,
169                bound,
170                param_types,
171                dependent_relations,
172                dependent_udfs,
173                ..
174            } = bound_result;
175
176            let (new_bound, parsed_params) = bound.bind_parameter(params, param_formats)?;
177            let new_bound_result = BoundResult {
178                stmt_type,
179                must_dist,
180                param_types,
181                parsed_params: Some(parsed_params),
182                dependent_relations,
183                dependent_udfs,
184                bound: new_bound,
185            };
186            Ok(Portal::Portal(PortalResult {
187                bound_result: new_bound_result,
188                result_formats,
189                statement,
190            }))
191        }
192        PrepareStatement::PureStatement(stmt) => {
193            // Jdbc might send set statements in a prepare statement, so params could be not empty.
194            Ok(Portal::PureStatement(stmt))
195        }
196    }
197}
198
199pub async fn handle_execute(session: Arc<SessionImpl>, portal: Portal) -> Result<RwPgResponse> {
200    match portal {
201        Portal::Empty => Ok(RwPgResponse::empty_result(
202            pgwire::pg_response::StatementType::EMPTY,
203        )),
204        Portal::Portal(portal) => {
205            session.clear_cancel_query_flag();
206            let _guard = session.txn_begin_implicit(); // TODO(bugen): is this behavior correct?
207            let sql: Arc<str> = Arc::from(portal.statement.to_string());
208            let handler_args = HandlerArgs::new(session, &portal.statement, sql)?;
209            if let Statement::FetchCursor { .. } = &portal.statement {
210                fetch_cursor::handle_fetch_cursor_execute(handler_args, portal).await
211            } else {
212                query::handle_execute(handler_args, portal).await
213            }
214        }
215        Portal::PureStatement(stmt) => {
216            let sql: Arc<str> = Arc::from(stmt.to_string());
217            handle(session, stmt, sql, vec![]).await
218        }
219    }
220}
221
222/// A quick way to check if a query contains parameters.
223fn have_parameter_in_query(query: &Query) -> bool {
224    query.to_string().contains("$1")
225}