1use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::rc::Rc;
20use std::sync::Arc;
21use std::time::Instant;
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use futures::StreamExt;
26use itertools::Itertools;
27use pgwire::pg_field_descriptor::PgFieldDescriptor;
28use pgwire::pg_response::StatementType;
29use pgwire::types::{Format, Row};
30use risingwave_common::catalog::{ColumnCatalog, Field};
31use risingwave_common::error::BoxedError;
32use risingwave_common::session_config::QueryMode;
33use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
34use risingwave_common::util::iter_util::ZipEqFast;
35use risingwave_hummock_sdk::HummockVersionId;
36
37use super::SessionImpl;
38use crate::catalog::TableId;
39use crate::catalog::subscription_catalog::SubscriptionCatalog;
40use crate::error::{ErrorCode, Result};
41use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
42use crate::handler::HandlerArgs;
43use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
44use crate::handler::query::{BatchQueryPlanResult, gen_batch_plan_fragmenter};
45use crate::handler::util::{
46 DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
47 pg_value_format, to_pg_field,
48};
49use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
50use crate::optimizer::PlanRoot;
51use crate::optimizer::plan_node::{BatchFilter, BatchLogSeqScan, BatchSeqScan, generic};
52use crate::optimizer::property::{Cardinality, Order, RequiredDist};
53use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot};
54use crate::utils::Condition;
55use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};
56
57pub enum CursorDataChunkStream {
58 LocalDataChunk(Option<LocalQueryStream>),
59 DistributedDataChunk(Option<DistributedQueryStream>),
60 PgResponse(PgResponseStream),
61}
62
63impl CursorDataChunkStream {
64 pub fn init_row_stream(
65 &mut self,
66 fields: &Vec<Field>,
67 formats: &Vec<Format>,
68 session: Arc<SessionImpl>,
69 ) {
70 let columns_type = fields.iter().map(|f| f.data_type().clone()).collect();
71 match self {
72 CursorDataChunkStream::LocalDataChunk(data_chunk) => {
73 let data_chunk = mem::take(data_chunk).unwrap();
74 let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
75 data_chunk,
76 columns_type,
77 formats.clone(),
78 session,
79 ));
80 *self = CursorDataChunkStream::PgResponse(row_stream);
81 }
82 CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
83 let data_chunk = mem::take(data_chunk).unwrap();
84 let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
85 data_chunk,
86 columns_type,
87 formats.clone(),
88 session,
89 ));
90 *self = CursorDataChunkStream::PgResponse(row_stream);
91 }
92 _ => {}
93 }
94 }
95
96 pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
97 match self {
98 CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
99 _ => Err(ErrorCode::InternalError(
100 "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
101 )
102 .into()),
103 }
104 }
105}
106pub enum Cursor {
107 Subscription(SubscriptionCursor),
108 Query(QueryCursor),
109}
110impl Cursor {
111 pub async fn next(
112 &mut self,
113 count: u32,
114 handler_args: HandlerArgs,
115 formats: &Vec<Format>,
116 timeout_seconds: Option<u64>,
117 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
118 match self {
119 Cursor::Subscription(cursor) => cursor
120 .next(count, handler_args, formats, timeout_seconds)
121 .await
122 .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
123 Cursor::Query(cursor) => {
124 cursor
125 .next(count, formats, handler_args, timeout_seconds)
126 .await
127 }
128 }
129 }
130
131 pub fn get_fields(&mut self) -> Vec<Field> {
132 match self {
133 Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields().clone(),
134 Cursor::Query(cursor) => cursor.fields.clone(),
135 }
136 }
137}
138
139pub struct QueryCursor {
140 chunk_stream: CursorDataChunkStream,
141 fields: Vec<Field>,
142 remaining_rows: VecDeque<Row>,
143}
144
145impl QueryCursor {
146 pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
147 Ok(Self {
148 chunk_stream,
149 fields,
150 remaining_rows: VecDeque::<Row>::new(),
151 })
152 }
153
154 pub async fn next_once(&mut self) -> Result<Option<Row>> {
155 while self.remaining_rows.is_empty() {
156 let rows = self.chunk_stream.next().await?;
157 let rows = match rows {
158 None => return Ok(None),
159 Some(row) => row?,
160 };
161 self.remaining_rows = rows.into_iter().collect();
162 }
163 let row = self.remaining_rows.pop_front().unwrap();
164 Ok(Some(row))
165 }
166
167 pub async fn next(
168 &mut self,
169 count: u32,
170 formats: &Vec<Format>,
171 handler_args: HandlerArgs,
172 timeout_seconds: Option<u64>,
173 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
174 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
177 let session = handler_args.session;
178 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
179 let mut cur = 0;
180 let desc = self.fields.iter().map(to_pg_field).collect();
181 self.chunk_stream
182 .init_row_stream(&self.fields, formats, session);
183 while cur < count
184 && let Some(row) = self.next_once().await?
185 {
186 cur += 1;
187 ans.push(row);
188 if let Some(timeout_instant) = timeout_instant
189 && Instant::now() > timeout_instant
190 {
191 break;
192 }
193 }
194 Ok((ans, desc))
195 }
196}
197
198enum State {
199 InitLogStoreQuery {
200 seek_timestamp: u64,
202
203 expected_timestamp: Option<u64>,
205 },
206 Fetch {
207 from_snapshot: bool,
211
212 rw_timestamp: u64,
214
215 chunk_stream: CursorDataChunkStream,
218
219 remaining_rows: VecDeque<Row>,
221
222 expected_timestamp: Option<u64>,
223
224 init_query_timer: Instant,
225 },
226 Invalid,
227}
228
229impl Display for State {
230 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
231 match self {
232 State::InitLogStoreQuery {
233 seek_timestamp,
234 expected_timestamp,
235 } => write!(
236 f,
237 "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
238 seek_timestamp, expected_timestamp
239 ),
240 State::Fetch {
241 from_snapshot,
242 rw_timestamp,
243 expected_timestamp,
244 remaining_rows,
245 init_query_timer,
246 ..
247 } => write!(
248 f,
249 "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
250 from_snapshot,
251 rw_timestamp,
252 expected_timestamp,
253 remaining_rows.len(),
254 init_query_timer.elapsed().as_millis()
255 ),
256 State::Invalid => write!(f, "Invalid"),
257 }
258 }
259}
260
261struct FieldsManager {
262 columns_catalog: Vec<ColumnCatalog>,
263 row_fields: Vec<Field>,
265 row_output_col_indices: Vec<usize>,
267 row_pk_indices: Vec<usize>,
269 stream_chunk_row_indices: Vec<usize>,
271 op_index: usize,
273}
274
275impl FieldsManager {
276 pub fn new(catalog: &TableCatalog) -> Self {
280 let mut row_fields = Vec::new();
281 let mut row_output_col_indices = Vec::new();
282 let mut row_pk_indices = Vec::new();
283 let mut stream_chunk_row_indices = Vec::new();
284 let mut output_idx = 0_usize;
285 let pk_set: HashSet<usize> = catalog
286 .pk
287 .iter()
288 .map(|col_order| col_order.column_index)
289 .collect();
290
291 for (index, v) in catalog.columns.iter().enumerate() {
292 if pk_set.contains(&index) {
293 row_pk_indices.push(output_idx);
294 stream_chunk_row_indices.push(output_idx);
295 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
296 if !v.is_hidden {
297 row_output_col_indices.push(output_idx);
298 }
299 output_idx += 1;
300 } else if !v.is_hidden {
301 row_output_col_indices.push(output_idx);
302 stream_chunk_row_indices.push(output_idx);
303 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
304 output_idx += 1;
305 }
306 }
307
308 row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned()));
309 row_output_col_indices.push(output_idx);
310 let op_index = output_idx;
311 output_idx += 1;
312 row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned()));
313 row_output_col_indices.push(output_idx);
314 Self {
315 columns_catalog: catalog.columns.clone(),
316 row_fields,
317 row_output_col_indices,
318 row_pk_indices,
319 stream_chunk_row_indices,
320 op_index,
321 }
322 }
323
324 pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool {
325 if self.columns_catalog.ne(&catalog.columns) {
326 *self = Self::new(catalog);
327 true
328 } else {
329 false
330 }
331 }
332
333 pub fn process_output_desc_row(&self, mut rows: Vec<Row>) -> (Vec<Row>, Option<Row>) {
334 let last_row = rows.last_mut().map(|row| {
335 let mut row = row.clone();
336 row.project(&self.row_pk_indices)
337 });
338 let rows = rows
339 .iter_mut()
340 .map(|row| row.project(&self.row_output_col_indices))
341 .collect();
342 (rows, last_row)
343 }
344
345 pub fn get_output_fields(&self) -> Vec<Field> {
346 self.row_output_col_indices
347 .iter()
348 .map(|&idx| self.row_fields[idx].clone())
349 .collect()
350 }
351
352 pub fn get_row_stream_fields_and_formats(
355 &self,
356 formats: &Vec<Format>,
357 from_snapshot: bool,
358 ) -> (Vec<Field>, Vec<Format>) {
359 let mut fields = Vec::new();
360 let need_format = !(formats.is_empty() || formats.len() == 1);
361 let mut new_formats = formats.clone();
362 let stream_chunk_row_indices_iter = if from_snapshot {
363 self.stream_chunk_row_indices.iter().chain(None)
364 } else {
365 self.stream_chunk_row_indices
366 .iter()
367 .chain(Some(&self.op_index))
368 };
369 for index in stream_chunk_row_indices_iter {
370 fields.push(self.row_fields[*index].clone());
371 if need_format && !self.row_output_col_indices.contains(index) {
372 new_formats.insert(*index, Format::Text);
373 }
374 }
375 (fields, new_formats)
376 }
377}
378
379pub struct SubscriptionCursor {
380 cursor_name: String,
381 subscription: Arc<SubscriptionCatalog>,
382 dependent_table_id: TableId,
383 cursor_need_drop_time: Instant,
384 state: State,
385 fields_manager: FieldsManager,
388 cursor_metrics: Arc<CursorMetrics>,
389 last_fetch: Instant,
390 seek_pk_row: Option<Row>,
391}
392
393impl SubscriptionCursor {
394 pub async fn new(
395 cursor_name: String,
396 start_timestamp: Option<u64>,
397 subscription: Arc<SubscriptionCatalog>,
398 dependent_table_id: TableId,
399 handler_args: &HandlerArgs,
400 cursor_metrics: Arc<CursorMetrics>,
401 ) -> Result<Self> {
402 let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp {
403 let table_catalog = handler_args.session.get_table_by_id(&dependent_table_id)?;
404 (
405 State::InitLogStoreQuery {
406 seek_timestamp: start_timestamp,
407 expected_timestamp: None,
408 },
409 FieldsManager::new(&table_catalog),
410 )
411 } else {
412 let (chunk_stream, init_query_timer, table_catalog) =
417 Self::initiate_query(None, &dependent_table_id, handler_args.clone(), None).await?;
418 let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else(
419 || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()),
420 )? {
421 ReadSnapshot::FrontendPinned { snapshot, .. } => {
422 snapshot
423 .version()
424 .state_table_info
425 .info()
426 .get(&dependent_table_id)
427 .ok_or_else(|| {
428 anyhow!("dependent_table_id {dependent_table_id} not exists")
429 })?
430 .committed_epoch
431 }
432 ReadSnapshot::Other(_) => {
433 return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into());
434 }
435 ReadSnapshot::ReadUncommitted => {
436 return Err(ErrorCode::InternalError(
437 "Fetch Cursor don't support read uncommitted".to_owned(),
438 )
439 .into());
440 }
441 };
442 let start_timestamp = pinned_epoch;
443
444 (
445 State::Fetch {
446 from_snapshot: true,
447 rw_timestamp: start_timestamp,
448 chunk_stream,
449 remaining_rows: VecDeque::new(),
450 expected_timestamp: None,
451 init_query_timer,
452 },
453 FieldsManager::new(&table_catalog),
454 )
455 };
456
457 let cursor_need_drop_time =
458 Instant::now() + Duration::from_secs(subscription.retention_seconds);
459 Ok(Self {
460 cursor_name,
461 subscription,
462 dependent_table_id,
463 cursor_need_drop_time,
464 state,
465 fields_manager,
466 cursor_metrics,
467 last_fetch: Instant::now(),
468 seek_pk_row: None,
469 })
470 }
471
472 async fn next_row(
473 &mut self,
474 handler_args: &HandlerArgs,
475 formats: &Vec<Format>,
476 ) -> Result<Option<Row>> {
477 loop {
478 match &mut self.state {
479 State::InitLogStoreQuery {
480 seek_timestamp,
481 expected_timestamp,
482 } => {
483 let from_snapshot = false;
484
485 match Self::get_next_rw_timestamp(
487 *seek_timestamp,
488 &self.dependent_table_id,
489 *expected_timestamp,
490 handler_args.clone(),
491 &self.subscription,
492 ) {
493 Ok((Some(rw_timestamp), expected_timestamp)) => {
494 let (mut chunk_stream, init_query_timer, catalog) =
495 Self::initiate_query(
496 Some(rw_timestamp),
497 &self.dependent_table_id,
498 handler_args.clone(),
499 None,
500 )
501 .await?;
502 let table_schema_changed =
503 self.fields_manager.try_refill_fields(&catalog);
504 let (fields, formats) = self
505 .fields_manager
506 .get_row_stream_fields_and_formats(formats, from_snapshot);
507 chunk_stream.init_row_stream(
508 &fields,
509 &formats,
510 handler_args.session.clone(),
511 );
512
513 self.cursor_need_drop_time = Instant::now()
514 + Duration::from_secs(self.subscription.retention_seconds);
515 let mut remaining_rows = VecDeque::new();
516 Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
517 .await?;
518 self.state = State::Fetch {
520 from_snapshot,
521 rw_timestamp,
522 chunk_stream,
523 remaining_rows,
524 expected_timestamp,
525 init_query_timer,
526 };
527 if table_schema_changed {
528 return Ok(None);
529 }
530 }
531 Ok((None, _)) => return Ok(None),
532 Err(e) => {
533 self.state = State::Invalid;
534 return Err(e);
535 }
536 }
537 }
538 State::Fetch {
539 from_snapshot,
540 rw_timestamp,
541 chunk_stream,
542 remaining_rows,
543 expected_timestamp,
544 init_query_timer,
545 } => {
546 let session_data = StaticSessionData {
547 timezone: handler_args.session.config().timezone(),
548 };
549 let from_snapshot = *from_snapshot;
550 let rw_timestamp = *rw_timestamp;
551
552 Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
554
555 if let Some(row) = remaining_rows.pop_front() {
556 if from_snapshot {
558 return Ok(Some(Self::build_row(
559 row.take(),
560 None,
561 formats,
562 &session_data,
563 )?));
564 } else {
565 return Ok(Some(Self::build_row(
566 row.take(),
567 Some(rw_timestamp),
568 formats,
569 &session_data,
570 )?));
571 }
572 } else {
573 self.cursor_metrics
574 .subscription_cursor_query_duration
575 .with_label_values(&[&self.subscription.name])
576 .observe(init_query_timer.elapsed().as_millis() as _);
577 if let Some(expected_timestamp) = expected_timestamp {
579 self.state = State::InitLogStoreQuery {
580 seek_timestamp: *expected_timestamp,
581 expected_timestamp: Some(*expected_timestamp),
582 };
583 } else {
584 self.state = State::InitLogStoreQuery {
585 seek_timestamp: rw_timestamp + 1,
586 expected_timestamp: None,
587 };
588 }
589 }
590 }
591 State::Invalid => {
592 return Err(ErrorCode::InternalError(
594 "Cursor is in invalid state. Please close and re-create the cursor."
595 .to_owned(),
596 )
597 .into());
598 }
599 }
600 }
601 }
602
603 pub async fn next(
604 &mut self,
605 count: u32,
606 handler_args: HandlerArgs,
607 formats: &Vec<Format>,
608 timeout_seconds: Option<u64>,
609 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
610 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
611 if Instant::now() > self.cursor_need_drop_time {
612 return Err(ErrorCode::InternalError(
613 "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
614 )
615 .into());
616 }
617
618 let session = &handler_args.session;
619 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
620 let mut cur = 0;
621 if let State::Fetch {
622 from_snapshot,
623 chunk_stream,
624 ..
625 } = &mut self.state
626 {
627 let (fields, fotmats) = self
628 .fields_manager
629 .get_row_stream_fields_and_formats(formats, *from_snapshot);
630 chunk_stream.init_row_stream(&fields, &fotmats, session.clone());
631 }
632 while cur < count {
633 let fetch_cursor_timer = Instant::now();
634 let row = self.next_row(&handler_args, formats).await?;
635 self.cursor_metrics
636 .subscription_cursor_fetch_duration
637 .with_label_values(&[&self.subscription.name])
638 .observe(fetch_cursor_timer.elapsed().as_millis() as _);
639 match row {
640 Some(row) => {
641 cur += 1;
642 ans.push(row);
643 }
644 None => {
645 let timeout_seconds = timeout_seconds.unwrap_or(0);
646 if cur > 0 || timeout_seconds == 0 {
647 break;
648 }
649 match tokio::time::timeout(
652 Duration::from_secs(timeout_seconds),
653 session
654 .env
655 .hummock_snapshot_manager()
656 .wait_table_change_log_notification(self.dependent_table_id.table_id()),
657 )
658 .await
659 {
660 Ok(result) => result?,
661 Err(_) => {
662 tracing::debug!("Cursor wait next epoch timeout");
663 break;
664 }
665 }
666 }
667 }
668 if let Some(timeout_instant) = timeout_instant
670 && Instant::now() > timeout_instant
671 {
672 break;
673 }
674 }
675 self.last_fetch = Instant::now();
676 let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans);
677 if let Some(seek_pk_row) = seek_pk_row {
678 self.seek_pk_row = Some(seek_pk_row);
679 }
680 let desc = self
681 .fields_manager
682 .get_output_fields()
683 .iter()
684 .map(to_pg_field)
685 .collect();
686
687 Ok((rows, desc))
688 }
689
690 fn get_next_rw_timestamp(
691 seek_timestamp: u64,
692 table_id: &TableId,
693 expected_timestamp: Option<u64>,
694 handler_args: HandlerArgs,
695 dependent_subscription: &SubscriptionCatalog,
696 ) -> Result<(Option<u64>, Option<u64>)> {
697 let session = handler_args.session;
698 session.get_subscription_by_schema_id_name(
700 dependent_subscription.schema_id,
701 &dependent_subscription.name,
702 )?;
703
704 let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?;
706 if let Some(expected_timestamp) = expected_timestamp
707 && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
708 {
709 return Err(ErrorCode::CatalogError(
710 format!(
711 " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
712 convert_logstore_u64_to_unix_millis(expected_timestamp)
713 )
714 .into(),
715 )
716 .into());
717 }
718 Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
719 }
720
721 pub fn gen_batch_plan_result(&self, handler_args: HandlerArgs) -> Result<BatchQueryPlanResult> {
722 match self.state {
723 State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
725 Some(0),
726 &self.dependent_table_id,
727 handler_args,
728 self.seek_pk_row.clone(),
729 ),
730 State::Fetch {
731 from_snapshot,
732 rw_timestamp,
733 ..
734 } => {
735 if from_snapshot {
736 Self::init_batch_plan_for_subscription_cursor(
737 None,
738 &self.dependent_table_id,
739 handler_args,
740 self.seek_pk_row.clone(),
741 )
742 } else {
743 Self::init_batch_plan_for_subscription_cursor(
744 Some(rw_timestamp),
745 &self.dependent_table_id,
746 handler_args,
747 self.seek_pk_row.clone(),
748 )
749 }
750 }
751 State::Invalid => Err(ErrorCode::InternalError(
752 "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
753 )
754 .into()),
755 }
756 }
757
758 fn init_batch_plan_for_subscription_cursor(
759 rw_timestamp: Option<u64>,
760 dependent_table_id: &TableId,
761 handler_args: HandlerArgs,
762 seek_pk_row: Option<Row>,
763 ) -> Result<BatchQueryPlanResult> {
764 let session = handler_args.clone().session;
765 let table_catalog = session.get_table_by_id(dependent_table_id)?;
766 let context = OptimizerContext::from_handler_args(handler_args.clone());
767 let version_id = {
768 let version = session.env.hummock_snapshot_manager.acquire();
769 let version = version.version();
770 if !version
771 .state_table_info
772 .info()
773 .contains_key(dependent_table_id)
774 {
775 return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
776 }
777 version.id
778 };
779 Self::create_batch_plan_for_cursor(
780 table_catalog,
781 &session,
782 context.into(),
783 rw_timestamp.map(|rw_timestamp| (rw_timestamp, rw_timestamp)),
784 version_id,
785 seek_pk_row,
786 )
787 }
788
789 async fn initiate_query(
790 rw_timestamp: Option<u64>,
791 dependent_table_id: &TableId,
792 handler_args: HandlerArgs,
793 seek_pk_row: Option<Row>,
794 ) -> Result<(CursorDataChunkStream, Instant, Arc<TableCatalog>)> {
795 let init_query_timer = Instant::now();
796 let session = handler_args.clone().session;
797 let table_catalog = session.get_table_by_id(dependent_table_id)?;
798 let plan_result = Self::init_batch_plan_for_subscription_cursor(
799 rw_timestamp,
800 dependent_table_id,
801 handler_args.clone(),
802 seek_pk_row,
803 )?;
804 let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
805 let (chunk_stream, _) =
806 create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
807 Ok((chunk_stream, init_query_timer, table_catalog))
808 }
809
810 async fn try_refill_remaining_rows(
811 chunk_stream: &mut CursorDataChunkStream,
812 remaining_rows: &mut VecDeque<Row>,
813 ) -> Result<()> {
814 if remaining_rows.is_empty()
815 && let Some(row_set) = chunk_stream.next().await?
816 {
817 remaining_rows.extend(row_set?);
818 }
819 Ok(())
820 }
821
822 pub fn build_row(
823 mut row: Vec<Option<Bytes>>,
824 rw_timestamp: Option<u64>,
825 formats: &Vec<Format>,
826 session_data: &StaticSessionData,
827 ) -> Result<Row> {
828 let row_len = row.len();
829 let new_row = if let Some(rw_timestamp) = rw_timestamp {
830 let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
831 let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
832 let rw_timestamp = pg_value_format(
833 &DataType::Int64,
834 risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
835 *rw_timestamp_formats,
836 session_data,
837 )?;
838 vec![Some(rw_timestamp)]
839 } else {
840 let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
841 let op = pg_value_format(
842 &DataType::Varchar,
843 risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
844 *op_formats,
845 session_data,
846 )?;
847 vec![Some(op), None]
848 };
849 row.extend(new_row);
850 Ok(Row::new(row))
851 }
852
853 pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
854 if from_snapshot {
855 descs.push(Field::with_name(DataType::Varchar, "op"));
856 }
857 descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
858 descs
859 }
860
861 pub fn create_batch_plan_for_cursor(
862 table_catalog: Arc<TableCatalog>,
863 session: &SessionImpl,
864 context: OptimizerContextRef,
865 epoch_range: Option<(u64, u64)>,
866 version_id: HummockVersionId,
867 seek_pk_rows: Option<Row>,
868 ) -> Result<BatchQueryPlanResult> {
869 let output_col_idx = table_catalog
871 .columns
872 .iter()
873 .enumerate()
874 .filter_map(|(index, v)| {
875 if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
876 Some(index)
877 } else {
878 None
879 }
880 })
881 .collect::<Vec<_>>();
882 let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64;
883 let pks = table_catalog.pk();
884 let pks = pks
885 .iter()
886 .map(|f| {
887 let pk = table_catalog.columns.get(f.column_index).unwrap();
888 (pk.data_type(), f.column_index)
889 })
890 .collect_vec();
891 let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
892 let mut pk_rows = vec![];
893 let mut values = vec![];
894 for (seek_pk, (data_type, column_index)) in
895 seek_pk_rows.take().into_iter().zip_eq_fast(pks.into_iter())
896 {
897 if let Some(seek_pk) = seek_pk {
898 pk_rows.push(InputRef {
899 index: column_index,
900 data_type: data_type.clone(),
901 });
902 let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
903 let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
904 values.push((Some(value_data), data_type.clone()));
905 }
906 }
907 if pk_rows.is_empty() {
908 (None, None)
909 } else {
910 let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
911 let right_data = ScalarImpl::Struct(StructValue::new(right_data));
912 let right_type = DataType::Struct(StructType::unnamed(right_types));
913 let left = FunctionCall::new_unchecked(
914 ExprType::Row,
915 pk_rows.into_iter().map(|pk| pk.into()).collect(),
916 right_type.clone(),
917 );
918 let right = Literal::new(Some(right_data), right_type);
919 let (scan, predicate) = Condition {
920 conjunctions: vec![
921 FunctionCall::new(ExprType::GreaterThan, vec![left.into(), right.into()])?
922 .into(),
923 ],
924 }
925 .split_to_scan_ranges(table_catalog.table_desc().into(), max_split_range_gap)?;
926 if scan.len() > 1 {
927 return Err(ErrorCode::InternalError(
928 "Seek pk row should only generate one scan range".to_owned(),
929 )
930 .into());
931 }
932 (scan.first().cloned(), Some(predicate))
933 }
934 } else {
935 (None, None)
936 };
937
938 let (seq_scan, out_fields, out_names) = if let Some(epoch_range) = epoch_range {
939 let core = generic::LogScan::new(
940 table_catalog.name.clone(),
941 output_col_idx,
942 Rc::new(table_catalog.table_desc()),
943 context,
944 epoch_range,
945 version_id,
946 );
947 let batch_log_seq_scan = BatchLogSeqScan::new(core, scan);
948 let out_fields = batch_log_seq_scan.core().out_fields();
949 let out_names = batch_log_seq_scan.core().column_names();
950 (batch_log_seq_scan.into(), out_fields, out_names)
951 } else {
952 let core = generic::TableScan::new(
953 table_catalog.name.clone(),
954 output_col_idx,
955 table_catalog.clone(),
956 vec![],
957 context,
958 Condition {
959 conjunctions: vec![],
960 },
961 None,
962 Cardinality::default(),
963 );
964 let scans = match scan {
965 Some(scan) => vec![scan],
966 None => vec![],
967 };
968 let table_scan = BatchSeqScan::new(core, scans, None);
969 let out_fields = table_scan.core().out_fields();
970 let out_names = table_scan.core().column_names();
971 (table_scan.into(), out_fields, out_names)
972 };
973
974 let plan = if let Some(predicate) = predicate
975 && !predicate.always_true()
976 {
977 BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into()
978 } else {
979 seq_scan
980 };
981
982 let order = Order::new(table_catalog.pk().to_vec());
984
985 let plan_root = PlanRoot::new_with_batch_plan(
987 plan,
988 RequiredDist::single(),
989 order,
990 out_fields,
991 out_names,
992 );
993 let schema = plan_root.schema().clone();
994 let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
995 QueryMode::Auto => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
996 QueryMode::Local => (plan_root.gen_batch_local_plan()?, QueryMode::Local),
997 QueryMode::Distributed => (
998 plan_root.gen_batch_distributed_plan()?,
999 QueryMode::Distributed,
1000 ),
1001 };
1002 Ok(BatchQueryPlanResult {
1003 plan: batch_log_seq_scan,
1004 query_mode,
1005 schema,
1006 stmt_type: StatementType::SELECT,
1007 dependent_relations: table_catalog.dependent_relations.clone(),
1008 read_storage_tables: HashSet::from_iter([table_catalog.id]),
1009 })
1010 }
1011
1012 pub fn idle_duration(&self) -> Duration {
1013 self.last_fetch.elapsed()
1014 }
1015
1016 pub fn subscription_name(&self) -> &str {
1017 self.subscription.name.as_str()
1018 }
1019
1020 pub fn state_info_string(&self) -> String {
1021 format!("{}", self.state)
1022 }
1023}
1024
1025pub struct CursorManager {
1026 cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
1027 cursor_metrics: Arc<CursorMetrics>,
1028}
1029
1030impl CursorManager {
1031 pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
1032 Self {
1033 cursor_map: tokio::sync::Mutex::new(HashMap::new()),
1034 cursor_metrics,
1035 }
1036 }
1037
1038 pub async fn add_subscription_cursor(
1039 &self,
1040 cursor_name: String,
1041 start_timestamp: Option<u64>,
1042 dependent_table_id: TableId,
1043 subscription: Arc<SubscriptionCatalog>,
1044 handler_args: &HandlerArgs,
1045 ) -> Result<()> {
1046 let create_cursor_timer = Instant::now();
1047 let subscription_name = subscription.name.clone();
1048 let cursor = SubscriptionCursor::new(
1049 cursor_name,
1050 start_timestamp,
1051 subscription,
1052 dependent_table_id,
1053 handler_args,
1054 self.cursor_metrics.clone(),
1055 )
1056 .await?;
1057 let mut cursor_map = self.cursor_map.lock().await;
1058 self.cursor_metrics
1059 .subscription_cursor_declare_duration
1060 .with_label_values(&[&subscription_name])
1061 .observe(create_cursor_timer.elapsed().as_millis() as _);
1062
1063 cursor_map.retain(|_, v| {
1064 if let Cursor::Subscription(cursor) = v
1065 && matches!(cursor.state, State::Invalid)
1066 {
1067 false
1068 } else {
1069 true
1070 }
1071 });
1072
1073 cursor_map
1074 .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
1075 .map_err(|error| {
1076 ErrorCode::CatalogError(
1077 format!("cursor `{}` already exists", error.entry.key()).into(),
1078 )
1079 })?;
1080 Ok(())
1081 }
1082
1083 pub async fn add_query_cursor(
1084 &self,
1085 cursor_name: String,
1086 chunk_stream: CursorDataChunkStream,
1087 fields: Vec<Field>,
1088 ) -> Result<()> {
1089 let cursor = QueryCursor::new(chunk_stream, fields)?;
1090 self.cursor_map
1091 .lock()
1092 .await
1093 .try_insert(cursor_name, Cursor::Query(cursor))
1094 .map_err(|error| {
1095 ErrorCode::CatalogError(
1096 format!("cursor `{}` already exists", error.entry.key()).into(),
1097 )
1098 })?;
1099
1100 Ok(())
1101 }
1102
1103 pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
1104 self.cursor_map
1105 .lock()
1106 .await
1107 .remove(cursor_name)
1108 .ok_or_else(|| {
1109 ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
1110 })?;
1111 Ok(())
1112 }
1113
1114 pub async fn remove_all_cursor(&self) {
1115 self.cursor_map.lock().await.clear();
1116 }
1117
1118 pub async fn remove_all_query_cursor(&self) {
1119 self.cursor_map
1120 .lock()
1121 .await
1122 .retain(|_, v| matches!(v, Cursor::Subscription(_)));
1123 }
1124
1125 pub async fn get_rows_with_cursor(
1126 &self,
1127 cursor_name: &str,
1128 count: u32,
1129 handler_args: HandlerArgs,
1130 formats: &Vec<Format>,
1131 timeout_seconds: Option<u64>,
1132 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
1133 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1134 cursor
1135 .next(count, handler_args, formats, timeout_seconds)
1136 .await
1137 } else {
1138 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1139 }
1140 }
1141
1142 pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
1143 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1144 Ok(cursor.get_fields())
1145 } else {
1146 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1147 }
1148 }
1149
1150 pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
1151 let mut subsription_cursor_nums = 0;
1152 let mut invalid_subsription_cursor_nums = 0;
1153 let mut subscription_cursor_last_fetch_duration = HashMap::new();
1154 for (_, cursor) in self.cursor_map.lock().await.iter() {
1155 if let Cursor::Subscription(subscription_cursor) = cursor {
1156 subsription_cursor_nums += 1;
1157 if matches!(subscription_cursor.state, State::Invalid) {
1158 invalid_subsription_cursor_nums += 1;
1159 } else {
1160 let fetch_duration =
1161 subscription_cursor.last_fetch.elapsed().as_millis() as f64;
1162 subscription_cursor_last_fetch_duration.insert(
1163 subscription_cursor.subscription.name.clone(),
1164 fetch_duration,
1165 );
1166 }
1167 }
1168 }
1169 PeriodicCursorMetrics {
1170 subsription_cursor_nums,
1171 invalid_subsription_cursor_nums,
1172 subscription_cursor_last_fetch_duration,
1173 }
1174 }
1175
1176 pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1177 self.cursor_map
1178 .lock()
1179 .await
1180 .iter()
1181 .for_each(|(cursor_name, cursor)| {
1182 if let Cursor::Query(cursor) = cursor {
1183 f(cursor_name, cursor)
1184 }
1185 });
1186 }
1187
1188 pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1189 self.cursor_map
1190 .lock()
1191 .await
1192 .iter()
1193 .for_each(|(cursor_name, cursor)| {
1194 if let Cursor::Subscription(cursor) = cursor {
1195 f(cursor_name, cursor)
1196 }
1197 });
1198 }
1199
1200 pub async fn gen_batch_plan_with_subscription_cursor(
1201 &self,
1202 cursor_name: &str,
1203 handler_args: HandlerArgs,
1204 ) -> Result<BatchQueryPlanResult> {
1205 match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1206 ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1207 })? {
1208 Cursor::Subscription(cursor) => {
1209 cursor.gen_batch_plan_result(handler_args.clone())
1210 },
1211 Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1212 }
1213 }
1214}