1use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use futures::StreamExt;
25use itertools::Itertools;
26use pgwire::pg_field_descriptor::PgFieldDescriptor;
27use pgwire::pg_response::StatementType;
28use pgwire::types::{Format, Row};
29use risingwave_common::catalog::{ColumnCatalog, Field};
30use risingwave_common::error::BoxedError;
31use risingwave_common::session_config::QueryMode;
32use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
33use risingwave_common::util::iter_util::ZipEqFast;
34use risingwave_hummock_sdk::HummockVersionId;
35
36use super::SessionImpl;
37use crate::catalog::TableId;
38use crate::catalog::subscription_catalog::SubscriptionCatalog;
39use crate::error::{ErrorCode, Result};
40use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
41use crate::handler::HandlerArgs;
42use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
43use crate::handler::query::{BatchQueryPlanResult, gen_batch_plan_fragmenter};
44use crate::handler::util::{
45 DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
46 pg_value_format, to_pg_field,
47};
48use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
49use crate::optimizer::PlanRoot;
50use crate::optimizer::plan_node::{BatchFilter, BatchLogSeqScan, BatchSeqScan, generic};
51use crate::optimizer::property::{Order, RequiredDist};
52use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot};
53use crate::utils::Condition;
54use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};
55
56pub enum CursorDataChunkStream {
57 LocalDataChunk(Option<LocalQueryStream>),
58 DistributedDataChunk(Option<DistributedQueryStream>),
59 PgResponse(PgResponseStream),
60}
61
62impl CursorDataChunkStream {
63 pub fn init_row_stream(
64 &mut self,
65 fields: &Vec<Field>,
66 formats: &Vec<Format>,
67 session: Arc<SessionImpl>,
68 ) {
69 let columns_type = fields.iter().map(|f| f.data_type().clone()).collect();
70 match self {
71 CursorDataChunkStream::LocalDataChunk(data_chunk) => {
72 let data_chunk = mem::take(data_chunk).unwrap();
73 let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
74 data_chunk,
75 columns_type,
76 formats.clone(),
77 session,
78 ));
79 *self = CursorDataChunkStream::PgResponse(row_stream);
80 }
81 CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
82 let data_chunk = mem::take(data_chunk).unwrap();
83 let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
84 data_chunk,
85 columns_type,
86 formats.clone(),
87 session,
88 ));
89 *self = CursorDataChunkStream::PgResponse(row_stream);
90 }
91 _ => {}
92 }
93 }
94
95 pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
96 match self {
97 CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
98 _ => Err(ErrorCode::InternalError(
99 "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
100 )
101 .into()),
102 }
103 }
104}
105pub enum Cursor {
106 Subscription(SubscriptionCursor),
107 Query(QueryCursor),
108}
109impl Cursor {
110 pub async fn next(
111 &mut self,
112 count: u32,
113 handler_args: HandlerArgs,
114 formats: &Vec<Format>,
115 timeout_seconds: Option<u64>,
116 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
117 match self {
118 Cursor::Subscription(cursor) => cursor
119 .next(count, handler_args, formats, timeout_seconds)
120 .await
121 .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
122 Cursor::Query(cursor) => {
123 cursor
124 .next(count, formats, handler_args, timeout_seconds)
125 .await
126 }
127 }
128 }
129
130 pub fn get_fields(&mut self) -> Vec<Field> {
131 match self {
132 Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields().clone(),
133 Cursor::Query(cursor) => cursor.fields.clone(),
134 }
135 }
136}
137
138pub struct QueryCursor {
139 chunk_stream: CursorDataChunkStream,
140 fields: Vec<Field>,
141 remaining_rows: VecDeque<Row>,
142}
143
144impl QueryCursor {
145 pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
146 Ok(Self {
147 chunk_stream,
148 fields,
149 remaining_rows: VecDeque::<Row>::new(),
150 })
151 }
152
153 pub async fn next_once(&mut self) -> Result<Option<Row>> {
154 while self.remaining_rows.is_empty() {
155 let rows = self.chunk_stream.next().await?;
156 let rows = match rows {
157 None => return Ok(None),
158 Some(row) => row?,
159 };
160 self.remaining_rows = rows.into_iter().collect();
161 }
162 let row = self.remaining_rows.pop_front().unwrap();
163 Ok(Some(row))
164 }
165
166 pub async fn next(
167 &mut self,
168 count: u32,
169 formats: &Vec<Format>,
170 handler_args: HandlerArgs,
171 timeout_seconds: Option<u64>,
172 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
173 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
176 let session = handler_args.session;
177 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
178 let mut cur = 0;
179 let desc = self.fields.iter().map(to_pg_field).collect();
180 self.chunk_stream
181 .init_row_stream(&self.fields, formats, session);
182 while cur < count
183 && let Some(row) = self.next_once().await?
184 {
185 cur += 1;
186 ans.push(row);
187 if let Some(timeout_instant) = timeout_instant
188 && Instant::now() > timeout_instant
189 {
190 break;
191 }
192 }
193 Ok((ans, desc))
194 }
195}
196
197enum State {
198 InitLogStoreQuery {
199 seek_timestamp: u64,
201
202 expected_timestamp: Option<u64>,
204 },
205 Fetch {
206 from_snapshot: bool,
210
211 rw_timestamp: u64,
213
214 chunk_stream: CursorDataChunkStream,
217
218 remaining_rows: VecDeque<Row>,
220
221 expected_timestamp: Option<u64>,
222
223 init_query_timer: Instant,
224 },
225 Invalid,
226}
227
228impl Display for State {
229 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
230 match self {
231 State::InitLogStoreQuery {
232 seek_timestamp,
233 expected_timestamp,
234 } => write!(
235 f,
236 "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
237 seek_timestamp, expected_timestamp
238 ),
239 State::Fetch {
240 from_snapshot,
241 rw_timestamp,
242 expected_timestamp,
243 remaining_rows,
244 init_query_timer,
245 ..
246 } => write!(
247 f,
248 "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
249 from_snapshot,
250 rw_timestamp,
251 expected_timestamp,
252 remaining_rows.len(),
253 init_query_timer.elapsed().as_millis()
254 ),
255 State::Invalid => write!(f, "Invalid"),
256 }
257 }
258}
259
260struct FieldsManager {
261 columns_catalog: Vec<ColumnCatalog>,
262 row_fields: Vec<Field>,
264 row_output_col_indices: Vec<usize>,
266 row_pk_indices: Vec<usize>,
268 stream_chunk_row_indices: Vec<usize>,
270 op_index: usize,
272}
273
274impl FieldsManager {
275 pub fn new(catalog: &TableCatalog) -> Self {
279 let mut row_fields = Vec::new();
280 let mut row_output_col_indices = Vec::new();
281 let mut row_pk_indices = Vec::new();
282 let mut stream_chunk_row_indices = Vec::new();
283 let mut output_idx = 0_usize;
284 let pk_set: HashSet<usize> = catalog
285 .pk
286 .iter()
287 .map(|col_order| col_order.column_index)
288 .collect();
289
290 for (index, v) in catalog.columns.iter().enumerate() {
291 if pk_set.contains(&index) {
292 row_pk_indices.push(output_idx);
293 stream_chunk_row_indices.push(output_idx);
294 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
295 if !v.is_hidden {
296 row_output_col_indices.push(output_idx);
297 }
298 output_idx += 1;
299 } else if !v.is_hidden {
300 row_output_col_indices.push(output_idx);
301 stream_chunk_row_indices.push(output_idx);
302 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
303 output_idx += 1;
304 }
305 }
306
307 row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned()));
308 row_output_col_indices.push(output_idx);
309 let op_index = output_idx;
310 output_idx += 1;
311 row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned()));
312 row_output_col_indices.push(output_idx);
313 Self {
314 columns_catalog: catalog.columns.clone(),
315 row_fields,
316 row_output_col_indices,
317 row_pk_indices,
318 stream_chunk_row_indices,
319 op_index,
320 }
321 }
322
323 pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool {
324 if self.columns_catalog.ne(&catalog.columns) {
325 *self = Self::new(catalog);
326 true
327 } else {
328 false
329 }
330 }
331
332 pub fn process_output_desc_row(&self, mut rows: Vec<Row>) -> (Vec<Row>, Option<Row>) {
333 let last_row = rows.last_mut().map(|row| {
334 let mut row = row.clone();
335 row.project(&self.row_pk_indices)
336 });
337 let rows = rows
338 .iter_mut()
339 .map(|row| row.project(&self.row_output_col_indices))
340 .collect();
341 (rows, last_row)
342 }
343
344 pub fn get_output_fields(&self) -> Vec<Field> {
345 self.row_output_col_indices
346 .iter()
347 .map(|&idx| self.row_fields[idx].clone())
348 .collect()
349 }
350
351 pub fn get_row_stream_fields_and_formats(
354 &self,
355 formats: &Vec<Format>,
356 from_snapshot: bool,
357 ) -> (Vec<Field>, Vec<Format>) {
358 let mut fields = Vec::new();
359 let need_format = !(formats.is_empty() || formats.len() == 1);
360 let mut new_formats = formats.clone();
361 let stream_chunk_row_indices_iter = if from_snapshot {
362 self.stream_chunk_row_indices.iter().chain(None)
363 } else {
364 self.stream_chunk_row_indices
365 .iter()
366 .chain(Some(&self.op_index))
367 };
368 for index in stream_chunk_row_indices_iter {
369 fields.push(self.row_fields[*index].clone());
370 if need_format && !self.row_output_col_indices.contains(index) {
371 new_formats.insert(*index, Format::Text);
372 }
373 }
374 (fields, new_formats)
375 }
376}
377
378pub struct SubscriptionCursor {
379 cursor_name: String,
380 subscription: Arc<SubscriptionCatalog>,
381 dependent_table_id: TableId,
382 cursor_need_drop_time: Instant,
383 state: State,
384 fields_manager: FieldsManager,
387 cursor_metrics: Arc<CursorMetrics>,
388 last_fetch: Instant,
389 seek_pk_row: Option<Row>,
390}
391
392impl SubscriptionCursor {
393 pub async fn new(
394 cursor_name: String,
395 start_timestamp: Option<u64>,
396 subscription: Arc<SubscriptionCatalog>,
397 dependent_table_id: TableId,
398 handler_args: &HandlerArgs,
399 cursor_metrics: Arc<CursorMetrics>,
400 ) -> Result<Self> {
401 let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp {
402 let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?;
403 (
404 State::InitLogStoreQuery {
405 seek_timestamp: start_timestamp,
406 expected_timestamp: None,
407 },
408 FieldsManager::new(&table_catalog),
409 )
410 } else {
411 let (chunk_stream, init_query_timer, table_catalog) =
416 Self::initiate_query(None, &dependent_table_id, handler_args.clone(), None).await?;
417 let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else(
418 || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()),
419 )? {
420 ReadSnapshot::FrontendPinned { snapshot, .. } => {
421 snapshot
422 .version()
423 .state_table_info
424 .info()
425 .get(&dependent_table_id)
426 .ok_or_else(|| {
427 anyhow!("dependent_table_id {dependent_table_id} not exists")
428 })?
429 .committed_epoch
430 }
431 ReadSnapshot::Other(_) => {
432 return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into());
433 }
434 ReadSnapshot::ReadUncommitted => {
435 return Err(ErrorCode::InternalError(
436 "Fetch Cursor don't support read uncommitted".to_owned(),
437 )
438 .into());
439 }
440 };
441 let start_timestamp = pinned_epoch;
442
443 (
444 State::Fetch {
445 from_snapshot: true,
446 rw_timestamp: start_timestamp,
447 chunk_stream,
448 remaining_rows: VecDeque::new(),
449 expected_timestamp: None,
450 init_query_timer,
451 },
452 FieldsManager::new(&table_catalog),
453 )
454 };
455
456 let cursor_need_drop_time =
457 Instant::now() + Duration::from_secs(subscription.retention_seconds);
458 Ok(Self {
459 cursor_name,
460 subscription,
461 dependent_table_id,
462 cursor_need_drop_time,
463 state,
464 fields_manager,
465 cursor_metrics,
466 last_fetch: Instant::now(),
467 seek_pk_row: None,
468 })
469 }
470
471 async fn next_row(
472 &mut self,
473 handler_args: &HandlerArgs,
474 formats: &Vec<Format>,
475 ) -> Result<Option<Row>> {
476 loop {
477 match &mut self.state {
478 State::InitLogStoreQuery {
479 seek_timestamp,
480 expected_timestamp,
481 } => {
482 let from_snapshot = false;
483
484 match Self::get_next_rw_timestamp(
486 *seek_timestamp,
487 &self.dependent_table_id,
488 *expected_timestamp,
489 handler_args.clone(),
490 &self.subscription,
491 ) {
492 Ok((Some(rw_timestamp), expected_timestamp)) => {
493 let (mut chunk_stream, init_query_timer, catalog) =
494 Self::initiate_query(
495 Some(rw_timestamp),
496 &self.dependent_table_id,
497 handler_args.clone(),
498 None,
499 )
500 .await?;
501 let table_schema_changed =
502 self.fields_manager.try_refill_fields(&catalog);
503 let (fields, formats) = self
504 .fields_manager
505 .get_row_stream_fields_and_formats(formats, from_snapshot);
506 chunk_stream.init_row_stream(
507 &fields,
508 &formats,
509 handler_args.session.clone(),
510 );
511
512 self.cursor_need_drop_time = Instant::now()
513 + Duration::from_secs(self.subscription.retention_seconds);
514 let mut remaining_rows = VecDeque::new();
515 Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
516 .await?;
517 self.state = State::Fetch {
519 from_snapshot,
520 rw_timestamp,
521 chunk_stream,
522 remaining_rows,
523 expected_timestamp,
524 init_query_timer,
525 };
526 if table_schema_changed {
527 return Ok(None);
528 }
529 }
530 Ok((None, _)) => return Ok(None),
531 Err(e) => {
532 self.state = State::Invalid;
533 return Err(e);
534 }
535 }
536 }
537 State::Fetch {
538 from_snapshot,
539 rw_timestamp,
540 chunk_stream,
541 remaining_rows,
542 expected_timestamp,
543 init_query_timer,
544 } => {
545 let session_data = StaticSessionData {
546 timezone: handler_args.session.config().timezone(),
547 };
548 let from_snapshot = *from_snapshot;
549 let rw_timestamp = *rw_timestamp;
550
551 Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
553
554 if let Some(row) = remaining_rows.pop_front() {
555 if from_snapshot {
557 return Ok(Some(Self::build_row(
558 row.take(),
559 None,
560 formats,
561 &session_data,
562 )?));
563 } else {
564 return Ok(Some(Self::build_row(
565 row.take(),
566 Some(rw_timestamp),
567 formats,
568 &session_data,
569 )?));
570 }
571 } else {
572 self.cursor_metrics
573 .subscription_cursor_query_duration
574 .with_label_values(&[&self.subscription.name])
575 .observe(init_query_timer.elapsed().as_millis() as _);
576 if let Some(expected_timestamp) = expected_timestamp {
578 self.state = State::InitLogStoreQuery {
579 seek_timestamp: *expected_timestamp,
580 expected_timestamp: Some(*expected_timestamp),
581 };
582 } else {
583 self.state = State::InitLogStoreQuery {
584 seek_timestamp: rw_timestamp + 1,
585 expected_timestamp: None,
586 };
587 }
588 }
589 }
590 State::Invalid => {
591 return Err(ErrorCode::InternalError(
593 "Cursor is in invalid state. Please close and re-create the cursor."
594 .to_owned(),
595 )
596 .into());
597 }
598 }
599 }
600 }
601
602 pub async fn next(
603 &mut self,
604 count: u32,
605 handler_args: HandlerArgs,
606 formats: &Vec<Format>,
607 timeout_seconds: Option<u64>,
608 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
609 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
610 if Instant::now() > self.cursor_need_drop_time {
611 return Err(ErrorCode::InternalError(
612 "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
613 )
614 .into());
615 }
616
617 let session = &handler_args.session;
618 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
619 let mut cur = 0;
620 if let State::Fetch {
621 from_snapshot,
622 chunk_stream,
623 ..
624 } = &mut self.state
625 {
626 let (fields, fotmats) = self
627 .fields_manager
628 .get_row_stream_fields_and_formats(formats, *from_snapshot);
629 chunk_stream.init_row_stream(&fields, &fotmats, session.clone());
630 }
631 while cur < count {
632 let fetch_cursor_timer = Instant::now();
633 let row = self.next_row(&handler_args, formats).await?;
634 self.cursor_metrics
635 .subscription_cursor_fetch_duration
636 .with_label_values(&[&self.subscription.name])
637 .observe(fetch_cursor_timer.elapsed().as_millis() as _);
638 match row {
639 Some(row) => {
640 cur += 1;
641 ans.push(row);
642 }
643 None => {
644 let timeout_seconds = timeout_seconds.unwrap_or(0);
645 if cur > 0 || timeout_seconds == 0 {
646 break;
647 }
648 match tokio::time::timeout(
651 Duration::from_secs(timeout_seconds),
652 session
653 .env
654 .hummock_snapshot_manager()
655 .wait_table_change_log_notification(self.dependent_table_id.table_id()),
656 )
657 .await
658 {
659 Ok(result) => result?,
660 Err(_) => {
661 tracing::debug!("Cursor wait next epoch timeout");
662 break;
663 }
664 }
665 }
666 }
667 if let Some(timeout_instant) = timeout_instant
669 && Instant::now() > timeout_instant
670 {
671 break;
672 }
673 }
674 self.last_fetch = Instant::now();
675 let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans);
676 if let Some(seek_pk_row) = seek_pk_row {
677 self.seek_pk_row = Some(seek_pk_row);
678 }
679 let desc = self
680 .fields_manager
681 .get_output_fields()
682 .iter()
683 .map(to_pg_field)
684 .collect();
685
686 Ok((rows, desc))
687 }
688
689 fn get_next_rw_timestamp(
690 seek_timestamp: u64,
691 table_id: &TableId,
692 expected_timestamp: Option<u64>,
693 handler_args: HandlerArgs,
694 dependent_subscription: &SubscriptionCatalog,
695 ) -> Result<(Option<u64>, Option<u64>)> {
696 let session = handler_args.session;
697 session.get_subscription_by_schema_id_name(
699 dependent_subscription.schema_id,
700 &dependent_subscription.name,
701 )?;
702
703 let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?;
705 if let Some(expected_timestamp) = expected_timestamp
706 && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
707 {
708 return Err(ErrorCode::CatalogError(
709 format!(
710 " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
711 convert_logstore_u64_to_unix_millis(expected_timestamp)
712 )
713 .into(),
714 )
715 .into());
716 }
717 Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
718 }
719
720 pub fn gen_batch_plan_result(&self, handler_args: HandlerArgs) -> Result<BatchQueryPlanResult> {
721 match self.state {
722 State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
724 Some(0),
725 &self.dependent_table_id,
726 handler_args,
727 self.seek_pk_row.clone(),
728 ),
729 State::Fetch {
730 from_snapshot,
731 rw_timestamp,
732 ..
733 } => {
734 if from_snapshot {
735 Self::init_batch_plan_for_subscription_cursor(
736 None,
737 &self.dependent_table_id,
738 handler_args,
739 self.seek_pk_row.clone(),
740 )
741 } else {
742 Self::init_batch_plan_for_subscription_cursor(
743 Some(rw_timestamp),
744 &self.dependent_table_id,
745 handler_args,
746 self.seek_pk_row.clone(),
747 )
748 }
749 }
750 State::Invalid => Err(ErrorCode::InternalError(
751 "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
752 )
753 .into()),
754 }
755 }
756
757 fn init_batch_plan_for_subscription_cursor(
758 rw_timestamp: Option<u64>,
759 dependent_table_id: &TableId,
760 handler_args: HandlerArgs,
761 seek_pk_row: Option<Row>,
762 ) -> Result<BatchQueryPlanResult> {
763 let session = handler_args.clone().session;
764 let table_catalog = session.get_table_by_id(dependent_table_id)?;
765 let context = OptimizerContext::from_handler_args(handler_args.clone());
766 let version_id = {
767 let version = session.env.hummock_snapshot_manager.acquire();
768 let version = version.version();
769 if !version
770 .state_table_info
771 .info()
772 .contains_key(dependent_table_id)
773 {
774 return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
775 }
776 version.id
777 };
778 Self::create_batch_plan_for_cursor(
779 table_catalog,
780 &session,
781 context.into(),
782 rw_timestamp.map(|rw_timestamp| (rw_timestamp, rw_timestamp)),
783 version_id,
784 seek_pk_row,
785 )
786 }
787
788 async fn initiate_query(
789 rw_timestamp: Option<u64>,
790 dependent_table_id: &TableId,
791 handler_args: HandlerArgs,
792 seek_pk_row: Option<Row>,
793 ) -> Result<(CursorDataChunkStream, Instant, Arc<TableCatalog>)> {
794 let init_query_timer = Instant::now();
795 let session = handler_args.clone().session;
796 let table_catalog = session.get_table_by_id(dependent_table_id)?;
797 let plan_result = Self::init_batch_plan_for_subscription_cursor(
798 rw_timestamp,
799 dependent_table_id,
800 handler_args.clone(),
801 seek_pk_row,
802 )?;
803 let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
804 let (chunk_stream, _) =
805 create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
806 Ok((chunk_stream, init_query_timer, table_catalog))
807 }
808
809 async fn try_refill_remaining_rows(
810 chunk_stream: &mut CursorDataChunkStream,
811 remaining_rows: &mut VecDeque<Row>,
812 ) -> Result<()> {
813 if remaining_rows.is_empty()
814 && let Some(row_set) = chunk_stream.next().await?
815 {
816 remaining_rows.extend(row_set?);
817 }
818 Ok(())
819 }
820
821 pub fn build_row(
822 mut row: Vec<Option<Bytes>>,
823 rw_timestamp: Option<u64>,
824 formats: &Vec<Format>,
825 session_data: &StaticSessionData,
826 ) -> Result<Row> {
827 let row_len = row.len();
828 let new_row = if let Some(rw_timestamp) = rw_timestamp {
829 let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
830 let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
831 let rw_timestamp = pg_value_format(
832 &DataType::Int64,
833 risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
834 *rw_timestamp_formats,
835 session_data,
836 )?;
837 vec![Some(rw_timestamp)]
838 } else {
839 let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
840 let op = pg_value_format(
841 &DataType::Varchar,
842 risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
843 *op_formats,
844 session_data,
845 )?;
846 vec![Some(op), None]
847 };
848 row.extend(new_row);
849 Ok(Row::new(row))
850 }
851
852 pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
853 if from_snapshot {
854 descs.push(Field::with_name(DataType::Varchar, "op"));
855 }
856 descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
857 descs
858 }
859
860 pub fn create_batch_plan_for_cursor(
861 table_catalog: Arc<TableCatalog>,
862 session: &SessionImpl,
863 context: OptimizerContextRef,
864 epoch_range: Option<(u64, u64)>,
865 version_id: HummockVersionId,
866 seek_pk_rows: Option<Row>,
867 ) -> Result<BatchQueryPlanResult> {
868 let output_col_idx = table_catalog
870 .columns
871 .iter()
872 .enumerate()
873 .filter_map(|(index, v)| {
874 if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
875 Some(index)
876 } else {
877 None
878 }
879 })
880 .collect::<Vec<_>>();
881 let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64;
882 let pks = table_catalog.pk();
883 let pks = pks
884 .iter()
885 .map(|f| {
886 let pk = table_catalog.columns.get(f.column_index).unwrap();
887 (pk.data_type(), f.column_index)
888 })
889 .collect_vec();
890 let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
891 let mut pk_rows = vec![];
892 let mut values = vec![];
893 for (seek_pk, (data_type, column_index)) in
894 seek_pk_rows.take().into_iter().zip_eq_fast(pks.into_iter())
895 {
896 if let Some(seek_pk) = seek_pk {
897 pk_rows.push(InputRef {
898 index: column_index,
899 data_type: data_type.clone(),
900 });
901 let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
902 let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
903 values.push((Some(value_data), data_type.clone()));
904 }
905 }
906 if pk_rows.is_empty() {
907 (None, None)
908 } else {
909 let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
910 let right_data = ScalarImpl::Struct(StructValue::new(right_data));
911 let right_type = DataType::Struct(StructType::unnamed(right_types));
912 let left = FunctionCall::new_unchecked(
913 ExprType::Row,
914 pk_rows.into_iter().map(|pk| pk.into()).collect(),
915 right_type.clone(),
916 );
917 let right = Literal::new(Some(right_data), right_type);
918 let (scan, predicate) = Condition {
919 conjunctions: vec![
920 FunctionCall::new(ExprType::GreaterThan, vec![left.into(), right.into()])?
921 .into(),
922 ],
923 }
924 .split_to_scan_ranges(&table_catalog, max_split_range_gap)?;
925 if scan.len() > 1 {
926 return Err(ErrorCode::InternalError(
927 "Seek pk row should only generate one scan range".to_owned(),
928 )
929 .into());
930 }
931 (scan.first().cloned(), Some(predicate))
932 }
933 } else {
934 (None, None)
935 };
936
937 let (seq_scan, out_fields, out_names) = if let Some(epoch_range) = epoch_range {
938 let core = generic::LogScan::new(
939 table_catalog.name.clone(),
940 output_col_idx,
941 table_catalog.clone(),
942 context,
943 epoch_range,
944 version_id,
945 );
946 let batch_log_seq_scan = BatchLogSeqScan::new(core, scan);
947 let out_fields = batch_log_seq_scan.core().out_fields();
948 let out_names = batch_log_seq_scan.core().column_names();
949 (batch_log_seq_scan.into(), out_fields, out_names)
950 } else {
951 let core = generic::TableScan::new(
952 output_col_idx,
953 table_catalog.clone(),
954 vec![],
955 context,
956 Condition {
957 conjunctions: vec![],
958 },
959 None,
960 );
961 let scans = match scan {
962 Some(scan) => vec![scan],
963 None => vec![],
964 };
965 let table_scan = BatchSeqScan::new(core, scans, None);
966 let out_fields = table_scan.core().out_fields();
967 let out_names = table_scan.core().column_names();
968 (table_scan.into(), out_fields, out_names)
969 };
970
971 let plan = if let Some(predicate) = predicate
972 && !predicate.always_true()
973 {
974 BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into()
975 } else {
976 seq_scan
977 };
978
979 let order = Order::new(table_catalog.pk().to_vec());
981
982 let plan_root = PlanRoot::new_with_batch_plan(
984 plan,
985 RequiredDist::single(),
986 order,
987 out_fields,
988 out_names,
989 );
990 let schema = plan_root.schema().clone();
991 let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
992 QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
993 QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
994 QueryMode::Distributed => (
995 plan_root.gen_batch_distributed_plan()?,
996 QueryMode::Distributed,
997 ),
998 };
999 Ok(BatchQueryPlanResult {
1000 plan: batch_log_seq_scan,
1001 query_mode,
1002 schema,
1003 stmt_type: StatementType::SELECT,
1004 dependent_relations: vec![],
1005 read_storage_tables: HashSet::from_iter([table_catalog.id]),
1006 })
1007 }
1008
1009 pub fn idle_duration(&self) -> Duration {
1010 self.last_fetch.elapsed()
1011 }
1012
1013 pub fn subscription_name(&self) -> &str {
1014 self.subscription.name.as_str()
1015 }
1016
1017 pub fn state_info_string(&self) -> String {
1018 format!("{}", self.state)
1019 }
1020}
1021
1022pub struct CursorManager {
1023 cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
1024 cursor_metrics: Arc<CursorMetrics>,
1025}
1026
1027impl CursorManager {
1028 pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
1029 Self {
1030 cursor_map: tokio::sync::Mutex::new(HashMap::new()),
1031 cursor_metrics,
1032 }
1033 }
1034
1035 pub async fn add_subscription_cursor(
1036 &self,
1037 cursor_name: String,
1038 start_timestamp: Option<u64>,
1039 dependent_table_id: TableId,
1040 subscription: Arc<SubscriptionCatalog>,
1041 handler_args: &HandlerArgs,
1042 ) -> Result<()> {
1043 let create_cursor_timer = Instant::now();
1044 let subscription_name = subscription.name.clone();
1045 let cursor = SubscriptionCursor::new(
1046 cursor_name,
1047 start_timestamp,
1048 subscription,
1049 dependent_table_id,
1050 handler_args,
1051 self.cursor_metrics.clone(),
1052 )
1053 .await?;
1054 let mut cursor_map = self.cursor_map.lock().await;
1055 self.cursor_metrics
1056 .subscription_cursor_declare_duration
1057 .with_label_values(&[&subscription_name])
1058 .observe(create_cursor_timer.elapsed().as_millis() as _);
1059
1060 cursor_map.retain(|_, v| {
1061 if let Cursor::Subscription(cursor) = v
1062 && matches!(cursor.state, State::Invalid)
1063 {
1064 false
1065 } else {
1066 true
1067 }
1068 });
1069
1070 cursor_map
1071 .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
1072 .map_err(|error| {
1073 ErrorCode::CatalogError(
1074 format!("cursor `{}` already exists", error.entry.key()).into(),
1075 )
1076 })?;
1077 Ok(())
1078 }
1079
1080 pub async fn add_query_cursor(
1081 &self,
1082 cursor_name: String,
1083 chunk_stream: CursorDataChunkStream,
1084 fields: Vec<Field>,
1085 ) -> Result<()> {
1086 let cursor = QueryCursor::new(chunk_stream, fields)?;
1087 self.cursor_map
1088 .lock()
1089 .await
1090 .try_insert(cursor_name, Cursor::Query(cursor))
1091 .map_err(|error| {
1092 ErrorCode::CatalogError(
1093 format!("cursor `{}` already exists", error.entry.key()).into(),
1094 )
1095 })?;
1096
1097 Ok(())
1098 }
1099
1100 pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
1101 self.cursor_map
1102 .lock()
1103 .await
1104 .remove(cursor_name)
1105 .ok_or_else(|| {
1106 ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
1107 })?;
1108 Ok(())
1109 }
1110
1111 pub async fn remove_all_cursor(&self) {
1112 self.cursor_map.lock().await.clear();
1113 }
1114
1115 pub async fn remove_all_query_cursor(&self) {
1116 self.cursor_map
1117 .lock()
1118 .await
1119 .retain(|_, v| matches!(v, Cursor::Subscription(_)));
1120 }
1121
1122 pub async fn get_rows_with_cursor(
1123 &self,
1124 cursor_name: &str,
1125 count: u32,
1126 handler_args: HandlerArgs,
1127 formats: &Vec<Format>,
1128 timeout_seconds: Option<u64>,
1129 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
1130 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1131 cursor
1132 .next(count, handler_args, formats, timeout_seconds)
1133 .await
1134 } else {
1135 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1136 }
1137 }
1138
1139 pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
1140 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1141 Ok(cursor.get_fields())
1142 } else {
1143 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1144 }
1145 }
1146
1147 pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
1148 let mut subsription_cursor_nums = 0;
1149 let mut invalid_subsription_cursor_nums = 0;
1150 let mut subscription_cursor_last_fetch_duration = HashMap::new();
1151 for (_, cursor) in self.cursor_map.lock().await.iter() {
1152 if let Cursor::Subscription(subscription_cursor) = cursor {
1153 subsription_cursor_nums += 1;
1154 if matches!(subscription_cursor.state, State::Invalid) {
1155 invalid_subsription_cursor_nums += 1;
1156 } else {
1157 let fetch_duration =
1158 subscription_cursor.last_fetch.elapsed().as_millis() as f64;
1159 subscription_cursor_last_fetch_duration.insert(
1160 subscription_cursor.subscription.name.clone(),
1161 fetch_duration,
1162 );
1163 }
1164 }
1165 }
1166 PeriodicCursorMetrics {
1167 subsription_cursor_nums,
1168 invalid_subsription_cursor_nums,
1169 subscription_cursor_last_fetch_duration,
1170 }
1171 }
1172
1173 pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1174 self.cursor_map
1175 .lock()
1176 .await
1177 .iter()
1178 .for_each(|(cursor_name, cursor)| {
1179 if let Cursor::Query(cursor) = cursor {
1180 f(cursor_name, cursor)
1181 }
1182 });
1183 }
1184
1185 pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1186 self.cursor_map
1187 .lock()
1188 .await
1189 .iter()
1190 .for_each(|(cursor_name, cursor)| {
1191 if let Cursor::Subscription(cursor) = cursor {
1192 f(cursor_name, cursor)
1193 }
1194 });
1195 }
1196
1197 pub async fn gen_batch_plan_with_subscription_cursor(
1198 &self,
1199 cursor_name: &str,
1200 handler_args: HandlerArgs,
1201 ) -> Result<BatchQueryPlanResult> {
1202 match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1203 ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1204 })? {
1205 Cursor::Subscription(cursor) => {
1206 cursor.gen_batch_plan_result(handler_args.clone())
1207 },
1208 Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1209 }
1210 }
1211}