risingwave_frontend/session/
cursor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use futures::StreamExt;
25use itertools::Itertools;
26use pgwire::pg_field_descriptor::PgFieldDescriptor;
27use pgwire::pg_response::StatementType;
28use pgwire::types::{Format, Row};
29use risingwave_common::catalog::{ColumnCatalog, Field};
30use risingwave_common::error::BoxedError;
31use risingwave_common::session_config::QueryMode;
32use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
33use risingwave_common::util::iter_util::ZipEqFast;
34use risingwave_hummock_sdk::HummockVersionId;
35
36use super::SessionImpl;
37use crate::catalog::TableId;
38use crate::catalog::subscription_catalog::SubscriptionCatalog;
39use crate::error::{ErrorCode, Result};
40use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
41use crate::handler::HandlerArgs;
42use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
43use crate::handler::query::{BatchQueryPlanResult, gen_batch_plan_fragmenter};
44use crate::handler::util::{
45    DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
46    pg_value_format, to_pg_field,
47};
48use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
49use crate::optimizer::PlanRoot;
50use crate::optimizer::plan_node::{BatchFilter, BatchLogSeqScan, BatchSeqScan, generic};
51use crate::optimizer::property::{Order, RequiredDist};
52use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot};
53use crate::utils::Condition;
54use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};
55
56pub enum CursorDataChunkStream {
57    LocalDataChunk(Option<LocalQueryStream>),
58    DistributedDataChunk(Option<DistributedQueryStream>),
59    PgResponse(PgResponseStream),
60}
61
62impl CursorDataChunkStream {
63    pub fn init_row_stream(
64        &mut self,
65        fields: &Vec<Field>,
66        formats: &Vec<Format>,
67        session: Arc<SessionImpl>,
68    ) {
69        let columns_type = fields.iter().map(|f| f.data_type().clone()).collect();
70        match self {
71            CursorDataChunkStream::LocalDataChunk(data_chunk) => {
72                let data_chunk = mem::take(data_chunk).unwrap();
73                let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
74                    data_chunk,
75                    columns_type,
76                    formats.clone(),
77                    session,
78                ));
79                *self = CursorDataChunkStream::PgResponse(row_stream);
80            }
81            CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
82                let data_chunk = mem::take(data_chunk).unwrap();
83                let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
84                    data_chunk,
85                    columns_type,
86                    formats.clone(),
87                    session,
88                ));
89                *self = CursorDataChunkStream::PgResponse(row_stream);
90            }
91            _ => {}
92        }
93    }
94
95    pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
96        match self {
97            CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
98            _ => Err(ErrorCode::InternalError(
99                "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
100            )
101            .into()),
102        }
103    }
104}
105pub enum Cursor {
106    Subscription(SubscriptionCursor),
107    Query(QueryCursor),
108}
109impl Cursor {
110    pub async fn next(
111        &mut self,
112        count: u32,
113        handler_args: HandlerArgs,
114        formats: &Vec<Format>,
115        timeout_seconds: Option<u64>,
116    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
117        match self {
118            Cursor::Subscription(cursor) => cursor
119                .next(count, handler_args, formats, timeout_seconds)
120                .await
121                .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
122            Cursor::Query(cursor) => {
123                cursor
124                    .next(count, formats, handler_args, timeout_seconds)
125                    .await
126            }
127        }
128    }
129
130    pub fn get_fields(&mut self) -> Vec<Field> {
131        match self {
132            Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields().clone(),
133            Cursor::Query(cursor) => cursor.fields.clone(),
134        }
135    }
136}
137
138pub struct QueryCursor {
139    chunk_stream: CursorDataChunkStream,
140    fields: Vec<Field>,
141    remaining_rows: VecDeque<Row>,
142}
143
144impl QueryCursor {
145    pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
146        Ok(Self {
147            chunk_stream,
148            fields,
149            remaining_rows: VecDeque::<Row>::new(),
150        })
151    }
152
153    pub async fn next_once(&mut self) -> Result<Option<Row>> {
154        while self.remaining_rows.is_empty() {
155            let rows = self.chunk_stream.next().await?;
156            let rows = match rows {
157                None => return Ok(None),
158                Some(row) => row?,
159            };
160            self.remaining_rows = rows.into_iter().collect();
161        }
162        let row = self.remaining_rows.pop_front().unwrap();
163        Ok(Some(row))
164    }
165
166    pub async fn next(
167        &mut self,
168        count: u32,
169        formats: &Vec<Format>,
170        handler_args: HandlerArgs,
171        timeout_seconds: Option<u64>,
172    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
173        // `FETCH NEXT` is equivalent to `FETCH 1`.
174        // min with 100 to avoid allocating too many memory at once.
175        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
176        let session = handler_args.session;
177        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
178        let mut cur = 0;
179        let desc = self.fields.iter().map(to_pg_field).collect();
180        self.chunk_stream
181            .init_row_stream(&self.fields, formats, session);
182        while cur < count
183            && let Some(row) = self.next_once().await?
184        {
185            cur += 1;
186            ans.push(row);
187            if let Some(timeout_instant) = timeout_instant
188                && Instant::now() > timeout_instant
189            {
190                break;
191            }
192        }
193        Ok((ans, desc))
194    }
195}
196
197enum State {
198    InitLogStoreQuery {
199        // The rw_timestamp used to initiate the query to read from subscription logstore.
200        seek_timestamp: u64,
201
202        // If specified, the expected_timestamp must be an exact match for the next rw_timestamp.
203        expected_timestamp: Option<u64>,
204    },
205    Fetch {
206        // Whether the query is reading from snapshot
207        // true: read from the upstream table snapshot
208        // false: read from subscription logstore
209        from_snapshot: bool,
210
211        // The rw_timestamp used to initiate the query to read from subscription logstore.
212        rw_timestamp: u64,
213
214        // The row stream to from the batch query read.
215        // It is returned from the batch execution.
216        chunk_stream: CursorDataChunkStream,
217
218        // A cache to store the remaining rows from the row stream.
219        remaining_rows: VecDeque<Row>,
220
221        expected_timestamp: Option<u64>,
222
223        init_query_timer: Instant,
224    },
225    Invalid,
226}
227
228impl Display for State {
229    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
230        match self {
231            State::InitLogStoreQuery {
232                seek_timestamp,
233                expected_timestamp,
234            } => write!(
235                f,
236                "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
237                seek_timestamp, expected_timestamp
238            ),
239            State::Fetch {
240                from_snapshot,
241                rw_timestamp,
242                expected_timestamp,
243                remaining_rows,
244                init_query_timer,
245                ..
246            } => write!(
247                f,
248                "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
249                from_snapshot,
250                rw_timestamp,
251                expected_timestamp,
252                remaining_rows.len(),
253                init_query_timer.elapsed().as_millis()
254            ),
255            State::Invalid => write!(f, "Invalid"),
256        }
257    }
258}
259
260struct FieldsManager {
261    columns_catalog: Vec<ColumnCatalog>,
262    // All row fields, including hidden pk, op, rw_timestamp and all non-hidden columns in the upstream table.
263    row_fields: Vec<Field>,
264    // Row output column indices based on `row_fields`.
265    row_output_col_indices: Vec<usize>,
266    // Row pk indices based on `row_fields`.
267    row_pk_indices: Vec<usize>,
268    // Stream chunk row indices based on `row_fields`.
269    stream_chunk_row_indices: Vec<usize>,
270    // The op index based on `row_fields`.
271    op_index: usize,
272}
273
274impl FieldsManager {
275    // pub const OP_FIELD: Field = Field::with_name(DataType::Varchar, "op".to_owned());
276    // pub const RW_TIMESTAMP_FIELD: Field = Field::with_name(DataType::Int64, "rw_timestamp".to_owned());
277
278    pub fn new(catalog: &TableCatalog) -> Self {
279        let mut row_fields = Vec::new();
280        let mut row_output_col_indices = Vec::new();
281        let mut row_pk_indices = Vec::new();
282        let mut stream_chunk_row_indices = Vec::new();
283        let mut output_idx = 0_usize;
284        let pk_set: HashSet<usize> = catalog
285            .pk
286            .iter()
287            .map(|col_order| col_order.column_index)
288            .collect();
289
290        for (index, v) in catalog.columns.iter().enumerate() {
291            if pk_set.contains(&index) {
292                row_pk_indices.push(output_idx);
293                stream_chunk_row_indices.push(output_idx);
294                row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
295                if !v.is_hidden {
296                    row_output_col_indices.push(output_idx);
297                }
298                output_idx += 1;
299            } else if !v.is_hidden {
300                row_output_col_indices.push(output_idx);
301                stream_chunk_row_indices.push(output_idx);
302                row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
303                output_idx += 1;
304            }
305        }
306
307        row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned()));
308        row_output_col_indices.push(output_idx);
309        let op_index = output_idx;
310        output_idx += 1;
311        row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned()));
312        row_output_col_indices.push(output_idx);
313        Self {
314            columns_catalog: catalog.columns.clone(),
315            row_fields,
316            row_output_col_indices,
317            row_pk_indices,
318            stream_chunk_row_indices,
319            op_index,
320        }
321    }
322
323    pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool {
324        if self.columns_catalog.ne(&catalog.columns) {
325            *self = Self::new(catalog);
326            true
327        } else {
328            false
329        }
330    }
331
332    pub fn process_output_desc_row(&self, mut rows: Vec<Row>) -> (Vec<Row>, Option<Row>) {
333        let last_row = rows.last_mut().map(|row| {
334            let mut row = row.clone();
335            row.project(&self.row_pk_indices)
336        });
337        let rows = rows
338            .iter_mut()
339            .map(|row| row.project(&self.row_output_col_indices))
340            .collect();
341        (rows, last_row)
342    }
343
344    pub fn get_output_fields(&self) -> Vec<Field> {
345        self.row_output_col_indices
346            .iter()
347            .map(|&idx| self.row_fields[idx].clone())
348            .collect()
349    }
350
351    // In the beginning (declare cur), we will give it an empty formats,
352    // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client.
353    pub fn get_row_stream_fields_and_formats(
354        &self,
355        formats: &Vec<Format>,
356        from_snapshot: bool,
357    ) -> (Vec<Field>, Vec<Format>) {
358        let mut fields = Vec::new();
359        let need_format = !(formats.is_empty() || formats.len() == 1);
360        let mut new_formats = formats.clone();
361        let stream_chunk_row_indices_iter = if from_snapshot {
362            self.stream_chunk_row_indices.iter().chain(None)
363        } else {
364            self.stream_chunk_row_indices
365                .iter()
366                .chain(Some(&self.op_index))
367        };
368        for index in stream_chunk_row_indices_iter {
369            fields.push(self.row_fields[*index].clone());
370            if need_format && !self.row_output_col_indices.contains(index) {
371                new_formats.insert(*index, Format::Text);
372            }
373        }
374        (fields, new_formats)
375    }
376}
377
378pub struct SubscriptionCursor {
379    cursor_name: String,
380    subscription: Arc<SubscriptionCatalog>,
381    dependent_table_id: TableId,
382    cursor_need_drop_time: Instant,
383    state: State,
384    // fields will be set in the table's catalog when the cursor is created,
385    // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter.
386    fields_manager: FieldsManager,
387    cursor_metrics: Arc<CursorMetrics>,
388    last_fetch: Instant,
389    seek_pk_row: Option<Row>,
390}
391
392impl SubscriptionCursor {
393    pub async fn new(
394        cursor_name: String,
395        start_timestamp: Option<u64>,
396        subscription: Arc<SubscriptionCatalog>,
397        dependent_table_id: TableId,
398        handler_args: &HandlerArgs,
399        cursor_metrics: Arc<CursorMetrics>,
400    ) -> Result<Self> {
401        let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp {
402            let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?;
403            (
404                State::InitLogStoreQuery {
405                    seek_timestamp: start_timestamp,
406                    expected_timestamp: None,
407                },
408                FieldsManager::new(&table_catalog),
409            )
410        } else {
411            // The query stream needs to initiated on cursor creation to make sure
412            // future fetch on the cursor starts from the snapshot when the cursor is declared.
413            //
414            // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch?
415            let (chunk_stream, init_query_timer, table_catalog) =
416                Self::initiate_query(None, &dependent_table_id, handler_args.clone(), None).await?;
417            let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else(
418                || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()),
419            )? {
420                ReadSnapshot::FrontendPinned { snapshot, .. } => {
421                    snapshot
422                        .version()
423                        .state_table_info
424                        .info()
425                        .get(&dependent_table_id)
426                        .ok_or_else(|| {
427                            anyhow!("dependent_table_id {dependent_table_id} not exists")
428                        })?
429                        .committed_epoch
430                }
431                ReadSnapshot::Other(_) => {
432                    return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into());
433                }
434                ReadSnapshot::ReadUncommitted => {
435                    return Err(ErrorCode::InternalError(
436                        "Fetch Cursor don't support read uncommitted".to_owned(),
437                    )
438                    .into());
439                }
440            };
441            let start_timestamp = pinned_epoch;
442
443            (
444                State::Fetch {
445                    from_snapshot: true,
446                    rw_timestamp: start_timestamp,
447                    chunk_stream,
448                    remaining_rows: VecDeque::new(),
449                    expected_timestamp: None,
450                    init_query_timer,
451                },
452                FieldsManager::new(&table_catalog),
453            )
454        };
455
456        let cursor_need_drop_time =
457            Instant::now() + Duration::from_secs(subscription.retention_seconds);
458        Ok(Self {
459            cursor_name,
460            subscription,
461            dependent_table_id,
462            cursor_need_drop_time,
463            state,
464            fields_manager,
465            cursor_metrics,
466            last_fetch: Instant::now(),
467            seek_pk_row: None,
468        })
469    }
470
471    async fn next_row(
472        &mut self,
473        handler_args: &HandlerArgs,
474        formats: &Vec<Format>,
475    ) -> Result<Option<Row>> {
476        loop {
477            match &mut self.state {
478                State::InitLogStoreQuery {
479                    seek_timestamp,
480                    expected_timestamp,
481                } => {
482                    let from_snapshot = false;
483
484                    // Initiate a new batch query to continue fetching
485                    match Self::get_next_rw_timestamp(
486                        *seek_timestamp,
487                        &self.dependent_table_id,
488                        *expected_timestamp,
489                        handler_args.clone(),
490                        &self.subscription,
491                    ) {
492                        Ok((Some(rw_timestamp), expected_timestamp)) => {
493                            let (mut chunk_stream, init_query_timer, catalog) =
494                                Self::initiate_query(
495                                    Some(rw_timestamp),
496                                    &self.dependent_table_id,
497                                    handler_args.clone(),
498                                    None,
499                                )
500                                .await?;
501                            let table_schema_changed =
502                                self.fields_manager.try_refill_fields(&catalog);
503                            let (fields, formats) = self
504                                .fields_manager
505                                .get_row_stream_fields_and_formats(formats, from_snapshot);
506                            chunk_stream.init_row_stream(
507                                &fields,
508                                &formats,
509                                handler_args.session.clone(),
510                            );
511
512                            self.cursor_need_drop_time = Instant::now()
513                                + Duration::from_secs(self.subscription.retention_seconds);
514                            let mut remaining_rows = VecDeque::new();
515                            Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
516                                .await?;
517                            // Transition to the Fetch state
518                            self.state = State::Fetch {
519                                from_snapshot,
520                                rw_timestamp,
521                                chunk_stream,
522                                remaining_rows,
523                                expected_timestamp,
524                                init_query_timer,
525                            };
526                            if table_schema_changed {
527                                return Ok(None);
528                            }
529                        }
530                        Ok((None, _)) => return Ok(None),
531                        Err(e) => {
532                            self.state = State::Invalid;
533                            return Err(e);
534                        }
535                    }
536                }
537                State::Fetch {
538                    from_snapshot,
539                    rw_timestamp,
540                    chunk_stream,
541                    remaining_rows,
542                    expected_timestamp,
543                    init_query_timer,
544                } => {
545                    let session_data = StaticSessionData {
546                        timezone: handler_args.session.config().timezone(),
547                    };
548                    let from_snapshot = *from_snapshot;
549                    let rw_timestamp = *rw_timestamp;
550
551                    // Try refill remaining rows
552                    Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
553
554                    if let Some(row) = remaining_rows.pop_front() {
555                        // 1. Fetch the next row
556                        if from_snapshot {
557                            return Ok(Some(Self::build_row(
558                                row.take(),
559                                None,
560                                formats,
561                                &session_data,
562                            )?));
563                        } else {
564                            return Ok(Some(Self::build_row(
565                                row.take(),
566                                Some(rw_timestamp),
567                                formats,
568                                &session_data,
569                            )?));
570                        }
571                    } else {
572                        self.cursor_metrics
573                            .subscription_cursor_query_duration
574                            .with_label_values(&[&self.subscription.name])
575                            .observe(init_query_timer.elapsed().as_millis() as _);
576                        // 2. Reach EOF for the current query.
577                        if let Some(expected_timestamp) = expected_timestamp {
578                            self.state = State::InitLogStoreQuery {
579                                seek_timestamp: *expected_timestamp,
580                                expected_timestamp: Some(*expected_timestamp),
581                            };
582                        } else {
583                            self.state = State::InitLogStoreQuery {
584                                seek_timestamp: rw_timestamp + 1,
585                                expected_timestamp: None,
586                            };
587                        }
588                    }
589                }
590                State::Invalid => {
591                    // TODO: auto close invalid cursor?
592                    return Err(ErrorCode::InternalError(
593                        "Cursor is in invalid state. Please close and re-create the cursor."
594                            .to_owned(),
595                    )
596                    .into());
597                }
598            }
599        }
600    }
601
602    pub async fn next(
603        &mut self,
604        count: u32,
605        handler_args: HandlerArgs,
606        formats: &Vec<Format>,
607        timeout_seconds: Option<u64>,
608    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
609        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
610        if Instant::now() > self.cursor_need_drop_time {
611            return Err(ErrorCode::InternalError(
612                "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
613            )
614            .into());
615        }
616
617        let session = &handler_args.session;
618        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
619        let mut cur = 0;
620        if let State::Fetch {
621            from_snapshot,
622            chunk_stream,
623            ..
624        } = &mut self.state
625        {
626            let (fields, fotmats) = self
627                .fields_manager
628                .get_row_stream_fields_and_formats(formats, *from_snapshot);
629            chunk_stream.init_row_stream(&fields, &fotmats, session.clone());
630        }
631        while cur < count {
632            let fetch_cursor_timer = Instant::now();
633            let row = self.next_row(&handler_args, formats).await?;
634            self.cursor_metrics
635                .subscription_cursor_fetch_duration
636                .with_label_values(&[&self.subscription.name])
637                .observe(fetch_cursor_timer.elapsed().as_millis() as _);
638            match row {
639                Some(row) => {
640                    cur += 1;
641                    ans.push(row);
642                }
643                None => {
644                    let timeout_seconds = timeout_seconds.unwrap_or(0);
645                    if cur > 0 || timeout_seconds == 0 {
646                        break;
647                    }
648                    // It's only blocked when there's no data
649                    // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`.
650                    match tokio::time::timeout(
651                        Duration::from_secs(timeout_seconds),
652                        session
653                            .env
654                            .hummock_snapshot_manager()
655                            .wait_table_change_log_notification(self.dependent_table_id.table_id()),
656                    )
657                    .await
658                    {
659                        Ok(result) => result?,
660                        Err(_) => {
661                            tracing::debug!("Cursor wait next epoch timeout");
662                            break;
663                        }
664                    }
665                }
666            }
667            // Timeout, return with current value
668            if let Some(timeout_instant) = timeout_instant
669                && Instant::now() > timeout_instant
670            {
671                break;
672            }
673        }
674        self.last_fetch = Instant::now();
675        let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans);
676        if let Some(seek_pk_row) = seek_pk_row {
677            self.seek_pk_row = Some(seek_pk_row);
678        }
679        let desc = self
680            .fields_manager
681            .get_output_fields()
682            .iter()
683            .map(to_pg_field)
684            .collect();
685
686        Ok((rows, desc))
687    }
688
689    fn get_next_rw_timestamp(
690        seek_timestamp: u64,
691        table_id: &TableId,
692        expected_timestamp: Option<u64>,
693        handler_args: HandlerArgs,
694        dependent_subscription: &SubscriptionCatalog,
695    ) -> Result<(Option<u64>, Option<u64>)> {
696        let session = handler_args.session;
697        // Test subscription existence
698        session.get_subscription_by_schema_id_name(
699            dependent_subscription.schema_id,
700            &dependent_subscription.name,
701        )?;
702
703        // The epoch here must be pulled every time, otherwise there will be cache consistency issues
704        let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?;
705        if let Some(expected_timestamp) = expected_timestamp
706            && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
707        {
708            return Err(ErrorCode::CatalogError(
709                format!(
710                    " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
711                    convert_logstore_u64_to_unix_millis(expected_timestamp)
712                )
713                .into(),
714            )
715            .into());
716        }
717        Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
718    }
719
720    pub fn gen_batch_plan_result(&self, handler_args: HandlerArgs) -> Result<BatchQueryPlanResult> {
721        match self.state {
722            // Only used to return generated plans, so rw_timestamp are meaningless
723            State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
724                Some(0),
725                &self.dependent_table_id,
726                handler_args,
727                self.seek_pk_row.clone(),
728            ),
729            State::Fetch {
730                from_snapshot,
731                rw_timestamp,
732                ..
733            } => {
734                if from_snapshot {
735                    Self::init_batch_plan_for_subscription_cursor(
736                        None,
737                        &self.dependent_table_id,
738                        handler_args,
739                        self.seek_pk_row.clone(),
740                    )
741                } else {
742                    Self::init_batch_plan_for_subscription_cursor(
743                        Some(rw_timestamp),
744                        &self.dependent_table_id,
745                        handler_args,
746                        self.seek_pk_row.clone(),
747                    )
748                }
749            }
750            State::Invalid => Err(ErrorCode::InternalError(
751                "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
752            )
753            .into()),
754        }
755    }
756
757    fn init_batch_plan_for_subscription_cursor(
758        rw_timestamp: Option<u64>,
759        dependent_table_id: &TableId,
760        handler_args: HandlerArgs,
761        seek_pk_row: Option<Row>,
762    ) -> Result<BatchQueryPlanResult> {
763        let session = handler_args.clone().session;
764        let table_catalog = session.get_table_by_id(dependent_table_id)?;
765        let context = OptimizerContext::from_handler_args(handler_args.clone());
766        let version_id = {
767            let version = session.env.hummock_snapshot_manager.acquire();
768            let version = version.version();
769            if !version
770                .state_table_info
771                .info()
772                .contains_key(dependent_table_id)
773            {
774                return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
775            }
776            version.id
777        };
778        Self::create_batch_plan_for_cursor(
779            table_catalog,
780            &session,
781            context.into(),
782            rw_timestamp.map(|rw_timestamp| (rw_timestamp, rw_timestamp)),
783            version_id,
784            seek_pk_row,
785        )
786    }
787
788    async fn initiate_query(
789        rw_timestamp: Option<u64>,
790        dependent_table_id: &TableId,
791        handler_args: HandlerArgs,
792        seek_pk_row: Option<Row>,
793    ) -> Result<(CursorDataChunkStream, Instant, Arc<TableCatalog>)> {
794        let init_query_timer = Instant::now();
795        let session = handler_args.clone().session;
796        let table_catalog = session.get_table_by_id(dependent_table_id)?;
797        let plan_result = Self::init_batch_plan_for_subscription_cursor(
798            rw_timestamp,
799            dependent_table_id,
800            handler_args.clone(),
801            seek_pk_row,
802        )?;
803        let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
804        let (chunk_stream, _) =
805            create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
806        Ok((chunk_stream, init_query_timer, table_catalog))
807    }
808
809    async fn try_refill_remaining_rows(
810        chunk_stream: &mut CursorDataChunkStream,
811        remaining_rows: &mut VecDeque<Row>,
812    ) -> Result<()> {
813        if remaining_rows.is_empty()
814            && let Some(row_set) = chunk_stream.next().await?
815        {
816            remaining_rows.extend(row_set?);
817        }
818        Ok(())
819    }
820
821    pub fn build_row(
822        mut row: Vec<Option<Bytes>>,
823        rw_timestamp: Option<u64>,
824        formats: &Vec<Format>,
825        session_data: &StaticSessionData,
826    ) -> Result<Row> {
827        let row_len = row.len();
828        let new_row = if let Some(rw_timestamp) = rw_timestamp {
829            let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
830            let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
831            let rw_timestamp = pg_value_format(
832                &DataType::Int64,
833                risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
834                *rw_timestamp_formats,
835                session_data,
836            )?;
837            vec![Some(rw_timestamp)]
838        } else {
839            let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
840            let op = pg_value_format(
841                &DataType::Varchar,
842                risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
843                *op_formats,
844                session_data,
845            )?;
846            vec![Some(op), None]
847        };
848        row.extend(new_row);
849        Ok(Row::new(row))
850    }
851
852    pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
853        if from_snapshot {
854            descs.push(Field::with_name(DataType::Varchar, "op"));
855        }
856        descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
857        descs
858    }
859
860    pub fn create_batch_plan_for_cursor(
861        table_catalog: Arc<TableCatalog>,
862        session: &SessionImpl,
863        context: OptimizerContextRef,
864        epoch_range: Option<(u64, u64)>,
865        version_id: HummockVersionId,
866        seek_pk_rows: Option<Row>,
867    ) -> Result<BatchQueryPlanResult> {
868        // pk + all column without hidden
869        let output_col_idx = table_catalog
870            .columns
871            .iter()
872            .enumerate()
873            .filter_map(|(index, v)| {
874                if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
875                    Some(index)
876                } else {
877                    None
878                }
879            })
880            .collect::<Vec<_>>();
881        let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64;
882        let pks = table_catalog.pk();
883        let pks = pks
884            .iter()
885            .map(|f| {
886                let pk = table_catalog.columns.get(f.column_index).unwrap();
887                (pk.data_type(), f.column_index)
888            })
889            .collect_vec();
890        let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
891            let mut pk_rows = vec![];
892            let mut values = vec![];
893            for (seek_pk, (data_type, column_index)) in
894                seek_pk_rows.take().into_iter().zip_eq_fast(pks.into_iter())
895            {
896                if let Some(seek_pk) = seek_pk {
897                    pk_rows.push(InputRef {
898                        index: column_index,
899                        data_type: data_type.clone(),
900                    });
901                    let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
902                    let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
903                    values.push((Some(value_data), data_type.clone()));
904                }
905            }
906            if pk_rows.is_empty() {
907                (None, None)
908            } else {
909                let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
910                let right_data = ScalarImpl::Struct(StructValue::new(right_data));
911                let right_type = DataType::Struct(StructType::unnamed(right_types));
912                let left = FunctionCall::new_unchecked(
913                    ExprType::Row,
914                    pk_rows.into_iter().map(|pk| pk.into()).collect(),
915                    right_type.clone(),
916                );
917                let right = Literal::new(Some(right_data), right_type);
918                let (scan, predicate) = Condition {
919                    conjunctions: vec![
920                        FunctionCall::new(ExprType::GreaterThan, vec![left.into(), right.into()])?
921                            .into(),
922                    ],
923                }
924                .split_to_scan_ranges(&table_catalog, max_split_range_gap)?;
925                if scan.len() > 1 {
926                    return Err(ErrorCode::InternalError(
927                        "Seek pk row should only generate one scan range".to_owned(),
928                    )
929                    .into());
930                }
931                (scan.first().cloned(), Some(predicate))
932            }
933        } else {
934            (None, None)
935        };
936
937        let (seq_scan, out_fields, out_names) = if let Some(epoch_range) = epoch_range {
938            let core = generic::LogScan::new(
939                table_catalog.name.clone(),
940                output_col_idx,
941                table_catalog.clone(),
942                context,
943                epoch_range,
944                version_id,
945            );
946            let batch_log_seq_scan = BatchLogSeqScan::new(core, scan);
947            let out_fields = batch_log_seq_scan.core().out_fields();
948            let out_names = batch_log_seq_scan.core().column_names();
949            (batch_log_seq_scan.into(), out_fields, out_names)
950        } else {
951            let core = generic::TableScan::new(
952                output_col_idx,
953                table_catalog.clone(),
954                vec![],
955                context,
956                Condition {
957                    conjunctions: vec![],
958                },
959                None,
960            );
961            let scans = match scan {
962                Some(scan) => vec![scan],
963                None => vec![],
964            };
965            let table_scan = BatchSeqScan::new(core, scans, None);
966            let out_fields = table_scan.core().out_fields();
967            let out_names = table_scan.core().column_names();
968            (table_scan.into(), out_fields, out_names)
969        };
970
971        let plan = if let Some(predicate) = predicate
972            && !predicate.always_true()
973        {
974            BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into()
975        } else {
976            seq_scan
977        };
978
979        // order by pk, so don't need to sort
980        let order = Order::new(table_catalog.pk().to_vec());
981
982        // Here we just need a plan_root to call the method, only out_fields and out_names will be used
983        let plan_root = PlanRoot::new_with_batch_plan(
984            plan,
985            RequiredDist::single(),
986            order,
987            out_fields,
988            out_names,
989        );
990        let schema = plan_root.schema().clone();
991        let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
992            QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
993            QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
994            QueryMode::Distributed => (
995                plan_root.gen_batch_distributed_plan()?,
996                QueryMode::Distributed,
997            ),
998        };
999        Ok(BatchQueryPlanResult {
1000            plan: batch_log_seq_scan,
1001            query_mode,
1002            schema,
1003            stmt_type: StatementType::SELECT,
1004            dependent_relations: vec![],
1005            read_storage_tables: HashSet::from_iter([table_catalog.id]),
1006        })
1007    }
1008
1009    pub fn idle_duration(&self) -> Duration {
1010        self.last_fetch.elapsed()
1011    }
1012
1013    pub fn subscription_name(&self) -> &str {
1014        self.subscription.name.as_str()
1015    }
1016
1017    pub fn state_info_string(&self) -> String {
1018        format!("{}", self.state)
1019    }
1020}
1021
1022pub struct CursorManager {
1023    cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
1024    cursor_metrics: Arc<CursorMetrics>,
1025}
1026
1027impl CursorManager {
1028    pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
1029        Self {
1030            cursor_map: tokio::sync::Mutex::new(HashMap::new()),
1031            cursor_metrics,
1032        }
1033    }
1034
1035    pub async fn add_subscription_cursor(
1036        &self,
1037        cursor_name: String,
1038        start_timestamp: Option<u64>,
1039        dependent_table_id: TableId,
1040        subscription: Arc<SubscriptionCatalog>,
1041        handler_args: &HandlerArgs,
1042    ) -> Result<()> {
1043        let create_cursor_timer = Instant::now();
1044        let subscription_name = subscription.name.clone();
1045        let cursor = SubscriptionCursor::new(
1046            cursor_name,
1047            start_timestamp,
1048            subscription,
1049            dependent_table_id,
1050            handler_args,
1051            self.cursor_metrics.clone(),
1052        )
1053        .await?;
1054        let mut cursor_map = self.cursor_map.lock().await;
1055        self.cursor_metrics
1056            .subscription_cursor_declare_duration
1057            .with_label_values(&[&subscription_name])
1058            .observe(create_cursor_timer.elapsed().as_millis() as _);
1059
1060        cursor_map.retain(|_, v| {
1061            if let Cursor::Subscription(cursor) = v
1062                && matches!(cursor.state, State::Invalid)
1063            {
1064                false
1065            } else {
1066                true
1067            }
1068        });
1069
1070        cursor_map
1071            .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
1072            .map_err(|error| {
1073                ErrorCode::CatalogError(
1074                    format!("cursor `{}` already exists", error.entry.key()).into(),
1075                )
1076            })?;
1077        Ok(())
1078    }
1079
1080    pub async fn add_query_cursor(
1081        &self,
1082        cursor_name: String,
1083        chunk_stream: CursorDataChunkStream,
1084        fields: Vec<Field>,
1085    ) -> Result<()> {
1086        let cursor = QueryCursor::new(chunk_stream, fields)?;
1087        self.cursor_map
1088            .lock()
1089            .await
1090            .try_insert(cursor_name, Cursor::Query(cursor))
1091            .map_err(|error| {
1092                ErrorCode::CatalogError(
1093                    format!("cursor `{}` already exists", error.entry.key()).into(),
1094                )
1095            })?;
1096
1097        Ok(())
1098    }
1099
1100    pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
1101        self.cursor_map
1102            .lock()
1103            .await
1104            .remove(cursor_name)
1105            .ok_or_else(|| {
1106                ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
1107            })?;
1108        Ok(())
1109    }
1110
1111    pub async fn remove_all_cursor(&self) {
1112        self.cursor_map.lock().await.clear();
1113    }
1114
1115    pub async fn remove_all_query_cursor(&self) {
1116        self.cursor_map
1117            .lock()
1118            .await
1119            .retain(|_, v| matches!(v, Cursor::Subscription(_)));
1120    }
1121
1122    pub async fn get_rows_with_cursor(
1123        &self,
1124        cursor_name: &str,
1125        count: u32,
1126        handler_args: HandlerArgs,
1127        formats: &Vec<Format>,
1128        timeout_seconds: Option<u64>,
1129    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
1130        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1131            cursor
1132                .next(count, handler_args, formats, timeout_seconds)
1133                .await
1134        } else {
1135            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1136        }
1137    }
1138
1139    pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
1140        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1141            Ok(cursor.get_fields())
1142        } else {
1143            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1144        }
1145    }
1146
1147    pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
1148        let mut subsription_cursor_nums = 0;
1149        let mut invalid_subsription_cursor_nums = 0;
1150        let mut subscription_cursor_last_fetch_duration = HashMap::new();
1151        for (_, cursor) in self.cursor_map.lock().await.iter() {
1152            if let Cursor::Subscription(subscription_cursor) = cursor {
1153                subsription_cursor_nums += 1;
1154                if matches!(subscription_cursor.state, State::Invalid) {
1155                    invalid_subsription_cursor_nums += 1;
1156                } else {
1157                    let fetch_duration =
1158                        subscription_cursor.last_fetch.elapsed().as_millis() as f64;
1159                    subscription_cursor_last_fetch_duration.insert(
1160                        subscription_cursor.subscription.name.clone(),
1161                        fetch_duration,
1162                    );
1163                }
1164            }
1165        }
1166        PeriodicCursorMetrics {
1167            subsription_cursor_nums,
1168            invalid_subsription_cursor_nums,
1169            subscription_cursor_last_fetch_duration,
1170        }
1171    }
1172
1173    pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1174        self.cursor_map
1175            .lock()
1176            .await
1177            .iter()
1178            .for_each(|(cursor_name, cursor)| {
1179                if let Cursor::Query(cursor) = cursor {
1180                    f(cursor_name, cursor)
1181                }
1182            });
1183    }
1184
1185    pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1186        self.cursor_map
1187            .lock()
1188            .await
1189            .iter()
1190            .for_each(|(cursor_name, cursor)| {
1191                if let Cursor::Subscription(cursor) = cursor {
1192                    f(cursor_name, cursor)
1193                }
1194            });
1195    }
1196
1197    pub async fn gen_batch_plan_with_subscription_cursor(
1198        &self,
1199        cursor_name: &str,
1200        handler_args: HandlerArgs,
1201    ) -> Result<BatchQueryPlanResult> {
1202        match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1203            ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1204        })? {
1205            Cursor::Subscription(cursor) => {
1206                cursor.gen_batch_plan_result(handler_args.clone())
1207            },
1208            Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1209        }
1210    }
1211}