1use std::ops::{Bound, Deref};
15use std::sync::Arc;
16
17use futures::{StreamExt, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use prometheus::Histogram;
21use risingwave_common::array::DataChunk;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Schema};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::types::DataType;
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::value_encoding::deserialize_datum;
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::batch_plan::{PbScanRange, scan_range};
31use risingwave_pb::common::BatchQueryEpoch;
32use risingwave_pb::plan_common::as_of::AsOfType;
33use risingwave_pb::plan_common::{PbAsOf, StorageTableDesc, as_of};
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37
38use crate::error::{BatchError, Result};
39use crate::executor::{
40 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
41};
42use crate::monitor::BatchMetrics;
43
44pub struct RowSeqScanExecutor<S: StateStore> {
46 chunk_size: usize,
47 identity: String,
48
49 metrics: Option<BatchMetrics>,
52
53 table: BatchTable<S>,
54 scan_ranges: Vec<ScanRange>,
55 ordered: bool,
56 epoch: BatchQueryEpoch,
57 limit: Option<u64>,
58 as_of: Option<AsOf>,
59}
60
61#[derive(Debug)]
63pub struct ScanRange {
64 pub pk_prefix: OwnedRow,
66
67 pub next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>),
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
72pub struct AsOf {
73 pub timestamp: i64,
74}
75
76impl TryFrom<&PbAsOf> for AsOf {
77 type Error = BatchError;
78
79 fn try_from(pb: &PbAsOf) -> std::result::Result<Self, Self::Error> {
80 match pb.as_of_type.as_ref().unwrap() {
81 AsOfType::Timestamp(ts) => Ok(Self {
82 timestamp: ts.timestamp,
83 }),
84 AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel(
85 anyhow::anyhow!("batch query does not support as of process time or version"),
86 )),
87 }
88 }
89}
90
91impl From<&AsOf> for PbAsOf {
92 fn from(v: &AsOf) -> Self {
93 PbAsOf {
94 as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp {
95 timestamp: v.timestamp,
96 })),
97 }
98 }
99}
100
101impl ScanRange {
102 pub fn new(scan_range: PbScanRange, pk_types: Vec<DataType>) -> Result<Self> {
104 let mut index = 0;
105 let pk_prefix = OwnedRow::new(
106 scan_range
107 .eq_conds
108 .iter()
109 .map(|v| {
110 let ty = pk_types.get(index).unwrap();
111 index += 1;
112 deserialize_datum(v.as_slice(), ty)
113 })
114 .try_collect()?,
115 );
116 if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
117 return Ok(Self {
118 pk_prefix,
119 ..Self::full()
120 });
121 }
122
123 let build_bound = |bound: &scan_range::Bound, mut index| -> Result<Bound<OwnedRow>> {
124 let next_col_bounds = OwnedRow::new(
125 bound
126 .value
127 .iter()
128 .map(|v| {
129 let ty = pk_types.get(index).unwrap();
130 index += 1;
131 deserialize_datum(v.as_slice(), ty)
132 })
133 .try_collect()?,
134 );
135 if bound.inclusive {
136 Ok(Bound::Included(next_col_bounds))
137 } else {
138 Ok(Bound::Excluded(next_col_bounds))
139 }
140 };
141
142 let next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>) = match (
143 scan_range.lower_bound.as_ref(),
144 scan_range.upper_bound.as_ref(),
145 ) {
146 (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?),
147 (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?),
148 (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded),
149 (None, None) => unreachable!(),
150 };
151 Ok(Self {
152 pk_prefix,
153 next_col_bounds,
154 })
155 }
156
157 pub fn full() -> Self {
159 Self {
160 pk_prefix: OwnedRow::default(),
161 next_col_bounds: (Bound::Unbounded, Bound::Unbounded),
162 }
163 }
164}
165
166impl<S: StateStore> RowSeqScanExecutor<S> {
167 pub fn new(
168 table: BatchTable<S>,
169 scan_ranges: Vec<ScanRange>,
170 ordered: bool,
171 epoch: BatchQueryEpoch,
172 chunk_size: usize,
173 identity: String,
174 limit: Option<u64>,
175 metrics: Option<BatchMetrics>,
176 as_of: Option<AsOf>,
177 ) -> Self {
178 Self {
179 chunk_size,
180 identity,
181 metrics,
182 table,
183 scan_ranges,
184 ordered,
185 epoch,
186 limit,
187 as_of,
188 }
189 }
190}
191
192pub struct RowSeqScanExecutorBuilder {}
193
194impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
195 async fn new_boxed_executor(
196 source: &ExecutorBuilder<'_>,
197 inputs: Vec<BoxedExecutor>,
198 ) -> Result<BoxedExecutor> {
199 ensure!(
200 inputs.is_empty(),
201 "Row sequential scan should not have input executor!"
202 );
203 let seq_scan_node = try_match_expand!(
204 source.plan_node().get_node_body().unwrap(),
205 NodeBody::RowSeqScan
206 )?;
207
208 let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
209 let column_ids = seq_scan_node
210 .column_ids
211 .iter()
212 .copied()
213 .map(ColumnId::from)
214 .collect();
215 let vnodes = match &seq_scan_node.vnode_bitmap {
216 Some(vnodes) => Some(Bitmap::from(vnodes).into()),
217 None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
220 };
221
222 let scan_ranges = {
223 let scan_ranges = &seq_scan_node.scan_ranges;
224 if scan_ranges.is_empty() {
225 vec![ScanRange::full()]
226 } else {
227 scan_ranges
228 .iter()
229 .map(|scan_range| {
230 let pk_types = table_desc
231 .pk
232 .iter()
233 .map(|order| {
234 DataType::from(
235 table_desc.columns[order.column_index as usize]
236 .column_type
237 .as_ref()
238 .unwrap(),
239 )
240 })
241 .collect_vec();
242 ScanRange::new(scan_range.clone(), pk_types)
243 })
244 .try_collect()?
245 }
246 };
247
248 let ordered = seq_scan_node.ordered;
249
250 let epoch = source.epoch();
251 let limit = seq_scan_node.limit;
252 let as_of = seq_scan_node
253 .as_of
254 .as_ref()
255 .map(AsOf::try_from)
256 .transpose()?;
257 let chunk_size = if let Some(limit) = seq_scan_node.limit {
258 (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
259 } else {
260 source.context().get_config().developer.chunk_size as u32
261 };
262 let metrics = source.context().batch_metrics();
263
264 dispatch_state_store!(source.context().state_store(), state_store, {
265 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
266 Ok(Box::new(RowSeqScanExecutor::new(
267 table,
268 scan_ranges,
269 ordered,
270 epoch,
271 chunk_size as usize,
272 source.plan_node().get_identity().clone(),
273 limit,
274 metrics,
275 as_of,
276 )))
277 })
278 }
279}
280
281impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
282 fn schema(&self) -> &Schema {
283 self.table.schema()
284 }
285
286 fn identity(&self) -> &str {
287 &self.identity
288 }
289
290 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
291 self.do_execute().boxed()
292 }
293}
294
295impl<S: StateStore> RowSeqScanExecutor<S> {
296 #[try_stream(ok = DataChunk, error = BatchError)]
297 async fn do_execute(self: Box<Self>) {
298 let Self {
299 chunk_size,
300 identity,
301 metrics,
302 table,
303 scan_ranges,
304 ordered,
305 epoch,
306 limit,
307 as_of,
308 } = *self;
309 let table = Arc::new(table);
310 let query_epoch = as_of
312 .map(|a| {
313 let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
314 tracing::debug!(epoch, identity, "time travel");
315 risingwave_pb::common::BatchQueryEpoch {
316 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
317 epoch,
318 )),
319 }
320 })
321 .unwrap_or_else(|| epoch);
322
323 let histogram = metrics
325 .as_ref()
326 .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
327
328 if ordered {
329 assert_eq!(scan_ranges.len(), 1);
333 }
334
335 let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
336 .into_iter()
337 .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
338
339 let mut returned = 0;
341 if let Some(limit) = &limit
342 && returned >= *limit
343 {
344 return Ok(());
345 }
346 let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
347 for point_get in point_gets {
349 let table = table.clone();
350 if let Some(row) =
351 Self::execute_point_get(table, point_get, query_epoch, histogram).await?
352 {
353 if let Some(chunk) = data_chunk_builder.append_one_row(row) {
354 returned += chunk.cardinality() as u64;
355 yield chunk;
356 if let Some(limit) = &limit
357 && returned >= *limit
358 {
359 return Ok(());
360 }
361 }
362 }
363 }
364 if let Some(chunk) = data_chunk_builder.consume_all() {
365 returned += chunk.cardinality() as u64;
366 yield chunk;
367 if let Some(limit) = &limit
368 && returned >= *limit
369 {
370 return Ok(());
371 }
372 }
373
374 for range in range_scans {
378 let stream = Self::execute_range(
379 table.clone(),
380 range,
381 ordered,
382 query_epoch,
383 chunk_size,
384 limit,
385 histogram,
386 );
387 #[for_await]
388 for chunk in stream {
389 let chunk = chunk?;
390 returned += chunk.cardinality() as u64;
391 yield chunk;
392 if let Some(limit) = &limit
393 && returned >= *limit
394 {
395 return Ok(());
396 }
397 }
398 }
399 }
400
401 async fn execute_point_get(
402 table: Arc<BatchTable<S>>,
403 scan_range: ScanRange,
404 epoch: BatchQueryEpoch,
405 histogram: Option<impl Deref<Target = Histogram>>,
406 ) -> Result<Option<OwnedRow>> {
407 let pk_prefix = scan_range.pk_prefix;
408 assert!(pk_prefix.len() == table.pk_indices().len());
409
410 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
411
412 let row = table.get_row(&pk_prefix, epoch.into()).await?;
414
415 if let Some(timer) = timer {
416 timer.observe_duration()
417 }
418
419 Ok(row)
420 }
421
422 #[try_stream(ok = DataChunk, error = BatchError)]
423 async fn execute_range(
424 table: Arc<BatchTable<S>>,
425 scan_range: ScanRange,
426 ordered: bool,
427 epoch: BatchQueryEpoch,
428 chunk_size: usize,
429 limit: Option<u64>,
430 histogram: Option<impl Deref<Target = Histogram>>,
431 ) {
432 let ScanRange {
433 pk_prefix,
434 next_col_bounds,
435 } = scan_range;
436
437 let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
439 let (start_bound, end_bound) = if order_type.is_ascending() {
440 (next_col_bounds.0, next_col_bounds.1)
441 } else {
442 (next_col_bounds.1, next_col_bounds.0)
443 };
444
445 let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
446 let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);
447
448 let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| {
449 match bound {
450 Bound::Unbounded => {
451 if other_bound_is_bounded && order_type_nulls {
452 Bound::Excluded(OwnedRow::new(vec![None]))
454 } else {
455 Bound::Unbounded
457 }
458 }
459 Bound::Included(x) => Bound::Included(x),
460 Bound::Excluded(x) => Bound::Excluded(x),
461 }
462 };
463 let start_bound = build_bound(
464 end_bound_is_bounded,
465 start_bound,
466 order_type.nulls_are_first(),
467 );
468 let end_bound = build_bound(
469 start_bound_is_bounded,
470 end_bound,
471 order_type.nulls_are_last(),
472 );
473
474 assert!(pk_prefix.len() < table.pk_indices().len());
476 let iter = table
477 .batch_chunk_iter_with_pk_bounds(
478 epoch.into(),
479 &pk_prefix,
480 (start_bound, end_bound),
481 ordered,
482 chunk_size,
483 PrefetchOptions::new(limit.is_none(), true),
484 )
485 .await?;
486
487 pin_mut!(iter);
488 loop {
489 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
490
491 let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
492
493 if let Some(timer) = timer {
494 timer.observe_duration()
495 }
496
497 if let Some(chunk) = chunk {
498 yield chunk
499 } else {
500 break;
501 }
502 }
503 }
504}
505
506pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
507 let ts = ts.checked_add(1).unwrap();
508 risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
509 u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
510 )
511}