risingwave_frontend/handler/
declare_cursor.rs1use std::sync::Arc;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::catalog::Field;
19use risingwave_common::session_config::QueryMode;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_sqlparser::ast::{
22 DeclareCursorStatement, Ident, ObjectName, Query, Since, Statement,
23};
24
25use super::RwPgResponse;
26use super::query::{
27 BatchPlanFragmenterResult, gen_batch_plan_by_statement, gen_batch_plan_fragmenter,
28};
29use super::util::convert_unix_millis_to_logstore_u64;
30use crate::error::{ErrorCode, Result};
31use crate::handler::HandlerArgs;
32use crate::handler::query::{distribute_execute, local_execute};
33use crate::session::SessionImpl;
34use crate::session::cursor_manager::CursorDataChunkStream;
35use crate::{Binder, OptimizerContext};
36
37pub async fn handle_declare_cursor(
38 handler_args: HandlerArgs,
39 stmt: DeclareCursorStatement,
40) -> Result<RwPgResponse> {
41 match stmt.declare_cursor {
42 risingwave_sqlparser::ast::DeclareCursor::Query(query) => {
43 handle_declare_query_cursor(handler_args, stmt.cursor_name, query).await
44 }
45 risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
46 handle_declare_subscription_cursor(
47 handler_args,
48 sub_name,
49 stmt.cursor_name,
50 rw_timestamp,
51 )
52 .await
53 }
54 }
55}
56pub async fn handle_declare_subscription_cursor(
57 handler_args: HandlerArgs,
58 sub_name: ObjectName,
59 cursor_name: Ident,
60 rw_timestamp: Since,
61) -> Result<RwPgResponse> {
62 let session = handler_args.session.clone();
63 let subscription = {
64 let db_name = &session.database();
65 let (sub_schema_name, sub_name) =
66 Binder::resolve_schema_qualified_name(db_name, &sub_name)?;
67 session.get_subscription_by_name(sub_schema_name, &sub_name)?
68 };
69 let start_rw_timestamp = match rw_timestamp {
71 risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp) => {
72 check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?;
73 Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp))
74 }
75 risingwave_sqlparser::ast::Since::ProcessTime => Some(Epoch::now().0),
76 risingwave_sqlparser::ast::Since::Begin => {
77 let min_unix_millis =
78 Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000;
79 let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis();
80 let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis);
81 Some(convert_unix_millis_to_logstore_u64(min_unix_millis))
82 }
83 risingwave_sqlparser::ast::Since::Full => None,
84 };
85 if let Err(e) = session
87 .get_cursor_manager()
88 .add_subscription_cursor(
89 cursor_name.real_value(),
90 start_rw_timestamp,
91 subscription.dependent_table_id,
92 subscription,
93 &handler_args,
94 )
95 .await
96 {
97 session
98 .env()
99 .cursor_metrics
100 .subscription_cursor_error_count
101 .inc();
102 return Err(e);
103 }
104
105 Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
106}
107
108fn check_cursor_unix_millis(unix_millis: u64, retention_seconds: u64) -> Result<()> {
109 let now = Epoch::now().as_unix_millis();
110 let min_unix_millis = now - retention_seconds * 1000;
111 if unix_millis > now {
112 return Err(ErrorCode::CatalogError(
113 "rw_timestamp is too large, need to be less than the current unix_millis"
114 .to_owned()
115 .into(),
116 )
117 .into());
118 }
119 if unix_millis < min_unix_millis {
120 return Err(ErrorCode::CatalogError("rw_timestamp is too small, need to be large than the current unix_millis - subscription's retention time".to_owned().into()).into());
121 }
122 Ok(())
123}
124
125async fn handle_declare_query_cursor(
126 handler_args: HandlerArgs,
127 cursor_name: Ident,
128 query: Box<Query>,
129) -> Result<RwPgResponse> {
130 let (chunk_stream, fields) =
131 create_stream_for_cursor_stmt(handler_args.clone(), Statement::Query(query)).await?;
132 handler_args
133 .session
134 .get_cursor_manager()
135 .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
136 .await?;
137 Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
138}
139
140pub async fn handle_bound_declare_query_cursor(
141 handler_args: HandlerArgs,
142 cursor_name: Ident,
143 plan_fragmenter_result: BatchPlanFragmenterResult,
144) -> Result<RwPgResponse> {
145 let session = handler_args.session.clone();
146 let (chunk_stream, fields) =
147 create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?;
148
149 handler_args
150 .session
151 .get_cursor_manager()
152 .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
153 .await?;
154 Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
155}
156
157pub async fn create_stream_for_cursor_stmt(
158 handler_args: HandlerArgs,
159 stmt: Statement,
160) -> Result<(CursorDataChunkStream, Vec<Field>)> {
161 let session = handler_args.session.clone();
162 let plan_fragmenter_result = {
163 let context = OptimizerContext::from_handler_args(handler_args);
164 let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
165 gen_batch_plan_fragmenter(&session, plan_result)?
166 };
167 create_chunk_stream_for_cursor(session, plan_fragmenter_result).await
168}
169
170pub async fn create_chunk_stream_for_cursor(
171 session: Arc<SessionImpl>,
172 plan_fragmenter_result: BatchPlanFragmenterResult,
173) -> Result<(CursorDataChunkStream, Vec<Field>)> {
174 let BatchPlanFragmenterResult {
175 plan_fragmenter,
176 query_mode,
177 schema,
178 read_storage_tables,
179 ..
180 } = plan_fragmenter_result;
181
182 let can_timeout_cancel = true;
183
184 let query = plan_fragmenter.generate_complete_query().await?;
185 tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
186
187 Ok((
188 match query_mode {
189 QueryMode::Auto => unreachable!(),
190 QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some(
191 local_execute(
192 session.clone(),
193 query,
194 can_timeout_cancel,
195 &read_storage_tables,
196 )
197 .await?,
198 )),
199 QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some(
200 distribute_execute(
201 session.clone(),
202 query,
203 can_timeout_cancel,
204 read_storage_tables,
205 )
206 .await?,
207 )),
208 },
209 schema.fields.clone(),
210 ))
211}