risingwave_batch_executors/executor/
row_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::ops::{Bound, Deref};
15use std::sync::Arc;
16
17use futures::{StreamExt, pin_mut};
18use futures_async_stream::try_stream;
19use itertools::Itertools;
20use prometheus::Histogram;
21use risingwave_common::array::DataChunk;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Schema};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::types::DataType;
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_common::util::value_encoding::deserialize_datum;
29use risingwave_pb::batch_plan::plan_node::NodeBody;
30use risingwave_pb::batch_plan::{PbScanRange, scan_range};
31use risingwave_pb::common::BatchQueryEpoch;
32use risingwave_pb::plan_common::as_of::AsOfType;
33use risingwave_pb::plan_common::{PbAsOf, StorageTableDesc, as_of};
34use risingwave_storage::store::PrefetchOptions;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37
38use crate::error::{BatchError, Result};
39use crate::executor::{
40    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
41};
42use crate::monitor::BatchMetrics;
43
44/// Executor that scans data from row table
45pub struct RowSeqScanExecutor<S: StateStore> {
46    chunk_size: usize,
47    identity: String,
48
49    /// Batch metrics.
50    /// None: Local mode don't record mertics.
51    metrics: Option<BatchMetrics>,
52
53    table: BatchTable<S>,
54    scan_ranges: Vec<ScanRange>,
55    ordered: bool,
56    epoch: BatchQueryEpoch,
57    limit: Option<u64>,
58    as_of: Option<AsOf>,
59}
60
61/// Range for batch scan.
62#[derive(Debug)]
63pub struct ScanRange {
64    /// The prefix of the primary key.
65    pub pk_prefix: OwnedRow,
66
67    /// The range bounds of the next column.
68    pub next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>),
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
72pub struct AsOf {
73    pub timestamp: i64,
74}
75
76impl TryFrom<&PbAsOf> for AsOf {
77    type Error = BatchError;
78
79    fn try_from(pb: &PbAsOf) -> std::result::Result<Self, Self::Error> {
80        match pb.as_of_type.as_ref().unwrap() {
81            AsOfType::Timestamp(ts) => Ok(Self {
82                timestamp: ts.timestamp,
83            }),
84            AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel(
85                anyhow::anyhow!("batch query does not support as of process time or version"),
86            )),
87        }
88    }
89}
90
91impl From<&AsOf> for PbAsOf {
92    fn from(v: &AsOf) -> Self {
93        PbAsOf {
94            as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp {
95                timestamp: v.timestamp,
96            })),
97        }
98    }
99}
100
101impl ScanRange {
102    /// Create a scan range from the prost representation.
103    pub fn new(scan_range: PbScanRange, pk_types: Vec<DataType>) -> Result<Self> {
104        let mut index = 0;
105        let pk_prefix = OwnedRow::new(
106            scan_range
107                .eq_conds
108                .iter()
109                .map(|v| {
110                    let ty = pk_types.get(index).unwrap();
111                    index += 1;
112                    deserialize_datum(v.as_slice(), ty)
113                })
114                .try_collect()?,
115        );
116        if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
117            return Ok(Self {
118                pk_prefix,
119                ..Self::full()
120            });
121        }
122
123        let build_bound = |bound: &scan_range::Bound, mut index| -> Result<Bound<OwnedRow>> {
124            let next_col_bounds = OwnedRow::new(
125                bound
126                    .value
127                    .iter()
128                    .map(|v| {
129                        let ty = pk_types.get(index).unwrap();
130                        index += 1;
131                        deserialize_datum(v.as_slice(), ty)
132                    })
133                    .try_collect()?,
134            );
135            if bound.inclusive {
136                Ok(Bound::Included(next_col_bounds))
137            } else {
138                Ok(Bound::Excluded(next_col_bounds))
139            }
140        };
141
142        let next_col_bounds: (Bound<OwnedRow>, Bound<OwnedRow>) = match (
143            scan_range.lower_bound.as_ref(),
144            scan_range.upper_bound.as_ref(),
145        ) {
146            (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?),
147            (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?),
148            (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded),
149            (None, None) => unreachable!(),
150        };
151        Ok(Self {
152            pk_prefix,
153            next_col_bounds,
154        })
155    }
156
157    /// Create a scan range for full table scan.
158    pub fn full() -> Self {
159        Self {
160            pk_prefix: OwnedRow::default(),
161            next_col_bounds: (Bound::Unbounded, Bound::Unbounded),
162        }
163    }
164}
165
166impl<S: StateStore> RowSeqScanExecutor<S> {
167    pub fn new(
168        table: BatchTable<S>,
169        scan_ranges: Vec<ScanRange>,
170        ordered: bool,
171        epoch: BatchQueryEpoch,
172        chunk_size: usize,
173        identity: String,
174        limit: Option<u64>,
175        metrics: Option<BatchMetrics>,
176        as_of: Option<AsOf>,
177    ) -> Self {
178        Self {
179            chunk_size,
180            identity,
181            metrics,
182            table,
183            scan_ranges,
184            ordered,
185            epoch,
186            limit,
187            as_of,
188        }
189    }
190}
191
192pub struct RowSeqScanExecutorBuilder {}
193
194impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
195    async fn new_boxed_executor(
196        source: &ExecutorBuilder<'_>,
197        inputs: Vec<BoxedExecutor>,
198    ) -> Result<BoxedExecutor> {
199        ensure!(
200            inputs.is_empty(),
201            "Row sequential scan should not have input executor!"
202        );
203        let seq_scan_node = try_match_expand!(
204            source.plan_node().get_node_body().unwrap(),
205            NodeBody::RowSeqScan
206        )?;
207
208        let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
209        let column_ids = seq_scan_node
210            .column_ids
211            .iter()
212            .copied()
213            .map(ColumnId::from)
214            .collect();
215        let vnodes = match &seq_scan_node.vnode_bitmap {
216            Some(vnodes) => Some(Bitmap::from(vnodes).into()),
217            // This is possible for dml. vnode_bitmap is not filled by scheduler.
218            // Or it's single distribution, e.g., distinct agg. We scan in a single executor.
219            None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
220        };
221
222        let scan_ranges = {
223            let scan_ranges = &seq_scan_node.scan_ranges;
224            if scan_ranges.is_empty() {
225                vec![ScanRange::full()]
226            } else {
227                scan_ranges
228                    .iter()
229                    .map(|scan_range| {
230                        let pk_types = table_desc
231                            .pk
232                            .iter()
233                            .map(|order| {
234                                DataType::from(
235                                    table_desc.columns[order.column_index as usize]
236                                        .column_type
237                                        .as_ref()
238                                        .unwrap(),
239                                )
240                            })
241                            .collect_vec();
242                        ScanRange::new(scan_range.clone(), pk_types)
243                    })
244                    .try_collect()?
245            }
246        };
247
248        let ordered = seq_scan_node.ordered;
249
250        let epoch = source.epoch();
251        let limit = seq_scan_node.limit;
252        let as_of = seq_scan_node
253            .as_of
254            .as_ref()
255            .map(AsOf::try_from)
256            .transpose()?;
257        let chunk_size = if let Some(limit) = seq_scan_node.limit {
258            (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
259        } else {
260            source.context().get_config().developer.chunk_size as u32
261        };
262        let metrics = source.context().batch_metrics();
263
264        dispatch_state_store!(source.context().state_store(), state_store, {
265            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
266            Ok(Box::new(RowSeqScanExecutor::new(
267                table,
268                scan_ranges,
269                ordered,
270                epoch,
271                chunk_size as usize,
272                source.plan_node().get_identity().clone(),
273                limit,
274                metrics,
275                as_of,
276            )))
277        })
278    }
279}
280
281impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
282    fn schema(&self) -> &Schema {
283        self.table.schema()
284    }
285
286    fn identity(&self) -> &str {
287        &self.identity
288    }
289
290    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
291        self.do_execute().boxed()
292    }
293}
294
295impl<S: StateStore> RowSeqScanExecutor<S> {
296    #[try_stream(ok = DataChunk, error = BatchError)]
297    async fn do_execute(self: Box<Self>) {
298        let Self {
299            chunk_size,
300            identity,
301            metrics,
302            table,
303            scan_ranges,
304            ordered,
305            epoch,
306            limit,
307            as_of,
308        } = *self;
309        let table = Arc::new(table);
310        // as_of takes precedence
311        let query_epoch = as_of
312            .map(|a| {
313                let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
314                tracing::debug!(epoch, identity, "time travel");
315                risingwave_pb::common::BatchQueryEpoch {
316                    epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
317                        epoch,
318                    )),
319                }
320            })
321            .unwrap_or_else(|| epoch);
322
323        // Create collector.
324        let histogram = metrics
325            .as_ref()
326            .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
327
328        if ordered {
329            // Currently we execute range-scans concurrently so the order is not guaranteed if
330            // there're multiple ranges.
331            // TODO: reserve the order for multiple ranges.
332            assert_eq!(scan_ranges.len(), 1);
333        }
334
335        let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
336            .into_iter()
337            .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
338
339        // the number of rows have been returned as execute result
340        let mut returned = 0;
341        if let Some(limit) = &limit
342            && returned >= *limit
343        {
344            return Ok(());
345        }
346        let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
347        // Point Get
348        for point_get in point_gets {
349            let table = table.clone();
350            if let Some(row) =
351                Self::execute_point_get(table, point_get, query_epoch, histogram).await?
352            {
353                if let Some(chunk) = data_chunk_builder.append_one_row(row) {
354                    returned += chunk.cardinality() as u64;
355                    yield chunk;
356                    if let Some(limit) = &limit
357                        && returned >= *limit
358                    {
359                        return Ok(());
360                    }
361                }
362            }
363        }
364        if let Some(chunk) = data_chunk_builder.consume_all() {
365            returned += chunk.cardinality() as u64;
366            yield chunk;
367            if let Some(limit) = &limit
368                && returned >= *limit
369            {
370                return Ok(());
371            }
372        }
373
374        // Range Scan
375        // WARN: DO NOT use `select` to execute range scans concurrently
376        //       it can consume too much memory if there're too many ranges.
377        for range in range_scans {
378            let stream = Self::execute_range(
379                table.clone(),
380                range,
381                ordered,
382                query_epoch,
383                chunk_size,
384                limit,
385                histogram,
386            );
387            #[for_await]
388            for chunk in stream {
389                let chunk = chunk?;
390                returned += chunk.cardinality() as u64;
391                yield chunk;
392                if let Some(limit) = &limit
393                    && returned >= *limit
394                {
395                    return Ok(());
396                }
397            }
398        }
399    }
400
401    async fn execute_point_get(
402        table: Arc<BatchTable<S>>,
403        scan_range: ScanRange,
404        epoch: BatchQueryEpoch,
405        histogram: Option<impl Deref<Target = Histogram>>,
406    ) -> Result<Option<OwnedRow>> {
407        let pk_prefix = scan_range.pk_prefix;
408        assert!(pk_prefix.len() == table.pk_indices().len());
409
410        let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
411
412        // Point Get.
413        let row = table.get_row(&pk_prefix, epoch.into()).await?;
414
415        if let Some(timer) = timer {
416            timer.observe_duration()
417        }
418
419        Ok(row)
420    }
421
422    #[try_stream(ok = DataChunk, error = BatchError)]
423    async fn execute_range(
424        table: Arc<BatchTable<S>>,
425        scan_range: ScanRange,
426        ordered: bool,
427        epoch: BatchQueryEpoch,
428        chunk_size: usize,
429        limit: Option<u64>,
430        histogram: Option<impl Deref<Target = Histogram>>,
431    ) {
432        let ScanRange {
433            pk_prefix,
434            next_col_bounds,
435        } = scan_range;
436
437        // The len of a valid pk_prefix should be less than or equal pk's num.
438        let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
439        let (start_bound, end_bound) = if order_type.is_ascending() {
440            (next_col_bounds.0, next_col_bounds.1)
441        } else {
442            (next_col_bounds.1, next_col_bounds.0)
443        };
444
445        let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
446        let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);
447
448        let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| {
449            match bound {
450                Bound::Unbounded => {
451                    if other_bound_is_bounded && order_type_nulls {
452                        // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
453                        Bound::Excluded(OwnedRow::new(vec![None]))
454                    } else {
455                        // Both start and end are unbounded, so we need to select all rows.
456                        Bound::Unbounded
457                    }
458                }
459                Bound::Included(x) => Bound::Included(x),
460                Bound::Excluded(x) => Bound::Excluded(x),
461            }
462        };
463        let start_bound = build_bound(
464            end_bound_is_bounded,
465            start_bound,
466            order_type.nulls_are_first(),
467        );
468        let end_bound = build_bound(
469            start_bound_is_bounded,
470            end_bound,
471            order_type.nulls_are_last(),
472        );
473
474        // Range Scan.
475        assert!(pk_prefix.len() < table.pk_indices().len());
476        let iter = table
477            .batch_chunk_iter_with_pk_bounds(
478                epoch.into(),
479                &pk_prefix,
480                (start_bound, end_bound),
481                ordered,
482                chunk_size,
483                PrefetchOptions::new(limit.is_none(), true),
484            )
485            .await?;
486
487        pin_mut!(iter);
488        loop {
489            let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
490
491            let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
492
493            if let Some(timer) = timer {
494                timer.observe_duration()
495            }
496
497            if let Some(chunk) = chunk {
498                yield chunk
499            } else {
500                break;
501            }
502        }
503    }
504}
505
506pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
507    let ts = ts.checked_add(1).unwrap();
508    risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
509        u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
510    )
511}