risingwave_frontend/session/
cursor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::rc::Rc;
20use std::sync::Arc;
21use std::time::Instant;
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use futures::StreamExt;
26use itertools::Itertools;
27use pgwire::pg_field_descriptor::PgFieldDescriptor;
28use pgwire::pg_response::StatementType;
29use pgwire::types::{Format, Row};
30use risingwave_common::catalog::{ColumnCatalog, Field};
31use risingwave_common::error::BoxedError;
32use risingwave_common::session_config::QueryMode;
33use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
34use risingwave_common::util::iter_util::ZipEqFast;
35use risingwave_hummock_sdk::HummockVersionId;
36
37use super::SessionImpl;
38use crate::catalog::TableId;
39use crate::catalog::subscription_catalog::SubscriptionCatalog;
40use crate::error::{ErrorCode, Result};
41use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
42use crate::handler::HandlerArgs;
43use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
44use crate::handler::query::{BatchQueryPlanResult, gen_batch_plan_fragmenter};
45use crate::handler::util::{
46    DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
47    pg_value_format, to_pg_field,
48};
49use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
50use crate::optimizer::PlanRoot;
51use crate::optimizer::plan_node::{BatchFilter, BatchLogSeqScan, BatchSeqScan, generic};
52use crate::optimizer::property::{Cardinality, Order, RequiredDist};
53use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot};
54use crate::utils::Condition;
55use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};
56
57pub enum CursorDataChunkStream {
58    LocalDataChunk(Option<LocalQueryStream>),
59    DistributedDataChunk(Option<DistributedQueryStream>),
60    PgResponse(PgResponseStream),
61}
62
63impl CursorDataChunkStream {
64    pub fn init_row_stream(
65        &mut self,
66        fields: &Vec<Field>,
67        formats: &Vec<Format>,
68        session: Arc<SessionImpl>,
69    ) {
70        let columns_type = fields.iter().map(|f| f.data_type().clone()).collect();
71        match self {
72            CursorDataChunkStream::LocalDataChunk(data_chunk) => {
73                let data_chunk = mem::take(data_chunk).unwrap();
74                let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
75                    data_chunk,
76                    columns_type,
77                    formats.clone(),
78                    session,
79                ));
80                *self = CursorDataChunkStream::PgResponse(row_stream);
81            }
82            CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
83                let data_chunk = mem::take(data_chunk).unwrap();
84                let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
85                    data_chunk,
86                    columns_type,
87                    formats.clone(),
88                    session,
89                ));
90                *self = CursorDataChunkStream::PgResponse(row_stream);
91            }
92            _ => {}
93        }
94    }
95
96    pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
97        match self {
98            CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
99            _ => Err(ErrorCode::InternalError(
100                "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
101            )
102            .into()),
103        }
104    }
105}
106pub enum Cursor {
107    Subscription(SubscriptionCursor),
108    Query(QueryCursor),
109}
110impl Cursor {
111    pub async fn next(
112        &mut self,
113        count: u32,
114        handler_args: HandlerArgs,
115        formats: &Vec<Format>,
116        timeout_seconds: Option<u64>,
117    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
118        match self {
119            Cursor::Subscription(cursor) => cursor
120                .next(count, handler_args, formats, timeout_seconds)
121                .await
122                .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
123            Cursor::Query(cursor) => {
124                cursor
125                    .next(count, formats, handler_args, timeout_seconds)
126                    .await
127            }
128        }
129    }
130
131    pub fn get_fields(&mut self) -> Vec<Field> {
132        match self {
133            Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields().clone(),
134            Cursor::Query(cursor) => cursor.fields.clone(),
135        }
136    }
137}
138
139pub struct QueryCursor {
140    chunk_stream: CursorDataChunkStream,
141    fields: Vec<Field>,
142    remaining_rows: VecDeque<Row>,
143}
144
145impl QueryCursor {
146    pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
147        Ok(Self {
148            chunk_stream,
149            fields,
150            remaining_rows: VecDeque::<Row>::new(),
151        })
152    }
153
154    pub async fn next_once(&mut self) -> Result<Option<Row>> {
155        while self.remaining_rows.is_empty() {
156            let rows = self.chunk_stream.next().await?;
157            let rows = match rows {
158                None => return Ok(None),
159                Some(row) => row?,
160            };
161            self.remaining_rows = rows.into_iter().collect();
162        }
163        let row = self.remaining_rows.pop_front().unwrap();
164        Ok(Some(row))
165    }
166
167    pub async fn next(
168        &mut self,
169        count: u32,
170        formats: &Vec<Format>,
171        handler_args: HandlerArgs,
172        timeout_seconds: Option<u64>,
173    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
174        // `FETCH NEXT` is equivalent to `FETCH 1`.
175        // min with 100 to avoid allocating too many memory at once.
176        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
177        let session = handler_args.session;
178        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
179        let mut cur = 0;
180        let desc = self.fields.iter().map(to_pg_field).collect();
181        self.chunk_stream
182            .init_row_stream(&self.fields, formats, session);
183        while cur < count
184            && let Some(row) = self.next_once().await?
185        {
186            cur += 1;
187            ans.push(row);
188            if let Some(timeout_instant) = timeout_instant
189                && Instant::now() > timeout_instant
190            {
191                break;
192            }
193        }
194        Ok((ans, desc))
195    }
196}
197
198enum State {
199    InitLogStoreQuery {
200        // The rw_timestamp used to initiate the query to read from subscription logstore.
201        seek_timestamp: u64,
202
203        // If specified, the expected_timestamp must be an exact match for the next rw_timestamp.
204        expected_timestamp: Option<u64>,
205    },
206    Fetch {
207        // Whether the query is reading from snapshot
208        // true: read from the upstream table snapshot
209        // false: read from subscription logstore
210        from_snapshot: bool,
211
212        // The rw_timestamp used to initiate the query to read from subscription logstore.
213        rw_timestamp: u64,
214
215        // The row stream to from the batch query read.
216        // It is returned from the batch execution.
217        chunk_stream: CursorDataChunkStream,
218
219        // A cache to store the remaining rows from the row stream.
220        remaining_rows: VecDeque<Row>,
221
222        expected_timestamp: Option<u64>,
223
224        init_query_timer: Instant,
225    },
226    Invalid,
227}
228
229impl Display for State {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        match self {
232            State::InitLogStoreQuery {
233                seek_timestamp,
234                expected_timestamp,
235            } => write!(
236                f,
237                "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
238                seek_timestamp, expected_timestamp
239            ),
240            State::Fetch {
241                from_snapshot,
242                rw_timestamp,
243                expected_timestamp,
244                remaining_rows,
245                init_query_timer,
246                ..
247            } => write!(
248                f,
249                "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
250                from_snapshot,
251                rw_timestamp,
252                expected_timestamp,
253                remaining_rows.len(),
254                init_query_timer.elapsed().as_millis()
255            ),
256            State::Invalid => write!(f, "Invalid"),
257        }
258    }
259}
260
261struct FieldsManager {
262    columns_catalog: Vec<ColumnCatalog>,
263    // All row fields, including hidden pk, op, rw_timestamp and all non-hidden columns in the upstream table.
264    row_fields: Vec<Field>,
265    // Row output column indices based on `row_fields`.
266    row_output_col_indices: Vec<usize>,
267    // Row pk indices based on `row_fields`.
268    row_pk_indices: Vec<usize>,
269    // Stream chunk row indices based on `row_fields`.
270    stream_chunk_row_indices: Vec<usize>,
271    // The op index based on `row_fields`.
272    op_index: usize,
273}
274
275impl FieldsManager {
276    // pub const OP_FIELD: Field = Field::with_name(DataType::Varchar, "op".to_owned());
277    // pub const RW_TIMESTAMP_FIELD: Field = Field::with_name(DataType::Int64, "rw_timestamp".to_owned());
278
279    pub fn new(catalog: &TableCatalog) -> Self {
280        let mut row_fields = Vec::new();
281        let mut row_output_col_indices = Vec::new();
282        let mut row_pk_indices = Vec::new();
283        let mut stream_chunk_row_indices = Vec::new();
284        let mut output_idx = 0_usize;
285        let pk_set: HashSet<usize> = catalog
286            .pk
287            .iter()
288            .map(|col_order| col_order.column_index)
289            .collect();
290
291        for (index, v) in catalog.columns.iter().enumerate() {
292            if pk_set.contains(&index) {
293                row_pk_indices.push(output_idx);
294                stream_chunk_row_indices.push(output_idx);
295                row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
296                if !v.is_hidden {
297                    row_output_col_indices.push(output_idx);
298                }
299                output_idx += 1;
300            } else if !v.is_hidden {
301                row_output_col_indices.push(output_idx);
302                stream_chunk_row_indices.push(output_idx);
303                row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
304                output_idx += 1;
305            }
306        }
307
308        row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned()));
309        row_output_col_indices.push(output_idx);
310        let op_index = output_idx;
311        output_idx += 1;
312        row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned()));
313        row_output_col_indices.push(output_idx);
314        Self {
315            columns_catalog: catalog.columns.clone(),
316            row_fields,
317            row_output_col_indices,
318            row_pk_indices,
319            stream_chunk_row_indices,
320            op_index,
321        }
322    }
323
324    pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool {
325        if self.columns_catalog.ne(&catalog.columns) {
326            *self = Self::new(catalog);
327            true
328        } else {
329            false
330        }
331    }
332
333    pub fn process_output_desc_row(&self, mut rows: Vec<Row>) -> (Vec<Row>, Option<Row>) {
334        let last_row = rows.last_mut().map(|row| {
335            let mut row = row.clone();
336            row.project(&self.row_pk_indices)
337        });
338        let rows = rows
339            .iter_mut()
340            .map(|row| row.project(&self.row_output_col_indices))
341            .collect();
342        (rows, last_row)
343    }
344
345    pub fn get_output_fields(&self) -> Vec<Field> {
346        self.row_output_col_indices
347            .iter()
348            .map(|&idx| self.row_fields[idx].clone())
349            .collect()
350    }
351
352    // In the beginning (declare cur), we will give it an empty formats,
353    // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client.
354    pub fn get_row_stream_fields_and_formats(
355        &self,
356        formats: &Vec<Format>,
357        from_snapshot: bool,
358    ) -> (Vec<Field>, Vec<Format>) {
359        let mut fields = Vec::new();
360        let need_format = !(formats.is_empty() || formats.len() == 1);
361        let mut new_formats = formats.clone();
362        let stream_chunk_row_indices_iter = if from_snapshot {
363            self.stream_chunk_row_indices.iter().chain(None)
364        } else {
365            self.stream_chunk_row_indices
366                .iter()
367                .chain(Some(&self.op_index))
368        };
369        for index in stream_chunk_row_indices_iter {
370            fields.push(self.row_fields[*index].clone());
371            if need_format && !self.row_output_col_indices.contains(index) {
372                new_formats.insert(*index, Format::Text);
373            }
374        }
375        (fields, new_formats)
376    }
377}
378
379pub struct SubscriptionCursor {
380    cursor_name: String,
381    subscription: Arc<SubscriptionCatalog>,
382    dependent_table_id: TableId,
383    cursor_need_drop_time: Instant,
384    state: State,
385    // fields will be set in the table's catalog when the cursor is created,
386    // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter.
387    fields_manager: FieldsManager,
388    cursor_metrics: Arc<CursorMetrics>,
389    last_fetch: Instant,
390    seek_pk_row: Option<Row>,
391}
392
393impl SubscriptionCursor {
394    pub async fn new(
395        cursor_name: String,
396        start_timestamp: Option<u64>,
397        subscription: Arc<SubscriptionCatalog>,
398        dependent_table_id: TableId,
399        handler_args: &HandlerArgs,
400        cursor_metrics: Arc<CursorMetrics>,
401    ) -> Result<Self> {
402        let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp {
403            let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?;
404            (
405                State::InitLogStoreQuery {
406                    seek_timestamp: start_timestamp,
407                    expected_timestamp: None,
408                },
409                FieldsManager::new(&table_catalog),
410            )
411        } else {
412            // The query stream needs to initiated on cursor creation to make sure
413            // future fetch on the cursor starts from the snapshot when the cursor is declared.
414            //
415            // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch?
416            let (chunk_stream, init_query_timer, table_catalog) =
417                Self::initiate_query(None, &dependent_table_id, handler_args.clone(), None).await?;
418            let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else(
419                || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()),
420            )? {
421                ReadSnapshot::FrontendPinned { snapshot, .. } => {
422                    snapshot
423                        .version()
424                        .state_table_info
425                        .info()
426                        .get(&dependent_table_id)
427                        .ok_or_else(|| {
428                            anyhow!("dependent_table_id {dependent_table_id} not exists")
429                        })?
430                        .committed_epoch
431                }
432                ReadSnapshot::Other(_) => {
433                    return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into());
434                }
435                ReadSnapshot::ReadUncommitted => {
436                    return Err(ErrorCode::InternalError(
437                        "Fetch Cursor don't support read uncommitted".to_owned(),
438                    )
439                    .into());
440                }
441            };
442            let start_timestamp = pinned_epoch;
443
444            (
445                State::Fetch {
446                    from_snapshot: true,
447                    rw_timestamp: start_timestamp,
448                    chunk_stream,
449                    remaining_rows: VecDeque::new(),
450                    expected_timestamp: None,
451                    init_query_timer,
452                },
453                FieldsManager::new(&table_catalog),
454            )
455        };
456
457        let cursor_need_drop_time =
458            Instant::now() + Duration::from_secs(subscription.retention_seconds);
459        Ok(Self {
460            cursor_name,
461            subscription,
462            dependent_table_id,
463            cursor_need_drop_time,
464            state,
465            fields_manager,
466            cursor_metrics,
467            last_fetch: Instant::now(),
468            seek_pk_row: None,
469        })
470    }
471
472    async fn next_row(
473        &mut self,
474        handler_args: &HandlerArgs,
475        formats: &Vec<Format>,
476    ) -> Result<Option<Row>> {
477        loop {
478            match &mut self.state {
479                State::InitLogStoreQuery {
480                    seek_timestamp,
481                    expected_timestamp,
482                } => {
483                    let from_snapshot = false;
484
485                    // Initiate a new batch query to continue fetching
486                    match Self::get_next_rw_timestamp(
487                        *seek_timestamp,
488                        &self.dependent_table_id,
489                        *expected_timestamp,
490                        handler_args.clone(),
491                        &self.subscription,
492                    ) {
493                        Ok((Some(rw_timestamp), expected_timestamp)) => {
494                            let (mut chunk_stream, init_query_timer, catalog) =
495                                Self::initiate_query(
496                                    Some(rw_timestamp),
497                                    &self.dependent_table_id,
498                                    handler_args.clone(),
499                                    None,
500                                )
501                                .await?;
502                            let table_schema_changed =
503                                self.fields_manager.try_refill_fields(&catalog);
504                            let (fields, formats) = self
505                                .fields_manager
506                                .get_row_stream_fields_and_formats(formats, from_snapshot);
507                            chunk_stream.init_row_stream(
508                                &fields,
509                                &formats,
510                                handler_args.session.clone(),
511                            );
512
513                            self.cursor_need_drop_time = Instant::now()
514                                + Duration::from_secs(self.subscription.retention_seconds);
515                            let mut remaining_rows = VecDeque::new();
516                            Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
517                                .await?;
518                            // Transition to the Fetch state
519                            self.state = State::Fetch {
520                                from_snapshot,
521                                rw_timestamp,
522                                chunk_stream,
523                                remaining_rows,
524                                expected_timestamp,
525                                init_query_timer,
526                            };
527                            if table_schema_changed {
528                                return Ok(None);
529                            }
530                        }
531                        Ok((None, _)) => return Ok(None),
532                        Err(e) => {
533                            self.state = State::Invalid;
534                            return Err(e);
535                        }
536                    }
537                }
538                State::Fetch {
539                    from_snapshot,
540                    rw_timestamp,
541                    chunk_stream,
542                    remaining_rows,
543                    expected_timestamp,
544                    init_query_timer,
545                } => {
546                    let session_data = StaticSessionData {
547                        timezone: handler_args.session.config().timezone(),
548                    };
549                    let from_snapshot = *from_snapshot;
550                    let rw_timestamp = *rw_timestamp;
551
552                    // Try refill remaining rows
553                    Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
554
555                    if let Some(row) = remaining_rows.pop_front() {
556                        // 1. Fetch the next row
557                        if from_snapshot {
558                            return Ok(Some(Self::build_row(
559                                row.take(),
560                                None,
561                                formats,
562                                &session_data,
563                            )?));
564                        } else {
565                            return Ok(Some(Self::build_row(
566                                row.take(),
567                                Some(rw_timestamp),
568                                formats,
569                                &session_data,
570                            )?));
571                        }
572                    } else {
573                        self.cursor_metrics
574                            .subscription_cursor_query_duration
575                            .with_label_values(&[&self.subscription.name])
576                            .observe(init_query_timer.elapsed().as_millis() as _);
577                        // 2. Reach EOF for the current query.
578                        if let Some(expected_timestamp) = expected_timestamp {
579                            self.state = State::InitLogStoreQuery {
580                                seek_timestamp: *expected_timestamp,
581                                expected_timestamp: Some(*expected_timestamp),
582                            };
583                        } else {
584                            self.state = State::InitLogStoreQuery {
585                                seek_timestamp: rw_timestamp + 1,
586                                expected_timestamp: None,
587                            };
588                        }
589                    }
590                }
591                State::Invalid => {
592                    // TODO: auto close invalid cursor?
593                    return Err(ErrorCode::InternalError(
594                        "Cursor is in invalid state. Please close and re-create the cursor."
595                            .to_owned(),
596                    )
597                    .into());
598                }
599            }
600        }
601    }
602
603    pub async fn next(
604        &mut self,
605        count: u32,
606        handler_args: HandlerArgs,
607        formats: &Vec<Format>,
608        timeout_seconds: Option<u64>,
609    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
610        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
611        if Instant::now() > self.cursor_need_drop_time {
612            return Err(ErrorCode::InternalError(
613                "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
614            )
615            .into());
616        }
617
618        let session = &handler_args.session;
619        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
620        let mut cur = 0;
621        if let State::Fetch {
622            from_snapshot,
623            chunk_stream,
624            ..
625        } = &mut self.state
626        {
627            let (fields, fotmats) = self
628                .fields_manager
629                .get_row_stream_fields_and_formats(formats, *from_snapshot);
630            chunk_stream.init_row_stream(&fields, &fotmats, session.clone());
631        }
632        while cur < count {
633            let fetch_cursor_timer = Instant::now();
634            let row = self.next_row(&handler_args, formats).await?;
635            self.cursor_metrics
636                .subscription_cursor_fetch_duration
637                .with_label_values(&[&self.subscription.name])
638                .observe(fetch_cursor_timer.elapsed().as_millis() as _);
639            match row {
640                Some(row) => {
641                    cur += 1;
642                    ans.push(row);
643                }
644                None => {
645                    let timeout_seconds = timeout_seconds.unwrap_or(0);
646                    if cur > 0 || timeout_seconds == 0 {
647                        break;
648                    }
649                    // It's only blocked when there's no data
650                    // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`.
651                    match tokio::time::timeout(
652                        Duration::from_secs(timeout_seconds),
653                        session
654                            .env
655                            .hummock_snapshot_manager()
656                            .wait_table_change_log_notification(self.dependent_table_id.table_id()),
657                    )
658                    .await
659                    {
660                        Ok(result) => result?,
661                        Err(_) => {
662                            tracing::debug!("Cursor wait next epoch timeout");
663                            break;
664                        }
665                    }
666                }
667            }
668            // Timeout, return with current value
669            if let Some(timeout_instant) = timeout_instant
670                && Instant::now() > timeout_instant
671            {
672                break;
673            }
674        }
675        self.last_fetch = Instant::now();
676        let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans);
677        if let Some(seek_pk_row) = seek_pk_row {
678            self.seek_pk_row = Some(seek_pk_row);
679        }
680        let desc = self
681            .fields_manager
682            .get_output_fields()
683            .iter()
684            .map(to_pg_field)
685            .collect();
686
687        Ok((rows, desc))
688    }
689
690    fn get_next_rw_timestamp(
691        seek_timestamp: u64,
692        table_id: &TableId,
693        expected_timestamp: Option<u64>,
694        handler_args: HandlerArgs,
695        dependent_subscription: &SubscriptionCatalog,
696    ) -> Result<(Option<u64>, Option<u64>)> {
697        let session = handler_args.session;
698        // Test subscription existence
699        session.get_subscription_by_schema_id_name(
700            dependent_subscription.schema_id,
701            &dependent_subscription.name,
702        )?;
703
704        // The epoch here must be pulled every time, otherwise there will be cache consistency issues
705        let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?;
706        if let Some(expected_timestamp) = expected_timestamp
707            && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
708        {
709            return Err(ErrorCode::CatalogError(
710                format!(
711                    " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
712                    convert_logstore_u64_to_unix_millis(expected_timestamp)
713                )
714                .into(),
715            )
716            .into());
717        }
718        Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
719    }
720
721    pub fn gen_batch_plan_result(&self, handler_args: HandlerArgs) -> Result<BatchQueryPlanResult> {
722        match self.state {
723            // Only used to return generated plans, so rw_timestamp are meaningless
724            State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
725                Some(0),
726                &self.dependent_table_id,
727                handler_args,
728                self.seek_pk_row.clone(),
729            ),
730            State::Fetch {
731                from_snapshot,
732                rw_timestamp,
733                ..
734            } => {
735                if from_snapshot {
736                    Self::init_batch_plan_for_subscription_cursor(
737                        None,
738                        &self.dependent_table_id,
739                        handler_args,
740                        self.seek_pk_row.clone(),
741                    )
742                } else {
743                    Self::init_batch_plan_for_subscription_cursor(
744                        Some(rw_timestamp),
745                        &self.dependent_table_id,
746                        handler_args,
747                        self.seek_pk_row.clone(),
748                    )
749                }
750            }
751            State::Invalid => Err(ErrorCode::InternalError(
752                "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
753            )
754            .into()),
755        }
756    }
757
758    fn init_batch_plan_for_subscription_cursor(
759        rw_timestamp: Option<u64>,
760        dependent_table_id: &TableId,
761        handler_args: HandlerArgs,
762        seek_pk_row: Option<Row>,
763    ) -> Result<BatchQueryPlanResult> {
764        let session = handler_args.clone().session;
765        let table_catalog = session.get_table_by_id(dependent_table_id)?;
766        let context = OptimizerContext::from_handler_args(handler_args.clone());
767        let version_id = {
768            let version = session.env.hummock_snapshot_manager.acquire();
769            let version = version.version();
770            if !version
771                .state_table_info
772                .info()
773                .contains_key(dependent_table_id)
774            {
775                return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
776            }
777            version.id
778        };
779        Self::create_batch_plan_for_cursor(
780            table_catalog,
781            &session,
782            context.into(),
783            rw_timestamp.map(|rw_timestamp| (rw_timestamp, rw_timestamp)),
784            version_id,
785            seek_pk_row,
786        )
787    }
788
789    async fn initiate_query(
790        rw_timestamp: Option<u64>,
791        dependent_table_id: &TableId,
792        handler_args: HandlerArgs,
793        seek_pk_row: Option<Row>,
794    ) -> Result<(CursorDataChunkStream, Instant, Arc<TableCatalog>)> {
795        let init_query_timer = Instant::now();
796        let session = handler_args.clone().session;
797        let table_catalog = session.get_table_by_id(dependent_table_id)?;
798        let plan_result = Self::init_batch_plan_for_subscription_cursor(
799            rw_timestamp,
800            dependent_table_id,
801            handler_args.clone(),
802            seek_pk_row,
803        )?;
804        let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
805        let (chunk_stream, _) =
806            create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
807        Ok((chunk_stream, init_query_timer, table_catalog))
808    }
809
810    async fn try_refill_remaining_rows(
811        chunk_stream: &mut CursorDataChunkStream,
812        remaining_rows: &mut VecDeque<Row>,
813    ) -> Result<()> {
814        if remaining_rows.is_empty()
815            && let Some(row_set) = chunk_stream.next().await?
816        {
817            remaining_rows.extend(row_set?);
818        }
819        Ok(())
820    }
821
822    pub fn build_row(
823        mut row: Vec<Option<Bytes>>,
824        rw_timestamp: Option<u64>,
825        formats: &Vec<Format>,
826        session_data: &StaticSessionData,
827    ) -> Result<Row> {
828        let row_len = row.len();
829        let new_row = if let Some(rw_timestamp) = rw_timestamp {
830            let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
831            let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
832            let rw_timestamp = pg_value_format(
833                &DataType::Int64,
834                risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
835                *rw_timestamp_formats,
836                session_data,
837            )?;
838            vec![Some(rw_timestamp)]
839        } else {
840            let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
841            let op = pg_value_format(
842                &DataType::Varchar,
843                risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
844                *op_formats,
845                session_data,
846            )?;
847            vec![Some(op), None]
848        };
849        row.extend(new_row);
850        Ok(Row::new(row))
851    }
852
853    pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
854        if from_snapshot {
855            descs.push(Field::with_name(DataType::Varchar, "op"));
856        }
857        descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
858        descs
859    }
860
861    pub fn create_batch_plan_for_cursor(
862        table_catalog: Arc<TableCatalog>,
863        session: &SessionImpl,
864        context: OptimizerContextRef,
865        epoch_range: Option<(u64, u64)>,
866        version_id: HummockVersionId,
867        seek_pk_rows: Option<Row>,
868    ) -> Result<BatchQueryPlanResult> {
869        // pk + all column without hidden
870        let output_col_idx = table_catalog
871            .columns
872            .iter()
873            .enumerate()
874            .filter_map(|(index, v)| {
875                if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
876                    Some(index)
877                } else {
878                    None
879                }
880            })
881            .collect::<Vec<_>>();
882        let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64;
883        let pks = table_catalog.pk();
884        let pks = pks
885            .iter()
886            .map(|f| {
887                let pk = table_catalog.columns.get(f.column_index).unwrap();
888                (pk.data_type(), f.column_index)
889            })
890            .collect_vec();
891        let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
892            let mut pk_rows = vec![];
893            let mut values = vec![];
894            for (seek_pk, (data_type, column_index)) in
895                seek_pk_rows.take().into_iter().zip_eq_fast(pks.into_iter())
896            {
897                if let Some(seek_pk) = seek_pk {
898                    pk_rows.push(InputRef {
899                        index: column_index,
900                        data_type: data_type.clone(),
901                    });
902                    let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
903                    let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
904                    values.push((Some(value_data), data_type.clone()));
905                }
906            }
907            if pk_rows.is_empty() {
908                (None, None)
909            } else {
910                let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
911                let right_data = ScalarImpl::Struct(StructValue::new(right_data));
912                let right_type = DataType::Struct(StructType::unnamed(right_types));
913                let left = FunctionCall::new_unchecked(
914                    ExprType::Row,
915                    pk_rows.into_iter().map(|pk| pk.into()).collect(),
916                    right_type.clone(),
917                );
918                let right = Literal::new(Some(right_data), right_type);
919                let (scan, predicate) = Condition {
920                    conjunctions: vec![
921                        FunctionCall::new(ExprType::GreaterThan, vec![left.into(), right.into()])?
922                            .into(),
923                    ],
924                }
925                .split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?;
926                if scan.len() > 1 {
927                    return Err(ErrorCode::InternalError(
928                        "Seek pk row should only generate one scan range".to_owned(),
929                    )
930                    .into());
931                }
932                (scan.first().cloned(), Some(predicate))
933            }
934        } else {
935            (None, None)
936        };
937
938        let (seq_scan, out_fields, out_names) = if let Some(epoch_range) = epoch_range {
939            let core = generic::LogScan::new(
940                table_catalog.name.clone(),
941                output_col_idx,
942                Rc::new(table_catalog.table_desc()),
943                context,
944                epoch_range,
945                version_id,
946            );
947            let batch_log_seq_scan = BatchLogSeqScan::new(core, scan);
948            let out_fields = batch_log_seq_scan.core().out_fields();
949            let out_names = batch_log_seq_scan.core().column_names();
950            (batch_log_seq_scan.into(), out_fields, out_names)
951        } else {
952            let core = generic::TableScan::new(
953                table_catalog.name.clone(),
954                output_col_idx,
955                table_catalog.clone(),
956                vec![],
957                context,
958                Condition {
959                    conjunctions: vec![],
960                },
961                None,
962                Cardinality::default(),
963            );
964            let scans = match scan {
965                Some(scan) => vec![scan],
966                None => vec![],
967            };
968            let table_scan = BatchSeqScan::new(core, scans, None);
969            let out_fields = table_scan.core().out_fields();
970            let out_names = table_scan.core().column_names();
971            (table_scan.into(), out_fields, out_names)
972        };
973
974        let plan = if let Some(predicate) = predicate
975            && !predicate.always_true()
976        {
977            BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into()
978        } else {
979            seq_scan
980        };
981
982        // order by pk, so don't need to sort
983        let order = Order::new(table_catalog.pk().to_vec());
984
985        // Here we just need a plan_root to call the method, only out_fields and out_names will be used
986        let plan_root = PlanRoot::new_with_batch_plan(
987            plan,
988            RequiredDist::single(),
989            order,
990            out_fields,
991            out_names,
992        );
993        let schema = plan_root.schema().clone();
994        let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
995            QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
996            QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
997            QueryMode::Distributed => (
998                plan_root.gen_batch_distributed_plan()?,
999                QueryMode::Distributed,
1000            ),
1001        };
1002        Ok(BatchQueryPlanResult {
1003            plan: batch_log_seq_scan,
1004            query_mode,
1005            schema,
1006            stmt_type: StatementType::SELECT,
1007            dependent_relations: table_catalog.dependent_relations.clone(),
1008            read_storage_tables: HashSet::from_iter([table_catalog.id]),
1009        })
1010    }
1011
1012    pub fn idle_duration(&self) -> Duration {
1013        self.last_fetch.elapsed()
1014    }
1015
1016    pub fn subscription_name(&self) -> &str {
1017        self.subscription.name.as_str()
1018    }
1019
1020    pub fn state_info_string(&self) -> String {
1021        format!("{}", self.state)
1022    }
1023}
1024
1025pub struct CursorManager {
1026    cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
1027    cursor_metrics: Arc<CursorMetrics>,
1028}
1029
1030impl CursorManager {
1031    pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
1032        Self {
1033            cursor_map: tokio::sync::Mutex::new(HashMap::new()),
1034            cursor_metrics,
1035        }
1036    }
1037
1038    pub async fn add_subscription_cursor(
1039        &self,
1040        cursor_name: String,
1041        start_timestamp: Option<u64>,
1042        dependent_table_id: TableId,
1043        subscription: Arc<SubscriptionCatalog>,
1044        handler_args: &HandlerArgs,
1045    ) -> Result<()> {
1046        let create_cursor_timer = Instant::now();
1047        let subscription_name = subscription.name.clone();
1048        let cursor = SubscriptionCursor::new(
1049            cursor_name,
1050            start_timestamp,
1051            subscription,
1052            dependent_table_id,
1053            handler_args,
1054            self.cursor_metrics.clone(),
1055        )
1056        .await?;
1057        let mut cursor_map = self.cursor_map.lock().await;
1058        self.cursor_metrics
1059            .subscription_cursor_declare_duration
1060            .with_label_values(&[&subscription_name])
1061            .observe(create_cursor_timer.elapsed().as_millis() as _);
1062
1063        cursor_map.retain(|_, v| {
1064            if let Cursor::Subscription(cursor) = v
1065                && matches!(cursor.state, State::Invalid)
1066            {
1067                false
1068            } else {
1069                true
1070            }
1071        });
1072
1073        cursor_map
1074            .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
1075            .map_err(|error| {
1076                ErrorCode::CatalogError(
1077                    format!("cursor `{}` already exists", error.entry.key()).into(),
1078                )
1079            })?;
1080        Ok(())
1081    }
1082
1083    pub async fn add_query_cursor(
1084        &self,
1085        cursor_name: String,
1086        chunk_stream: CursorDataChunkStream,
1087        fields: Vec<Field>,
1088    ) -> Result<()> {
1089        let cursor = QueryCursor::new(chunk_stream, fields)?;
1090        self.cursor_map
1091            .lock()
1092            .await
1093            .try_insert(cursor_name, Cursor::Query(cursor))
1094            .map_err(|error| {
1095                ErrorCode::CatalogError(
1096                    format!("cursor `{}` already exists", error.entry.key()).into(),
1097                )
1098            })?;
1099
1100        Ok(())
1101    }
1102
1103    pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
1104        self.cursor_map
1105            .lock()
1106            .await
1107            .remove(cursor_name)
1108            .ok_or_else(|| {
1109                ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
1110            })?;
1111        Ok(())
1112    }
1113
1114    pub async fn remove_all_cursor(&self) {
1115        self.cursor_map.lock().await.clear();
1116    }
1117
1118    pub async fn remove_all_query_cursor(&self) {
1119        self.cursor_map
1120            .lock()
1121            .await
1122            .retain(|_, v| matches!(v, Cursor::Subscription(_)));
1123    }
1124
1125    pub async fn get_rows_with_cursor(
1126        &self,
1127        cursor_name: &str,
1128        count: u32,
1129        handler_args: HandlerArgs,
1130        formats: &Vec<Format>,
1131        timeout_seconds: Option<u64>,
1132    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
1133        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1134            cursor
1135                .next(count, handler_args, formats, timeout_seconds)
1136                .await
1137        } else {
1138            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1139        }
1140    }
1141
1142    pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
1143        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1144            Ok(cursor.get_fields())
1145        } else {
1146            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1147        }
1148    }
1149
1150    pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
1151        let mut subsription_cursor_nums = 0;
1152        let mut invalid_subsription_cursor_nums = 0;
1153        let mut subscription_cursor_last_fetch_duration = HashMap::new();
1154        for (_, cursor) in self.cursor_map.lock().await.iter() {
1155            if let Cursor::Subscription(subscription_cursor) = cursor {
1156                subsription_cursor_nums += 1;
1157                if matches!(subscription_cursor.state, State::Invalid) {
1158                    invalid_subsription_cursor_nums += 1;
1159                } else {
1160                    let fetch_duration =
1161                        subscription_cursor.last_fetch.elapsed().as_millis() as f64;
1162                    subscription_cursor_last_fetch_duration.insert(
1163                        subscription_cursor.subscription.name.clone(),
1164                        fetch_duration,
1165                    );
1166                }
1167            }
1168        }
1169        PeriodicCursorMetrics {
1170            subsription_cursor_nums,
1171            invalid_subsription_cursor_nums,
1172            subscription_cursor_last_fetch_duration,
1173        }
1174    }
1175
1176    pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1177        self.cursor_map
1178            .lock()
1179            .await
1180            .iter()
1181            .for_each(|(cursor_name, cursor)| {
1182                if let Cursor::Query(cursor) = cursor {
1183                    f(cursor_name, cursor)
1184                }
1185            });
1186    }
1187
1188    pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1189        self.cursor_map
1190            .lock()
1191            .await
1192            .iter()
1193            .for_each(|(cursor_name, cursor)| {
1194                if let Cursor::Subscription(cursor) = cursor {
1195                    f(cursor_name, cursor)
1196                }
1197            });
1198    }
1199
1200    pub async fn gen_batch_plan_with_subscription_cursor(
1201        &self,
1202        cursor_name: &str,
1203        handler_args: HandlerArgs,
1204    ) -> Result<BatchQueryPlanResult> {
1205        match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1206            ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1207        })? {
1208            Cursor::Subscription(cursor) => {
1209                cursor.gen_batch_plan_result(handler_args.clone())
1210            },
1211            Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1212        }
1213    }
1214}