risingwave_batch_executors/executor/
row_seq_scan.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::{StreamExt, pin_mut};
20use futures_async_stream::try_stream;
21use prometheus::Histogram;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Schema};
25use risingwave_common::hash::VnodeCountCompat;
26use risingwave_common::row::{OwnedRow, Row};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_pb::batch_plan::plan_node::NodeBody;
29use risingwave_pb::common::BatchQueryEpoch;
30use risingwave_pb::plan_common::StorageTableDesc;
31use risingwave_storage::store::PrefetchOptions;
32use risingwave_storage::table::batch_table::BatchTable;
33use risingwave_storage::{StateStore, dispatch_state_store};
34
35use super::ScanRange;
36use crate::error::{BatchError, Result};
37use crate::executor::{
38    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
39    build_scan_ranges_from_pb,
40};
41use crate::monitor::BatchMetrics;
42
43/// Executor that scans data from row table
44pub struct RowSeqScanExecutor<S: StateStore> {
45    chunk_size: usize,
46    identity: String,
47
48    /// Batch metrics.
49    /// None: Local mode don't record metrics.
50    metrics: Option<BatchMetrics>,
51
52    table: BatchTable<S>,
53    scan_ranges: Vec<ScanRange>,
54    ordered: bool,
55    query_epoch: BatchQueryEpoch,
56    limit: Option<u64>,
57}
58
59impl<S: StateStore> RowSeqScanExecutor<S> {
60    pub fn new(
61        table: BatchTable<S>,
62        scan_ranges: Vec<ScanRange>,
63        ordered: bool,
64        query_epoch: BatchQueryEpoch,
65        chunk_size: usize,
66        identity: String,
67        limit: Option<u64>,
68        metrics: Option<BatchMetrics>,
69    ) -> Self {
70        Self {
71            chunk_size,
72            identity,
73            metrics,
74            table,
75            scan_ranges,
76            ordered,
77            query_epoch,
78            limit,
79        }
80    }
81}
82
83pub struct RowSeqScanExecutorBuilder {}
84
85impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
86    async fn new_boxed_executor(
87        source: &ExecutorBuilder<'_>,
88        inputs: Vec<BoxedExecutor>,
89    ) -> Result<BoxedExecutor> {
90        ensure!(
91            inputs.is_empty(),
92            "Row sequential scan should not have input executor!"
93        );
94        let seq_scan_node = try_match_expand!(
95            source.plan_node().get_node_body().unwrap(),
96            NodeBody::RowSeqScan
97        )?;
98
99        let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
100        let column_ids = seq_scan_node
101            .column_ids
102            .iter()
103            .copied()
104            .map(ColumnId::from)
105            .collect();
106        let vnodes = match &seq_scan_node.vnode_bitmap {
107            Some(vnodes) => Some(Bitmap::from(vnodes).into()),
108            // This is possible for dml. vnode_bitmap is not filled by scheduler.
109            // Or it's single distribution, e.g., distinct agg. We scan in a single executor.
110            None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
111        };
112
113        let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?;
114
115        let ordered = seq_scan_node.ordered;
116        let limit = seq_scan_node.limit;
117        let query_epoch = seq_scan_node
118            .query_epoch
119            .ok_or_else(|| anyhow!("query_epoch not set in distributed lookup join"))?;
120
121        let chunk_size = if let Some(limit) = seq_scan_node.limit {
122            (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
123        } else {
124            source.context().get_config().developer.chunk_size as u32
125        };
126        let metrics = source.context().batch_metrics();
127
128        dispatch_state_store!(source.context().state_store(), state_store, {
129            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
130            Ok(Box::new(RowSeqScanExecutor::new(
131                table,
132                scan_ranges,
133                ordered,
134                query_epoch,
135                chunk_size as usize,
136                source.plan_node().get_identity().clone(),
137                limit,
138                metrics,
139            )))
140        })
141    }
142}
143
144impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
145    fn schema(&self) -> &Schema {
146        self.table.schema()
147    }
148
149    fn identity(&self) -> &str {
150        &self.identity
151    }
152
153    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
154        self.do_execute().boxed()
155    }
156}
157
158impl<S: StateStore> RowSeqScanExecutor<S> {
159    #[try_stream(ok = DataChunk, error = BatchError)]
160    async fn do_execute(self: Box<Self>) {
161        let Self {
162            chunk_size,
163            metrics,
164            table,
165            scan_ranges,
166            ordered,
167            query_epoch,
168            limit,
169            ..
170        } = *self;
171        let table = Arc::new(table);
172
173        // Create collector.
174        let histogram = metrics
175            .as_ref()
176            .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
177
178        if ordered {
179            // Currently we execute range-scans concurrently so the order is not guaranteed if
180            // there're multiple ranges.
181            // TODO: reserve the order for multiple ranges.
182            assert_eq!(scan_ranges.len(), 1);
183        }
184
185        let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
186            .into_iter()
187            .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
188
189        // the number of rows have been returned as execute result
190        let mut returned = 0;
191        if let Some(limit) = &limit
192            && returned >= *limit
193        {
194            return Ok(());
195        }
196        let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
197        // Point Get
198        for point_get in point_gets {
199            let table = table.clone();
200            if let Some(row) =
201                Self::execute_point_get(table, point_get, query_epoch, histogram).await?
202                && let Some(chunk) = data_chunk_builder.append_one_row(row)
203            {
204                returned += chunk.cardinality() as u64;
205                yield chunk;
206                if let Some(limit) = &limit
207                    && returned >= *limit
208                {
209                    return Ok(());
210                }
211            }
212        }
213        if let Some(chunk) = data_chunk_builder.consume_all() {
214            returned += chunk.cardinality() as u64;
215            yield chunk;
216            if let Some(limit) = &limit
217                && returned >= *limit
218            {
219                return Ok(());
220            }
221        }
222
223        // Range Scan
224        // WARN: DO NOT use `select` to execute range scans concurrently
225        //       it can consume too much memory if there're too many ranges.
226        for range in range_scans {
227            let stream = Self::execute_range(
228                table.clone(),
229                range,
230                ordered,
231                query_epoch,
232                chunk_size,
233                limit,
234                histogram,
235            );
236            #[for_await]
237            for chunk in stream {
238                let chunk = chunk?;
239                returned += chunk.cardinality() as u64;
240                yield chunk;
241                if let Some(limit) = &limit
242                    && returned >= *limit
243                {
244                    return Ok(());
245                }
246            }
247        }
248    }
249
250    async fn execute_point_get(
251        table: Arc<BatchTable<S>>,
252        scan_range: ScanRange,
253        query_epoch: BatchQueryEpoch,
254        histogram: Option<impl Deref<Target = Histogram>>,
255    ) -> Result<Option<OwnedRow>> {
256        let pk_prefix = scan_range.pk_prefix;
257        assert!(pk_prefix.len() == table.pk_indices().len());
258
259        let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
260
261        // Point Get.
262        let row = table.get_row(&pk_prefix, query_epoch.into()).await?;
263
264        if let Some(timer) = timer {
265            timer.observe_duration()
266        }
267
268        Ok(row)
269    }
270
271    #[try_stream(ok = DataChunk, error = BatchError)]
272    async fn execute_range(
273        table: Arc<BatchTable<S>>,
274        scan_range: ScanRange,
275        ordered: bool,
276        query_epoch: BatchQueryEpoch,
277        chunk_size: usize,
278        limit: Option<u64>,
279        histogram: Option<impl Deref<Target = Histogram>>,
280    ) {
281        let pk_prefix = scan_range.pk_prefix.clone();
282        let range_bounds = scan_range.convert_to_range_bounds(&table);
283        // Range Scan.
284        assert!(pk_prefix.len() < table.pk_indices().len());
285        let iter = table
286            .batch_chunk_iter_with_pk_bounds(
287                query_epoch.into(),
288                &pk_prefix,
289                range_bounds,
290                ordered,
291                chunk_size,
292                PrefetchOptions::new(limit.is_none(), true),
293            )
294            .await?;
295
296        pin_mut!(iter);
297        loop {
298            let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
299
300            let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
301
302            if let Some(timer) = timer {
303                timer.observe_duration()
304            }
305
306            if let Some(chunk) = chunk {
307                yield chunk
308            } else {
309                break;
310            }
311        }
312    }
313}