risingwave_batch_executors/executor/
row_seq_scan.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::ops::Deref;
15use std::sync::Arc;
16
17use futures::{StreamExt, pin_mut};
18use futures_async_stream::try_stream;
19use prometheus::Histogram;
20use risingwave_common::array::DataChunk;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{ColumnId, Schema};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::row::{OwnedRow, Row};
25use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
26use risingwave_pb::batch_plan::plan_node::NodeBody;
27use risingwave_pb::common::BatchQueryEpoch;
28use risingwave_pb::plan_common::as_of::AsOfType;
29use risingwave_pb::plan_common::{PbAsOf, StorageTableDesc, as_of};
30use risingwave_storage::store::PrefetchOptions;
31use risingwave_storage::table::batch_table::BatchTable;
32use risingwave_storage::{StateStore, dispatch_state_store};
33
34use super::ScanRange;
35use crate::error::{BatchError, Result};
36use crate::executor::{
37    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
38    build_scan_ranges_from_pb,
39};
40use crate::monitor::BatchMetrics;
41
42/// Executor that scans data from row table
43pub struct RowSeqScanExecutor<S: StateStore> {
44    chunk_size: usize,
45    identity: String,
46
47    /// Batch metrics.
48    /// None: Local mode don't record metrics.
49    metrics: Option<BatchMetrics>,
50
51    table: BatchTable<S>,
52    scan_ranges: Vec<ScanRange>,
53    ordered: bool,
54    epoch: BatchQueryEpoch,
55    limit: Option<u64>,
56    as_of: Option<AsOf>,
57}
58#[derive(Debug, Clone, PartialEq, Eq, Hash)]
59pub struct AsOf {
60    pub timestamp: i64,
61}
62
63impl TryFrom<&PbAsOf> for AsOf {
64    type Error = BatchError;
65
66    fn try_from(pb: &PbAsOf) -> std::result::Result<Self, Self::Error> {
67        match pb.as_of_type.as_ref().unwrap() {
68            AsOfType::Timestamp(ts) => Ok(Self {
69                timestamp: ts.timestamp,
70            }),
71            AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel(
72                anyhow::anyhow!("batch query does not support as of process time or version"),
73            )),
74        }
75    }
76}
77
78impl From<&AsOf> for PbAsOf {
79    fn from(v: &AsOf) -> Self {
80        PbAsOf {
81            as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp {
82                timestamp: v.timestamp,
83            })),
84        }
85    }
86}
87
88impl<S: StateStore> RowSeqScanExecutor<S> {
89    pub fn new(
90        table: BatchTable<S>,
91        scan_ranges: Vec<ScanRange>,
92        ordered: bool,
93        epoch: BatchQueryEpoch,
94        chunk_size: usize,
95        identity: String,
96        limit: Option<u64>,
97        metrics: Option<BatchMetrics>,
98        as_of: Option<AsOf>,
99    ) -> Self {
100        Self {
101            chunk_size,
102            identity,
103            metrics,
104            table,
105            scan_ranges,
106            ordered,
107            epoch,
108            limit,
109            as_of,
110        }
111    }
112}
113
114pub struct RowSeqScanExecutorBuilder {}
115
116impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
117    async fn new_boxed_executor(
118        source: &ExecutorBuilder<'_>,
119        inputs: Vec<BoxedExecutor>,
120    ) -> Result<BoxedExecutor> {
121        ensure!(
122            inputs.is_empty(),
123            "Row sequential scan should not have input executor!"
124        );
125        let seq_scan_node = try_match_expand!(
126            source.plan_node().get_node_body().unwrap(),
127            NodeBody::RowSeqScan
128        )?;
129
130        let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
131        let column_ids = seq_scan_node
132            .column_ids
133            .iter()
134            .copied()
135            .map(ColumnId::from)
136            .collect();
137        let vnodes = match &seq_scan_node.vnode_bitmap {
138            Some(vnodes) => Some(Bitmap::from(vnodes).into()),
139            // This is possible for dml. vnode_bitmap is not filled by scheduler.
140            // Or it's single distribution, e.g., distinct agg. We scan in a single executor.
141            None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
142        };
143
144        let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?;
145
146        let ordered = seq_scan_node.ordered;
147
148        let epoch = source.epoch();
149        let limit = seq_scan_node.limit;
150        let as_of = seq_scan_node
151            .as_of
152            .as_ref()
153            .map(AsOf::try_from)
154            .transpose()?;
155        let chunk_size = if let Some(limit) = seq_scan_node.limit {
156            (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
157        } else {
158            source.context().get_config().developer.chunk_size as u32
159        };
160        let metrics = source.context().batch_metrics();
161
162        dispatch_state_store!(source.context().state_store(), state_store, {
163            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
164            Ok(Box::new(RowSeqScanExecutor::new(
165                table,
166                scan_ranges,
167                ordered,
168                epoch,
169                chunk_size as usize,
170                source.plan_node().get_identity().clone(),
171                limit,
172                metrics,
173                as_of,
174            )))
175        })
176    }
177}
178
179impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
180    fn schema(&self) -> &Schema {
181        self.table.schema()
182    }
183
184    fn identity(&self) -> &str {
185        &self.identity
186    }
187
188    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
189        self.do_execute().boxed()
190    }
191}
192
193impl<S: StateStore> RowSeqScanExecutor<S> {
194    #[try_stream(ok = DataChunk, error = BatchError)]
195    async fn do_execute(self: Box<Self>) {
196        let Self {
197            chunk_size,
198            identity,
199            metrics,
200            table,
201            scan_ranges,
202            ordered,
203            epoch,
204            limit,
205            as_of,
206        } = *self;
207        let table = Arc::new(table);
208        // as_of takes precedence
209        let query_epoch = as_of
210            .map(|a| {
211                let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
212                tracing::debug!(epoch, identity, "time travel");
213                risingwave_pb::common::BatchQueryEpoch {
214                    epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
215                        epoch,
216                    )),
217                }
218            })
219            .unwrap_or_else(|| epoch);
220
221        // Create collector.
222        let histogram = metrics
223            .as_ref()
224            .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
225
226        if ordered {
227            // Currently we execute range-scans concurrently so the order is not guaranteed if
228            // there're multiple ranges.
229            // TODO: reserve the order for multiple ranges.
230            assert_eq!(scan_ranges.len(), 1);
231        }
232
233        let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
234            .into_iter()
235            .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
236
237        // the number of rows have been returned as execute result
238        let mut returned = 0;
239        if let Some(limit) = &limit
240            && returned >= *limit
241        {
242            return Ok(());
243        }
244        let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
245        // Point Get
246        for point_get in point_gets {
247            let table = table.clone();
248            if let Some(row) =
249                Self::execute_point_get(table, point_get, query_epoch, histogram).await?
250            {
251                if let Some(chunk) = data_chunk_builder.append_one_row(row) {
252                    returned += chunk.cardinality() as u64;
253                    yield chunk;
254                    if let Some(limit) = &limit
255                        && returned >= *limit
256                    {
257                        return Ok(());
258                    }
259                }
260            }
261        }
262        if let Some(chunk) = data_chunk_builder.consume_all() {
263            returned += chunk.cardinality() as u64;
264            yield chunk;
265            if let Some(limit) = &limit
266                && returned >= *limit
267            {
268                return Ok(());
269            }
270        }
271
272        // Range Scan
273        // WARN: DO NOT use `select` to execute range scans concurrently
274        //       it can consume too much memory if there're too many ranges.
275        for range in range_scans {
276            let stream = Self::execute_range(
277                table.clone(),
278                range,
279                ordered,
280                query_epoch,
281                chunk_size,
282                limit,
283                histogram,
284            );
285            #[for_await]
286            for chunk in stream {
287                let chunk = chunk?;
288                returned += chunk.cardinality() as u64;
289                yield chunk;
290                if let Some(limit) = &limit
291                    && returned >= *limit
292                {
293                    return Ok(());
294                }
295            }
296        }
297    }
298
299    async fn execute_point_get(
300        table: Arc<BatchTable<S>>,
301        scan_range: ScanRange,
302        epoch: BatchQueryEpoch,
303        histogram: Option<impl Deref<Target = Histogram>>,
304    ) -> Result<Option<OwnedRow>> {
305        let pk_prefix = scan_range.pk_prefix;
306        assert!(pk_prefix.len() == table.pk_indices().len());
307
308        let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
309
310        // Point Get.
311        let row = table.get_row(&pk_prefix, epoch.into()).await?;
312
313        if let Some(timer) = timer {
314            timer.observe_duration()
315        }
316
317        Ok(row)
318    }
319
320    #[try_stream(ok = DataChunk, error = BatchError)]
321    async fn execute_range(
322        table: Arc<BatchTable<S>>,
323        scan_range: ScanRange,
324        ordered: bool,
325        epoch: BatchQueryEpoch,
326        chunk_size: usize,
327        limit: Option<u64>,
328        histogram: Option<impl Deref<Target = Histogram>>,
329    ) {
330        let pk_prefix = scan_range.pk_prefix.clone();
331        let range_bounds = scan_range.convert_to_range_bounds(&table);
332        // Range Scan.
333        assert!(pk_prefix.len() < table.pk_indices().len());
334        let iter = table
335            .batch_chunk_iter_with_pk_bounds(
336                epoch.into(),
337                &pk_prefix,
338                range_bounds,
339                ordered,
340                chunk_size,
341                PrefetchOptions::new(limit.is_none(), true),
342            )
343            .await?;
344
345        pin_mut!(iter);
346        loop {
347            let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
348
349            let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
350
351            if let Some(timer) = timer {
352                timer.observe_duration()
353            }
354
355            if let Some(chunk) = chunk {
356                yield chunk
357            } else {
358                break;
359            }
360        }
361    }
362}
363
364pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
365    let ts = ts.checked_add(1).unwrap();
366    risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
367        u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
368    )
369}