risingwave_frontend/handler/
declare_cursor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::catalog::Field;
19use risingwave_common::session_config::QueryMode;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_sqlparser::ast::{
22    DeclareCursorStatement, Ident, ObjectName, Query, Since, Statement,
23};
24
25use super::RwPgResponse;
26use super::query::{
27    BatchPlanFragmenterResult, gen_batch_plan_by_statement, gen_batch_plan_fragmenter,
28};
29use super::util::convert_unix_millis_to_logstore_u64;
30use crate::error::{ErrorCode, Result};
31use crate::handler::HandlerArgs;
32use crate::handler::query::{distribute_execute, local_execute};
33use crate::session::SessionImpl;
34use crate::session::cursor_manager::CursorDataChunkStream;
35use crate::{Binder, OptimizerContext};
36
37pub async fn handle_declare_cursor(
38    handler_args: HandlerArgs,
39    stmt: DeclareCursorStatement,
40) -> Result<RwPgResponse> {
41    match stmt.declare_cursor {
42        risingwave_sqlparser::ast::DeclareCursor::Query(query) => {
43            handle_declare_query_cursor(handler_args, stmt.cursor_name, query).await
44        }
45        risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
46            handle_declare_subscription_cursor(
47                handler_args,
48                sub_name,
49                stmt.cursor_name,
50                rw_timestamp,
51            )
52            .await
53        }
54    }
55}
56pub async fn handle_declare_subscription_cursor(
57    handler_args: HandlerArgs,
58    sub_name: ObjectName,
59    cursor_name: Ident,
60    rw_timestamp: Since,
61) -> Result<RwPgResponse> {
62    let session = handler_args.session.clone();
63    let subscription = {
64        let db_name = &session.database();
65        let (sub_schema_name, sub_name) = Binder::resolve_schema_qualified_name(db_name, sub_name)?;
66        session.get_subscription_by_name(sub_schema_name, &sub_name)?
67    };
68    // Start the first query of cursor, which includes querying the table and querying the subscription's logstore
69    let start_rw_timestamp = match rw_timestamp {
70        risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp) => {
71            check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?;
72            Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp))
73        }
74        risingwave_sqlparser::ast::Since::ProcessTime => Some(Epoch::now().0),
75        risingwave_sqlparser::ast::Since::Begin => {
76            let min_unix_millis =
77                Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000;
78            let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis();
79            let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis);
80            Some(convert_unix_millis_to_logstore_u64(min_unix_millis))
81        }
82        risingwave_sqlparser::ast::Since::Full => None,
83    };
84    // Create cursor based on the response
85    if let Err(e) = session
86        .get_cursor_manager()
87        .add_subscription_cursor(
88            cursor_name.real_value(),
89            start_rw_timestamp,
90            subscription.dependent_table_id,
91            subscription,
92            &handler_args,
93        )
94        .await
95    {
96        session
97            .env()
98            .cursor_metrics
99            .subscription_cursor_error_count
100            .inc();
101        return Err(e);
102    }
103
104    Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
105}
106
107fn check_cursor_unix_millis(unix_millis: u64, retention_seconds: u64) -> Result<()> {
108    let now = Epoch::now().as_unix_millis();
109    let min_unix_millis = now - retention_seconds * 1000;
110    if unix_millis > now {
111        return Err(ErrorCode::CatalogError(
112            "rw_timestamp is too large, need to be less than the current unix_millis"
113                .to_owned()
114                .into(),
115        )
116        .into());
117    }
118    if unix_millis < min_unix_millis {
119        return Err(ErrorCode::CatalogError("rw_timestamp is too small, need to be large than the current unix_millis - subscription's retention time".to_owned().into()).into());
120    }
121    Ok(())
122}
123
124async fn handle_declare_query_cursor(
125    handler_args: HandlerArgs,
126    cursor_name: Ident,
127    query: Box<Query>,
128) -> Result<RwPgResponse> {
129    let (chunk_stream, fields) =
130        create_stream_for_cursor_stmt(handler_args.clone(), Statement::Query(query)).await?;
131    handler_args
132        .session
133        .get_cursor_manager()
134        .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
135        .await?;
136    Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
137}
138
139pub async fn handle_bound_declare_query_cursor(
140    handler_args: HandlerArgs,
141    cursor_name: Ident,
142    plan_fragmenter_result: BatchPlanFragmenterResult,
143) -> Result<RwPgResponse> {
144    let session = handler_args.session.clone();
145    let (chunk_stream, fields) =
146        create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?;
147
148    handler_args
149        .session
150        .get_cursor_manager()
151        .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
152        .await?;
153    Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
154}
155
156pub async fn create_stream_for_cursor_stmt(
157    handler_args: HandlerArgs,
158    stmt: Statement,
159) -> Result<(CursorDataChunkStream, Vec<Field>)> {
160    let session = handler_args.session.clone();
161    let plan_fragmenter_result = {
162        let context = OptimizerContext::from_handler_args(handler_args);
163        let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
164        gen_batch_plan_fragmenter(&session, plan_result)?
165    };
166    create_chunk_stream_for_cursor(session, plan_fragmenter_result).await
167}
168
169pub async fn create_chunk_stream_for_cursor(
170    session: Arc<SessionImpl>,
171    plan_fragmenter_result: BatchPlanFragmenterResult,
172) -> Result<(CursorDataChunkStream, Vec<Field>)> {
173    let BatchPlanFragmenterResult {
174        plan_fragmenter,
175        query_mode,
176        schema,
177        read_storage_tables,
178        ..
179    } = plan_fragmenter_result;
180
181    let can_timeout_cancel = true;
182
183    let query = plan_fragmenter.generate_complete_query().await?;
184    tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
185
186    Ok((
187        match query_mode {
188            QueryMode::Auto => unreachable!(),
189            QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some(
190                local_execute(
191                    session.clone(),
192                    query,
193                    can_timeout_cancel,
194                    &read_storage_tables,
195                )
196                .await?,
197            )),
198            QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some(
199                distribute_execute(
200                    session.clone(),
201                    query,
202                    can_timeout_cancel,
203                    read_storage_tables,
204                )
205                .await?,
206            )),
207        },
208        schema.fields.clone(),
209    ))
210}