risingwave_frontend/handler/
declare_cursor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::catalog::Field;
19use risingwave_common::session_config::QueryMode;
20use risingwave_common::util::epoch::Epoch;
21use risingwave_sqlparser::ast::{
22    DeclareCursorStatement, Ident, ObjectName, Query, Since, Statement,
23};
24
25use super::RwPgResponse;
26use super::query::{
27    BatchPlanFragmenterResult, gen_batch_plan_by_statement, gen_batch_plan_fragmenter,
28};
29use super::util::convert_unix_millis_to_logstore_u64;
30use crate::error::{ErrorCode, Result};
31use crate::handler::HandlerArgs;
32use crate::handler::query::{distribute_execute, local_execute};
33use crate::session::SessionImpl;
34use crate::session::cursor_manager::CursorDataChunkStream;
35use crate::{Binder, OptimizerContext};
36
37pub async fn handle_declare_cursor(
38    handler_args: HandlerArgs,
39    stmt: DeclareCursorStatement,
40) -> Result<RwPgResponse> {
41    match stmt.declare_cursor {
42        risingwave_sqlparser::ast::DeclareCursor::Query(query) => {
43            handle_declare_query_cursor(handler_args, stmt.cursor_name, query).await
44        }
45        risingwave_sqlparser::ast::DeclareCursor::Subscription(sub_name, rw_timestamp) => {
46            handle_declare_subscription_cursor(
47                handler_args,
48                sub_name,
49                stmt.cursor_name,
50                rw_timestamp,
51            )
52            .await
53        }
54    }
55}
56pub async fn handle_declare_subscription_cursor(
57    handler_args: HandlerArgs,
58    sub_name: ObjectName,
59    cursor_name: Ident,
60    rw_timestamp: Since,
61) -> Result<RwPgResponse> {
62    let session = handler_args.session.clone();
63    let subscription = {
64        let db_name = &session.database();
65        let (sub_schema_name, sub_name) =
66            Binder::resolve_schema_qualified_name(db_name, &sub_name)?;
67        session.get_subscription_by_name(sub_schema_name, &sub_name)?
68    };
69    // Start the first query of cursor, which includes querying the table and querying the subscription's logstore
70    let start_rw_timestamp = match rw_timestamp {
71        risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp) => {
72            check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?;
73            Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp))
74        }
75        risingwave_sqlparser::ast::Since::ProcessTime => Some(Epoch::now().0),
76        risingwave_sqlparser::ast::Since::Begin => {
77            let min_unix_millis =
78                Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000;
79            let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis();
80            let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis);
81            Some(convert_unix_millis_to_logstore_u64(min_unix_millis))
82        }
83        risingwave_sqlparser::ast::Since::Full => None,
84    };
85    // Create cursor based on the response
86    if let Err(e) = session
87        .get_cursor_manager()
88        .add_subscription_cursor(
89            cursor_name.real_value(),
90            start_rw_timestamp,
91            subscription.dependent_table_id,
92            subscription,
93            &handler_args,
94        )
95        .await
96    {
97        session
98            .env()
99            .cursor_metrics
100            .subscription_cursor_error_count
101            .inc();
102        return Err(e);
103    }
104
105    Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
106}
107
108fn check_cursor_unix_millis(unix_millis: u64, retention_seconds: u64) -> Result<()> {
109    let now = Epoch::now().as_unix_millis();
110    let min_unix_millis = now - retention_seconds * 1000;
111    if unix_millis > now {
112        return Err(ErrorCode::CatalogError(
113            "rw_timestamp is too large, need to be less than the current unix_millis"
114                .to_owned()
115                .into(),
116        )
117        .into());
118    }
119    if unix_millis < min_unix_millis {
120        return Err(ErrorCode::CatalogError("rw_timestamp is too small, need to be large than the current unix_millis - subscription's retention time".to_owned().into()).into());
121    }
122    Ok(())
123}
124
125async fn handle_declare_query_cursor(
126    handler_args: HandlerArgs,
127    cursor_name: Ident,
128    query: Box<Query>,
129) -> Result<RwPgResponse> {
130    let (chunk_stream, fields) =
131        create_stream_for_cursor_stmt(handler_args.clone(), Statement::Query(query)).await?;
132    handler_args
133        .session
134        .get_cursor_manager()
135        .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
136        .await?;
137    Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
138}
139
140pub async fn handle_bound_declare_query_cursor(
141    handler_args: HandlerArgs,
142    cursor_name: Ident,
143    plan_fragmenter_result: BatchPlanFragmenterResult,
144) -> Result<RwPgResponse> {
145    let session = handler_args.session.clone();
146    let (chunk_stream, fields) =
147        create_chunk_stream_for_cursor(session, plan_fragmenter_result).await?;
148
149    handler_args
150        .session
151        .get_cursor_manager()
152        .add_query_cursor(cursor_name.real_value(), chunk_stream, fields)
153        .await?;
154    Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
155}
156
157pub async fn create_stream_for_cursor_stmt(
158    handler_args: HandlerArgs,
159    stmt: Statement,
160) -> Result<(CursorDataChunkStream, Vec<Field>)> {
161    let session = handler_args.session.clone();
162    let plan_fragmenter_result = {
163        let context = OptimizerContext::from_handler_args(handler_args);
164        let plan_result = gen_batch_plan_by_statement(&session, context.into(), stmt)?;
165        gen_batch_plan_fragmenter(&session, plan_result)?
166    };
167    create_chunk_stream_for_cursor(session, plan_fragmenter_result).await
168}
169
170pub async fn create_chunk_stream_for_cursor(
171    session: Arc<SessionImpl>,
172    plan_fragmenter_result: BatchPlanFragmenterResult,
173) -> Result<(CursorDataChunkStream, Vec<Field>)> {
174    let BatchPlanFragmenterResult {
175        plan_fragmenter,
176        query_mode,
177        schema,
178        read_storage_tables,
179        ..
180    } = plan_fragmenter_result;
181
182    let can_timeout_cancel = true;
183
184    let query = plan_fragmenter.generate_complete_query().await?;
185    tracing::trace!("Generated query after plan fragmenter: {:?}", &query);
186
187    Ok((
188        match query_mode {
189            QueryMode::Auto => unreachable!(),
190            QueryMode::Local => CursorDataChunkStream::LocalDataChunk(Some(
191                local_execute(
192                    session.clone(),
193                    query,
194                    can_timeout_cancel,
195                    &read_storage_tables,
196                )
197                .await?,
198            )),
199            QueryMode::Distributed => CursorDataChunkStream::DistributedDataChunk(Some(
200                distribute_execute(
201                    session.clone(),
202                    query,
203                    can_timeout_cancel,
204                    read_storage_tables,
205                )
206                .await?,
207            )),
208        },
209        schema.fields.clone(),
210    ))
211}