1use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use futures::StreamExt;
25use itertools::Itertools;
26use pgwire::pg_field_descriptor::PgFieldDescriptor;
27use pgwire::pg_response::StatementType;
28use pgwire::types::{Format, Row};
29use risingwave_common::catalog::{ColumnCatalog, Field};
30use risingwave_common::error::BoxedError;
31use risingwave_common::session_config::QueryMode;
32use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
33use risingwave_common::util::iter_util::ZipEqFast;
34use risingwave_hummock_sdk::HummockVersionId;
35
36use super::SessionImpl;
37use crate::catalog::TableId;
38use crate::catalog::subscription_catalog::SubscriptionCatalog;
39use crate::error::{ErrorCode, Result};
40use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
41use crate::handler::HandlerArgs;
42use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
43use crate::handler::query::{RwBatchQueryPlanResult, gen_batch_plan_fragmenter};
44use crate::handler::util::{
45 DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
46 pg_value_format, to_pg_field,
47};
48use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
49use crate::optimizer::PlanRoot;
50use crate::optimizer::plan_node::{BatchFilter, BatchLogSeqScan, BatchSeqScan, generic};
51use crate::optimizer::property::{Order, RequiredDist};
52use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot};
53use crate::utils::Condition;
54use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};
55
56pub enum CursorDataChunkStream {
57 LocalDataChunk(Option<LocalQueryStream>),
58 DistributedDataChunk(Option<DistributedQueryStream>),
59 PgResponse(PgResponseStream),
60}
61
62impl CursorDataChunkStream {
63 pub fn init_row_stream(
64 &mut self,
65 fields: &Vec<Field>,
66 formats: &Vec<Format>,
67 session: Arc<SessionImpl>,
68 ) {
69 let columns_type = fields.iter().map(|f| f.data_type()).collect();
70 match self {
71 CursorDataChunkStream::LocalDataChunk(data_chunk) => {
72 let data_chunk = mem::take(data_chunk).unwrap();
73 let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
74 data_chunk,
75 columns_type,
76 formats.clone(),
77 session,
78 ));
79 *self = CursorDataChunkStream::PgResponse(row_stream);
80 }
81 CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
82 let data_chunk = mem::take(data_chunk).unwrap();
83 let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
84 data_chunk,
85 columns_type,
86 formats.clone(),
87 session,
88 ));
89 *self = CursorDataChunkStream::PgResponse(row_stream);
90 }
91 _ => {}
92 }
93 }
94
95 pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
96 match self {
97 CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
98 _ => Err(ErrorCode::InternalError(
99 "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
100 )
101 .into()),
102 }
103 }
104}
105pub enum Cursor {
106 Subscription(SubscriptionCursor),
107 Query(QueryCursor),
108}
109impl Cursor {
110 pub async fn next(
111 &mut self,
112 count: u32,
113 handler_args: HandlerArgs,
114 formats: &Vec<Format>,
115 timeout_seconds: Option<u64>,
116 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
117 match self {
118 Cursor::Subscription(cursor) => cursor
119 .next(count, handler_args, formats, timeout_seconds)
120 .await
121 .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
122 Cursor::Query(cursor) => {
123 cursor
124 .next(count, formats, handler_args, timeout_seconds)
125 .await
126 }
127 }
128 }
129
130 pub fn get_fields(&mut self) -> Vec<Field> {
131 match self {
132 Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields(),
133 Cursor::Query(cursor) => cursor.fields.clone(),
134 }
135 }
136}
137
138pub struct QueryCursor {
139 chunk_stream: CursorDataChunkStream,
140 fields: Vec<Field>,
141 remaining_rows: VecDeque<Row>,
142}
143
144impl QueryCursor {
145 pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
146 Ok(Self {
147 chunk_stream,
148 fields,
149 remaining_rows: VecDeque::<Row>::new(),
150 })
151 }
152
153 pub async fn next_once(&mut self) -> Result<Option<Row>> {
154 while self.remaining_rows.is_empty() {
155 let rows = self.chunk_stream.next().await?;
156 let rows = match rows {
157 None => return Ok(None),
158 Some(row) => row?,
159 };
160 self.remaining_rows = rows.into_iter().collect();
161 }
162 let row = self.remaining_rows.pop_front().unwrap();
163 Ok(Some(row))
164 }
165
166 pub async fn next(
167 &mut self,
168 count: u32,
169 formats: &Vec<Format>,
170 handler_args: HandlerArgs,
171 timeout_seconds: Option<u64>,
172 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
173 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
176 let session = handler_args.session;
177 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
178 let mut cur = 0;
179 let desc = self.fields.iter().map(to_pg_field).collect();
180 self.chunk_stream
181 .init_row_stream(&self.fields, formats, session);
182 while cur < count
183 && let Some(row) = self.next_once().await?
184 {
185 cur += 1;
186 ans.push(row);
187 if let Some(timeout_instant) = timeout_instant
188 && Instant::now() > timeout_instant
189 {
190 break;
191 }
192 }
193 Ok((ans, desc))
194 }
195}
196
197enum State {
198 InitLogStoreQuery {
199 seek_timestamp: u64,
201
202 expected_timestamp: Option<u64>,
204 },
205 Fetch {
206 from_snapshot: bool,
210
211 rw_timestamp: u64,
213
214 chunk_stream: CursorDataChunkStream,
217
218 remaining_rows: VecDeque<Row>,
220
221 expected_timestamp: Option<u64>,
222
223 init_query_timer: Instant,
224 },
225 Invalid,
226}
227
228impl Display for State {
229 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
230 match self {
231 State::InitLogStoreQuery {
232 seek_timestamp,
233 expected_timestamp,
234 } => write!(
235 f,
236 "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
237 seek_timestamp, expected_timestamp
238 ),
239 State::Fetch {
240 from_snapshot,
241 rw_timestamp,
242 expected_timestamp,
243 remaining_rows,
244 init_query_timer,
245 ..
246 } => write!(
247 f,
248 "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
249 from_snapshot,
250 rw_timestamp,
251 expected_timestamp,
252 remaining_rows.len(),
253 init_query_timer.elapsed().as_millis()
254 ),
255 State::Invalid => write!(f, "Invalid"),
256 }
257 }
258}
259
260struct FieldsManager {
261 columns_catalog: Vec<ColumnCatalog>,
262 row_fields: Vec<Field>,
264 row_output_col_indices: Vec<usize>,
266 row_pk_indices: Vec<usize>,
268 stream_chunk_row_indices: Vec<usize>,
270 op_index: usize,
272}
273
274impl FieldsManager {
275 pub fn new(catalog: &TableCatalog) -> Self {
279 let mut row_fields = Vec::new();
280 let mut row_output_col_indices = Vec::new();
281 let mut row_pk_indices = Vec::new();
282 let mut stream_chunk_row_indices = Vec::new();
283 let mut output_idx = 0_usize;
284 let pk_set: HashSet<usize> = catalog
285 .pk
286 .iter()
287 .map(|col_order| col_order.column_index)
288 .collect();
289
290 for (index, v) in catalog.columns.iter().enumerate() {
291 if pk_set.contains(&index) {
292 row_pk_indices.push(output_idx);
293 stream_chunk_row_indices.push(output_idx);
294 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
295 if !v.is_hidden {
296 row_output_col_indices.push(output_idx);
297 }
298 output_idx += 1;
299 } else if !v.is_hidden {
300 row_output_col_indices.push(output_idx);
301 stream_chunk_row_indices.push(output_idx);
302 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
303 output_idx += 1;
304 }
305 }
306
307 row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned()));
308 row_output_col_indices.push(output_idx);
309 let op_index = output_idx;
310 output_idx += 1;
311 row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned()));
312 row_output_col_indices.push(output_idx);
313 Self {
314 columns_catalog: catalog.columns.clone(),
315 row_fields,
316 row_output_col_indices,
317 row_pk_indices,
318 stream_chunk_row_indices,
319 op_index,
320 }
321 }
322
323 pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool {
324 if self.columns_catalog.ne(&catalog.columns) {
325 *self = Self::new(catalog);
326 true
327 } else {
328 false
329 }
330 }
331
332 pub fn process_output_desc_row(&self, mut rows: Vec<Row>) -> (Vec<Row>, Option<Row>) {
333 let last_row = rows.last_mut().map(|row| {
334 let mut row = row.clone();
335 row.project(&self.row_pk_indices)
336 });
337 let rows = rows
338 .iter_mut()
339 .map(|row| row.project(&self.row_output_col_indices))
340 .collect();
341 (rows, last_row)
342 }
343
344 pub fn get_output_fields(&self) -> Vec<Field> {
345 self.row_output_col_indices
346 .iter()
347 .map(|&idx| self.row_fields[idx].clone())
348 .collect()
349 }
350
351 pub fn get_row_stream_fields_and_formats(
354 &self,
355 formats: &Vec<Format>,
356 from_snapshot: bool,
357 ) -> (Vec<Field>, Vec<Format>) {
358 let mut fields = Vec::new();
359 let need_format = !(formats.is_empty() || formats.len() == 1);
360 let mut new_formats = formats.clone();
361 let stream_chunk_row_indices_iter = if from_snapshot {
362 self.stream_chunk_row_indices.iter().chain(None)
363 } else {
364 self.stream_chunk_row_indices
365 .iter()
366 .chain(Some(&self.op_index))
367 };
368 for index in stream_chunk_row_indices_iter {
369 fields.push(self.row_fields[*index].clone());
370 if need_format && !self.row_output_col_indices.contains(index) {
371 new_formats.insert(*index, Format::Text);
372 }
373 }
374 (fields, new_formats)
375 }
376}
377
378pub struct SubscriptionCursor {
379 cursor_name: String,
380 subscription: Arc<SubscriptionCatalog>,
381 dependent_table_id: TableId,
382 cursor_need_drop_time: Instant,
383 state: State,
384 fields_manager: FieldsManager,
387 cursor_metrics: Arc<CursorMetrics>,
388 last_fetch: Instant,
389 seek_pk_row: Option<Row>,
390}
391
392impl SubscriptionCursor {
393 pub async fn new(
394 cursor_name: String,
395 start_timestamp: Option<u64>,
396 subscription: Arc<SubscriptionCatalog>,
397 dependent_table_id: TableId,
398 handler_args: &HandlerArgs,
399 cursor_metrics: Arc<CursorMetrics>,
400 ) -> Result<Self> {
401 let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp {
402 let table_catalog = handler_args.session.get_table_by_id(dependent_table_id)?;
403 (
404 State::InitLogStoreQuery {
405 seek_timestamp: start_timestamp,
406 expected_timestamp: None,
407 },
408 FieldsManager::new(&table_catalog),
409 )
410 } else {
411 let (chunk_stream, init_query_timer, table_catalog) =
416 Self::initiate_query(None, dependent_table_id, handler_args.clone(), None).await?;
417 let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else(
418 || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()),
419 )? {
420 ReadSnapshot::FrontendPinned { snapshot, .. } => {
421 snapshot
422 .version()
423 .state_table_info
424 .info()
425 .get(&dependent_table_id)
426 .ok_or_else(|| {
427 anyhow!("dependent_table_id {dependent_table_id} not exists")
428 })?
429 .committed_epoch
430 }
431 ReadSnapshot::Other(_) => {
432 return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into());
433 }
434 ReadSnapshot::ReadUncommitted => {
435 return Err(ErrorCode::InternalError(
436 "Fetch Cursor don't support read uncommitted".to_owned(),
437 )
438 .into());
439 }
440 };
441 let start_timestamp = pinned_epoch;
442
443 (
444 State::Fetch {
445 from_snapshot: true,
446 rw_timestamp: start_timestamp,
447 chunk_stream,
448 remaining_rows: VecDeque::new(),
449 expected_timestamp: None,
450 init_query_timer,
451 },
452 FieldsManager::new(&table_catalog),
453 )
454 };
455
456 let cursor_need_drop_time =
457 Instant::now() + Duration::from_secs(subscription.retention_seconds);
458 Ok(Self {
459 cursor_name,
460 subscription,
461 dependent_table_id,
462 cursor_need_drop_time,
463 state,
464 fields_manager,
465 cursor_metrics,
466 last_fetch: Instant::now(),
467 seek_pk_row: None,
468 })
469 }
470
471 async fn next_row(
472 &mut self,
473 handler_args: &HandlerArgs,
474 formats: &Vec<Format>,
475 ) -> Result<Option<Row>> {
476 loop {
477 match &mut self.state {
478 State::InitLogStoreQuery {
479 seek_timestamp,
480 expected_timestamp,
481 } => {
482 let from_snapshot = false;
483
484 match Self::get_next_rw_timestamp(
486 *seek_timestamp,
487 self.dependent_table_id,
488 *expected_timestamp,
489 handler_args.clone(),
490 &self.subscription,
491 )
492 .await
493 {
494 Ok((Some(rw_timestamp), expected_timestamp)) => {
495 let (mut chunk_stream, init_query_timer, catalog) =
496 Self::initiate_query(
497 Some(rw_timestamp),
498 self.dependent_table_id,
499 handler_args.clone(),
500 None,
501 )
502 .await?;
503 let table_schema_changed =
504 self.fields_manager.try_refill_fields(&catalog);
505 let (fields, formats) = self
506 .fields_manager
507 .get_row_stream_fields_and_formats(formats, from_snapshot);
508 chunk_stream.init_row_stream(
509 &fields,
510 &formats,
511 handler_args.session.clone(),
512 );
513
514 self.cursor_need_drop_time = Instant::now()
515 + Duration::from_secs(self.subscription.retention_seconds);
516 let mut remaining_rows = VecDeque::new();
517 Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
518 .await?;
519 self.state = State::Fetch {
521 from_snapshot,
522 rw_timestamp,
523 chunk_stream,
524 remaining_rows,
525 expected_timestamp,
526 init_query_timer,
527 };
528 if table_schema_changed {
529 return Ok(None);
530 }
531 }
532 Ok((None, _)) => return Ok(None),
533 Err(e) => {
534 self.state = State::Invalid;
535 return Err(e);
536 }
537 }
538 }
539 State::Fetch {
540 from_snapshot,
541 rw_timestamp,
542 chunk_stream,
543 remaining_rows,
544 expected_timestamp,
545 init_query_timer,
546 } => {
547 let session_data = StaticSessionData {
548 timezone: handler_args.session.config().timezone(),
549 };
550 let from_snapshot = *from_snapshot;
551 let rw_timestamp = *rw_timestamp;
552
553 Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
555
556 if let Some(row) = remaining_rows.pop_front() {
557 if from_snapshot {
559 return Ok(Some(Self::build_row(
560 row.take(),
561 None,
562 formats,
563 &session_data,
564 )?));
565 } else {
566 return Ok(Some(Self::build_row(
567 row.take(),
568 Some(rw_timestamp),
569 formats,
570 &session_data,
571 )?));
572 }
573 } else {
574 self.cursor_metrics
575 .subscription_cursor_query_duration
576 .with_label_values(&[&self.subscription.name])
577 .observe(init_query_timer.elapsed().as_millis() as _);
578 if let Some(expected_timestamp) = expected_timestamp {
580 self.state = State::InitLogStoreQuery {
581 seek_timestamp: *expected_timestamp,
582 expected_timestamp: Some(*expected_timestamp),
583 };
584 } else {
585 self.state = State::InitLogStoreQuery {
586 seek_timestamp: rw_timestamp + 1,
587 expected_timestamp: None,
588 };
589 }
590 }
591 }
592 State::Invalid => {
593 return Err(ErrorCode::InternalError(
595 "Cursor is in invalid state. Please close and re-create the cursor."
596 .to_owned(),
597 )
598 .into());
599 }
600 }
601 }
602 }
603
604 pub async fn next(
605 &mut self,
606 count: u32,
607 handler_args: HandlerArgs,
608 formats: &Vec<Format>,
609 timeout_seconds: Option<u64>,
610 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
611 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
612 if Instant::now() > self.cursor_need_drop_time {
613 return Err(ErrorCode::InternalError(
614 "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
615 )
616 .into());
617 }
618
619 let session = &handler_args.session;
620 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
621 let mut cur = 0;
622 if let State::Fetch {
623 from_snapshot,
624 chunk_stream,
625 ..
626 } = &mut self.state
627 {
628 let (fields, fotmats) = self
629 .fields_manager
630 .get_row_stream_fields_and_formats(formats, *from_snapshot);
631 chunk_stream.init_row_stream(&fields, &fotmats, session.clone());
632 }
633 while cur < count {
634 let fetch_cursor_timer = Instant::now();
635 let row = self.next_row(&handler_args, formats).await?;
636 self.cursor_metrics
637 .subscription_cursor_fetch_duration
638 .with_label_values(&[&self.subscription.name])
639 .observe(fetch_cursor_timer.elapsed().as_millis() as _);
640 match row {
641 Some(row) => {
642 cur += 1;
643 ans.push(row);
644 }
645 None => {
646 let timeout_seconds = timeout_seconds.unwrap_or(0);
647 if cur > 0 || timeout_seconds == 0 {
648 break;
649 }
650 let State::InitLogStoreQuery { seek_timestamp, .. } = &self.state else {
651 continue;
653 };
654 match tokio::time::timeout(
657 Duration::from_secs(timeout_seconds),
658 session
659 .env
660 .hummock_snapshot_manager()
661 .wait_table_change_log_notification(
662 self.dependent_table_id,
663 *seek_timestamp,
664 ),
665 )
666 .await
667 {
668 Ok(result) => result?,
669 Err(_) => {
670 tracing::debug!("Cursor wait next epoch timeout");
671 break;
672 }
673 }
674 }
675 }
676 if let Some(timeout_instant) = timeout_instant
678 && Instant::now() > timeout_instant
679 {
680 break;
681 }
682 }
683 self.last_fetch = Instant::now();
684 let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans);
685 if let Some(seek_pk_row) = seek_pk_row {
686 self.seek_pk_row = Some(seek_pk_row);
687 }
688 let desc = self
689 .fields_manager
690 .get_output_fields()
691 .iter()
692 .map(to_pg_field)
693 .collect();
694
695 Ok((rows, desc))
696 }
697
698 async fn get_next_rw_timestamp(
699 seek_timestamp: u64,
700 table_id: TableId,
701 expected_timestamp: Option<u64>,
702 handler_args: HandlerArgs,
703 dependent_subscription: &SubscriptionCatalog,
704 ) -> Result<(Option<u64>, Option<u64>)> {
705 let session = handler_args.session;
706 session.get_subscription_by_schema_id_name(
708 dependent_subscription.schema_id,
709 &dependent_subscription.name,
710 )?;
711
712 let new_epochs = session
714 .list_change_log_epochs(table_id, seek_timestamp, 2)
715 .await?;
716 if let Some(expected_timestamp) = expected_timestamp
717 && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
718 {
719 return Err(ErrorCode::CatalogError(
720 format!(
721 " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
722 convert_logstore_u64_to_unix_millis(expected_timestamp)
723 )
724 .into(),
725 )
726 .into());
727 }
728 Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
729 }
730
731 pub fn gen_batch_plan_result(
732 &self,
733 handler_args: HandlerArgs,
734 ) -> Result<RwBatchQueryPlanResult> {
735 match self.state {
736 State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
738 Some(0),
739 self.dependent_table_id,
740 handler_args,
741 self.seek_pk_row.clone(),
742 ),
743 State::Fetch {
744 from_snapshot,
745 rw_timestamp,
746 ..
747 } => {
748 if from_snapshot {
749 Self::init_batch_plan_for_subscription_cursor(
750 None,
751 self.dependent_table_id,
752 handler_args,
753 self.seek_pk_row.clone(),
754 )
755 } else {
756 Self::init_batch_plan_for_subscription_cursor(
757 Some(rw_timestamp),
758 self.dependent_table_id,
759 handler_args,
760 self.seek_pk_row.clone(),
761 )
762 }
763 }
764 State::Invalid => Err(ErrorCode::InternalError(
765 "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
766 )
767 .into()),
768 }
769 }
770
771 fn init_batch_plan_for_subscription_cursor(
772 rw_timestamp: Option<u64>,
773 dependent_table_id: TableId,
774 handler_args: HandlerArgs,
775 seek_pk_row: Option<Row>,
776 ) -> Result<RwBatchQueryPlanResult> {
777 let session = handler_args.clone().session;
778 let table_catalog = session.get_table_by_id(dependent_table_id)?;
779 let context = OptimizerContext::from_handler_args(handler_args);
780 let version_id = {
781 let version = session.env.hummock_snapshot_manager.acquire();
782 let version = version.version();
783 if !version
784 .state_table_info
785 .info()
786 .contains_key(&dependent_table_id)
787 {
788 return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
789 }
790 version.id
791 };
792 Self::create_batch_plan_for_cursor(
793 table_catalog,
794 &session,
795 context.into(),
796 rw_timestamp.map(|rw_timestamp| (rw_timestamp, rw_timestamp)),
797 version_id,
798 seek_pk_row,
799 )
800 }
801
802 async fn initiate_query(
803 rw_timestamp: Option<u64>,
804 dependent_table_id: TableId,
805 handler_args: HandlerArgs,
806 seek_pk_row: Option<Row>,
807 ) -> Result<(CursorDataChunkStream, Instant, Arc<TableCatalog>)> {
808 let init_query_timer = Instant::now();
809 let session = handler_args.clone().session;
810 let table_catalog = session.get_table_by_id(dependent_table_id)?;
811 let plan_result = Self::init_batch_plan_for_subscription_cursor(
812 rw_timestamp,
813 dependent_table_id,
814 handler_args.clone(),
815 seek_pk_row,
816 )?;
817 let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
818 let (chunk_stream, _) =
819 create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
820 Ok((chunk_stream, init_query_timer, table_catalog))
821 }
822
823 async fn try_refill_remaining_rows(
824 chunk_stream: &mut CursorDataChunkStream,
825 remaining_rows: &mut VecDeque<Row>,
826 ) -> Result<()> {
827 if remaining_rows.is_empty()
828 && let Some(row_set) = chunk_stream.next().await?
829 {
830 remaining_rows.extend(row_set?);
831 }
832 Ok(())
833 }
834
835 pub fn build_row(
836 mut row: Vec<Option<Bytes>>,
837 rw_timestamp: Option<u64>,
838 formats: &Vec<Format>,
839 session_data: &StaticSessionData,
840 ) -> Result<Row> {
841 let row_len = row.len();
842 let new_row = if let Some(rw_timestamp) = rw_timestamp {
843 let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
844 let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
845 let rw_timestamp = pg_value_format(
846 &DataType::Int64,
847 risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
848 *rw_timestamp_formats,
849 session_data,
850 )?;
851 vec![Some(rw_timestamp)]
852 } else {
853 let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
854 let op = pg_value_format(
855 &DataType::Varchar,
856 risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
857 *op_formats,
858 session_data,
859 )?;
860 vec![Some(op), None]
861 };
862 row.extend(new_row);
863 Ok(Row::new(row))
864 }
865
866 pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
867 if from_snapshot {
868 descs.push(Field::with_name(DataType::Varchar, "op"));
869 }
870 descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
871 descs
872 }
873
874 pub fn create_batch_plan_for_cursor(
875 table_catalog: Arc<TableCatalog>,
876 session: &SessionImpl,
877 context: OptimizerContextRef,
878 epoch_range: Option<(u64, u64)>,
879 version_id: HummockVersionId,
880 seek_pk_rows: Option<Row>,
881 ) -> Result<RwBatchQueryPlanResult> {
882 let output_col_idx = table_catalog
884 .columns
885 .iter()
886 .enumerate()
887 .filter_map(|(index, v)| {
888 if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
889 Some(index)
890 } else {
891 None
892 }
893 })
894 .collect::<Vec<_>>();
895 let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64;
896 let pks = table_catalog.pk();
897 let pks = pks
898 .iter()
899 .map(|f| {
900 let pk = table_catalog.columns.get(f.column_index).unwrap();
901 (pk.data_type(), f.column_index)
902 })
903 .collect_vec();
904 let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
905 let mut pk_rows = vec![];
906 let mut values = vec![];
907 for (seek_pk, (data_type, column_index)) in
908 seek_pk_rows.take().into_iter().zip_eq_fast(pks.into_iter())
909 {
910 if let Some(seek_pk) = seek_pk {
911 pk_rows.push(InputRef {
912 index: column_index,
913 data_type: data_type.clone(),
914 });
915 let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
916 let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
917 values.push((Some(value_data), data_type.clone()));
918 }
919 }
920 if pk_rows.is_empty() {
921 (None, None)
922 } else {
923 let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
924 let right_data = ScalarImpl::Struct(StructValue::new(right_data));
925 let right_type = DataType::Struct(StructType::row_expr_type(right_types));
926 let left = FunctionCall::new_unchecked(
927 ExprType::Row,
928 pk_rows.into_iter().map(|pk| pk.into()).collect(),
929 right_type.clone(),
930 );
931 let right = Literal::new(Some(right_data), right_type);
932 let (scan, predicate) = Condition {
933 conjunctions: vec![
934 FunctionCall::new(ExprType::GreaterThan, vec![left.into(), right.into()])?
935 .into(),
936 ],
937 }
938 .split_to_scan_ranges(&table_catalog, max_split_range_gap)?;
939 if scan.len() > 1 {
940 return Err(ErrorCode::InternalError(
941 "Seek pk row should only generate one scan range".to_owned(),
942 )
943 .into());
944 }
945 (scan.first().cloned(), Some(predicate))
946 }
947 } else {
948 (None, None)
949 };
950
951 let (seq_scan, out_fields, out_names) = if let Some(epoch_range) = epoch_range {
952 let core = generic::LogScan::new(
953 table_catalog.name.clone(),
954 output_col_idx,
955 table_catalog.clone(),
956 context,
957 epoch_range,
958 version_id,
959 );
960 let batch_log_seq_scan = BatchLogSeqScan::new(core, scan);
961 let out_fields = batch_log_seq_scan.core().out_fields();
962 let out_names = batch_log_seq_scan.core().column_names();
963 (batch_log_seq_scan.into(), out_fields, out_names)
964 } else {
965 let core = generic::TableScan::new(
966 output_col_idx,
967 table_catalog.clone(),
968 vec![],
969 vec![],
970 context,
971 Condition {
972 conjunctions: vec![],
973 },
974 None,
975 );
976 let scans = match scan {
977 Some(scan) => vec![scan],
978 None => vec![],
979 };
980 let table_scan = BatchSeqScan::new(core, scans, None);
981 let out_fields = table_scan.core().out_fields();
982 let out_names = table_scan.core().column_names();
983 (table_scan.into(), out_fields, out_names)
984 };
985
986 let plan = if let Some(predicate) = predicate
987 && !predicate.always_true()
988 {
989 BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into()
990 } else {
991 seq_scan
992 };
993
994 let order = Order::new(table_catalog.pk().to_vec());
996
997 let plan_root = PlanRoot::new_with_batch_plan(
999 plan,
1000 RequiredDist::single(),
1001 order,
1002 out_fields,
1003 out_names,
1004 );
1005 let schema = plan_root.schema();
1006 let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
1007 QueryMode::Auto | QueryMode::Local => {
1008 (plan_root.gen_batch_local_plan()?, QueryMode::Local)
1009 }
1010 QueryMode::Distributed => (
1011 plan_root.gen_batch_distributed_plan()?,
1012 QueryMode::Distributed,
1013 ),
1014 };
1015 Ok(RwBatchQueryPlanResult {
1016 plan: batch_log_seq_scan,
1017 query_mode,
1018 schema,
1019 stmt_type: StatementType::SELECT,
1020 dependent_relations: vec![],
1021 })
1022 }
1023
1024 pub fn idle_duration(&self) -> Duration {
1025 self.last_fetch.elapsed()
1026 }
1027
1028 pub fn subscription_name(&self) -> &str {
1029 self.subscription.name.as_str()
1030 }
1031
1032 pub fn state_info_string(&self) -> String {
1033 format!("{}", self.state)
1034 }
1035}
1036
1037pub struct CursorManager {
1038 cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
1039 cursor_metrics: Arc<CursorMetrics>,
1040}
1041
1042impl CursorManager {
1043 pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
1044 Self {
1045 cursor_map: tokio::sync::Mutex::new(HashMap::new()),
1046 cursor_metrics,
1047 }
1048 }
1049
1050 pub async fn add_subscription_cursor(
1051 &self,
1052 cursor_name: String,
1053 start_timestamp: Option<u64>,
1054 dependent_table_id: TableId,
1055 subscription: Arc<SubscriptionCatalog>,
1056 handler_args: &HandlerArgs,
1057 ) -> Result<()> {
1058 let create_cursor_timer = Instant::now();
1059 let subscription_name = subscription.name.clone();
1060 let cursor = SubscriptionCursor::new(
1061 cursor_name,
1062 start_timestamp,
1063 subscription,
1064 dependent_table_id,
1065 handler_args,
1066 self.cursor_metrics.clone(),
1067 )
1068 .await?;
1069 let mut cursor_map = self.cursor_map.lock().await;
1070 self.cursor_metrics
1071 .subscription_cursor_declare_duration
1072 .with_label_values(&[&subscription_name])
1073 .observe(create_cursor_timer.elapsed().as_millis() as _);
1074
1075 cursor_map.retain(|_, v| {
1076 if let Cursor::Subscription(cursor) = v
1077 && matches!(cursor.state, State::Invalid)
1078 {
1079 false
1080 } else {
1081 true
1082 }
1083 });
1084
1085 cursor_map
1086 .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
1087 .map_err(|error| {
1088 ErrorCode::CatalogError(
1089 format!("cursor `{}` already exists", error.entry.key()).into(),
1090 )
1091 })?;
1092 Ok(())
1093 }
1094
1095 pub async fn add_query_cursor(
1096 &self,
1097 cursor_name: String,
1098 chunk_stream: CursorDataChunkStream,
1099 fields: Vec<Field>,
1100 ) -> Result<()> {
1101 let cursor = QueryCursor::new(chunk_stream, fields)?;
1102 self.cursor_map
1103 .lock()
1104 .await
1105 .try_insert(cursor_name, Cursor::Query(cursor))
1106 .map_err(|error| {
1107 ErrorCode::CatalogError(
1108 format!("cursor `{}` already exists", error.entry.key()).into(),
1109 )
1110 })?;
1111
1112 Ok(())
1113 }
1114
1115 pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
1116 self.cursor_map
1117 .lock()
1118 .await
1119 .remove(cursor_name)
1120 .ok_or_else(|| {
1121 ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
1122 })?;
1123 Ok(())
1124 }
1125
1126 pub async fn remove_all_cursor(&self) {
1127 self.cursor_map.lock().await.clear();
1128 }
1129
1130 pub async fn remove_all_query_cursor(&self) {
1131 self.cursor_map
1132 .lock()
1133 .await
1134 .retain(|_, v| matches!(v, Cursor::Subscription(_)));
1135 }
1136
1137 pub async fn get_rows_with_cursor(
1138 &self,
1139 cursor_name: &str,
1140 count: u32,
1141 handler_args: HandlerArgs,
1142 formats: &Vec<Format>,
1143 timeout_seconds: Option<u64>,
1144 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
1145 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1146 cursor
1147 .next(count, handler_args, formats, timeout_seconds)
1148 .await
1149 } else {
1150 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1151 }
1152 }
1153
1154 pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
1155 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1156 Ok(cursor.get_fields())
1157 } else {
1158 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1159 }
1160 }
1161
1162 pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
1163 let mut subscription_cursor_nums = 0;
1164 let mut invalid_subscription_cursor_nums = 0;
1165 let mut subscription_cursor_last_fetch_duration = HashMap::new();
1166 for (_, cursor) in self.cursor_map.lock().await.iter() {
1167 if let Cursor::Subscription(subscription_cursor) = cursor {
1168 subscription_cursor_nums += 1;
1169 if matches!(subscription_cursor.state, State::Invalid) {
1170 invalid_subscription_cursor_nums += 1;
1171 } else {
1172 let fetch_duration =
1173 subscription_cursor.last_fetch.elapsed().as_millis() as f64;
1174 subscription_cursor_last_fetch_duration.insert(
1175 subscription_cursor.subscription.name.clone(),
1176 fetch_duration,
1177 );
1178 }
1179 }
1180 }
1181 PeriodicCursorMetrics {
1182 subscription_cursor_nums,
1183 invalid_subscription_cursor_nums,
1184 subscription_cursor_last_fetch_duration,
1185 }
1186 }
1187
1188 pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1189 self.cursor_map
1190 .lock()
1191 .await
1192 .iter()
1193 .for_each(|(cursor_name, cursor)| {
1194 if let Cursor::Query(cursor) = cursor {
1195 f(cursor_name, cursor)
1196 }
1197 });
1198 }
1199
1200 pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1201 self.cursor_map
1202 .lock()
1203 .await
1204 .iter()
1205 .for_each(|(cursor_name, cursor)| {
1206 if let Cursor::Subscription(cursor) = cursor {
1207 f(cursor_name, cursor)
1208 }
1209 });
1210 }
1211
1212 pub async fn gen_batch_plan_with_subscription_cursor(
1213 &self,
1214 cursor_name: &str,
1215 handler_args: HandlerArgs,
1216 ) -> Result<RwBatchQueryPlanResult> {
1217 match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1218 ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1219 })? {
1220 Cursor::Subscription(cursor) => {
1221 cursor.gen_batch_plan_result(handler_args.clone())
1222 },
1223 Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1224 }
1225 }
1226}