risingwave_batch_executors/executor/
row_seq_scan.rs1use std::ops::Deref;
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use futures::{StreamExt, pin_mut};
20use futures_async_stream::try_stream;
21use prometheus::Histogram;
22use risingwave_common::array::DataChunk;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Schema};
25use risingwave_common::hash::VnodeCountCompat;
26use risingwave_common::row::{OwnedRow, Row};
27use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
28use risingwave_pb::batch_plan::plan_node::NodeBody;
29use risingwave_pb::common::BatchQueryEpoch;
30use risingwave_pb::plan_common::StorageTableDesc;
31use risingwave_storage::store::PrefetchOptions;
32use risingwave_storage::table::batch_table::BatchTable;
33use risingwave_storage::{StateStore, dispatch_state_store};
34
35use super::ScanRange;
36use crate::error::{BatchError, Result};
37use crate::executor::{
38 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
39 build_scan_ranges_from_pb,
40};
41use crate::monitor::BatchMetrics;
42
43pub struct RowSeqScanExecutor<S: StateStore> {
45 chunk_size: usize,
46 identity: String,
47
48 metrics: Option<BatchMetrics>,
51
52 table: BatchTable<S>,
53 scan_ranges: Vec<ScanRange>,
54 ordered: bool,
55 query_epoch: BatchQueryEpoch,
56 limit: Option<u64>,
57}
58
59impl<S: StateStore> RowSeqScanExecutor<S> {
60 pub fn new(
61 table: BatchTable<S>,
62 scan_ranges: Vec<ScanRange>,
63 ordered: bool,
64 query_epoch: BatchQueryEpoch,
65 chunk_size: usize,
66 identity: String,
67 limit: Option<u64>,
68 metrics: Option<BatchMetrics>,
69 ) -> Self {
70 Self {
71 chunk_size,
72 identity,
73 metrics,
74 table,
75 scan_ranges,
76 ordered,
77 query_epoch,
78 limit,
79 }
80 }
81}
82
83pub struct RowSeqScanExecutorBuilder {}
84
85impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
86 async fn new_boxed_executor(
87 source: &ExecutorBuilder<'_>,
88 inputs: Vec<BoxedExecutor>,
89 ) -> Result<BoxedExecutor> {
90 ensure!(
91 inputs.is_empty(),
92 "Row sequential scan should not have input executor!"
93 );
94 let seq_scan_node = try_match_expand!(
95 source.plan_node().get_node_body().unwrap(),
96 NodeBody::RowSeqScan
97 )?;
98
99 let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
100 let column_ids = seq_scan_node
101 .column_ids
102 .iter()
103 .copied()
104 .map(ColumnId::from)
105 .collect();
106 let vnodes = match &seq_scan_node.vnode_bitmap {
107 Some(vnodes) => Some(Bitmap::from(vnodes).into()),
108 None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
111 };
112
113 let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?;
114
115 let ordered = seq_scan_node.ordered;
116 let limit = seq_scan_node.limit;
117 let query_epoch = seq_scan_node
118 .query_epoch
119 .ok_or_else(|| anyhow!("query_epoch not set in distributed lookup join"))?;
120
121 let chunk_size = if let Some(limit) = seq_scan_node.limit {
122 (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
123 } else {
124 source.context().get_config().developer.chunk_size as u32
125 };
126 let metrics = source.context().batch_metrics();
127
128 dispatch_state_store!(source.context().state_store(), state_store, {
129 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
130 Ok(Box::new(RowSeqScanExecutor::new(
131 table,
132 scan_ranges,
133 ordered,
134 query_epoch,
135 chunk_size as usize,
136 source.plan_node().get_identity().clone(),
137 limit,
138 metrics,
139 )))
140 })
141 }
142}
143
144impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
145 fn schema(&self) -> &Schema {
146 self.table.schema()
147 }
148
149 fn identity(&self) -> &str {
150 &self.identity
151 }
152
153 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
154 self.do_execute().boxed()
155 }
156}
157
158impl<S: StateStore> RowSeqScanExecutor<S> {
159 #[try_stream(ok = DataChunk, error = BatchError)]
160 async fn do_execute(self: Box<Self>) {
161 let Self {
162 chunk_size,
163 metrics,
164 table,
165 scan_ranges,
166 ordered,
167 query_epoch,
168 limit,
169 ..
170 } = *self;
171 let table = Arc::new(table);
172
173 let histogram = metrics
175 .as_ref()
176 .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
177
178 if ordered {
179 assert_eq!(scan_ranges.len(), 1);
183 }
184
185 let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
186 .into_iter()
187 .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
188
189 let mut returned = 0;
191 if let Some(limit) = &limit
192 && returned >= *limit
193 {
194 return Ok(());
195 }
196 let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
197 for point_get in point_gets {
199 let table = table.clone();
200 if let Some(row) =
201 Self::execute_point_get(table, point_get, query_epoch, histogram).await?
202 && let Some(chunk) = data_chunk_builder.append_one_row(row)
203 {
204 returned += chunk.cardinality() as u64;
205 yield chunk;
206 if let Some(limit) = &limit
207 && returned >= *limit
208 {
209 return Ok(());
210 }
211 }
212 }
213 if let Some(chunk) = data_chunk_builder.consume_all() {
214 returned += chunk.cardinality() as u64;
215 yield chunk;
216 if let Some(limit) = &limit
217 && returned >= *limit
218 {
219 return Ok(());
220 }
221 }
222
223 for range in range_scans {
227 let stream = Self::execute_range(
228 table.clone(),
229 range,
230 ordered,
231 query_epoch,
232 chunk_size,
233 limit,
234 histogram,
235 );
236 #[for_await]
237 for chunk in stream {
238 let chunk = chunk?;
239 returned += chunk.cardinality() as u64;
240 yield chunk;
241 if let Some(limit) = &limit
242 && returned >= *limit
243 {
244 return Ok(());
245 }
246 }
247 }
248 }
249
250 async fn execute_point_get(
251 table: Arc<BatchTable<S>>,
252 scan_range: ScanRange,
253 query_epoch: BatchQueryEpoch,
254 histogram: Option<impl Deref<Target = Histogram>>,
255 ) -> Result<Option<OwnedRow>> {
256 let pk_prefix = scan_range.pk_prefix;
257 assert!(pk_prefix.len() == table.pk_indices().len());
258
259 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
260
261 let row = table.get_row(&pk_prefix, query_epoch.into()).await?;
263
264 if let Some(timer) = timer {
265 timer.observe_duration()
266 }
267
268 Ok(row)
269 }
270
271 #[try_stream(ok = DataChunk, error = BatchError)]
272 async fn execute_range(
273 table: Arc<BatchTable<S>>,
274 scan_range: ScanRange,
275 ordered: bool,
276 query_epoch: BatchQueryEpoch,
277 chunk_size: usize,
278 limit: Option<u64>,
279 histogram: Option<impl Deref<Target = Histogram>>,
280 ) {
281 let pk_prefix = scan_range.pk_prefix.clone();
282 let range_bounds = scan_range.convert_to_range_bounds(&table);
283 assert!(pk_prefix.len() < table.pk_indices().len());
285 let iter = table
286 .batch_chunk_iter_with_pk_bounds(
287 query_epoch.into(),
288 &pk_prefix,
289 range_bounds,
290 ordered,
291 chunk_size,
292 PrefetchOptions::new(limit.is_none(), true),
293 )
294 .await?;
295
296 pin_mut!(iter);
297 loop {
298 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
299
300 let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
301
302 if let Some(timer) = timer {
303 timer.observe_duration()
304 }
305
306 if let Some(chunk) = chunk {
307 yield chunk
308 } else {
309 break;
310 }
311 }
312 }
313}