risingwave_frontend/session/
cursor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::rc::Rc;
20use std::sync::Arc;
21use std::time::Instant;
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use futures::StreamExt;
26use pgwire::pg_field_descriptor::PgFieldDescriptor;
27use pgwire::pg_response::StatementType;
28use pgwire::types::{Format, Row};
29use risingwave_common::catalog::Field;
30use risingwave_common::error::BoxedError;
31use risingwave_common::session_config::QueryMode;
32use risingwave_common::types::DataType;
33use risingwave_common::util::sort_util::ColumnOrder;
34use risingwave_hummock_sdk::HummockVersionId;
35use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};
36
37use super::SessionImpl;
38use crate::catalog::TableId;
39use crate::catalog::subscription_catalog::SubscriptionCatalog;
40use crate::error::{ErrorCode, Result, RwError};
41use crate::handler::HandlerArgs;
42use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
43use crate::handler::query::{
44    BatchQueryPlanResult, gen_batch_plan_by_statement, gen_batch_plan_fragmenter,
45};
46use crate::handler::util::{
47    DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
48    gen_query_from_table_name_order_by, pg_value_format, to_pg_field,
49};
50use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
51use crate::optimizer::PlanRoot;
52use crate::optimizer::plan_node::{BatchLogSeqScan, generic};
53use crate::optimizer::property::{Order, RequiredDist};
54use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
55use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog};
56
57pub enum CursorDataChunkStream {
58    LocalDataChunk(Option<LocalQueryStream>),
59    DistributedDataChunk(Option<DistributedQueryStream>),
60    PgResponse(PgResponseStream),
61}
62
63impl CursorDataChunkStream {
64    pub fn init_row_stream(
65        &mut self,
66        fields: &Vec<Field>,
67        formats: &Vec<Format>,
68        session: Arc<SessionImpl>,
69    ) {
70        let columns_type = fields.iter().map(|f| f.data_type().clone()).collect();
71        match self {
72            CursorDataChunkStream::LocalDataChunk(data_chunk) => {
73                let data_chunk = mem::take(data_chunk).unwrap();
74                let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
75                    data_chunk,
76                    columns_type,
77                    formats.clone(),
78                    session,
79                ));
80                *self = CursorDataChunkStream::PgResponse(row_stream);
81            }
82            CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
83                let data_chunk = mem::take(data_chunk).unwrap();
84                let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
85                    data_chunk,
86                    columns_type,
87                    formats.clone(),
88                    session,
89                ));
90                *self = CursorDataChunkStream::PgResponse(row_stream);
91            }
92            _ => {}
93        }
94    }
95
96    pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
97        match self {
98            CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
99            _ => Err(ErrorCode::InternalError(
100                "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
101            )
102            .into()),
103        }
104    }
105}
106pub enum Cursor {
107    Subscription(SubscriptionCursor),
108    Query(QueryCursor),
109}
110impl Cursor {
111    pub async fn next(
112        &mut self,
113        count: u32,
114        handler_args: HandlerArgs,
115        formats: &Vec<Format>,
116        timeout_seconds: Option<u64>,
117    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
118        match self {
119            Cursor::Subscription(cursor) => cursor
120                .next(count, handler_args, formats, timeout_seconds)
121                .await
122                .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
123            Cursor::Query(cursor) => {
124                cursor
125                    .next(count, formats, handler_args, timeout_seconds)
126                    .await
127            }
128        }
129    }
130
131    pub fn get_fields(&mut self) -> Vec<Field> {
132        match self {
133            Cursor::Subscription(cursor) => cursor.fields.clone(),
134            Cursor::Query(cursor) => cursor.fields.clone(),
135        }
136    }
137}
138
139pub struct QueryCursor {
140    chunk_stream: CursorDataChunkStream,
141    fields: Vec<Field>,
142    remaining_rows: VecDeque<Row>,
143}
144
145impl QueryCursor {
146    pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
147        Ok(Self {
148            chunk_stream,
149            fields,
150            remaining_rows: VecDeque::<Row>::new(),
151        })
152    }
153
154    pub async fn next_once(&mut self) -> Result<Option<Row>> {
155        while self.remaining_rows.is_empty() {
156            let rows = self.chunk_stream.next().await?;
157            let rows = match rows {
158                None => return Ok(None),
159                Some(row) => row?,
160            };
161            self.remaining_rows = rows.into_iter().collect();
162        }
163        let row = self.remaining_rows.pop_front().unwrap();
164        Ok(Some(row))
165    }
166
167    pub async fn next(
168        &mut self,
169        count: u32,
170        formats: &Vec<Format>,
171        handler_args: HandlerArgs,
172        timeout_seconds: Option<u64>,
173    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
174        // `FETCH NEXT` is equivalent to `FETCH 1`.
175        // min with 100 to avoid allocating too many memory at once.
176        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
177        let session = handler_args.session;
178        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
179        let mut cur = 0;
180        let desc = self.fields.iter().map(to_pg_field).collect();
181        self.chunk_stream
182            .init_row_stream(&self.fields, formats, session);
183        while cur < count
184            && let Some(row) = self.next_once().await?
185        {
186            cur += 1;
187            ans.push(row);
188            if let Some(timeout_instant) = timeout_instant
189                && Instant::now() > timeout_instant
190            {
191                break;
192            }
193        }
194        Ok((ans, desc))
195    }
196}
197
198enum State {
199    InitLogStoreQuery {
200        // The rw_timestamp used to initiate the query to read from subscription logstore.
201        seek_timestamp: u64,
202
203        // If specified, the expected_timestamp must be an exact match for the next rw_timestamp.
204        expected_timestamp: Option<u64>,
205    },
206    Fetch {
207        // Whether the query is reading from snapshot
208        // true: read from the upstream table snapshot
209        // false: read from subscription logstore
210        from_snapshot: bool,
211
212        // The rw_timestamp used to initiate the query to read from subscription logstore.
213        rw_timestamp: u64,
214
215        // The row stream to from the batch query read.
216        // It is returned from the batch execution.
217        chunk_stream: CursorDataChunkStream,
218
219        // A cache to store the remaining rows from the row stream.
220        remaining_rows: VecDeque<Row>,
221
222        expected_timestamp: Option<u64>,
223
224        init_query_timer: Instant,
225    },
226    Invalid,
227}
228
229impl Display for State {
230    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231        match self {
232            State::InitLogStoreQuery {
233                seek_timestamp,
234                expected_timestamp,
235            } => write!(
236                f,
237                "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
238                seek_timestamp, expected_timestamp
239            ),
240            State::Fetch {
241                from_snapshot,
242                rw_timestamp,
243                expected_timestamp,
244                remaining_rows,
245                init_query_timer,
246                ..
247            } => write!(
248                f,
249                "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
250                from_snapshot,
251                rw_timestamp,
252                expected_timestamp,
253                remaining_rows.len(),
254                init_query_timer.elapsed().as_millis()
255            ),
256            State::Invalid => write!(f, "Invalid"),
257        }
258    }
259}
260
261pub struct SubscriptionCursor {
262    cursor_name: String,
263    subscription: Arc<SubscriptionCatalog>,
264    dependent_table_id: TableId,
265    cursor_need_drop_time: Instant,
266    state: State,
267    // fields will be set in the table's catalog when the cursor is created,
268    // and will be reset each time it is created chunk_stream, this is to avoid changes in the catalog due to alter.
269    fields: Vec<Field>,
270    cursor_metrics: Arc<CursorMetrics>,
271    last_fetch: Instant,
272}
273
274impl SubscriptionCursor {
275    pub async fn new(
276        cursor_name: String,
277        start_timestamp: Option<u64>,
278        subscription: Arc<SubscriptionCatalog>,
279        dependent_table_id: TableId,
280        handler_args: &HandlerArgs,
281        cursor_metrics: Arc<CursorMetrics>,
282    ) -> Result<Self> {
283        let (state, fields) = if let Some(start_timestamp) = start_timestamp {
284            let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?;
285            let fields = table_catalog
286                .columns
287                .iter()
288                .filter(|c| !c.is_hidden)
289                .map(|c| Field::with_name(c.data_type().clone(), c.name()))
290                .collect();
291            let fields = Self::build_desc(fields, true);
292            (
293                State::InitLogStoreQuery {
294                    seek_timestamp: start_timestamp,
295                    expected_timestamp: None,
296                },
297                fields,
298            )
299        } else {
300            // The query stream needs to initiated on cursor creation to make sure
301            // future fetch on the cursor starts from the snapshot when the cursor is declared.
302            //
303            // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch?
304            let (chunk_stream, fields, init_query_timer) =
305                Self::initiate_query(None, &dependent_table_id, handler_args.clone()).await?;
306            let pinned_epoch = handler_args
307                .session
308                .env
309                .hummock_snapshot_manager
310                .acquire()
311                .version()
312                .state_table_info
313                .info()
314                .get(&dependent_table_id)
315                .ok_or_else(|| anyhow!("dependent_table_id {dependent_table_id} not exists"))?
316                .committed_epoch;
317            let start_timestamp = pinned_epoch;
318
319            (
320                State::Fetch {
321                    from_snapshot: true,
322                    rw_timestamp: start_timestamp,
323                    chunk_stream,
324                    remaining_rows: VecDeque::new(),
325                    expected_timestamp: None,
326                    init_query_timer,
327                },
328                fields,
329            )
330        };
331
332        let cursor_need_drop_time =
333            Instant::now() + Duration::from_secs(subscription.retention_seconds);
334        Ok(Self {
335            cursor_name,
336            subscription,
337            dependent_table_id,
338            cursor_need_drop_time,
339            state,
340            fields,
341            cursor_metrics,
342            last_fetch: Instant::now(),
343        })
344    }
345
346    async fn next_row(
347        &mut self,
348        handler_args: &HandlerArgs,
349        formats: &Vec<Format>,
350    ) -> Result<Option<Row>> {
351        loop {
352            match &mut self.state {
353                State::InitLogStoreQuery {
354                    seek_timestamp,
355                    expected_timestamp,
356                } => {
357                    let from_snapshot = false;
358
359                    // Initiate a new batch query to continue fetching
360                    match Self::get_next_rw_timestamp(
361                        *seek_timestamp,
362                        &self.dependent_table_id,
363                        *expected_timestamp,
364                        handler_args.clone(),
365                        &self.subscription,
366                    ) {
367                        Ok((Some(rw_timestamp), expected_timestamp)) => {
368                            let (mut chunk_stream, fields, init_query_timer) =
369                                Self::initiate_query(
370                                    Some(rw_timestamp),
371                                    &self.dependent_table_id,
372                                    handler_args.clone(),
373                                )
374                                .await?;
375                            Self::init_row_stream(
376                                &mut chunk_stream,
377                                formats,
378                                &from_snapshot,
379                                &fields,
380                                handler_args.session.clone(),
381                            );
382
383                            self.cursor_need_drop_time = Instant::now()
384                                + Duration::from_secs(self.subscription.retention_seconds);
385                            let mut remaining_rows = VecDeque::new();
386                            Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
387                                .await?;
388                            // Transition to the Fetch state
389                            self.state = State::Fetch {
390                                from_snapshot,
391                                rw_timestamp,
392                                chunk_stream,
393                                remaining_rows,
394                                expected_timestamp,
395                                init_query_timer,
396                            };
397                            if self.fields.ne(&fields) {
398                                self.fields = fields;
399                                return Ok(None);
400                            }
401                        }
402                        Ok((None, _)) => return Ok(None),
403                        Err(e) => {
404                            self.state = State::Invalid;
405                            return Err(e);
406                        }
407                    }
408                }
409                State::Fetch {
410                    from_snapshot,
411                    rw_timestamp,
412                    chunk_stream,
413                    remaining_rows,
414                    expected_timestamp,
415                    init_query_timer,
416                } => {
417                    let session_data = StaticSessionData {
418                        timezone: handler_args.session.config().timezone(),
419                    };
420                    let from_snapshot = *from_snapshot;
421                    let rw_timestamp = *rw_timestamp;
422
423                    // Try refill remaining rows
424                    Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
425
426                    if let Some(row) = remaining_rows.pop_front() {
427                        // 1. Fetch the next row
428                        let new_row = row.take();
429                        if from_snapshot {
430                            return Ok(Some(Row::new(Self::build_row(
431                                new_row,
432                                None,
433                                formats,
434                                &session_data,
435                            )?)));
436                        } else {
437                            return Ok(Some(Row::new(Self::build_row(
438                                new_row,
439                                Some(rw_timestamp),
440                                formats,
441                                &session_data,
442                            )?)));
443                        }
444                    } else {
445                        self.cursor_metrics
446                            .subscription_cursor_query_duration
447                            .with_label_values(&[&self.subscription.name])
448                            .observe(init_query_timer.elapsed().as_millis() as _);
449                        // 2. Reach EOF for the current query.
450                        if let Some(expected_timestamp) = expected_timestamp {
451                            self.state = State::InitLogStoreQuery {
452                                seek_timestamp: *expected_timestamp,
453                                expected_timestamp: Some(*expected_timestamp),
454                            };
455                        } else {
456                            self.state = State::InitLogStoreQuery {
457                                seek_timestamp: rw_timestamp + 1,
458                                expected_timestamp: None,
459                            };
460                        }
461                    }
462                }
463                State::Invalid => {
464                    // TODO: auto close invalid cursor?
465                    return Err(ErrorCode::InternalError(
466                        "Cursor is in invalid state. Please close and re-create the cursor."
467                            .to_owned(),
468                    )
469                    .into());
470                }
471            }
472        }
473    }
474
475    pub async fn next(
476        &mut self,
477        count: u32,
478        handler_args: HandlerArgs,
479        formats: &Vec<Format>,
480        timeout_seconds: Option<u64>,
481    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
482        let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
483        if Instant::now() > self.cursor_need_drop_time {
484            return Err(ErrorCode::InternalError(
485                "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
486            )
487            .into());
488        }
489
490        let session = &handler_args.session;
491        let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
492        let mut cur = 0;
493        if let State::Fetch {
494            from_snapshot,
495            chunk_stream,
496            ..
497        } = &mut self.state
498        {
499            Self::init_row_stream(
500                chunk_stream,
501                formats,
502                from_snapshot,
503                &self.fields,
504                session.clone(),
505            );
506        }
507        while cur < count {
508            let fetch_cursor_timer = Instant::now();
509            let row = self.next_row(&handler_args, formats).await?;
510            self.cursor_metrics
511                .subscription_cursor_fetch_duration
512                .with_label_values(&[&self.subscription.name])
513                .observe(fetch_cursor_timer.elapsed().as_millis() as _);
514            match row {
515                Some(row) => {
516                    cur += 1;
517                    ans.push(row);
518                }
519                None => {
520                    let timeout_seconds = timeout_seconds.unwrap_or(0);
521                    if cur > 0 || timeout_seconds == 0 {
522                        break;
523                    }
524                    // It's only blocked when there's no data
525                    // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`.
526                    match tokio::time::timeout(
527                        Duration::from_secs(timeout_seconds),
528                        session
529                            .env
530                            .hummock_snapshot_manager()
531                            .wait_table_change_log_notification(self.dependent_table_id.table_id()),
532                    )
533                    .await
534                    {
535                        Ok(result) => result?,
536                        Err(_) => {
537                            tracing::debug!("Cursor wait next epoch timeout");
538                            break;
539                        }
540                    }
541                }
542            }
543            // Timeout, return with current value
544            if let Some(timeout_instant) = timeout_instant
545                && Instant::now() > timeout_instant
546            {
547                break;
548            }
549        }
550        self.last_fetch = Instant::now();
551        let desc = self.fields.iter().map(to_pg_field).collect();
552
553        Ok((ans, desc))
554    }
555
556    fn get_next_rw_timestamp(
557        seek_timestamp: u64,
558        table_id: &TableId,
559        expected_timestamp: Option<u64>,
560        handler_args: HandlerArgs,
561        dependent_subscription: &SubscriptionCatalog,
562    ) -> Result<(Option<u64>, Option<u64>)> {
563        let session = handler_args.session;
564        // Test subscription existence
565        session.get_subscription_by_schema_id_name(
566            dependent_subscription.schema_id,
567            &dependent_subscription.name,
568        )?;
569
570        // The epoch here must be pulled every time, otherwise there will be cache consistency issues
571        let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?;
572        if let Some(expected_timestamp) = expected_timestamp
573            && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
574        {
575            return Err(ErrorCode::CatalogError(
576                format!(
577                    " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
578                    convert_logstore_u64_to_unix_millis(expected_timestamp)
579                )
580                .into(),
581            )
582            .into());
583        }
584        Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
585    }
586
587    pub fn gen_batch_plan_result(&self, handler_args: HandlerArgs) -> Result<BatchQueryPlanResult> {
588        match self.state {
589            // Only used to return generated plans, so rw_timestamp are meaningless
590            State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
591                Some(0),
592                &self.dependent_table_id,
593                handler_args,
594            ),
595            State::Fetch {
596                from_snapshot,
597                rw_timestamp,
598                ..
599            } => {
600                if from_snapshot {
601                    Self::init_batch_plan_for_subscription_cursor(
602                        None,
603                        &self.dependent_table_id,
604                        handler_args,
605                    )
606                } else {
607                    Self::init_batch_plan_for_subscription_cursor(
608                        Some(rw_timestamp),
609                        &self.dependent_table_id,
610                        handler_args,
611                    )
612                }
613            }
614            State::Invalid => Err(ErrorCode::InternalError(
615                "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
616            )
617            .into()),
618        }
619    }
620
621    fn init_batch_plan_for_subscription_cursor(
622        rw_timestamp: Option<u64>,
623        dependent_table_id: &TableId,
624        handler_args: HandlerArgs,
625    ) -> Result<BatchQueryPlanResult> {
626        let session = handler_args.clone().session;
627        let table_catalog = session.get_table_by_id(dependent_table_id)?;
628        let pks = table_catalog.pk();
629        let context = OptimizerContext::from_handler_args(handler_args.clone());
630        if let Some(rw_timestamp) = rw_timestamp {
631            let version_id = {
632                let version = session.env.hummock_snapshot_manager.acquire();
633                let version = version.version();
634                if !version
635                    .state_table_info
636                    .info()
637                    .contains_key(dependent_table_id)
638                {
639                    return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
640                }
641                version.id
642            };
643            Self::create_batch_plan_for_cursor(
644                &table_catalog,
645                &session,
646                context.into(),
647                rw_timestamp,
648                rw_timestamp,
649                version_id,
650                pks,
651            )
652        } else {
653            let subscription_from_table_name =
654                ObjectName(vec![Ident::from(table_catalog.name.as_ref())]);
655            let pk_names = pks
656                .iter()
657                .map(|f| {
658                    Ok::<String, RwError>(
659                        table_catalog
660                            .columns
661                            .get(f.column_index)
662                            .ok_or_else(|| {
663                                anyhow!(
664                                    "columns not find in table schema, index is {:?}",
665                                    f.column_index
666                                )
667                            })?
668                            .name()
669                            .to_owned(),
670                    )
671                })
672                .collect::<Result<Vec<_>>>()?;
673            let query_stmt = Statement::Query(Box::new(gen_query_from_table_name_order_by(
674                subscription_from_table_name,
675                pk_names,
676            )));
677            gen_batch_plan_by_statement(&session, context.into(), query_stmt)
678        }
679    }
680
681    async fn initiate_query(
682        rw_timestamp: Option<u64>,
683        dependent_table_id: &TableId,
684        handler_args: HandlerArgs,
685    ) -> Result<(CursorDataChunkStream, Vec<Field>, Instant)> {
686        let init_query_timer = Instant::now();
687        let plan_result = Self::init_batch_plan_for_subscription_cursor(
688            rw_timestamp,
689            dependent_table_id,
690            handler_args.clone(),
691        )?;
692        let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
693        let (chunk_stream, fields) =
694            create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
695        Ok((
696            chunk_stream,
697            Self::build_desc(fields, rw_timestamp.is_none()),
698            init_query_timer,
699        ))
700    }
701
702    async fn try_refill_remaining_rows(
703        chunk_stream: &mut CursorDataChunkStream,
704        remaining_rows: &mut VecDeque<Row>,
705    ) -> Result<()> {
706        if remaining_rows.is_empty()
707            && let Some(row_set) = chunk_stream.next().await?
708        {
709            remaining_rows.extend(row_set?);
710        }
711        Ok(())
712    }
713
714    pub fn build_row(
715        mut row: Vec<Option<Bytes>>,
716        rw_timestamp: Option<u64>,
717        formats: &Vec<Format>,
718        session_data: &StaticSessionData,
719    ) -> Result<Vec<Option<Bytes>>> {
720        let row_len = row.len();
721        let new_row = if let Some(rw_timestamp) = rw_timestamp {
722            let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
723            let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
724            let rw_timestamp = pg_value_format(
725                &DataType::Int64,
726                risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
727                *rw_timestamp_formats,
728                session_data,
729            )?;
730            vec![Some(rw_timestamp)]
731        } else {
732            let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
733            let op = pg_value_format(
734                &DataType::Varchar,
735                risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
736                *op_formats,
737                session_data,
738            )?;
739            vec![Some(op), None]
740        };
741        row.extend(new_row);
742        Ok(row)
743    }
744
745    pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
746        if from_snapshot {
747            descs.push(Field::with_name(DataType::Varchar, "op"));
748        }
749        descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
750        descs
751    }
752
753    pub fn create_batch_plan_for_cursor(
754        table_catalog: &TableCatalog,
755        session: &SessionImpl,
756        context: OptimizerContextRef,
757        old_epoch: u64,
758        new_epoch: u64,
759        version_id: HummockVersionId,
760        pks: &[ColumnOrder],
761    ) -> Result<BatchQueryPlanResult> {
762        // pk + all column without hidden
763        let output_col_idx = table_catalog
764            .columns
765            .iter()
766            .enumerate()
767            .filter_map(|(index, v)| {
768                if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
769                    Some(index)
770                } else {
771                    None
772                }
773            })
774            .collect::<Vec<_>>();
775        let output_col_idx_with_out_hidden = output_col_idx
776            .iter()
777            .filter(|index| !table_catalog.columns[**index].is_hidden)
778            .cloned()
779            .collect::<Vec<_>>();
780        let core = generic::LogScan::new(
781            table_catalog.name.clone(),
782            output_col_idx_with_out_hidden,
783            output_col_idx,
784            Rc::new(table_catalog.table_desc()),
785            context,
786            old_epoch,
787            new_epoch,
788            version_id,
789        );
790
791        let batch_log_seq_scan = BatchLogSeqScan::new(core);
792
793        let out_fields = batch_log_seq_scan.core().out_fields();
794        let out_names = batch_log_seq_scan.core().column_names_without_hidden();
795
796        // order by pk, so don't need to sort
797        let order = Order::new(pks.to_vec());
798
799        // Here we just need a plan_root to call the method, only out_fields and out_names will be used
800        let plan_root = PlanRoot::new_with_batch_plan(
801            PlanRef::from(batch_log_seq_scan.clone()),
802            RequiredDist::single(),
803            order,
804            out_fields,
805            out_names,
806        );
807        let schema = plan_root.schema().clone();
808        let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
809            QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
810            QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
811            QueryMode::Distributed => (
812                plan_root.gen_batch_distributed_plan()?,
813                QueryMode::Distributed,
814            ),
815        };
816        Ok(BatchQueryPlanResult {
817            plan: batch_log_seq_scan,
818            query_mode,
819            schema,
820            stmt_type: StatementType::SELECT,
821            dependent_relations: table_catalog.dependent_relations.clone(),
822            read_storage_tables: HashSet::from_iter([table_catalog.id]),
823        })
824    }
825
826    // In the beginning (declare cur), we will give it an empty formats,
827    // this formats is not a real, when we fetch, We fill it with the formats returned from the pg client.
828    pub fn init_row_stream(
829        chunk_stream: &mut CursorDataChunkStream,
830        formats: &Vec<Format>,
831        from_snapshot: &bool,
832        fields: &Vec<Field>,
833        session: Arc<SessionImpl>,
834    ) {
835        let mut formats = formats.clone();
836        let mut fields = fields.clone();
837        formats.pop();
838        fields.pop();
839        if *from_snapshot {
840            formats.pop();
841            fields.pop();
842        }
843        chunk_stream.init_row_stream(&fields, &formats, session);
844    }
845
846    pub fn idle_duration(&self) -> Duration {
847        self.last_fetch.elapsed()
848    }
849
850    pub fn subscription_name(&self) -> &str {
851        self.subscription.name.as_str()
852    }
853
854    pub fn state_info_string(&self) -> String {
855        format!("{}", self.state)
856    }
857}
858
859pub struct CursorManager {
860    cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
861    cursor_metrics: Arc<CursorMetrics>,
862}
863
864impl CursorManager {
865    pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
866        Self {
867            cursor_map: tokio::sync::Mutex::new(HashMap::new()),
868            cursor_metrics,
869        }
870    }
871
872    pub async fn add_subscription_cursor(
873        &self,
874        cursor_name: String,
875        start_timestamp: Option<u64>,
876        dependent_table_id: TableId,
877        subscription: Arc<SubscriptionCatalog>,
878        handler_args: &HandlerArgs,
879    ) -> Result<()> {
880        let create_cursor_timer = Instant::now();
881        let subscription_name = subscription.name.clone();
882        let cursor = SubscriptionCursor::new(
883            cursor_name,
884            start_timestamp,
885            subscription,
886            dependent_table_id,
887            handler_args,
888            self.cursor_metrics.clone(),
889        )
890        .await?;
891        let mut cursor_map = self.cursor_map.lock().await;
892        self.cursor_metrics
893            .subscription_cursor_declare_duration
894            .with_label_values(&[&subscription_name])
895            .observe(create_cursor_timer.elapsed().as_millis() as _);
896
897        cursor_map.retain(|_, v| {
898            if let Cursor::Subscription(cursor) = v
899                && matches!(cursor.state, State::Invalid)
900            {
901                false
902            } else {
903                true
904            }
905        });
906
907        cursor_map
908            .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
909            .map_err(|error| {
910                ErrorCode::CatalogError(
911                    format!("cursor `{}` already exists", error.entry.key()).into(),
912                )
913            })?;
914        Ok(())
915    }
916
917    pub async fn add_query_cursor(
918        &self,
919        cursor_name: String,
920        chunk_stream: CursorDataChunkStream,
921        fields: Vec<Field>,
922    ) -> Result<()> {
923        let cursor = QueryCursor::new(chunk_stream, fields)?;
924        self.cursor_map
925            .lock()
926            .await
927            .try_insert(cursor_name, Cursor::Query(cursor))
928            .map_err(|error| {
929                ErrorCode::CatalogError(
930                    format!("cursor `{}` already exists", error.entry.key()).into(),
931                )
932            })?;
933
934        Ok(())
935    }
936
937    pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
938        self.cursor_map
939            .lock()
940            .await
941            .remove(cursor_name)
942            .ok_or_else(|| {
943                ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
944            })?;
945        Ok(())
946    }
947
948    pub async fn remove_all_cursor(&self) {
949        self.cursor_map.lock().await.clear();
950    }
951
952    pub async fn remove_all_query_cursor(&self) {
953        self.cursor_map
954            .lock()
955            .await
956            .retain(|_, v| matches!(v, Cursor::Subscription(_)));
957    }
958
959    pub async fn get_rows_with_cursor(
960        &self,
961        cursor_name: &str,
962        count: u32,
963        handler_args: HandlerArgs,
964        formats: &Vec<Format>,
965        timeout_seconds: Option<u64>,
966    ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
967        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
968            cursor
969                .next(count, handler_args, formats, timeout_seconds)
970                .await
971        } else {
972            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
973        }
974    }
975
976    pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
977        if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
978            Ok(cursor.get_fields())
979        } else {
980            Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
981        }
982    }
983
984    pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
985        let mut subsription_cursor_nums = 0;
986        let mut invalid_subsription_cursor_nums = 0;
987        let mut subscription_cursor_last_fetch_duration = HashMap::new();
988        for (_, cursor) in self.cursor_map.lock().await.iter() {
989            if let Cursor::Subscription(subscription_cursor) = cursor {
990                subsription_cursor_nums += 1;
991                if matches!(subscription_cursor.state, State::Invalid) {
992                    invalid_subsription_cursor_nums += 1;
993                } else {
994                    let fetch_duration =
995                        subscription_cursor.last_fetch.elapsed().as_millis() as f64;
996                    subscription_cursor_last_fetch_duration.insert(
997                        subscription_cursor.subscription.name.clone(),
998                        fetch_duration,
999                    );
1000                }
1001            }
1002        }
1003        PeriodicCursorMetrics {
1004            subsription_cursor_nums,
1005            invalid_subsription_cursor_nums,
1006            subscription_cursor_last_fetch_duration,
1007        }
1008    }
1009
1010    pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1011        self.cursor_map
1012            .lock()
1013            .await
1014            .iter()
1015            .for_each(|(cursor_name, cursor)| {
1016                if let Cursor::Query(cursor) = cursor {
1017                    f(cursor_name, cursor)
1018                }
1019            });
1020    }
1021
1022    pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1023        self.cursor_map
1024            .lock()
1025            .await
1026            .iter()
1027            .for_each(|(cursor_name, cursor)| {
1028                if let Cursor::Subscription(cursor) = cursor {
1029                    f(cursor_name, cursor)
1030                }
1031            });
1032    }
1033
1034    pub async fn gen_batch_plan_with_subscription_cursor(
1035        &self,
1036        cursor_name: &str,
1037        handler_args: HandlerArgs,
1038    ) -> Result<BatchQueryPlanResult> {
1039        match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1040            ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1041        })? {
1042            Cursor::Subscription(cursor) => {
1043                cursor.gen_batch_plan_result(handler_args.clone())
1044            },
1045            Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1046        }
1047    }
1048}