risingwave_frontend/session/
cursor_manager.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use futures::StreamExt;
25use itertools::Itertools;
26use pgwire::pg_field_descriptor::PgFieldDescriptor;
27use pgwire::pg_response::StatementType;
28use pgwire::types::{Format, Row};
29use risingwave_common::catalog::{ColumnCatalog, Field};
30use risingwave_common::error::BoxedError;
31use risingwave_common::session_config::QueryMode;
32use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
33use risingwave_common::util::iter_util::ZipEqFast;
34use risingwave_hummock_sdk::HummockVersionId;
35
36use super::SessionImpl;
37use crate::catalog::TableId;
38use crate::catalog::subscription_catalog::SubscriptionCatalog;
39use crate::error::{ErrorCode, Result};
40use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
41use crate::handler::HandlerArgs;
42use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
43use crate::handler::query::{RwBatchQueryPlanResult, gen_batch_plan_fragmenter};
44use crate::handler::util::{
45    DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
46    pg_value_format, to_pg_field,
47};
48use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
49use crate::optimizer::PlanRoot;
50use crate::optimizer::plan_node::{BatchFilter, BatchLogSeqScan, BatchSeqScan, generic};
51use crate::optimizer::property::{Order, RequiredDist};
52use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot};
53use crate::utils::Condition;
54use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};
55
56pub enum CursorDataChunkStream {
57    LocalDataChunk(Option<LocalQueryStream>),
58    DistributedDataChunk(Option<DistributedQueryStream>),
59    PgResponse(PgResponseStream),
60}
61
62impl CursorDataChunkStream {
63    pub fn init_row_stream(
64        &mut self,
65        fields: &Vec<Field>,
66        formats: &Vec<Format>,
67        session: Arc<SessionImpl>,
68    ) {
69        let columns_type = fields.iter().map(|f| f.data_type()).collect();
70        match self {
71            CursorDataChunkStream::LocalDataChunk(data_chunk) => {
72                let data_chunk = mem::take(data_chunk).unwrap();
73                let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
74                    data_chunk,
75                    columns_type,
76                    formats.clone(),
77                    session,
78                ));
79                *self = CursorDataChunkStream::PgResponse(row_stream);
80            }
81            CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
82                let data_chunk = mem::take(data_chunk).unwrap();
83                let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
84                    data_chunk,
85                    columns_type,
86                    formats.clone(),
87                    session,
88                ));
89                *self = CursorDataChunkStream::PgResponse(row_stream);
90            }
91            _ => {}
92        }
93    }
94
95    pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
96        match self {
97            CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
98            _ => Err(ErrorCode::InternalError(
99                "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
100            )
101            .into()),
102        }
103    }
104}
105pub enum Cursor {
106    Subscription(SubscriptionCursor),
107    Query(QueryCursor),
108}
109impl Cursor {
110    pub async fn next(
111        &mut self,
112        count: u32,
113        handler_args: HandlerArgs,
114        formats: &Vec<Format>,
115        timeout_seconds: Option<u64>,
116    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
117        match self {
118            Cursor::Subscription(cursor) => cursor
119                .next(count, handler_args, formats, timeout_seconds)
120                .await
121                .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
122            Cursor::Query(cursor) => {
123                cursor
124                    .next(count, formats, handler_args, timeout_seconds)
125                    .await
126            }
127        }
128    }
129
130    pub fn get_fields(&mut self) -> Vec<Field> {
131        match self {
132            Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields(),
133            Cursor::Query(cursor) => cursor.fields.clone(),
134        }
135    }
136}
137
138pub struct QueryCursor {
139    chunk_stream: CursorDataChunkStream,
140    fields: Vec<Field>,
141    remaining_rows: VecDeque<Row>,
142}
143
144impl QueryCursor {
145    pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
146        Ok(Self {
147            chunk_stream,
148            fields,
149            remaining_rows: VecDeque::<Row>::new(),
150        })
151    }
152
153    pub async fn next_once(&mut self) -> Result<Option<Row>> {
154        while self.remaining_rows.is_empty() {
155            let rows = self.chunk_stream.next().await?;
156            let rows = match rows {
157                None => return Ok(None),
158                Some(row) => row?,
159            };
160            self.remaining_rows = rows.into_iter().collect();
161        }
162        let row = self.remaining_rows.pop_front().unwrap();
163        Ok(Some(row))
164    }
165
166    pub async fn next(
167        &mut self,
168        count: u32,
169        formats: &Vec<Format>,
170        handler_args: HandlerArgs,
171        timeout_seconds: Option<u64>,
172    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
173        // `FETCH NEXT` is equivalent to `FETCH 1`.
174        // min with 100 to avoid allocating too many memory at once.
175        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
176        let session = handler_args.session;
177        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
178        let mut cur = 0;
179        let desc = self.fields.iter().map(to_pg_field).collect();
180        self.chunk_stream
181            .init_row_stream(&self.fields, formats, session);
182        while cur < count
183            && let Some(row) = self.next_once().await?
184        {
185            cur += 1;
186            ans.push(row);
187            if let Some(timeout_instant) = timeout_instant
188                && Instant::now() > timeout_instant
189            {
190                break;
191            }
192        }
193        Ok((ans, desc))
194    }
195}
196
197enum State {
198    InitLogStoreQuery {
199        // The rw_timestamp used to initiate the query to read from subscription logstore.
200        seek_timestamp: u64,
201
202        // If specified, the expected_timestamp must be an exact match for the next rw_timestamp.
203        expected_timestamp: Option<u64>,
204    },
205    Fetch {
206        // Whether the query is reading from snapshot
207        // true: read from the upstream table snapshot
208        // false: read from subscription logstore
209        from_snapshot: bool,
210
211        // The rw_timestamp used to initiate the query to read from subscription logstore.
212        rw_timestamp: u64,
213
214        // The row stream to from the batch query read.
215        // It is returned from the batch execution.
216        chunk_stream: CursorDataChunkStream,
217
218        // A cache to store the remaining rows from the row stream.
219        remaining_rows: VecDeque<Row>,
220
221        expected_timestamp: Option<u64>,
222
223        init_query_timer: Instant,
224    },
225    Invalid,
226}
227
228impl Display for State {
229    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
230        match self {
231            State::InitLogStoreQuery {
232                seek_timestamp,
233                expected_timestamp,
234            } => write!(
235                f,
236                "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
237                seek_timestamp, expected_timestamp
238            ),
239            State::Fetch {
240                from_snapshot,
241                rw_timestamp,
242                expected_timestamp,
243                remaining_rows,
244                init_query_timer,
245                ..
246            } => write!(
247                f,
248                "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
249                from_snapshot,
250                rw_timestamp,
251                expected_timestamp,
252                remaining_rows.len(),
253                init_query_timer.elapsed().as_millis()
254            ),
255            State::Invalid => write!(f, "Invalid"),
256        }
257    }
258}
259
260struct FieldsManager {
261    columns_catalog: Vec<ColumnCatalog>,
262    // All row fields, including hidden pk, op, rw_timestamp and all non-hidden columns in the upstream table.
263    row_fields: Vec<Field>,
264    // Row output column indices based on `row_fields`.
265    row_output_col_indices: Vec<usize>,
266    // Row pk indices based on `row_fields`.
267    row_pk_indices: Vec<usize>,
268    // Stream chunk row indices based on `row_fields`.
269    stream_chunk_row_indices: Vec<usize>,
270    // The op index based on `row_fields`.
271    op_index: usize,
272}
273
274impl FieldsManager {
275    // pub const OP_FIELD: Field = Field::with_name(DataType::Varchar, "op".to_owned());
276    // pub const RW_TIMESTAMP_FIELD: Field = Field::with_name(DataType::Int64, "rw_timestamp".to_owned());
277
278    pub fn new(catalog: &TableCatalog) -> Self {
279        let mut row_fields = Vec::new();
280        let mut row_output_col_indices = Vec::new();
281        let mut row_pk_indices = Vec::new();
282        let mut stream_chunk_row_indices = Vec::new();
283        let mut output_idx = 0_usize;
284        let pk_set: HashSet<usize> = catalog
285            .pk
286            .iter()
287            .map(|col_order| col_order.column_index)
288            .collect();
289
290        for (index, v) in catalog.columns.iter().enumerate() {
291            if pk_set.contains(&index) {
292                row_pk_indices.push(output_idx);
293                stream_chunk_row_indices.push(output_idx);
294                row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
295                if !v.is_hidden {
296                    row_output_col_indices.push(output_idx);
297                }
298                output_idx += 1;
299            } else if !v.is_hidden {
300                row_output_col_indices.push(output_idx);
301                stream_chunk_row_indices.push(output_idx);
302                row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
303                output_idx += 1;
304            }
305        }
306
307        row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned()));
308        row_output_col_indices.push(output_idx);
309        let op_index = output_idx;
310        output_idx += 1;
311        row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned()));
312        row_output_col_indices.push(output_idx);
313        Self {
314            columns_catalog: catalog.columns.clone(),
315            row_fields,
316            row_output_col_indices,
317            row_pk_indices,
318            stream_chunk_row_indices,
319            op_index,
320        }
321    }
322
323    pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool {
324        if self.columns_catalog.ne(&catalog.columns) {
325            *self = Self::new(catalog);
326            true
327        } else {
328            false
329        }
330    }
331
332    pub fn process_output_desc_row(&self, mut rows: Vec<Row>) -> (Vec<Row>, Option<Row>) {
333        let last_row = rows.last_mut().map(|row| {
334            let mut row = row.clone();
335            row.project(&self.row_pk_indices)
336        });
337        let rows = rows
338            .iter_mut()
339            .map(|row| row.project(&self.row_output_col_indices))
340            .collect();
341        (rows, last_row)
342    }
343
344    pub fn get_output_fields(&self) -> Vec<Field> {
345        self.row_output_col_indices
346            .iter()
347            .map(|&idx| self.row_fields[idx].clone())
348            .collect()
349    }
350
351    // In the beginning (declare cur), we will give it an empty formats,
352    // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client.
353    pub fn get_row_stream_fields_and_formats(
354        &self,
355        formats: &Vec<Format>,
356        from_snapshot: bool,
357    ) -> (Vec<Field>, Vec<Format>) {
358        let mut fields = Vec::new();
359        let need_format = !(formats.is_empty() || formats.len() == 1);
360        let mut new_formats = formats.clone();
361        let stream_chunk_row_indices_iter = if from_snapshot {
362            self.stream_chunk_row_indices.iter().chain(None)
363        } else {
364            self.stream_chunk_row_indices
365                .iter()
366                .chain(Some(&self.op_index))
367        };
368        for index in stream_chunk_row_indices_iter {
369            fields.push(self.row_fields[*index].clone());
370            if need_format && !self.row_output_col_indices.contains(index) {
371                new_formats.insert(*index, Format::Text);
372            }
373        }
374        (fields, new_formats)
375    }
376}
377
378pub struct SubscriptionCursor {
379    cursor_name: String,
380    subscription: Arc<SubscriptionCatalog>,
381    dependent_table_id: TableId,
382    cursor_need_drop_time: Instant,
383    state: State,
384    // fields will be set in the table's catalog when the cursor is created,
385    // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter.
386    fields_manager: FieldsManager,
387    cursor_metrics: Arc<CursorMetrics>,
388    last_fetch: Instant,
389    seek_pk_row: Option<Row>,
390}
391
392impl SubscriptionCursor {
393    pub async fn new(
394        cursor_name: String,
395        start_timestamp: Option<u64>,
396        subscription: Arc<SubscriptionCatalog>,
397        dependent_table_id: TableId,
398        handler_args: &HandlerArgs,
399        cursor_metrics: Arc<CursorMetrics>,
400    ) -> Result<Self> {
401        let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp {
402            let table_catalog = handler_args.session.get_table_by_id(dependent_table_id)?;
403            (
404                State::InitLogStoreQuery {
405                    seek_timestamp: start_timestamp,
406                    expected_timestamp: None,
407                },
408                FieldsManager::new(&table_catalog),
409            )
410        } else {
411            // The query stream needs to initiated on cursor creation to make sure
412            // future fetch on the cursor starts from the snapshot when the cursor is declared.
413            //
414            // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch?
415            let (chunk_stream, init_query_timer, table_catalog) =
416                Self::initiate_query(None, dependent_table_id, handler_args.clone(), None).await?;
417            let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else(
418                || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()),
419            )? {
420                ReadSnapshot::FrontendPinned { snapshot, .. } => {
421                    snapshot
422                        .version()
423                        .state_table_info
424                        .info()
425                        .get(&dependent_table_id)
426                        .ok_or_else(|| {
427                            anyhow!("dependent_table_id {dependent_table_id} not exists")
428                        })?
429                        .committed_epoch
430                }
431                ReadSnapshot::Other(_) => {
432                    return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into());
433                }
434                ReadSnapshot::ReadUncommitted => {
435                    return Err(ErrorCode::InternalError(
436                        "Fetch Cursor don't support read uncommitted".to_owned(),
437                    )
438                    .into());
439                }
440            };
441            let start_timestamp = pinned_epoch;
442
443            (
444                State::Fetch {
445                    from_snapshot: true,
446                    rw_timestamp: start_timestamp,
447                    chunk_stream,
448                    remaining_rows: VecDeque::new(),
449                    expected_timestamp: None,
450                    init_query_timer,
451                },
452                FieldsManager::new(&table_catalog),
453            )
454        };
455
456        let cursor_need_drop_time =
457            Instant::now() + Duration::from_secs(subscription.retention_seconds);
458        Ok(Self {
459            cursor_name,
460            subscription,
461            dependent_table_id,
462            cursor_need_drop_time,
463            state,
464            fields_manager,
465            cursor_metrics,
466            last_fetch: Instant::now(),
467            seek_pk_row: None,
468        })
469    }
470
471    async fn next_row(
472        &mut self,
473        handler_args: &HandlerArgs,
474        formats: &Vec<Format>,
475    ) -> Result<Option<Row>> {
476        loop {
477            match &mut self.state {
478                State::InitLogStoreQuery {
479                    seek_timestamp,
480                    expected_timestamp,
481                } => {
482                    let from_snapshot = false;
483
484                    // Initiate a new batch query to continue fetching
485                    match Self::get_next_rw_timestamp(
486                        *seek_timestamp,
487                        self.dependent_table_id,
488                        *expected_timestamp,
489                        handler_args.clone(),
490                        &self.subscription,
491                    )
492                    .await
493                    {
494                        Ok((Some(rw_timestamp), expected_timestamp)) => {
495                            let (mut chunk_stream, init_query_timer, catalog) =
496                                Self::initiate_query(
497                                    Some(rw_timestamp),
498                                    self.dependent_table_id,
499                                    handler_args.clone(),
500                                    None,
501                                )
502                                .await?;
503                            let table_schema_changed =
504                                self.fields_manager.try_refill_fields(&catalog);
505                            let (fields, formats) = self
506                                .fields_manager
507                                .get_row_stream_fields_and_formats(formats, from_snapshot);
508                            chunk_stream.init_row_stream(
509                                &fields,
510                                &formats,
511                                handler_args.session.clone(),
512                            );
513
514                            self.cursor_need_drop_time = Instant::now()
515                                + Duration::from_secs(self.subscription.retention_seconds);
516                            let mut remaining_rows = VecDeque::new();
517                            Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
518                                .await?;
519                            // Transition to the Fetch state
520                            self.state = State::Fetch {
521                                from_snapshot,
522                                rw_timestamp,
523                                chunk_stream,
524                                remaining_rows,
525                                expected_timestamp,
526                                init_query_timer,
527                            };
528                            if table_schema_changed {
529                                return Ok(None);
530                            }
531                        }
532                        Ok((None, _)) => return Ok(None),
533                        Err(e) => {
534                            self.state = State::Invalid;
535                            return Err(e);
536                        }
537                    }
538                }
539                State::Fetch {
540                    from_snapshot,
541                    rw_timestamp,
542                    chunk_stream,
543                    remaining_rows,
544                    expected_timestamp,
545                    init_query_timer,
546                } => {
547                    let session_data = StaticSessionData {
548                        timezone: handler_args.session.config().timezone(),
549                    };
550                    let from_snapshot = *from_snapshot;
551                    let rw_timestamp = *rw_timestamp;
552
553                    // Try refill remaining rows
554                    Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
555
556                    if let Some(row) = remaining_rows.pop_front() {
557                        // 1. Fetch the next row
558                        if from_snapshot {
559                            return Ok(Some(Self::build_row(
560                                row.take(),
561                                None,
562                                formats,
563                                &session_data,
564                            )?));
565                        } else {
566                            return Ok(Some(Self::build_row(
567                                row.take(),
568                                Some(rw_timestamp),
569                                formats,
570                                &session_data,
571                            )?));
572                        }
573                    } else {
574                        self.cursor_metrics
575                            .subscription_cursor_query_duration
576                            .with_label_values(&[&self.subscription.name])
577                            .observe(init_query_timer.elapsed().as_millis() as _);
578                        // 2. Reach EOF for the current query.
579                        if let Some(expected_timestamp) = expected_timestamp {
580                            self.state = State::InitLogStoreQuery {
581                                seek_timestamp: *expected_timestamp,
582                                expected_timestamp: Some(*expected_timestamp),
583                            };
584                        } else {
585                            self.state = State::InitLogStoreQuery {
586                                seek_timestamp: rw_timestamp + 1,
587                                expected_timestamp: None,
588                            };
589                        }
590                    }
591                }
592                State::Invalid => {
593                    // TODO: auto close invalid cursor?
594                    return Err(ErrorCode::InternalError(
595                        "Cursor is in invalid state. Please close and re-create the cursor."
596                            .to_owned(),
597                    )
598                    .into());
599                }
600            }
601        }
602    }
603
604    pub async fn next(
605        &mut self,
606        count: u32,
607        handler_args: HandlerArgs,
608        formats: &Vec<Format>,
609        timeout_seconds: Option<u64>,
610    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
611        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
612        if Instant::now() > self.cursor_need_drop_time {
613            return Err(ErrorCode::InternalError(
614                "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
615            )
616            .into());
617        }
618
619        let session = &handler_args.session;
620        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
621        let mut cur = 0;
622        if let State::Fetch {
623            from_snapshot,
624            chunk_stream,
625            ..
626        } = &mut self.state
627        {
628            let (fields, fotmats) = self
629                .fields_manager
630                .get_row_stream_fields_and_formats(formats, *from_snapshot);
631            chunk_stream.init_row_stream(&fields, &fotmats, session.clone());
632        }
633        while cur < count {
634            let fetch_cursor_timer = Instant::now();
635            let row = self.next_row(&handler_args, formats).await?;
636            self.cursor_metrics
637                .subscription_cursor_fetch_duration
638                .with_label_values(&[&self.subscription.name])
639                .observe(fetch_cursor_timer.elapsed().as_millis() as _);
640            match row {
641                Some(row) => {
642                    cur += 1;
643                    ans.push(row);
644                }
645                None => {
646                    let timeout_seconds = timeout_seconds.unwrap_or(0);
647                    if cur > 0 || timeout_seconds == 0 {
648                        break;
649                    }
650                    let State::InitLogStoreQuery { seek_timestamp, .. } = &self.state else {
651                        // Triggered when previous next_row returns None while self.state is State::Fetch.
652                        continue;
653                    };
654                    // It's only blocked when there's no data
655                    // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`.
656                    match tokio::time::timeout(
657                        Duration::from_secs(timeout_seconds),
658                        session
659                            .env
660                            .hummock_snapshot_manager()
661                            .wait_table_change_log_notification(
662                                self.dependent_table_id,
663                                *seek_timestamp,
664                            ),
665                    )
666                    .await
667                    {
668                        Ok(result) => result?,
669                        Err(_) => {
670                            tracing::debug!("Cursor wait next epoch timeout");
671                            break;
672                        }
673                    }
674                }
675            }
676            // Timeout, return with current value
677            if let Some(timeout_instant) = timeout_instant
678                && Instant::now() > timeout_instant
679            {
680                break;
681            }
682        }
683        self.last_fetch = Instant::now();
684        let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans);
685        if let Some(seek_pk_row) = seek_pk_row {
686            self.seek_pk_row = Some(seek_pk_row);
687        }
688        let desc = self
689            .fields_manager
690            .get_output_fields()
691            .iter()
692            .map(to_pg_field)
693            .collect();
694
695        Ok((rows, desc))
696    }
697
698    async fn get_next_rw_timestamp(
699        seek_timestamp: u64,
700        table_id: TableId,
701        expected_timestamp: Option<u64>,
702        handler_args: HandlerArgs,
703        dependent_subscription: &SubscriptionCatalog,
704    ) -> Result<(Option<u64>, Option<u64>)> {
705        let session = handler_args.session;
706        // Test subscription existence
707        session.get_subscription_by_schema_id_name(
708            dependent_subscription.schema_id,
709            &dependent_subscription.name,
710        )?;
711
712        // The epoch here must be pulled every time, otherwise there will be cache consistency issues
713        let new_epochs = session
714            .list_change_log_epochs(table_id, seek_timestamp, 2)
715            .await?;
716        if let Some(expected_timestamp) = expected_timestamp
717            && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
718        {
719            return Err(ErrorCode::CatalogError(
720                format!(
721                    " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
722                    convert_logstore_u64_to_unix_millis(expected_timestamp)
723                )
724                .into(),
725            )
726            .into());
727        }
728        Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
729    }
730
731    pub fn gen_batch_plan_result(
732        &self,
733        handler_args: HandlerArgs,
734    ) -> Result<RwBatchQueryPlanResult> {
735        match self.state {
736            // Only used to return generated plans, so rw_timestamp are meaningless
737            State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
738                Some(0),
739                self.dependent_table_id,
740                handler_args,
741                self.seek_pk_row.clone(),
742            ),
743            State::Fetch {
744                from_snapshot,
745                rw_timestamp,
746                ..
747            } => {
748                if from_snapshot {
749                    Self::init_batch_plan_for_subscription_cursor(
750                        None,
751                        self.dependent_table_id,
752                        handler_args,
753                        self.seek_pk_row.clone(),
754                    )
755                } else {
756                    Self::init_batch_plan_for_subscription_cursor(
757                        Some(rw_timestamp),
758                        self.dependent_table_id,
759                        handler_args,
760                        self.seek_pk_row.clone(),
761                    )
762                }
763            }
764            State::Invalid => Err(ErrorCode::InternalError(
765                "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
766            )
767            .into()),
768        }
769    }
770
771    fn init_batch_plan_for_subscription_cursor(
772        rw_timestamp: Option<u64>,
773        dependent_table_id: TableId,
774        handler_args: HandlerArgs,
775        seek_pk_row: Option<Row>,
776    ) -> Result<RwBatchQueryPlanResult> {
777        let session = handler_args.clone().session;
778        let table_catalog = session.get_table_by_id(dependent_table_id)?;
779        let context = OptimizerContext::from_handler_args(handler_args);
780        let version_id = {
781            let version = session.env.hummock_snapshot_manager.acquire();
782            let version = version.version();
783            if !version
784                .state_table_info
785                .info()
786                .contains_key(&dependent_table_id)
787            {
788                return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
789            }
790            version.id
791        };
792        Self::create_batch_plan_for_cursor(
793            table_catalog,
794            &session,
795            context.into(),
796            rw_timestamp.map(|rw_timestamp| (rw_timestamp, rw_timestamp)),
797            version_id,
798            seek_pk_row,
799        )
800    }
801
802    async fn initiate_query(
803        rw_timestamp: Option<u64>,
804        dependent_table_id: TableId,
805        handler_args: HandlerArgs,
806        seek_pk_row: Option<Row>,
807    ) -> Result<(CursorDataChunkStream, Instant, Arc<TableCatalog>)> {
808        let init_query_timer = Instant::now();
809        let session = handler_args.clone().session;
810        let table_catalog = session.get_table_by_id(dependent_table_id)?;
811        let plan_result = Self::init_batch_plan_for_subscription_cursor(
812            rw_timestamp,
813            dependent_table_id,
814            handler_args.clone(),
815            seek_pk_row,
816        )?;
817        let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
818        let (chunk_stream, _) =
819            create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
820        Ok((chunk_stream, init_query_timer, table_catalog))
821    }
822
823    async fn try_refill_remaining_rows(
824        chunk_stream: &mut CursorDataChunkStream,
825        remaining_rows: &mut VecDeque<Row>,
826    ) -> Result<()> {
827        if remaining_rows.is_empty()
828            && let Some(row_set) = chunk_stream.next().await?
829        {
830            remaining_rows.extend(row_set?);
831        }
832        Ok(())
833    }
834
835    pub fn build_row(
836        mut row: Vec<Option<Bytes>>,
837        rw_timestamp: Option<u64>,
838        formats: &Vec<Format>,
839        session_data: &StaticSessionData,
840    ) -> Result<Row> {
841        let row_len = row.len();
842        let new_row = if let Some(rw_timestamp) = rw_timestamp {
843            let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
844            let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
845            let rw_timestamp = pg_value_format(
846                &DataType::Int64,
847                risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
848                *rw_timestamp_formats,
849                session_data,
850            )?;
851            vec![Some(rw_timestamp)]
852        } else {
853            let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
854            let op = pg_value_format(
855                &DataType::Varchar,
856                risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
857                *op_formats,
858                session_data,
859            )?;
860            vec![Some(op), None]
861        };
862        row.extend(new_row);
863        Ok(Row::new(row))
864    }
865
866    pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
867        if from_snapshot {
868            descs.push(Field::with_name(DataType::Varchar, "op"));
869        }
870        descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
871        descs
872    }
873
874    pub fn create_batch_plan_for_cursor(
875        table_catalog: Arc<TableCatalog>,
876        session: &SessionImpl,
877        context: OptimizerContextRef,
878        epoch_range: Option<(u64, u64)>,
879        version_id: HummockVersionId,
880        seek_pk_rows: Option<Row>,
881    ) -> Result<RwBatchQueryPlanResult> {
882        // pk + all column without hidden
883        let output_col_idx = table_catalog
884            .columns
885            .iter()
886            .enumerate()
887            .filter_map(|(index, v)| {
888                if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
889                    Some(index)
890                } else {
891                    None
892                }
893            })
894            .collect::<Vec<_>>();
895        let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64;
896        let pks = table_catalog.pk();
897        let pks = pks
898            .iter()
899            .map(|f| {
900                let pk = table_catalog.columns.get(f.column_index).unwrap();
901                (pk.data_type(), f.column_index)
902            })
903            .collect_vec();
904        let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
905            let mut pk_rows = vec![];
906            let mut values = vec![];
907            for (seek_pk, (data_type, column_index)) in
908                seek_pk_rows.take().into_iter().zip_eq_fast(pks.into_iter())
909            {
910                if let Some(seek_pk) = seek_pk {
911                    pk_rows.push(InputRef {
912                        index: column_index,
913                        data_type: data_type.clone(),
914                    });
915                    let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
916                    let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
917                    values.push((Some(value_data), data_type.clone()));
918                }
919            }
920            if pk_rows.is_empty() {
921                (None, None)
922            } else {
923                let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
924                let right_data = ScalarImpl::Struct(StructValue::new(right_data));
925                let right_type = DataType::Struct(StructType::row_expr_type(right_types));
926                let left = FunctionCall::new_unchecked(
927                    ExprType::Row,
928                    pk_rows.into_iter().map(|pk| pk.into()).collect(),
929                    right_type.clone(),
930                );
931                let right = Literal::new(Some(right_data), right_type);
932                let (scan, predicate) = Condition {
933                    conjunctions: vec![
934                        FunctionCall::new(ExprType::GreaterThan, vec![left.into(), right.into()])?
935                            .into(),
936                    ],
937                }
938                .split_to_scan_ranges(&table_catalog, max_split_range_gap)?;
939                if scan.len() > 1 {
940                    return Err(ErrorCode::InternalError(
941                        "Seek pk row should only generate one scan range".to_owned(),
942                    )
943                    .into());
944                }
945                (scan.first().cloned(), Some(predicate))
946            }
947        } else {
948            (None, None)
949        };
950
951        let (seq_scan, out_fields, out_names) = if let Some(epoch_range) = epoch_range {
952            let core = generic::LogScan::new(
953                table_catalog.name.clone(),
954                output_col_idx,
955                table_catalog.clone(),
956                context,
957                epoch_range,
958                version_id,
959            );
960            let batch_log_seq_scan = BatchLogSeqScan::new(core, scan);
961            let out_fields = batch_log_seq_scan.core().out_fields();
962            let out_names = batch_log_seq_scan.core().column_names();
963            (batch_log_seq_scan.into(), out_fields, out_names)
964        } else {
965            let core = generic::TableScan::new(
966                output_col_idx,
967                table_catalog.clone(),
968                vec![],
969                vec![],
970                context,
971                Condition {
972                    conjunctions: vec![],
973                },
974                None,
975            );
976            let scans = match scan {
977                Some(scan) => vec![scan],
978                None => vec![],
979            };
980            let table_scan = BatchSeqScan::new(core, scans, None);
981            let out_fields = table_scan.core().out_fields();
982            let out_names = table_scan.core().column_names();
983            (table_scan.into(), out_fields, out_names)
984        };
985
986        let plan = if let Some(predicate) = predicate
987            && !predicate.always_true()
988        {
989            BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into()
990        } else {
991            seq_scan
992        };
993
994        // order by pk, so don't need to sort
995        let order = Order::new(table_catalog.pk().to_vec());
996
997        // Here we just need a plan_root to call the method, only out_fields and out_names will be used
998        let plan_root = PlanRoot::new_with_batch_plan(
999            plan,
1000            RequiredDist::single(),
1001            order,
1002            out_fields,
1003            out_names,
1004        );
1005        let schema = plan_root.schema();
1006        let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
1007            QueryMode::Auto | QueryMode::Local => {
1008                (plan_root.gen_batch_local_plan()?, QueryMode::Local)
1009            }
1010            QueryMode::Distributed => (
1011                plan_root.gen_batch_distributed_plan()?,
1012                QueryMode::Distributed,
1013            ),
1014        };
1015        Ok(RwBatchQueryPlanResult {
1016            plan: batch_log_seq_scan,
1017            query_mode,
1018            schema,
1019            stmt_type: StatementType::SELECT,
1020            dependent_relations: vec![],
1021        })
1022    }
1023
1024    pub fn idle_duration(&self) -> Duration {
1025        self.last_fetch.elapsed()
1026    }
1027
1028    pub fn subscription_name(&self) -> &str {
1029        self.subscription.name.as_str()
1030    }
1031
1032    pub fn state_info_string(&self) -> String {
1033        format!("{}", self.state)
1034    }
1035}
1036
1037pub struct CursorManager {
1038    cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
1039    cursor_metrics: Arc<CursorMetrics>,
1040}
1041
1042impl CursorManager {
1043    pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
1044        Self {
1045            cursor_map: tokio::sync::Mutex::new(HashMap::new()),
1046            cursor_metrics,
1047        }
1048    }
1049
1050    pub async fn add_subscription_cursor(
1051        &self,
1052        cursor_name: String,
1053        start_timestamp: Option<u64>,
1054        dependent_table_id: TableId,
1055        subscription: Arc<SubscriptionCatalog>,
1056        handler_args: &HandlerArgs,
1057    ) -> Result<()> {
1058        let create_cursor_timer = Instant::now();
1059        let subscription_name = subscription.name.clone();
1060        let cursor = SubscriptionCursor::new(
1061            cursor_name,
1062            start_timestamp,
1063            subscription,
1064            dependent_table_id,
1065            handler_args,
1066            self.cursor_metrics.clone(),
1067        )
1068        .await?;
1069        let mut cursor_map = self.cursor_map.lock().await;
1070        self.cursor_metrics
1071            .subscription_cursor_declare_duration
1072            .with_label_values(&[&subscription_name])
1073            .observe(create_cursor_timer.elapsed().as_millis() as _);
1074
1075        cursor_map.retain(|_, v| {
1076            if let Cursor::Subscription(cursor) = v
1077                && matches!(cursor.state, State::Invalid)
1078            {
1079                false
1080            } else {
1081                true
1082            }
1083        });
1084
1085        cursor_map
1086            .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
1087            .map_err(|error| {
1088                ErrorCode::CatalogError(
1089                    format!("cursor `{}` already exists", error.entry.key()).into(),
1090                )
1091            })?;
1092        Ok(())
1093    }
1094
1095    pub async fn add_query_cursor(
1096        &self,
1097        cursor_name: String,
1098        chunk_stream: CursorDataChunkStream,
1099        fields: Vec<Field>,
1100    ) -> Result<()> {
1101        let cursor = QueryCursor::new(chunk_stream, fields)?;
1102        self.cursor_map
1103            .lock()
1104            .await
1105            .try_insert(cursor_name, Cursor::Query(cursor))
1106            .map_err(|error| {
1107                ErrorCode::CatalogError(
1108                    format!("cursor `{}` already exists", error.entry.key()).into(),
1109                )
1110            })?;
1111
1112        Ok(())
1113    }
1114
1115    pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
1116        self.cursor_map
1117            .lock()
1118            .await
1119            .remove(cursor_name)
1120            .ok_or_else(|| {
1121                ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
1122            })?;
1123        Ok(())
1124    }
1125
1126    pub async fn remove_all_cursor(&self) {
1127        self.cursor_map.lock().await.clear();
1128    }
1129
1130    pub async fn remove_all_query_cursor(&self) {
1131        self.cursor_map
1132            .lock()
1133            .await
1134            .retain(|_, v| matches!(v, Cursor::Subscription(_)));
1135    }
1136
1137    pub async fn get_rows_with_cursor(
1138        &self,
1139        cursor_name: &str,
1140        count: u32,
1141        handler_args: HandlerArgs,
1142        formats: &Vec<Format>,
1143        timeout_seconds: Option<u64>,
1144    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
1145        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1146            cursor
1147                .next(count, handler_args, formats, timeout_seconds)
1148                .await
1149        } else {
1150            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1151        }
1152    }
1153
1154    pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
1155        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1156            Ok(cursor.get_fields())
1157        } else {
1158            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1159        }
1160    }
1161
1162    pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
1163        let mut subscription_cursor_nums = 0;
1164        let mut invalid_subscription_cursor_nums = 0;
1165        let mut subscription_cursor_last_fetch_duration = HashMap::new();
1166        for (_, cursor) in self.cursor_map.lock().await.iter() {
1167            if let Cursor::Subscription(subscription_cursor) = cursor {
1168                subscription_cursor_nums += 1;
1169                if matches!(subscription_cursor.state, State::Invalid) {
1170                    invalid_subscription_cursor_nums += 1;
1171                } else {
1172                    let fetch_duration =
1173                        subscription_cursor.last_fetch.elapsed().as_millis() as f64;
1174                    subscription_cursor_last_fetch_duration.insert(
1175                        subscription_cursor.subscription.name.clone(),
1176                        fetch_duration,
1177                    );
1178                }
1179            }
1180        }
1181        PeriodicCursorMetrics {
1182            subscription_cursor_nums,
1183            invalid_subscription_cursor_nums,
1184            subscription_cursor_last_fetch_duration,
1185        }
1186    }
1187
1188    pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1189        self.cursor_map
1190            .lock()
1191            .await
1192            .iter()
1193            .for_each(|(cursor_name, cursor)| {
1194                if let Cursor::Query(cursor) = cursor {
1195                    f(cursor_name, cursor)
1196                }
1197            });
1198    }
1199
1200    pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1201        self.cursor_map
1202            .lock()
1203            .await
1204            .iter()
1205            .for_each(|(cursor_name, cursor)| {
1206                if let Cursor::Subscription(cursor) = cursor {
1207                    f(cursor_name, cursor)
1208                }
1209            });
1210    }
1211
1212    pub async fn gen_batch_plan_with_subscription_cursor(
1213        &self,
1214        cursor_name: &str,
1215        handler_args: HandlerArgs,
1216    ) -> Result<RwBatchQueryPlanResult> {
1217        match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1218            ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1219        })? {
1220            Cursor::Subscription(cursor) => {
1221                cursor.gen_batch_plan_result(handler_args.clone())
1222            },
1223            Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1224        }
1225    }
1226}