risingwave_frontend/handler/
fetch_cursor.rs1use pgwire::pg_field_descriptor::PgFieldDescriptor;
16use pgwire::pg_response::{PgResponse, StatementType};
17use pgwire::types::{Format, Row};
18use risingwave_common::bail_not_implemented;
19use risingwave_common::catalog::Schema;
20use risingwave_common::types::DataType;
21use risingwave_sqlparser::ast::{FetchCursorStatement, Statement};
22
23use super::RwPgResponse;
24use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult};
25use super::query::BoundResult;
26use super::util::convert_interval_to_u64_seconds;
27use crate::binder::BoundStatement;
28use crate::error::Result;
29use crate::handler::HandlerArgs;
30use crate::{Binder, PgResponseStream, WithOptions};
31
32pub async fn handle_fetch_cursor_execute(
33 handler_args: HandlerArgs,
34 portal_result: PortalResult,
35) -> Result<RwPgResponse> {
36 if let PortalResult {
37 statement: Statement::FetchCursor { stmt },
38 bound_result:
39 BoundResult {
40 bound: BoundStatement::FetchCursor(fetch_cursor),
41 ..
42 },
43 result_formats,
44 ..
45 } = portal_result
46 {
47 match fetch_cursor.returning_schema {
48 Some(_) => handle_fetch_cursor(handler_args, stmt, &result_formats).await,
49 None => Ok(build_fetch_cursor_response(vec![], vec![])),
50 }
51 } else {
52 bail_not_implemented!("unsupported portal {}", portal_result)
53 }
54}
55pub async fn handle_fetch_cursor(
56 handler_args: HandlerArgs,
57 stmt: FetchCursorStatement,
58 formats: &Vec<Format>,
59) -> Result<RwPgResponse> {
60 let session = handler_args.session.clone();
61 let cursor_name = stmt.cursor_name.real_value();
62 let with_options = WithOptions::try_from(stmt.with_properties.0.as_slice())?;
63
64 if with_options.len() > 1 {
65 bail_not_implemented!("only `timeout` is supported in with options")
66 }
67
68 let timeout_seconds = with_options
69 .get("timeout")
70 .map(convert_interval_to_u64_seconds)
71 .transpose()?;
72
73 if with_options.len() == 1 && timeout_seconds.is_none() {
74 bail_not_implemented!("only `timeout` is supported in with options")
75 }
76
77 let cursor_manager = session.get_cursor_manager();
78
79 let (rows, pg_descs) = cursor_manager
80 .get_rows_with_cursor(
81 &cursor_name,
82 stmt.count,
83 handler_args,
84 formats,
85 timeout_seconds,
86 )
87 .await?;
88 Ok(build_fetch_cursor_response(rows, pg_descs))
89}
90
91fn build_fetch_cursor_response(rows: Vec<Row>, pg_descs: Vec<PgFieldDescriptor>) -> RwPgResponse {
92 PgResponse::builder(StatementType::FETCH_CURSOR)
93 .row_cnt_opt(Some(rows.len() as i32))
94 .values(PgResponseStream::from(rows), pg_descs)
95 .into()
96}
97
98pub async fn handle_parse(
99 handler_args: HandlerArgs,
100 statement: Statement,
101 specific_param_types: Vec<Option<DataType>>,
102) -> Result<PrepareStatement> {
103 if let Statement::FetchCursor { stmt } = &statement {
104 let session = handler_args.session.clone();
105 let cursor_name = stmt.cursor_name.real_value();
106 let fields = session
107 .get_cursor_manager()
108 .get_fields_with_cursor(&cursor_name)
109 .await?;
110
111 let mut binder = Binder::new_with_param_types(&session, specific_param_types);
112 let schema = Some(Schema::new(fields));
113
114 let bound = binder.bind_fetch_cursor(cursor_name, stmt.count, schema)?;
115 let bound_result = BoundResult {
116 stmt_type: StatementType::FETCH_CURSOR,
117 must_dist: false,
118 bound: BoundStatement::FetchCursor(Box::new(bound)),
119 param_types: binder.export_param_types()?,
120 parsed_params: None,
121 dependent_relations: binder.included_relations().clone(),
122 dependent_udfs: binder.included_udfs().clone(),
123 };
124 let result = PreparedResult {
125 statement,
126 bound_result,
127 };
128 Ok(PrepareStatement::Prepared(result))
129 } else {
130 bail_not_implemented!("unsupported statement {:?}", statement)
131 }
132}