1use core::mem;
16use core::time::Duration;
17use std::collections::{HashMap, HashSet, VecDeque};
18use std::fmt::{Display, Formatter};
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use futures::StreamExt;
25use itertools::Itertools;
26use pgwire::pg_field_descriptor::PgFieldDescriptor;
27use pgwire::pg_response::StatementType;
28use pgwire::types::{Format, Row};
29use risingwave_common::catalog::{ColumnCatalog, Field};
30use risingwave_common::error::BoxedError;
31use risingwave_common::session_config::QueryMode;
32use risingwave_common::types::{DataType, ScalarImpl, StructType, StructValue};
33use risingwave_common::util::iter_util::ZipEqFast;
34use risingwave_hummock_sdk::HummockVersionId;
35
36use super::SessionImpl;
37use crate::catalog::TableId;
38use crate::catalog::subscription_catalog::SubscriptionCatalog;
39use crate::error::{ErrorCode, Result};
40use crate::expr::{ExprType, FunctionCall, InputRef, Literal};
41use crate::handler::HandlerArgs;
42use crate::handler::declare_cursor::create_chunk_stream_for_cursor;
43use crate::handler::query::{RwBatchQueryPlanResult, gen_batch_plan_fragmenter};
44use crate::handler::util::{
45 DataChunkToRowSetAdapter, StaticSessionData, convert_logstore_u64_to_unix_millis,
46 pg_value_format, to_pg_field,
47};
48use crate::monitor::{CursorMetrics, PeriodicCursorMetrics};
49use crate::optimizer::PlanRoot;
50use crate::optimizer::plan_node::{BatchFilter, BatchLogSeqScan, BatchSeqScan, generic};
51use crate::optimizer::property::{Order, RequiredDist};
52use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot};
53use crate::utils::Condition;
54use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, TableCatalog};
55
56pub enum CursorDataChunkStream {
57 LocalDataChunk(Option<LocalQueryStream>),
58 DistributedDataChunk(Option<DistributedQueryStream>),
59 PgResponse(PgResponseStream),
60}
61
62impl CursorDataChunkStream {
63 pub fn init_row_stream(
64 &mut self,
65 fields: &Vec<Field>,
66 formats: &Vec<Format>,
67 session: Arc<SessionImpl>,
68 ) {
69 let columns_type = fields.iter().map(|f| f.data_type()).collect();
70 match self {
71 CursorDataChunkStream::LocalDataChunk(data_chunk) => {
72 let data_chunk = mem::take(data_chunk).unwrap();
73 let row_stream = PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
74 data_chunk,
75 columns_type,
76 formats.clone(),
77 session,
78 ));
79 *self = CursorDataChunkStream::PgResponse(row_stream);
80 }
81 CursorDataChunkStream::DistributedDataChunk(data_chunk) => {
82 let data_chunk = mem::take(data_chunk).unwrap();
83 let row_stream = PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
84 data_chunk,
85 columns_type,
86 formats.clone(),
87 session,
88 ));
89 *self = CursorDataChunkStream::PgResponse(row_stream);
90 }
91 _ => {}
92 }
93 }
94
95 pub async fn next(&mut self) -> Result<Option<std::result::Result<Vec<Row>, BoxedError>>> {
96 match self {
97 CursorDataChunkStream::PgResponse(row_stream) => Ok(row_stream.next().await),
98 _ => Err(ErrorCode::InternalError(
99 "Only 'CursorDataChunkStream' can call next and return rows".to_owned(),
100 )
101 .into()),
102 }
103 }
104}
105pub enum Cursor {
106 Subscription(SubscriptionCursor),
107 Query(QueryCursor),
108}
109impl Cursor {
110 pub async fn next(
111 &mut self,
112 count: u32,
113 handler_args: HandlerArgs,
114 formats: &Vec<Format>,
115 timeout_seconds: Option<u64>,
116 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
117 match self {
118 Cursor::Subscription(cursor) => cursor
119 .next(count, handler_args, formats, timeout_seconds)
120 .await
121 .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()),
122 Cursor::Query(cursor) => {
123 cursor
124 .next(count, formats, handler_args, timeout_seconds)
125 .await
126 }
127 }
128 }
129
130 pub fn get_fields(&mut self) -> Vec<Field> {
131 match self {
132 Cursor::Subscription(cursor) => cursor.fields_manager.get_output_fields(),
133 Cursor::Query(cursor) => cursor.fields.clone(),
134 }
135 }
136}
137
138pub struct QueryCursor {
139 chunk_stream: CursorDataChunkStream,
140 fields: Vec<Field>,
141 remaining_rows: VecDeque<Row>,
142}
143
144impl QueryCursor {
145 pub fn new(chunk_stream: CursorDataChunkStream, fields: Vec<Field>) -> Result<Self> {
146 Ok(Self {
147 chunk_stream,
148 fields,
149 remaining_rows: VecDeque::<Row>::new(),
150 })
151 }
152
153 pub async fn next_once(&mut self) -> Result<Option<Row>> {
154 while self.remaining_rows.is_empty() {
155 let rows = self.chunk_stream.next().await?;
156 let rows = match rows {
157 None => return Ok(None),
158 Some(row) => row?,
159 };
160 self.remaining_rows = rows.into_iter().collect();
161 }
162 let row = self.remaining_rows.pop_front().unwrap();
163 Ok(Some(row))
164 }
165
166 pub async fn next(
167 &mut self,
168 count: u32,
169 formats: &Vec<Format>,
170 handler_args: HandlerArgs,
171 timeout_seconds: Option<u64>,
172 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
173 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
176 let session = handler_args.session;
177 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
178 let mut cur = 0;
179 let desc = self.fields.iter().map(to_pg_field).collect();
180 self.chunk_stream
181 .init_row_stream(&self.fields, formats, session);
182 while cur < count
183 && let Some(row) = self.next_once().await?
184 {
185 cur += 1;
186 ans.push(row);
187 if let Some(timeout_instant) = timeout_instant
188 && Instant::now() > timeout_instant
189 {
190 break;
191 }
192 }
193 Ok((ans, desc))
194 }
195}
196
197enum State {
198 InitLogStoreQuery {
199 seek_timestamp: u64,
201
202 expected_timestamp: Option<u64>,
204 },
205 Fetch {
206 from_snapshot: bool,
210
211 rw_timestamp: u64,
213
214 chunk_stream: CursorDataChunkStream,
217
218 remaining_rows: VecDeque<Row>,
220
221 expected_timestamp: Option<u64>,
222
223 init_query_timer: Instant,
224 },
225 Invalid,
226}
227
228impl Display for State {
229 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
230 match self {
231 State::InitLogStoreQuery {
232 seek_timestamp,
233 expected_timestamp,
234 } => write!(
235 f,
236 "InitLogStoreQuery {{ seek_timestamp: {}, expected_timestamp: {:?} }}",
237 seek_timestamp, expected_timestamp
238 ),
239 State::Fetch {
240 from_snapshot,
241 rw_timestamp,
242 expected_timestamp,
243 remaining_rows,
244 init_query_timer,
245 ..
246 } => write!(
247 f,
248 "Fetch {{ from_snapshot: {}, rw_timestamp: {}, expected_timestamp: {:?}, cached rows: {}, query init at {}ms before }}",
249 from_snapshot,
250 rw_timestamp,
251 expected_timestamp,
252 remaining_rows.len(),
253 init_query_timer.elapsed().as_millis()
254 ),
255 State::Invalid => write!(f, "Invalid"),
256 }
257 }
258}
259
260struct FieldsManager {
261 columns_catalog: Vec<ColumnCatalog>,
262 row_fields: Vec<Field>,
264 row_output_col_indices: Vec<usize>,
266 row_pk_indices: Vec<usize>,
268 stream_chunk_row_indices: Vec<usize>,
270 op_index: usize,
272}
273
274impl FieldsManager {
275 pub fn new(catalog: &TableCatalog) -> Self {
279 let mut row_fields = Vec::new();
280 let mut row_output_col_indices = Vec::new();
281 let mut row_pk_indices = Vec::new();
282 let mut stream_chunk_row_indices = Vec::new();
283 let mut output_idx = 0_usize;
284 let pk_set: HashSet<usize> = catalog
285 .pk
286 .iter()
287 .map(|col_order| col_order.column_index)
288 .collect();
289
290 for (index, v) in catalog.columns.iter().enumerate() {
291 if pk_set.contains(&index) {
292 row_pk_indices.push(output_idx);
293 stream_chunk_row_indices.push(output_idx);
294 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
295 if !v.is_hidden {
296 row_output_col_indices.push(output_idx);
297 }
298 output_idx += 1;
299 } else if !v.is_hidden {
300 row_output_col_indices.push(output_idx);
301 stream_chunk_row_indices.push(output_idx);
302 row_fields.push(Field::with_name(v.data_type().clone(), v.name()));
303 output_idx += 1;
304 }
305 }
306
307 row_fields.push(Field::with_name(DataType::Varchar, "op".to_owned()));
308 row_output_col_indices.push(output_idx);
309 let op_index = output_idx;
310 output_idx += 1;
311 row_fields.push(Field::with_name(DataType::Int64, "rw_timestamp".to_owned()));
312 row_output_col_indices.push(output_idx);
313 Self {
314 columns_catalog: catalog.columns.clone(),
315 row_fields,
316 row_output_col_indices,
317 row_pk_indices,
318 stream_chunk_row_indices,
319 op_index,
320 }
321 }
322
323 pub fn try_refill_fields(&mut self, catalog: &TableCatalog) -> bool {
324 if self.columns_catalog.ne(&catalog.columns) {
325 *self = Self::new(catalog);
326 true
327 } else {
328 false
329 }
330 }
331
332 pub fn process_output_desc_row(&self, mut rows: Vec<Row>) -> (Vec<Row>, Option<Row>) {
333 let last_row = rows.last_mut().map(|row| {
334 let mut row = row.clone();
335 row.project(&self.row_pk_indices)
336 });
337 let rows = rows
338 .iter_mut()
339 .map(|row| row.project(&self.row_output_col_indices))
340 .collect();
341 (rows, last_row)
342 }
343
344 pub fn get_output_fields(&self) -> Vec<Field> {
345 self.row_output_col_indices
346 .iter()
347 .map(|&idx| self.row_fields[idx].clone())
348 .collect()
349 }
350
351 pub fn get_row_stream_fields_and_formats(
354 &self,
355 formats: &Vec<Format>,
356 from_snapshot: bool,
357 ) -> (Vec<Field>, Vec<Format>) {
358 let mut fields = Vec::new();
359 let need_format = !(formats.is_empty() || formats.len() == 1);
360 let mut new_formats = formats.clone();
361 let stream_chunk_row_indices_iter = if from_snapshot {
362 self.stream_chunk_row_indices.iter().chain(None)
363 } else {
364 self.stream_chunk_row_indices
365 .iter()
366 .chain(Some(&self.op_index))
367 };
368 for index in stream_chunk_row_indices_iter {
369 fields.push(self.row_fields[*index].clone());
370 if need_format && !self.row_output_col_indices.contains(index) {
371 new_formats.insert(*index, Format::Text);
372 }
373 }
374 (fields, new_formats)
375 }
376}
377
378pub struct SubscriptionCursor {
379 cursor_name: String,
380 subscription: Arc<SubscriptionCatalog>,
381 dependent_table_id: TableId,
382 cursor_need_drop_time: Instant,
383 state: State,
384 fields_manager: FieldsManager,
387 cursor_metrics: Arc<CursorMetrics>,
388 last_fetch: Instant,
389 seek_pk_row: Option<Row>,
390}
391
392impl SubscriptionCursor {
393 pub async fn new(
394 cursor_name: String,
395 start_timestamp: Option<u64>,
396 subscription: Arc<SubscriptionCatalog>,
397 dependent_table_id: TableId,
398 handler_args: &HandlerArgs,
399 cursor_metrics: Arc<CursorMetrics>,
400 ) -> Result<Self> {
401 let (state, fields_manager) = if let Some(start_timestamp) = start_timestamp {
402 let table_catalog = handler_args.session.get_table_by_id(dependent_table_id)?;
403 (
404 State::InitLogStoreQuery {
405 seek_timestamp: start_timestamp,
406 expected_timestamp: None,
407 },
408 FieldsManager::new(&table_catalog),
409 )
410 } else {
411 let (chunk_stream, init_query_timer, table_catalog) =
416 Self::initiate_query(None, dependent_table_id, handler_args.clone(), None).await?;
417 let pinned_epoch = match handler_args.session.get_pinned_snapshot().ok_or_else(
418 || ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_owned()),
419 )? {
420 ReadSnapshot::FrontendPinned { snapshot, .. } => {
421 snapshot
422 .version()
423 .state_table_info
424 .info()
425 .get(&dependent_table_id)
426 .ok_or_else(|| {
427 anyhow!("dependent_table_id {dependent_table_id} not exists")
428 })?
429 .committed_epoch
430 }
431 ReadSnapshot::Other(_) => {
432 return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_owned()).into());
433 }
434 ReadSnapshot::ReadUncommitted => {
435 return Err(ErrorCode::InternalError(
436 "Fetch Cursor don't support read uncommitted".to_owned(),
437 )
438 .into());
439 }
440 };
441 let start_timestamp = pinned_epoch;
442
443 (
444 State::Fetch {
445 from_snapshot: true,
446 rw_timestamp: start_timestamp,
447 chunk_stream,
448 remaining_rows: VecDeque::new(),
449 expected_timestamp: None,
450 init_query_timer,
451 },
452 FieldsManager::new(&table_catalog),
453 )
454 };
455
456 let cursor_need_drop_time =
457 Instant::now() + Duration::from_secs(subscription.retention_seconds);
458 Ok(Self {
459 cursor_name,
460 subscription,
461 dependent_table_id,
462 cursor_need_drop_time,
463 state,
464 fields_manager,
465 cursor_metrics,
466 last_fetch: Instant::now(),
467 seek_pk_row: None,
468 })
469 }
470
471 async fn next_row(
472 &mut self,
473 handler_args: &HandlerArgs,
474 formats: &Vec<Format>,
475 ) -> Result<Option<Row>> {
476 loop {
477 match &mut self.state {
478 State::InitLogStoreQuery {
479 seek_timestamp,
480 expected_timestamp,
481 } => {
482 let from_snapshot = false;
483
484 match Self::get_next_rw_timestamp(
486 *seek_timestamp,
487 self.dependent_table_id,
488 *expected_timestamp,
489 handler_args.clone(),
490 &self.subscription,
491 ) {
492 Ok((Some(rw_timestamp), expected_timestamp)) => {
493 let (mut chunk_stream, init_query_timer, catalog) =
494 Self::initiate_query(
495 Some(rw_timestamp),
496 self.dependent_table_id,
497 handler_args.clone(),
498 None,
499 )
500 .await?;
501 let table_schema_changed =
502 self.fields_manager.try_refill_fields(&catalog);
503 let (fields, formats) = self
504 .fields_manager
505 .get_row_stream_fields_and_formats(formats, from_snapshot);
506 chunk_stream.init_row_stream(
507 &fields,
508 &formats,
509 handler_args.session.clone(),
510 );
511
512 self.cursor_need_drop_time = Instant::now()
513 + Duration::from_secs(self.subscription.retention_seconds);
514 let mut remaining_rows = VecDeque::new();
515 Self::try_refill_remaining_rows(&mut chunk_stream, &mut remaining_rows)
516 .await?;
517 self.state = State::Fetch {
519 from_snapshot,
520 rw_timestamp,
521 chunk_stream,
522 remaining_rows,
523 expected_timestamp,
524 init_query_timer,
525 };
526 if table_schema_changed {
527 return Ok(None);
528 }
529 }
530 Ok((None, _)) => return Ok(None),
531 Err(e) => {
532 self.state = State::Invalid;
533 return Err(e);
534 }
535 }
536 }
537 State::Fetch {
538 from_snapshot,
539 rw_timestamp,
540 chunk_stream,
541 remaining_rows,
542 expected_timestamp,
543 init_query_timer,
544 } => {
545 let session_data = StaticSessionData {
546 timezone: handler_args.session.config().timezone(),
547 };
548 let from_snapshot = *from_snapshot;
549 let rw_timestamp = *rw_timestamp;
550
551 Self::try_refill_remaining_rows(chunk_stream, remaining_rows).await?;
553
554 if let Some(row) = remaining_rows.pop_front() {
555 if from_snapshot {
557 return Ok(Some(Self::build_row(
558 row.take(),
559 None,
560 formats,
561 &session_data,
562 )?));
563 } else {
564 return Ok(Some(Self::build_row(
565 row.take(),
566 Some(rw_timestamp),
567 formats,
568 &session_data,
569 )?));
570 }
571 } else {
572 self.cursor_metrics
573 .subscription_cursor_query_duration
574 .with_label_values(&[&self.subscription.name])
575 .observe(init_query_timer.elapsed().as_millis() as _);
576 if let Some(expected_timestamp) = expected_timestamp {
578 self.state = State::InitLogStoreQuery {
579 seek_timestamp: *expected_timestamp,
580 expected_timestamp: Some(*expected_timestamp),
581 };
582 } else {
583 self.state = State::InitLogStoreQuery {
584 seek_timestamp: rw_timestamp + 1,
585 expected_timestamp: None,
586 };
587 }
588 }
589 }
590 State::Invalid => {
591 return Err(ErrorCode::InternalError(
593 "Cursor is in invalid state. Please close and re-create the cursor."
594 .to_owned(),
595 )
596 .into());
597 }
598 }
599 }
600 }
601
602 pub async fn next(
603 &mut self,
604 count: u32,
605 handler_args: HandlerArgs,
606 formats: &Vec<Format>,
607 timeout_seconds: Option<u64>,
608 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
609 let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s));
610 if Instant::now() > self.cursor_need_drop_time {
611 return Err(ErrorCode::InternalError(
612 "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_owned(),
613 )
614 .into());
615 }
616
617 let session = &handler_args.session;
618 let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize);
619 let mut cur = 0;
620 if let State::Fetch {
621 from_snapshot,
622 chunk_stream,
623 ..
624 } = &mut self.state
625 {
626 let (fields, fotmats) = self
627 .fields_manager
628 .get_row_stream_fields_and_formats(formats, *from_snapshot);
629 chunk_stream.init_row_stream(&fields, &fotmats, session.clone());
630 }
631 while cur < count {
632 let fetch_cursor_timer = Instant::now();
633 let row = self.next_row(&handler_args, formats).await?;
634 self.cursor_metrics
635 .subscription_cursor_fetch_duration
636 .with_label_values(&[&self.subscription.name])
637 .observe(fetch_cursor_timer.elapsed().as_millis() as _);
638 match row {
639 Some(row) => {
640 cur += 1;
641 ans.push(row);
642 }
643 None => {
644 let timeout_seconds = timeout_seconds.unwrap_or(0);
645 if cur > 0 || timeout_seconds == 0 {
646 break;
647 }
648 match tokio::time::timeout(
651 Duration::from_secs(timeout_seconds),
652 session
653 .env
654 .hummock_snapshot_manager()
655 .wait_table_change_log_notification(self.dependent_table_id),
656 )
657 .await
658 {
659 Ok(result) => result?,
660 Err(_) => {
661 tracing::debug!("Cursor wait next epoch timeout");
662 break;
663 }
664 }
665 }
666 }
667 if let Some(timeout_instant) = timeout_instant
669 && Instant::now() > timeout_instant
670 {
671 break;
672 }
673 }
674 self.last_fetch = Instant::now();
675 let (rows, seek_pk_row) = self.fields_manager.process_output_desc_row(ans);
676 if let Some(seek_pk_row) = seek_pk_row {
677 self.seek_pk_row = Some(seek_pk_row);
678 }
679 let desc = self
680 .fields_manager
681 .get_output_fields()
682 .iter()
683 .map(to_pg_field)
684 .collect();
685
686 Ok((rows, desc))
687 }
688
689 fn get_next_rw_timestamp(
690 seek_timestamp: u64,
691 table_id: TableId,
692 expected_timestamp: Option<u64>,
693 handler_args: HandlerArgs,
694 dependent_subscription: &SubscriptionCatalog,
695 ) -> Result<(Option<u64>, Option<u64>)> {
696 let session = handler_args.session;
697 session.get_subscription_by_schema_id_name(
699 dependent_subscription.schema_id,
700 &dependent_subscription.name,
701 )?;
702
703 let new_epochs = session.list_change_log_epochs(table_id, seek_timestamp, 2)?;
705 if let Some(expected_timestamp) = expected_timestamp
706 && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
707 {
708 return Err(ErrorCode::CatalogError(
709 format!(
710 " No data found for rw_timestamp {:?}, data may have been recycled, please recreate cursor",
711 convert_logstore_u64_to_unix_millis(expected_timestamp)
712 )
713 .into(),
714 )
715 .into());
716 }
717 Ok((new_epochs.get(0).cloned(), new_epochs.get(1).cloned()))
718 }
719
720 pub fn gen_batch_plan_result(
721 &self,
722 handler_args: HandlerArgs,
723 ) -> Result<RwBatchQueryPlanResult> {
724 match self.state {
725 State::InitLogStoreQuery { .. } => Self::init_batch_plan_for_subscription_cursor(
727 Some(0),
728 self.dependent_table_id,
729 handler_args,
730 self.seek_pk_row.clone(),
731 ),
732 State::Fetch {
733 from_snapshot,
734 rw_timestamp,
735 ..
736 } => {
737 if from_snapshot {
738 Self::init_batch_plan_for_subscription_cursor(
739 None,
740 self.dependent_table_id,
741 handler_args,
742 self.seek_pk_row.clone(),
743 )
744 } else {
745 Self::init_batch_plan_for_subscription_cursor(
746 Some(rw_timestamp),
747 self.dependent_table_id,
748 handler_args,
749 self.seek_pk_row.clone(),
750 )
751 }
752 }
753 State::Invalid => Err(ErrorCode::InternalError(
754 "Cursor is in invalid state. Please close and re-create the cursor.".to_owned(),
755 )
756 .into()),
757 }
758 }
759
760 fn init_batch_plan_for_subscription_cursor(
761 rw_timestamp: Option<u64>,
762 dependent_table_id: TableId,
763 handler_args: HandlerArgs,
764 seek_pk_row: Option<Row>,
765 ) -> Result<RwBatchQueryPlanResult> {
766 let session = handler_args.clone().session;
767 let table_catalog = session.get_table_by_id(dependent_table_id)?;
768 let context = OptimizerContext::from_handler_args(handler_args);
769 let version_id = {
770 let version = session.env.hummock_snapshot_manager.acquire();
771 let version = version.version();
772 if !version
773 .state_table_info
774 .info()
775 .contains_key(&dependent_table_id)
776 {
777 return Err(anyhow!("table id {dependent_table_id} has been dropped").into());
778 }
779 version.id
780 };
781 Self::create_batch_plan_for_cursor(
782 table_catalog,
783 &session,
784 context.into(),
785 rw_timestamp.map(|rw_timestamp| (rw_timestamp, rw_timestamp)),
786 version_id,
787 seek_pk_row,
788 )
789 }
790
791 async fn initiate_query(
792 rw_timestamp: Option<u64>,
793 dependent_table_id: TableId,
794 handler_args: HandlerArgs,
795 seek_pk_row: Option<Row>,
796 ) -> Result<(CursorDataChunkStream, Instant, Arc<TableCatalog>)> {
797 let init_query_timer = Instant::now();
798 let session = handler_args.clone().session;
799 let table_catalog = session.get_table_by_id(dependent_table_id)?;
800 let plan_result = Self::init_batch_plan_for_subscription_cursor(
801 rw_timestamp,
802 dependent_table_id,
803 handler_args.clone(),
804 seek_pk_row,
805 )?;
806 let plan_fragmenter_result = gen_batch_plan_fragmenter(&handler_args.session, plan_result)?;
807 let (chunk_stream, _) =
808 create_chunk_stream_for_cursor(handler_args.session, plan_fragmenter_result).await?;
809 Ok((chunk_stream, init_query_timer, table_catalog))
810 }
811
812 async fn try_refill_remaining_rows(
813 chunk_stream: &mut CursorDataChunkStream,
814 remaining_rows: &mut VecDeque<Row>,
815 ) -> Result<()> {
816 if remaining_rows.is_empty()
817 && let Some(row_set) = chunk_stream.next().await?
818 {
819 remaining_rows.extend(row_set?);
820 }
821 Ok(())
822 }
823
824 pub fn build_row(
825 mut row: Vec<Option<Bytes>>,
826 rw_timestamp: Option<u64>,
827 formats: &Vec<Format>,
828 session_data: &StaticSessionData,
829 ) -> Result<Row> {
830 let row_len = row.len();
831 let new_row = if let Some(rw_timestamp) = rw_timestamp {
832 let rw_timestamp_formats = formats.get(row_len).unwrap_or(&Format::Text);
833 let rw_timestamp = convert_logstore_u64_to_unix_millis(rw_timestamp);
834 let rw_timestamp = pg_value_format(
835 &DataType::Int64,
836 risingwave_common::types::ScalarRefImpl::Int64(rw_timestamp as i64),
837 *rw_timestamp_formats,
838 session_data,
839 )?;
840 vec![Some(rw_timestamp)]
841 } else {
842 let op_formats = formats.get(row_len).unwrap_or(&Format::Text);
843 let op = pg_value_format(
844 &DataType::Varchar,
845 risingwave_common::types::ScalarRefImpl::Utf8("Insert"),
846 *op_formats,
847 session_data,
848 )?;
849 vec![Some(op), None]
850 };
851 row.extend(new_row);
852 Ok(Row::new(row))
853 }
854
855 pub fn build_desc(mut descs: Vec<Field>, from_snapshot: bool) -> Vec<Field> {
856 if from_snapshot {
857 descs.push(Field::with_name(DataType::Varchar, "op"));
858 }
859 descs.push(Field::with_name(DataType::Int64, "rw_timestamp"));
860 descs
861 }
862
863 pub fn create_batch_plan_for_cursor(
864 table_catalog: Arc<TableCatalog>,
865 session: &SessionImpl,
866 context: OptimizerContextRef,
867 epoch_range: Option<(u64, u64)>,
868 version_id: HummockVersionId,
869 seek_pk_rows: Option<Row>,
870 ) -> Result<RwBatchQueryPlanResult> {
871 let output_col_idx = table_catalog
873 .columns
874 .iter()
875 .enumerate()
876 .filter_map(|(index, v)| {
877 if !v.is_hidden || table_catalog.pk.iter().any(|pk| pk.column_index == index) {
878 Some(index)
879 } else {
880 None
881 }
882 })
883 .collect::<Vec<_>>();
884 let max_split_range_gap = context.session_ctx().config().max_split_range_gap() as u64;
885 let pks = table_catalog.pk();
886 let pks = pks
887 .iter()
888 .map(|f| {
889 let pk = table_catalog.columns.get(f.column_index).unwrap();
890 (pk.data_type(), f.column_index)
891 })
892 .collect_vec();
893 let (scan, predicate) = if let Some(seek_pk_rows) = seek_pk_rows {
894 let mut pk_rows = vec![];
895 let mut values = vec![];
896 for (seek_pk, (data_type, column_index)) in
897 seek_pk_rows.take().into_iter().zip_eq_fast(pks.into_iter())
898 {
899 if let Some(seek_pk) = seek_pk {
900 pk_rows.push(InputRef {
901 index: column_index,
902 data_type: data_type.clone(),
903 });
904 let value_string = String::from_utf8(seek_pk.clone().into()).unwrap();
905 let value_data = ScalarImpl::from_text(&value_string, data_type).unwrap();
906 values.push((Some(value_data), data_type.clone()));
907 }
908 }
909 if pk_rows.is_empty() {
910 (None, None)
911 } else {
912 let (right_data, right_types): (Vec<_>, Vec<_>) = values.into_iter().unzip();
913 let right_data = ScalarImpl::Struct(StructValue::new(right_data));
914 let right_type = DataType::Struct(StructType::row_expr_type(right_types));
915 let left = FunctionCall::new_unchecked(
916 ExprType::Row,
917 pk_rows.into_iter().map(|pk| pk.into()).collect(),
918 right_type.clone(),
919 );
920 let right = Literal::new(Some(right_data), right_type);
921 let (scan, predicate) = Condition {
922 conjunctions: vec![
923 FunctionCall::new(ExprType::GreaterThan, vec![left.into(), right.into()])?
924 .into(),
925 ],
926 }
927 .split_to_scan_ranges(&table_catalog, max_split_range_gap)?;
928 if scan.len() > 1 {
929 return Err(ErrorCode::InternalError(
930 "Seek pk row should only generate one scan range".to_owned(),
931 )
932 .into());
933 }
934 (scan.first().cloned(), Some(predicate))
935 }
936 } else {
937 (None, None)
938 };
939
940 let (seq_scan, out_fields, out_names) = if let Some(epoch_range) = epoch_range {
941 let core = generic::LogScan::new(
942 table_catalog.name.clone(),
943 output_col_idx,
944 table_catalog.clone(),
945 context,
946 epoch_range,
947 version_id,
948 );
949 let batch_log_seq_scan = BatchLogSeqScan::new(core, scan);
950 let out_fields = batch_log_seq_scan.core().out_fields();
951 let out_names = batch_log_seq_scan.core().column_names();
952 (batch_log_seq_scan.into(), out_fields, out_names)
953 } else {
954 let core = generic::TableScan::new(
955 output_col_idx,
956 table_catalog.clone(),
957 vec![],
958 vec![],
959 context,
960 Condition {
961 conjunctions: vec![],
962 },
963 None,
964 );
965 let scans = match scan {
966 Some(scan) => vec![scan],
967 None => vec![],
968 };
969 let table_scan = BatchSeqScan::new(core, scans, None);
970 let out_fields = table_scan.core().out_fields();
971 let out_names = table_scan.core().column_names();
972 (table_scan.into(), out_fields, out_names)
973 };
974
975 let plan = if let Some(predicate) = predicate
976 && !predicate.always_true()
977 {
978 BatchFilter::new(generic::Filter::new(predicate, seq_scan)).into()
979 } else {
980 seq_scan
981 };
982
983 let order = Order::new(table_catalog.pk().to_vec());
985
986 let plan_root = PlanRoot::new_with_batch_plan(
988 plan,
989 RequiredDist::single(),
990 order,
991 out_fields,
992 out_names,
993 );
994 let schema = plan_root.schema();
995 let (batch_log_seq_scan, query_mode) = match session.config().query_mode() {
996 QueryMode::Auto | QueryMode::Local => {
997 (plan_root.gen_batch_local_plan()?, QueryMode::Local)
998 }
999 QueryMode::Distributed => (
1000 plan_root.gen_batch_distributed_plan()?,
1001 QueryMode::Distributed,
1002 ),
1003 };
1004 Ok(RwBatchQueryPlanResult {
1005 plan: batch_log_seq_scan,
1006 query_mode,
1007 schema,
1008 stmt_type: StatementType::SELECT,
1009 dependent_relations: vec![],
1010 })
1011 }
1012
1013 pub fn idle_duration(&self) -> Duration {
1014 self.last_fetch.elapsed()
1015 }
1016
1017 pub fn subscription_name(&self) -> &str {
1018 self.subscription.name.as_str()
1019 }
1020
1021 pub fn state_info_string(&self) -> String {
1022 format!("{}", self.state)
1023 }
1024}
1025
1026pub struct CursorManager {
1027 cursor_map: tokio::sync::Mutex<HashMap<String, Cursor>>,
1028 cursor_metrics: Arc<CursorMetrics>,
1029}
1030
1031impl CursorManager {
1032 pub fn new(cursor_metrics: Arc<CursorMetrics>) -> Self {
1033 Self {
1034 cursor_map: tokio::sync::Mutex::new(HashMap::new()),
1035 cursor_metrics,
1036 }
1037 }
1038
1039 pub async fn add_subscription_cursor(
1040 &self,
1041 cursor_name: String,
1042 start_timestamp: Option<u64>,
1043 dependent_table_id: TableId,
1044 subscription: Arc<SubscriptionCatalog>,
1045 handler_args: &HandlerArgs,
1046 ) -> Result<()> {
1047 let create_cursor_timer = Instant::now();
1048 let subscription_name = subscription.name.clone();
1049 let cursor = SubscriptionCursor::new(
1050 cursor_name,
1051 start_timestamp,
1052 subscription,
1053 dependent_table_id,
1054 handler_args,
1055 self.cursor_metrics.clone(),
1056 )
1057 .await?;
1058 let mut cursor_map = self.cursor_map.lock().await;
1059 self.cursor_metrics
1060 .subscription_cursor_declare_duration
1061 .with_label_values(&[&subscription_name])
1062 .observe(create_cursor_timer.elapsed().as_millis() as _);
1063
1064 cursor_map.retain(|_, v| {
1065 if let Cursor::Subscription(cursor) = v
1066 && matches!(cursor.state, State::Invalid)
1067 {
1068 false
1069 } else {
1070 true
1071 }
1072 });
1073
1074 cursor_map
1075 .try_insert(cursor.cursor_name.clone(), Cursor::Subscription(cursor))
1076 .map_err(|error| {
1077 ErrorCode::CatalogError(
1078 format!("cursor `{}` already exists", error.entry.key()).into(),
1079 )
1080 })?;
1081 Ok(())
1082 }
1083
1084 pub async fn add_query_cursor(
1085 &self,
1086 cursor_name: String,
1087 chunk_stream: CursorDataChunkStream,
1088 fields: Vec<Field>,
1089 ) -> Result<()> {
1090 let cursor = QueryCursor::new(chunk_stream, fields)?;
1091 self.cursor_map
1092 .lock()
1093 .await
1094 .try_insert(cursor_name, Cursor::Query(cursor))
1095 .map_err(|error| {
1096 ErrorCode::CatalogError(
1097 format!("cursor `{}` already exists", error.entry.key()).into(),
1098 )
1099 })?;
1100
1101 Ok(())
1102 }
1103
1104 pub async fn remove_cursor(&self, cursor_name: &str) -> Result<()> {
1105 self.cursor_map
1106 .lock()
1107 .await
1108 .remove(cursor_name)
1109 .ok_or_else(|| {
1110 ErrorCode::CatalogError(format!("cursor `{}` don't exists", cursor_name).into())
1111 })?;
1112 Ok(())
1113 }
1114
1115 pub async fn remove_all_cursor(&self) {
1116 self.cursor_map.lock().await.clear();
1117 }
1118
1119 pub async fn remove_all_query_cursor(&self) {
1120 self.cursor_map
1121 .lock()
1122 .await
1123 .retain(|_, v| matches!(v, Cursor::Subscription(_)));
1124 }
1125
1126 pub async fn get_rows_with_cursor(
1127 &self,
1128 cursor_name: &str,
1129 count: u32,
1130 handler_args: HandlerArgs,
1131 formats: &Vec<Format>,
1132 timeout_seconds: Option<u64>,
1133 ) -> Result<(Vec<Row>, Vec<PgFieldDescriptor>)> {
1134 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1135 cursor
1136 .next(count, handler_args, formats, timeout_seconds)
1137 .await
1138 } else {
1139 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1140 }
1141 }
1142
1143 pub async fn get_fields_with_cursor(&self, cursor_name: &str) -> Result<Vec<Field>> {
1144 if let Some(cursor) = self.cursor_map.lock().await.get_mut(cursor_name) {
1145 Ok(cursor.get_fields())
1146 } else {
1147 Err(ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name)).into())
1148 }
1149 }
1150
1151 pub async fn get_periodic_cursor_metrics(&self) -> PeriodicCursorMetrics {
1152 let mut subscription_cursor_nums = 0;
1153 let mut invalid_subscription_cursor_nums = 0;
1154 let mut subscription_cursor_last_fetch_duration = HashMap::new();
1155 for (_, cursor) in self.cursor_map.lock().await.iter() {
1156 if let Cursor::Subscription(subscription_cursor) = cursor {
1157 subscription_cursor_nums += 1;
1158 if matches!(subscription_cursor.state, State::Invalid) {
1159 invalid_subscription_cursor_nums += 1;
1160 } else {
1161 let fetch_duration =
1162 subscription_cursor.last_fetch.elapsed().as_millis() as f64;
1163 subscription_cursor_last_fetch_duration.insert(
1164 subscription_cursor.subscription.name.clone(),
1165 fetch_duration,
1166 );
1167 }
1168 }
1169 }
1170 PeriodicCursorMetrics {
1171 subscription_cursor_nums,
1172 invalid_subscription_cursor_nums,
1173 subscription_cursor_last_fetch_duration,
1174 }
1175 }
1176
1177 pub async fn iter_query_cursors(&self, mut f: impl FnMut(&String, &QueryCursor)) {
1178 self.cursor_map
1179 .lock()
1180 .await
1181 .iter()
1182 .for_each(|(cursor_name, cursor)| {
1183 if let Cursor::Query(cursor) = cursor {
1184 f(cursor_name, cursor)
1185 }
1186 });
1187 }
1188
1189 pub async fn iter_subscription_cursors(&self, mut f: impl FnMut(&String, &SubscriptionCursor)) {
1190 self.cursor_map
1191 .lock()
1192 .await
1193 .iter()
1194 .for_each(|(cursor_name, cursor)| {
1195 if let Cursor::Subscription(cursor) = cursor {
1196 f(cursor_name, cursor)
1197 }
1198 });
1199 }
1200
1201 pub async fn gen_batch_plan_with_subscription_cursor(
1202 &self,
1203 cursor_name: &str,
1204 handler_args: HandlerArgs,
1205 ) -> Result<RwBatchQueryPlanResult> {
1206 match self.cursor_map.lock().await.get(cursor_name).ok_or_else(|| {
1207 ErrorCode::InternalError(format!("Cannot find cursor `{}`", cursor_name))
1208 })? {
1209 Cursor::Subscription(cursor) => {
1210 cursor.gen_batch_plan_result(handler_args.clone())
1211 },
1212 Cursor::Query(_) => Err(ErrorCode::InternalError("The plan of the cursor is the same as the query statement of the as when it was created.".to_owned()).into()),
1213 }
1214 }
1215}