risingwave_frontend/handler/
declare_cursor.rs1use std::sync::Arc;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::catalog::Field;
19use risingwave_common::session_config::QueryMode;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_sqlparser::ast::{
22 DeclareCursorStatement, Ident, ObjectName, Query, Since, Statement,
23};
24
25use super::RwPgResponse;
26use super::query::{
27 BatchPlanFragmenterResult, gen_batch_plan_by_statement, gen_batch_plan_fragmenter,
28};
29use super::util::convert_unix_millis_to_logstore_u64;
30use crate::error::{ErrorCode, Result};
31use crate::handler::HandlerArgs;
32use crate::handler::query::{distribute_execute, local_execute};
33use crate::session::SessionImpl;
34use crate::session::cursor_manager::CursorDataChunkStream;
35use crate::{Binder, OptimizerContext};
36
37pub async fn handle_declare_cursor(
38 handler_args: HandlerArgs,
39 stmt: DeclareCursorStatement,
40) -> Result<RwPgResponse> {
41 match stmt.declare_cursor {
42 risingwave_sqlparser::ast::DeclareCursor::Query(query) => {
43 handle_declare_query_cursor(handler_args, stmt.cursor_name, query).await
44 }
45 risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
46 handle_declare_subscription_cursor(
47 handler_args,
48 sub_name,
49 stmt.cursor_name,
50 rw_timestamp,
51 )
52 .await
53 }
54 }
55}
56pub async fn handle_declare_subscription_cursor(
57 handler_args: HandlerArgs,
58 sub_name: ObjectName,
59 cursor_name: Ident,
60 rw_timestamp: Since,
61) -> Result<RwPgResponse> {
62 let session = handler_args.session.clone();
63 let subscription = {
64 let db_name = &session.database();
65 let (sub_schema_name, sub_name) = Binder::resolve_schema_qualified_name(db_name, sub_name)?;
66 session.get_subscription_by_name(sub_schema_name, &sub_name)?
67 };
68 let start_rw_timestamp = match rw_timestamp {
70 risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp) => {
71 check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?;
72 Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp))
73 }
74 risingwave_sqlparser::ast::Since::ProcessTime => Some(Epoch::now().0),
75 risingwave_sqlparser::ast::Since::Begin => {
76 let min_unix_millis =
77 Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000;
78 let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis();
79 let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis);
80 Some(convert_unix_millis_to_logstore_u64(min_unix_millis))
81 }
82 risingwave_sqlparser::ast::Since::Full => None,
83 };
84 if let Err(e) = session
86 .get_cursor_manager()
87 .add_subscription_cursor(
88 cursor_name.real_value(),
89 start_rw_timestamp,
90 subscription.dependent_table_id,
91 subscription,
92 &handler_args,
93 )
94 .await
95 {
96 session
97 .env()
98 .cursor_metrics
99 .subscription_cursor_error_count
100 .inc();
101 return Err(e);
102 }
103
104 Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
105}
106
107fn check_cursor_unix_millis(unix_millis: u64, retention_seconds: u64) -> Result<()> {
108 let now = Epoch::now().as_unix_millis();
109 let min_unix_millis = now - retention_seconds * 1000;
110 if unix_millis > now {
111 return Err(ErrorCode::CatalogError(
112 "rw_timestamp is too large, need to be less than the current unix_millis"
113 .to_owned()
114 .into(),
115 )
116 .into());
117 }
118 if unix_millis < min_unix_millis {
119 return Err(ErrorCode::CatalogError("rw_timestamp is too small, need to be large than the current unix_millis - subscription's retention time".to_owned().into()).into());
120 }
121 Ok(())
122}
123
124async fn handle_declare_query_cursor(
125 handler_args: HandlerArgs,
126 cursor_name: Ident,
127 query: Box<Query>,
128) -> Result<RwPgResponse> {
129 let (chunk_stream, fields) =
130 create_stream_for_cursor_stmt(handler_args.clone(), Statement::Query(query)).await?;
131 handler_args
132 .session
133 .get_cursor_manager()
134 .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
135 .await?;
136 Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
137}
138
139pub async fn handle_bound_declare_query_cursor(
140 handler_args: HandlerArgs,
141 cursor_name: Ident,
142 plan_fragmenter_result: BatchPlanFragmenterResult,
143) -> Result<RwPgResponse> {
144 let session = handler_args.session.clone();
145 let (chunk_stream, fields) =
146 create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?;
147
148 handler_args
149 .session
150 .get_cursor_manager()
151 .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
152 .await?;
153 Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
154}
155
156pub async fn create_stream_for_cursor_stmt(
157 handler_args: HandlerArgs,
158 stmt: Statement,
159) -> Result<(CursorDataChunkStream, Vec<Field>)> {
160 let session = handler_args.session.clone();
161 let plan_fragmenter_result = {
162 let context = OptimizerContext::from_handler_args(handler_args);
163 let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
164 gen_batch_plan_fragmenter(&session, plan_result)?
165 };
166 create_chunk_stream_for_cursor(session, plan_fragmenter_result).await
167}
168
169pub async fn create_chunk_stream_for_cursor(
170 session: Arc<SessionImpl>,
171 plan_fragmenter_result: BatchPlanFragmenterResult,
172) -> Result<(CursorDataChunkStream, Vec<Field>)> {
173 let BatchPlanFragmenterResult {
174 plan_fragmenter,
175 query_mode,
176 schema,
177 read_storage_tables,
178 ..
179 } = plan_fragmenter_result;
180
181 let can_timeout_cancel = true;
182
183 let query = plan_fragmenter.generate_complete_query().await?;
184 tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
185
186 Ok((
187 match query_mode {
188 QueryMode::Auto => unreachable!(),
189 QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some(
190 local_execute(
191 session.clone(),
192 query,
193 can_timeout_cancel,
194 &read_storage_tables,
195 )
196 .await?,
197 )),
198 QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some(
199 distribute_execute(
200 session.clone(),
201 query,
202 can_timeout_cancel,
203 read_storage_tables,
204 )
205 .await?,
206 )),
207 },
208 schema.fields.clone(),
209 ))
210}