risingwave_frontend/handler/
fetch_cursor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_field_descriptor::PgFieldDescriptor;
16use pgwire::pg_response::{PgResponse, StatementType};
17use pgwire::types::{Format, Row};
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_sqlparser::ast::{FetchCursorStatement, Statement};
22
23use super::RwPgResponse;
24use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
25use super::query::BoundResult;
26use super::util::convert_interval_to_u64_seconds;
27use crate::binder::BoundStatement;
28use crate::error::Result;
29use crate::handler::HandlerArgs;
30use crate::{Binder, PgResponseStream, WithOptions};
31
32pub async fn handle_fetch_cursor_execute(
33    handler_args: HandlerArgs,
34    portal_result: PortalResult,
35) -> Result<RwPgResponse> {
36    if let PortalResult {
37        statement: Statement::FetchCursor { stmt },
38        bound_result:
39            BoundResult {
40                bound: BoundStatement::FetchCursor(fetch_cursor),
41                ..
42            },
43        result_formats,
44        ..
45    } = portal_result
46    {
47        match fetch_cursor.returning_schema {
48            Some(_) => handle_fetch_cursor(handler_args, stmt, &result_formats).await,
49            None => Ok(build_fetch_cursor_response(vec![], vec![])),
50        }
51    } else {
52        bail_not_implemented!("unsupported portal {}", portal_result)
53    }
54}
55pub async fn handle_fetch_cursor(
56    handler_args: HandlerArgs,
57    stmt: FetchCursorStatement,
58    formats: &Vec<Format>,
59) -> Result<RwPgResponse> {
60    let session = handler_args.session.clone();
61    let cursor_name = stmt.cursor_name.real_value();
62    let with_options = WithOptions::try_from(stmt.with_properties.0.as_slice())?;
63
64    if with_options.len() > 1 {
65        bail_not_implemented!("only `timeout` is supported in with options")
66    }
67
68    let timeout_seconds = with_options
69        .get("timeout")
70        .map(convert_interval_to_u64_seconds)
71        .transpose()?;
72
73    if with_options.len() == 1 && timeout_seconds.is_none() {
74        bail_not_implemented!("only `timeout` is supported in with options")
75    }
76
77    let cursor_manager = session.get_cursor_manager();
78
79    let (rows, pg_descs) = cursor_manager
80        .get_rows_with_cursor(
81            &cursor_name,
82            stmt.count,
83            handler_args,
84            formats,
85            timeout_seconds,
86        )
87        .await?;
88    Ok(build_fetch_cursor_response(rows, pg_descs))
89}
90
91fn build_fetch_cursor_response(rows: Vec<Row>, pg_descs: Vec<PgFieldDescriptor>) -> RwPgResponse {
92    PgResponse::builder(StatementType::FETCH_CURSOR)
93        .row_cnt_opt(Some(rows.len() as i32))
94        .values(PgResponseStream::from(rows), pg_descs)
95        .into()
96}
97
98pub async fn handle_parse(
99    handler_args: HandlerArgs,
100    statement: Statement,
101    specific_param_types: Vec<Option<DataType>>,
102) -> Result<PrepareStatement> {
103    if let Statement::FetchCursor { stmt } = &statement {
104        let session = handler_args.session.clone();
105        let cursor_name = stmt.cursor_name.real_value();
106        let fields = session
107            .get_cursor_manager()
108            .get_fields_with_cursor(&cursor_name)
109            .await?;
110
111        let mut binder = Binder::new_with_param_types(&session, specific_param_types);
112        let schema = Some(Schema::new(fields));
113
114        let bound = binder.bind_fetch_cursor(cursor_name, stmt.count, schema)?;
115        let bound_result = BoundResult {
116            stmt_type: StatementType::FETCH_CURSOR,
117            must_dist: false,
118            bound: BoundStatement::FetchCursor(Box::new(bound)),
119            param_types: binder.export_param_types()?,
120            parsed_params: None,
121            dependent_relations: binder.included_relations().clone(),
122            dependent_udfs: binder.included_udfs().clone(),
123        };
124        let result = PreparedResult {
125            statement,
126            bound_result,
127        };
128        Ok(PrepareStatement::Prepared(result))
129    } else {
130        bail_not_implemented!("unsupported statement {:?}", statement)
131    }
132}