risingwave_batch_executors/executor/
row_seq_scan.rs1use std::ops::Deref;
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::{StreamExt, pin_mut};
19use futures_async_stream::try_stream;
20use prometheus::Histogram;
21use risingwave_common::array::DataChunk;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Schema};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28use risingwave_pb::common::BatchQueryEpoch;
29use risingwave_pb::plan_common::StorageTableDesc;
30use risingwave_storage::store::PrefetchOptions;
31use risingwave_storage::table::batch_table::BatchTable;
32use risingwave_storage::{StateStore, dispatch_state_store};
33
34use super::ScanRange;
35use crate::error::{BatchError, Result};
36use crate::executor::{
37    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
38    build_scan_ranges_from_pb,
39};
40use crate::monitor::BatchMetrics;
41
42pub struct RowSeqScanExecutor<S: StateStore> {
44    chunk_size: usize,
45    identity: String,
46
47    metrics: Option<BatchMetrics>,
50
51    table: BatchTable<S>,
52    scan_ranges: Vec<ScanRange>,
53    ordered: bool,
54    query_epoch: BatchQueryEpoch,
55    limit: Option<u64>,
56}
57
58impl<S: StateStore> RowSeqScanExecutor<S> {
59    pub fn new(
60        table: BatchTable<S>,
61        scan_ranges: Vec<ScanRange>,
62        ordered: bool,
63        query_epoch: BatchQueryEpoch,
64        chunk_size: usize,
65        identity: String,
66        limit: Option<u64>,
67        metrics: Option<BatchMetrics>,
68    ) -> Self {
69        Self {
70            chunk_size,
71            identity,
72            metrics,
73            table,
74            scan_ranges,
75            ordered,
76            query_epoch,
77            limit,
78        }
79    }
80}
81
82pub struct RowSeqScanExecutorBuilder {}
83
84impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
85    async fn new_boxed_executor(
86        source: &ExecutorBuilder<'_>,
87        inputs: Vec<BoxedExecutor>,
88    ) -> Result<BoxedExecutor> {
89        ensure!(
90            inputs.is_empty(),
91            "Row sequential scan should not have input executor!"
92        );
93        let seq_scan_node = try_match_expand!(
94            source.plan_node().get_node_body().unwrap(),
95            NodeBody::RowSeqScan
96        )?;
97
98        let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
99        let column_ids = seq_scan_node
100            .column_ids
101            .iter()
102            .copied()
103            .map(ColumnId::from)
104            .collect();
105        let vnodes = match &seq_scan_node.vnode_bitmap {
106            Some(vnodes) => Some(Bitmap::from(vnodes).into()),
107            None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
110        };
111
112        let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?;
113
114        let ordered = seq_scan_node.ordered;
115        let limit = seq_scan_node.limit;
116        let query_epoch = seq_scan_node
117            .query_epoch
118            .ok_or_else(|| anyhow!("query_epoch not set in distributed lookup join"))?;
119
120        let chunk_size = if let Some(limit) = seq_scan_node.limit {
121            (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
122        } else {
123            source.context().get_config().developer.chunk_size as u32
124        };
125        let metrics = source.context().batch_metrics();
126
127        dispatch_state_store!(source.context().state_store(), state_store, {
128            let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
129            Ok(Box::new(RowSeqScanExecutor::new(
130                table,
131                scan_ranges,
132                ordered,
133                query_epoch,
134                chunk_size as usize,
135                source.plan_node().get_identity().clone(),
136                limit,
137                metrics,
138            )))
139        })
140    }
141}
142
143impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
144    fn schema(&self) -> &Schema {
145        self.table.schema()
146    }
147
148    fn identity(&self) -> &str {
149        &self.identity
150    }
151
152    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
153        self.do_execute().boxed()
154    }
155}
156
157impl<S: StateStore> RowSeqScanExecutor<S> {
158    #[try_stream(ok = DataChunk, error = BatchError)]
159    async fn do_execute(self: Box<Self>) {
160        let Self {
161            chunk_size,
162            metrics,
163            table,
164            scan_ranges,
165            ordered,
166            query_epoch,
167            limit,
168            ..
169        } = *self;
170        let table = Arc::new(table);
171
172        let histogram = metrics
174            .as_ref()
175            .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
176
177        if ordered {
178            assert_eq!(scan_ranges.len(), 1);
182        }
183
184        let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
185            .into_iter()
186            .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
187
188        let mut returned = 0;
190        if let Some(limit) = &limit
191            && returned >= *limit
192        {
193            return Ok(());
194        }
195        let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
196        for point_get in point_gets {
198            let table = table.clone();
199            if let Some(row) =
200                Self::execute_point_get(table, point_get, query_epoch, histogram).await?
201                && let Some(chunk) = data_chunk_builder.append_one_row(row)
202            {
203                returned += chunk.cardinality() as u64;
204                yield chunk;
205                if let Some(limit) = &limit
206                    && returned >= *limit
207                {
208                    return Ok(());
209                }
210            }
211        }
212        if let Some(chunk) = data_chunk_builder.consume_all() {
213            returned += chunk.cardinality() as u64;
214            yield chunk;
215            if let Some(limit) = &limit
216                && returned >= *limit
217            {
218                return Ok(());
219            }
220        }
221
222        for range in range_scans {
226            let stream = Self::execute_range(
227                table.clone(),
228                range,
229                ordered,
230                query_epoch,
231                chunk_size,
232                limit,
233                histogram,
234            );
235            #[for_await]
236            for chunk in stream {
237                let chunk = chunk?;
238                returned += chunk.cardinality() as u64;
239                yield chunk;
240                if let Some(limit) = &limit
241                    && returned >= *limit
242                {
243                    return Ok(());
244                }
245            }
246        }
247    }
248
249    async fn execute_point_get(
250        table: Arc<BatchTable<S>>,
251        scan_range: ScanRange,
252        query_epoch: BatchQueryEpoch,
253        histogram: Option<impl Deref<Target = Histogram>>,
254    ) -> Result<Option<OwnedRow>> {
255        let pk_prefix = scan_range.pk_prefix;
256        assert!(pk_prefix.len() == table.pk_indices().len());
257
258        let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
259
260        let row = table.get_row(&pk_prefix, query_epoch.into()).await?;
262
263        if let Some(timer) = timer {
264            timer.observe_duration()
265        }
266
267        Ok(row)
268    }
269
270    #[try_stream(ok = DataChunk, error = BatchError)]
271    async fn execute_range(
272        table: Arc<BatchTable<S>>,
273        scan_range: ScanRange,
274        ordered: bool,
275        query_epoch: BatchQueryEpoch,
276        chunk_size: usize,
277        limit: Option<u64>,
278        histogram: Option<impl Deref<Target = Histogram>>,
279    ) {
280        let pk_prefix = scan_range.pk_prefix.clone();
281        let range_bounds = scan_range.convert_to_range_bounds(&table);
282        assert!(pk_prefix.len() < table.pk_indices().len());
284        let iter = table
285            .batch_chunk_iter_with_pk_bounds(
286                query_epoch.into(),
287                &pk_prefix,
288                range_bounds,
289                ordered,
290                chunk_size,
291                PrefetchOptions::new(limit.is_none(), true),
292            )
293            .await?;
294
295        pin_mut!(iter);
296        loop {
297            let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
298
299            let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
300
301            if let Some(timer) = timer {
302                timer.observe_duration()
303            }
304
305            if let Some(chunk) = chunk {
306                yield chunk
307            } else {
308                break;
309            }
310        }
311    }
312}