1use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::rc::Rc;
20use std::sync::Arc;
21use std::time::Instant;
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use futures::StreamExt;
26use pgwire::pg_field_descriptor::PgFieldDescriptor;
27use pgwire::pg_response::StatementType;
28use pgwire::types::{Format, Row};
29use risingwave_common::catalog::Field;
30use risingwave_common::error::BoxedError;
31use risingwave_common::session_config::QueryMode;
32use risingwave_common::types::DataType;
33use risingwave_common::util::sort_util::ColumnOrder;
34use risingwave_hummock_sdk::HummockVersionId;
35use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};
36
37use super::SessionImpl;
38use crate::catalog::TableId;
39use crate::catalog::subscription_catalog::SubscriptionCatalog;
40use crate::error::{ErrorCode, Result, RwError};
41use crate::handler::HandlerArgs;
42use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
43use crate::handler::query::{
44 BatchQueryPlanResult, gen_batch_plan_by_statement, gen_batch_plan_fragmenter,
45};
46use crate::handler::util::{
47 DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
48 gen_query_from_table_name_order_by, pg_value_format, to_pg_field,
49};
50use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
51use crate::optimizer::PlanRoot;
52use crate::optimizer::plan_node::{BatchLogSeqScan, generic};
53use crate::optimizer::property::{Order, RequiredDist};
54use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
55use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog};
56
57pub enum CursorDataChunkStream {
58 LocalDataChunk(Option<LocalQueryStream>),
59 DistributedDataChunk(Option<DistributedQueryStream>),
60 PgResponse(PgResponseStream),
61}
62
63impl CursorDataChunkStream {
64 pub fn init_row_stream(
65 &mut self,
66 fields: &Vec<Field>,
67 formats: &Vec<Format>,
68 session: Arc<SessionImpl>,
69 ) {
70 let columns_type = fields.iter().map(|f| f.data_type().clone()).collect();
71 match self {
72 CursorDataChunkStream::LocalDataChunk(data_chunk) => {
73 let data_chunk = mem::take(data_chunk).unwrap();
74 let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
75 data_chunk,
76 columns_type,
77 formats.clone(),
78 session,
79 ));
80 *self = CursorDataChunkStream::PgResponse(row_stream);
81 }
82 CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
83 let data_chunk = mem::take(data_chunk).unwrap();
84 let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
85 data_chunk,
86 columns_type,
87 formats.clone(),
88 session,
89 ));
90 *self = CursorDataChunkStream::PgResponse(row_stream);
91 }
92 _ => {}
93 }
94 }
95
96 pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
97 match self {
98 CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
99 _ => Err(ErrorCode::InternalError(
100 "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
101 )
102 .into()),
103 }
104 }
105}
106pub enum Cursor {
107 Subscription(SubscriptionCursor),
108 Query(QueryCursor),
109}
110impl Cursor {
111 pub async fn next(
112 &mut self,
113 count: u32,
114 handler_args: HandlerArgs,
115 formats: &Vec<Format>,
116 timeout_seconds: Option<u64>,
117 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
118 match self {
119 Cursor::Subscription(cursor) => cursor
120 .next(count, handler_args, formats, timeout_seconds)
121 .await
122 .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
123 Cursor::Query(cursor) => {
124 cursor
125 .next(count, formats, handler_args, timeout_seconds)
126 .await
127 }
128 }
129 }
130
131 pub fn get_fields(&mut self) -> Vec<Field> {
132 match self {
133 Cursor::Subscription(cursor) => cursor.fields.clone(),
134 Cursor::Query(cursor) => cursor.fields.clone(),
135 }
136 }
137}
138
139pub struct QueryCursor {
140 chunk_stream: CursorDataChunkStream,
141 fields: Vec<Field>,
142 remaining_rows: VecDeque<Row>,
143}
144
145impl QueryCursor {
146 pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
147 Ok(Self {
148 chunk_stream,
149 fields,
150 remaining_rows: VecDeque::<Row>::new(),
151 })
152 }
153
154 pub async fn next_once(&mut self) -> Result<Option<Row>> {
155 while self.remaining_rows.is_empty() {
156 let rows = self.chunk_stream.next().await?;
157 let rows = match rows {
158 None => return Ok(None),
159 Some(row) => row?,
160 };
161 self.remaining_rows = rows.into_iter().collect();
162 }
163 let row = self.remaining_rows.pop_front().unwrap();
164 Ok(Some(row))
165 }
166
167 pub async fn next(
168 &mut self,
169 count: u32,
170 formats: &Vec<Format>,
171 handler_args: HandlerArgs,
172 timeout_seconds: Option<u64>,
173 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
174 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
177 let session = handler_args.session;
178 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
179 let mut cur = 0;
180 let desc = self.fields.iter().map(to_pg_field).collect();
181 self.chunk_stream
182 .init_row_stream(&self.fields, formats, session);
183 while cur < count
184 && let Some(row) = self.next_once().await?
185 {
186 cur += 1;
187 ans.push(row);
188 if let Some(timeout_instant) = timeout_instant
189 && Instant::now() > timeout_instant
190 {
191 break;
192 }
193 }
194 Ok((ans, desc))
195 }
196}
197
198enum State {
199 InitLogStoreQuery {
200 seek_timestamp: u64,
202
203 expected_timestamp: Option<u64>,
205 },
206 Fetch {
207 from_snapshot: bool,
211
212 rw_timestamp: u64,
214
215 chunk_stream: CursorDataChunkStream,
218
219 remaining_rows: VecDeque<Row>,
221
222 expected_timestamp: Option<u64>,
223
224 init_query_timer: Instant,
225 },
226 Invalid,
227}
228
229impl Display for State {
230 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231 match self {
232 State::InitLogStoreQuery {
233 seek_timestamp,
234 expected_timestamp,
235 } => write!(
236 f,
237 "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
238 seek_timestamp, expected_timestamp
239 ),
240 State::Fetch {
241 from_snapshot,
242 rw_timestamp,
243 expected_timestamp,
244 remaining_rows,
245 init_query_timer,
246 ..
247 } => write!(
248 f,
249 "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
250 from_snapshot,
251 rw_timestamp,
252 expected_timestamp,
253 remaining_rows.len(),
254 init_query_timer.elapsed().as_millis()
255 ),
256 State::Invalid => write!(f, "Invalid"),
257 }
258 }
259}
260
261pub struct SubscriptionCursor {
262 cursor_name: String,
263 subscription: Arc<SubscriptionCatalog>,
264 dependent_table_id: TableId,
265 cursor_need_drop_time: Instant,
266 state: State,
267 fields: Vec<Field>,
270 cursor_metrics: Arc<CursorMetrics>,
271 last_fetch: Instant,
272}
273
274impl SubscriptionCursor {
275 pub async fn new(
276 cursor_name: String,
277 start_timestamp: Option<u64>,
278 subscription: Arc<SubscriptionCatalog>,
279 dependent_table_id: TableId,
280 handler_args: &HandlerArgs,
281 cursor_metrics: Arc<CursorMetrics>,
282 ) -> Result<Self> {
283 let (state, fields) = if let Some(start_timestamp) = start_timestamp {
284 let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?;
285 let fields = table_catalog
286 .columns
287 .iter()
288 .filter(|c| !c.is_hidden)
289 .map(|c| Field::with_name(c.data_type().clone(), c.name()))
290 .collect();
291 let fields = Self::build_desc(fields, true);
292 (
293 State::InitLogStoreQuery {
294 seek_timestamp: start_timestamp,
295 expected_timestamp: None,
296 },
297 fields,
298 )
299 } else {
300 let (chunk_stream, fields, init_query_timer) =
305 Self::initiate_query(None, &dependent_table_id, handler_args.clone()).await?;
306 let pinned_epoch = handler_args
307 .session
308 .env
309 .hummock_snapshot_manager
310 .acquire()
311 .version()
312 .state_table_info
313 .info()
314 .get(&dependent_table_id)
315 .ok_or_else(|| anyhow!("dependent_table_id {dependent_table_id} not exists"))?
316 .committed_epoch;
317 let start_timestamp = pinned_epoch;
318
319 (
320 State::Fetch {
321 from_snapshot: true,
322 rw_timestamp: start_timestamp,
323 chunk_stream,
324 remaining_rows: VecDeque::new(),
325 expected_timestamp: None,
326 init_query_timer,
327 },
328 fields,
329 )
330 };
331
332 let cursor_need_drop_time =
333 Instant::now() + Duration::from_secs(subscription.retention_seconds);
334 Ok(Self {
335 cursor_name,
336 subscription,
337 dependent_table_id,
338 cursor_need_drop_time,
339 state,
340 fields,
341 cursor_metrics,
342 last_fetch: Instant::now(),
343 })
344 }
345
346 async fn next_row(
347 &mut self,
348 handler_args: &HandlerArgs,
349 formats: &Vec<Format>,
350 ) -> Result<Option<Row>> {
351 loop {
352 match &mut self.state {
353 State::InitLogStoreQuery {
354 seek_timestamp,
355 expected_timestamp,
356 } => {
357 let from_snapshot = false;
358
359 match Self::get_next_rw_timestamp(
361 *seek_timestamp,
362 &self.dependent_table_id,
363 *expected_timestamp,
364 handler_args.clone(),
365 &self.subscription,
366 ) {
367 Ok((Some(rw_timestamp), expected_timestamp)) => {
368 let (mut chunk_stream, fields, init_query_timer) =
369 Self::initiate_query(
370 Some(rw_timestamp),
371 &self.dependent_table_id,
372 handler_args.clone(),
373 )
374 .await?;
375 Self::init_row_stream(
376 &mut chunk_stream,
377 formats,
378 &from_snapshot,
379 &fields,
380 handler_args.session.clone(),
381 );
382
383 self.cursor_need_drop_time = Instant::now()
384 + Duration::from_secs(self.subscription.retention_seconds);
385 let mut remaining_rows = VecDeque::new();
386 Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
387 .await?;
388 self.state = State::Fetch {
390 from_snapshot,
391 rw_timestamp,
392 chunk_stream,
393 remaining_rows,
394 expected_timestamp,
395 init_query_timer,
396 };
397 if self.fields.ne(&fields) {
398 self.fields = fields;
399 return Ok(None);
400 }
401 }
402 Ok((None, _)) => return Ok(None),
403 Err(e) => {
404 self.state = State::Invalid;
405 return Err(e);
406 }
407 }
408 }
409 State::Fetch {
410 from_snapshot,
411 rw_timestamp,
412 chunk_stream,
413 remaining_rows,
414 expected_timestamp,
415 init_query_timer,
416 } => {
417 let session_data = StaticSessionData {
418 timezone: handler_args.session.config().timezone(),
419 };
420 let from_snapshot = *from_snapshot;
421 let rw_timestamp = *rw_timestamp;
422
423 Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
425
426 if let Some(row) = remaining_rows.pop_front() {
427 let new_row = row.take();
429 if from_snapshot {
430 return Ok(Some(Row::new(Self::build_row(
431 new_row,
432 None,
433 formats,
434 &session_data,
435 )?)));
436 } else {
437 return Ok(Some(Row::new(Self::build_row(
438 new_row,
439 Some(rw_timestamp),
440 formats,
441 &session_data,
442 )?)));
443 }
444 } else {
445 self.cursor_metrics
446 .subscription_cursor_query_duration
447 .with_label_values(&[&self.subscription.name])
448 .observe(init_query_timer.elapsed().as_millis() as _);
449 if let Some(expected_timestamp) = expected_timestamp {
451 self.state = State::InitLogStoreQuery {
452 seek_timestamp: *expected_timestamp,
453 expected_timestamp: Some(*expected_timestamp),
454 };
455 } else {
456 self.state = State::InitLogStoreQuery {
457 seek_timestamp: rw_timestamp + 1,
458 expected_timestamp: None,
459 };
460 }
461 }
462 }
463 State::Invalid => {
464 return Err(ErrorCode::InternalError(
466 "Cursor is in invalid state. Please close and re-create the cursor."
467 .to_owned(),
468 )
469 .into());
470 }
471 }
472 }
473 }
474
475 pub async fn next(
476 &mut self,
477 count: u32,
478 handler_args: HandlerArgs,
479 formats: &Vec<Format>,
480 timeout_seconds: Option<u64>,
481 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
482 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
483 if Instant::now() > self.cursor_need_drop_time {
484 return Err(ErrorCode::InternalError(
485 "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
486 )
487 .into());
488 }
489
490 let session = &handler_args.session;
491 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
492 let mut cur = 0;
493 if let State::Fetch {
494 from_snapshot,
495 chunk_stream,
496 ..
497 } = &mut self.state
498 {
499 Self::init_row_stream(
500 chunk_stream,
501 formats,
502 from_snapshot,
503 &self.fields,
504 session.clone(),
505 );
506 }
507 while cur < count {
508 let fetch_cursor_timer = Instant::now();
509 let row = self.next_row(&handler_args, formats).await?;
510 self.cursor_metrics
511 .subscription_cursor_fetch_duration
512 .with_label_values(&[&self.subscription.name])
513 .observe(fetch_cursor_timer.elapsed().as_millis() as _);
514 match row {
515 Some(row) => {
516 cur += 1;
517 ans.push(row);
518 }
519 None => {
520 let timeout_seconds = timeout_seconds.unwrap_or(0);
521 if cur > 0 || timeout_seconds == 0 {
522 break;
523 }
524 match tokio::time::timeout(
527 Duration::from_secs(timeout_seconds),
528 session
529 .env
530 .hummock_snapshot_manager()
531 .wait_table_change_log_notification(self.dependent_table_id.table_id()),
532 )
533 .await
534 {
535 Ok(result) => result?,
536 Err(_) => {
537 tracing::debug!("Cursor wait next epoch timeout");
538 break;
539 }
540 }
541 }
542 }
543 if let Some(timeout_instant) = timeout_instant
545 && Instant::now() > timeout_instant
546 {
547 break;
548 }
549 }
550 self.last_fetch = Instant::now();
551 let desc = self.fields.iter().map(to_pg_field).collect();
552
553 Ok((ans, desc))
554 }
555
556 fn get_next_rw_timestamp(
557 seek_timestamp: u64,
558 table_id: &TableId,
559 expected_timestamp: Option<u64>,
560 handler_args: HandlerArgs,
561 dependent_subscription: &SubscriptionCatalog,
562 ) -> Result<(Option<u64>, Option<u64>)> {
563 let session = handler_args.session;
564 session.get_subscription_by_schema_id_name(
566 dependent_subscription.schema_id,
567 &dependent_subscription.name,
568 )?;
569
570 let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?;
572 if let Some(expected_timestamp) = expected_timestamp
573 && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
574 {
575 return Err(ErrorCode::CatalogError(
576 format!(
577 " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
578 convert_logstore_u64_to_unix_millis(expected_timestamp)
579 )
580 .into(),
581 )
582 .into());
583 }
584 Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
585 }
586
587 pub fn gen_batch_plan_result(&self, handler_args: HandlerArgs) -> Result<BatchQueryPlanResult> {
588 match self.state {
589 State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
591 Some(0),
592 &self.dependent_table_id,
593 handler_args,
594 ),
595 State::Fetch {
596 from_snapshot,
597 rw_timestamp,
598 ..
599 } => {
600 if from_snapshot {
601 Self::init_batch_plan_for_subscription_cursor(
602 None,
603 &self.dependent_table_id,
604 handler_args,
605 )
606 } else {
607 Self::init_batch_plan_for_subscription_cursor(
608 Some(rw_timestamp),
609 &self.dependent_table_id,
610 handler_args,
611 )
612 }
613 }
614 State::Invalid => Err(ErrorCode::InternalError(
615 "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
616 )
617 .into()),
618 }
619 }
620
621 fn init_batch_plan_for_subscription_cursor(
622 rw_timestamp: Option<u64>,
623 dependent_table_id: &TableId,
624 handler_args: HandlerArgs,
625 ) -> Result<BatchQueryPlanResult> {
626 let session = handler_args.clone().session;
627 let table_catalog = session.get_table_by_id(dependent_table_id)?;
628 let pks = table_catalog.pk();
629 let context = OptimizerContext::from_handler_args(handler_args.clone());
630 if let Some(rw_timestamp) = rw_timestamp {
631 let version_id = {
632 let version = session.env.hummock_snapshot_manager.acquire();
633 let version = version.version();
634 if !version
635 .state_table_info
636 .info()
637 .contains_key(dependent_table_id)
638 {
639 return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
640 }
641 version.id
642 };
643 Self::create_batch_plan_for_cursor(
644 &table_catalog,
645 &session,
646 context.into(),
647 rw_timestamp,
648 rw_timestamp,
649 version_id,
650 pks,
651 )
652 } else {
653 let subscription_from_table_name =
654 ObjectName(vec![Ident::from(table_catalog.name.as_ref())]);
655 let pk_names = pks
656 .iter()
657 .map(|f| {
658 Ok::<String, RwError>(
659 table_catalog
660 .columns
661 .get(f.column_index)
662 .ok_or_else(|| {
663 anyhow!(
664 "columns not find in table schema, index is {:?}",
665 f.column_index
666 )
667 })?
668 .name()
669 .to_owned(),
670 )
671 })
672 .collect::<Result<Vec<_>>>()?;
673 let query_stmt = Statement::Query(Box::new(gen_query_from_table_name_order_by(
674 subscription_from_table_name,
675 pk_names,
676 )));
677 gen_batch_plan_by_statement(&session, context.into(), query_stmt)
678 }
679 }
680
681 async fn initiate_query(
682 rw_timestamp: Option<u64>,
683 dependent_table_id: &TableId,
684 handler_args: HandlerArgs,
685 ) -> Result<(CursorDataChunkStream, Vec<Field>, Instant)> {
686 let init_query_timer = Instant::now();
687 let plan_result = Self::init_batch_plan_for_subscription_cursor(
688 rw_timestamp,
689 dependent_table_id,
690 handler_args.clone(),
691 )?;
692 let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
693 let (chunk_stream, fields) =
694 create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
695 Ok((
696 chunk_stream,
697 Self::build_desc(fields, rw_timestamp.is_none()),
698 init_query_timer,
699 ))
700 }
701
702 async fn try_refill_remaining_rows(
703 chunk_stream: &mut CursorDataChunkStream,
704 remaining_rows: &mut VecDeque<Row>,
705 ) -> Result<()> {
706 if remaining_rows.is_empty()
707 && let Some(row_set) = chunk_stream.next().await?
708 {
709 remaining_rows.extend(row_set?);
710 }
711 Ok(())
712 }
713
714 pub fn build_row(
715 mut row: Vec<Option<Bytes>>,
716 rw_timestamp: Option<u64>,
717 formats: &Vec<Format>,
718 session_data: &StaticSessionData,
719 ) -> Result<Vec<Option<Bytes>>> {
720 let row_len = row.len();
721 let new_row = if let Some(rw_timestamp) = rw_timestamp {
722 let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
723 let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
724 let rw_timestamp = pg_value_format(
725 &DataType::Int64,
726 risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
727 *rw_timestamp_formats,
728 session_data,
729 )?;
730 vec![Some(rw_timestamp)]
731 } else {
732 let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
733 let op = pg_value_format(
734 &DataType::Varchar,
735 risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
736 *op_formats,
737 session_data,
738 )?;
739 vec![Some(op), None]
740 };
741 row.extend(new_row);
742 Ok(row)
743 }
744
745 pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
746 if from_snapshot {
747 descs.push(Field::with_name(DataType::Varchar, "op"));
748 }
749 descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
750 descs
751 }
752
753 pub fn create_batch_plan_for_cursor(
754 table_catalog: &TableCatalog,
755 session: &SessionImpl,
756 context: OptimizerContextRef,
757 old_epoch: u64,
758 new_epoch: u64,
759 version_id: HummockVersionId,
760 pks: &[ColumnOrder],
761 ) -> Result<BatchQueryPlanResult> {
762 let output_col_idx = table_catalog
764 .columns
765 .iter()
766 .enumerate()
767 .filter_map(|(index, v)| {
768 if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
769 Some(index)
770 } else {
771 None
772 }
773 })
774 .collect::<Vec<_>>();
775 let output_col_idx_with_out_hidden = output_col_idx
776 .iter()
777 .filter(|index| !table_catalog.columns[**index].is_hidden)
778 .cloned()
779 .collect::<Vec<_>>();
780 let core = generic::LogScan::new(
781 table_catalog.name.clone(),
782 output_col_idx_with_out_hidden,
783 output_col_idx,
784 Rc::new(table_catalog.table_desc()),
785 context,
786 old_epoch,
787 new_epoch,
788 version_id,
789 );
790
791 let batch_log_seq_scan = BatchLogSeqScan::new(core);
792
793 let out_fields = batch_log_seq_scan.core().out_fields();
794 let out_names = batch_log_seq_scan.core().column_names_without_hidden();
795
796 let order = Order::new(pks.to_vec());
798
799 let plan_root = PlanRoot::new_with_batch_plan(
801 PlanRef::from(batch_log_seq_scan.clone()),
802 RequiredDist::single(),
803 order,
804 out_fields,
805 out_names,
806 );
807 let schema = plan_root.schema().clone();
808 let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
809 QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
810 QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
811 QueryMode::Distributed => (
812 plan_root.gen_batch_distributed_plan()?,
813 QueryMode::Distributed,
814 ),
815 };
816 Ok(BatchQueryPlanResult {
817 plan: batch_log_seq_scan,
818 query_mode,
819 schema,
820 stmt_type: StatementType::SELECT,
821 dependent_relations: table_catalog.dependent_relations.clone(),
822 read_storage_tables: HashSet::from_iter([table_catalog.id]),
823 })
824 }
825
826 pub fn init_row_stream(
829 chunk_stream: &mut CursorDataChunkStream,
830 formats: &Vec<Format>,
831 from_snapshot: &bool,
832 fields: &Vec<Field>,
833 session: Arc<SessionImpl>,
834 ) {
835 let mut formats = formats.clone();
836 let mut fields = fields.clone();
837 formats.pop();
838 fields.pop();
839 if *from_snapshot {
840 formats.pop();
841 fields.pop();
842 }
843 chunk_stream.init_row_stream(&fields, &formats, session);
844 }
845
846 pub fn idle_duration(&self) -> Duration {
847 self.last_fetch.elapsed()
848 }
849
850 pub fn subscription_name(&self) -> &str {
851 self.subscription.name.as_str()
852 }
853
854 pub fn state_info_string(&self) -> String {
855 format!("{}", self.state)
856 }
857}
858
859pub struct CursorManager {
860 cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
861 cursor_metrics: Arc<CursorMetrics>,
862}
863
864impl CursorManager {
865 pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
866 Self {
867 cursor_map: tokio::sync::Mutex::new(HashMap::new()),
868 cursor_metrics,
869 }
870 }
871
872 pub async fn add_subscription_cursor(
873 &self,
874 cursor_name: String,
875 start_timestamp: Option<u64>,
876 dependent_table_id: TableId,
877 subscription: Arc<SubscriptionCatalog>,
878 handler_args: &HandlerArgs,
879 ) -> Result<()> {
880 let create_cursor_timer = Instant::now();
881 let subscription_name = subscription.name.clone();
882 let cursor = SubscriptionCursor::new(
883 cursor_name,
884 start_timestamp,
885 subscription,
886 dependent_table_id,
887 handler_args,
888 self.cursor_metrics.clone(),
889 )
890 .await?;
891 let mut cursor_map = self.cursor_map.lock().await;
892 self.cursor_metrics
893 .subscription_cursor_declare_duration
894 .with_label_values(&[&subscription_name])
895 .observe(create_cursor_timer.elapsed().as_millis() as _);
896
897 cursor_map.retain(|_, v| {
898 if let Cursor::Subscription(cursor) = v
899 && matches!(cursor.state, State::Invalid)
900 {
901 false
902 } else {
903 true
904 }
905 });
906
907 cursor_map
908 .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
909 .map_err(|error| {
910 ErrorCode::CatalogError(
911 format!("cursor `{}` already exists", error.entry.key()).into(),
912 )
913 })?;
914 Ok(())
915 }
916
917 pub async fn add_query_cursor(
918 &self,
919 cursor_name: String,
920 chunk_stream: CursorDataChunkStream,
921 fields: Vec<Field>,
922 ) -> Result<()> {
923 let cursor = QueryCursor::new(chunk_stream, fields)?;
924 self.cursor_map
925 .lock()
926 .await
927 .try_insert(cursor_name, Cursor::Query(cursor))
928 .map_err(|error| {
929 ErrorCode::CatalogError(
930 format!("cursor `{}` already exists", error.entry.key()).into(),
931 )
932 })?;
933
934 Ok(())
935 }
936
937 pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
938 self.cursor_map
939 .lock()
940 .await
941 .remove(cursor_name)
942 .ok_or_else(|| {
943 ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
944 })?;
945 Ok(())
946 }
947
948 pub async fn remove_all_cursor(&self) {
949 self.cursor_map.lock().await.clear();
950 }
951
952 pub async fn remove_all_query_cursor(&self) {
953 self.cursor_map
954 .lock()
955 .await
956 .retain(|_, v| matches!(v, Cursor::Subscription(_)));
957 }
958
959 pub async fn get_rows_with_cursor(
960 &self,
961 cursor_name: &str,
962 count: u32,
963 handler_args: HandlerArgs,
964 formats: &Vec<Format>,
965 timeout_seconds: Option<u64>,
966 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
967 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
968 cursor
969 .next(count, handler_args, formats, timeout_seconds)
970 .await
971 } else {
972 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
973 }
974 }
975
976 pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
977 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
978 Ok(cursor.get_fields())
979 } else {
980 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
981 }
982 }
983
984 pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
985 let mut subsription_cursor_nums = 0;
986 let mut invalid_subsription_cursor_nums = 0;
987 let mut subscription_cursor_last_fetch_duration = HashMap::new();
988 for (_, cursor) in self.cursor_map.lock().await.iter() {
989 if let Cursor::Subscription(subscription_cursor) = cursor {
990 subsription_cursor_nums += 1;
991 if matches!(subscription_cursor.state, State::Invalid) {
992 invalid_subsription_cursor_nums += 1;
993 } else {
994 let fetch_duration =
995 subscription_cursor.last_fetch.elapsed().as_millis() as f64;
996 subscription_cursor_last_fetch_duration.insert(
997 subscription_cursor.subscription.name.clone(),
998 fetch_duration,
999 );
1000 }
1001 }
1002 }
1003 PeriodicCursorMetrics {
1004 subsription_cursor_nums,
1005 invalid_subsription_cursor_nums,
1006 subscription_cursor_last_fetch_duration,
1007 }
1008 }
1009
1010 pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1011 self.cursor_map
1012 .lock()
1013 .await
1014 .iter()
1015 .for_each(|(cursor_name, cursor)| {
1016 if let Cursor::Query(cursor) = cursor {
1017 f(cursor_name, cursor)
1018 }
1019 });
1020 }
1021
1022 pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1023 self.cursor_map
1024 .lock()
1025 .await
1026 .iter()
1027 .for_each(|(cursor_name, cursor)| {
1028 if let Cursor::Subscription(cursor) = cursor {
1029 f(cursor_name, cursor)
1030 }
1031 });
1032 }
1033
1034 pub async fn gen_batch_plan_with_subscription_cursor(
1035 &self,
1036 cursor_name: &str,
1037 handler_args: HandlerArgs,
1038 ) -> Result<BatchQueryPlanResult> {
1039 match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1040 ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1041 })? {
1042 Cursor::Subscription(cursor) => {
1043 cursor.gen_batch_plan_result(handler_args.clone())
1044 },
1045 Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1046 }
1047 }
1048}