risingwave_batch_executors/executor/
row_seq_scan.rs1use std::ops::Deref;
15use std::sync::Arc;
16
17use anyhow::anyhow;
18use futures::{StreamExt, pin_mut};
19use futures_async_stream::try_stream;
20use prometheus::Histogram;
21use risingwave_common::array::DataChunk;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Schema};
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::row::{OwnedRow, Row};
26use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28use risingwave_pb::common::BatchQueryEpoch;
29use risingwave_pb::plan_common::StorageTableDesc;
30use risingwave_storage::store::PrefetchOptions;
31use risingwave_storage::table::batch_table::BatchTable;
32use risingwave_storage::{StateStore, dispatch_state_store};
33
34use super::ScanRange;
35use crate::error::{BatchError, Result};
36use crate::executor::{
37 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
38 build_scan_ranges_from_pb,
39};
40use crate::monitor::BatchMetrics;
41
42pub struct RowSeqScanExecutor<S: StateStore> {
44 chunk_size: usize,
45 identity: String,
46
47 metrics: Option<BatchMetrics>,
50
51 table: BatchTable<S>,
52 scan_ranges: Vec<ScanRange>,
53 ordered: bool,
54 query_epoch: BatchQueryEpoch,
55 limit: Option<u64>,
56}
57
58impl<S: StateStore> RowSeqScanExecutor<S> {
59 pub fn new(
60 table: BatchTable<S>,
61 scan_ranges: Vec<ScanRange>,
62 ordered: bool,
63 query_epoch: BatchQueryEpoch,
64 chunk_size: usize,
65 identity: String,
66 limit: Option<u64>,
67 metrics: Option<BatchMetrics>,
68 ) -> Self {
69 Self {
70 chunk_size,
71 identity,
72 metrics,
73 table,
74 scan_ranges,
75 ordered,
76 query_epoch,
77 limit,
78 }
79 }
80}
81
82pub struct RowSeqScanExecutorBuilder {}
83
84impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
85 async fn new_boxed_executor(
86 source: &ExecutorBuilder<'_>,
87 inputs: Vec<BoxedExecutor>,
88 ) -> Result<BoxedExecutor> {
89 ensure!(
90 inputs.is_empty(),
91 "Row sequential scan should not have input executor!"
92 );
93 let seq_scan_node = try_match_expand!(
94 source.plan_node().get_node_body().unwrap(),
95 NodeBody::RowSeqScan
96 )?;
97
98 let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
99 let column_ids = seq_scan_node
100 .column_ids
101 .iter()
102 .copied()
103 .map(ColumnId::from)
104 .collect();
105 let vnodes = match &seq_scan_node.vnode_bitmap {
106 Some(vnodes) => Some(Bitmap::from(vnodes).into()),
107 None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
110 };
111
112 let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?;
113
114 let ordered = seq_scan_node.ordered;
115 let limit = seq_scan_node.limit;
116 let query_epoch = seq_scan_node
117 .query_epoch
118 .ok_or_else(|| anyhow!("query_epoch not set in distributed lookup join"))?;
119
120 let chunk_size = if let Some(limit) = seq_scan_node.limit {
121 (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
122 } else {
123 source.context().get_config().developer.chunk_size as u32
124 };
125 let metrics = source.context().batch_metrics();
126
127 dispatch_state_store!(source.context().state_store(), state_store, {
128 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
129 Ok(Box::new(RowSeqScanExecutor::new(
130 table,
131 scan_ranges,
132 ordered,
133 query_epoch,
134 chunk_size as usize,
135 source.plan_node().get_identity().clone(),
136 limit,
137 metrics,
138 )))
139 })
140 }
141}
142
143impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
144 fn schema(&self) -> &Schema {
145 self.table.schema()
146 }
147
148 fn identity(&self) -> &str {
149 &self.identity
150 }
151
152 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
153 self.do_execute().boxed()
154 }
155}
156
157impl<S: StateStore> RowSeqScanExecutor<S> {
158 #[try_stream(ok = DataChunk, error = BatchError)]
159 async fn do_execute(self: Box<Self>) {
160 let Self {
161 chunk_size,
162 metrics,
163 table,
164 scan_ranges,
165 ordered,
166 query_epoch,
167 limit,
168 ..
169 } = *self;
170 let table = Arc::new(table);
171
172 let histogram = metrics
174 .as_ref()
175 .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
176
177 if ordered {
178 assert_eq!(scan_ranges.len(), 1);
182 }
183
184 let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
185 .into_iter()
186 .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
187
188 let mut returned = 0;
190 if let Some(limit) = &limit
191 && returned >= *limit
192 {
193 return Ok(());
194 }
195 let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
196 for point_get in point_gets {
198 let table = table.clone();
199 if let Some(row) =
200 Self::execute_point_get(table, point_get, query_epoch, histogram).await?
201 && let Some(chunk) = data_chunk_builder.append_one_row(row)
202 {
203 returned += chunk.cardinality() as u64;
204 yield chunk;
205 if let Some(limit) = &limit
206 && returned >= *limit
207 {
208 return Ok(());
209 }
210 }
211 }
212 if let Some(chunk) = data_chunk_builder.consume_all() {
213 returned += chunk.cardinality() as u64;
214 yield chunk;
215 if let Some(limit) = &limit
216 && returned >= *limit
217 {
218 return Ok(());
219 }
220 }
221
222 for range in range_scans {
226 let stream = Self::execute_range(
227 table.clone(),
228 range,
229 ordered,
230 query_epoch,
231 chunk_size,
232 limit,
233 histogram,
234 );
235 #[for_await]
236 for chunk in stream {
237 let chunk = chunk?;
238 returned += chunk.cardinality() as u64;
239 yield chunk;
240 if let Some(limit) = &limit
241 && returned >= *limit
242 {
243 return Ok(());
244 }
245 }
246 }
247 }
248
249 async fn execute_point_get(
250 table: Arc<BatchTable<S>>,
251 scan_range: ScanRange,
252 query_epoch: BatchQueryEpoch,
253 histogram: Option<impl Deref<Target = Histogram>>,
254 ) -> Result<Option<OwnedRow>> {
255 let pk_prefix = scan_range.pk_prefix;
256 assert!(pk_prefix.len() == table.pk_indices().len());
257
258 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
259
260 let row = table.get_row(&pk_prefix, query_epoch.into()).await?;
262
263 if let Some(timer) = timer {
264 timer.observe_duration()
265 }
266
267 Ok(row)
268 }
269
270 #[try_stream(ok = DataChunk, error = BatchError)]
271 async fn execute_range(
272 table: Arc<BatchTable<S>>,
273 scan_range: ScanRange,
274 ordered: bool,
275 query_epoch: BatchQueryEpoch,
276 chunk_size: usize,
277 limit: Option<u64>,
278 histogram: Option<impl Deref<Target = Histogram>>,
279 ) {
280 let pk_prefix = scan_range.pk_prefix.clone();
281 let range_bounds = scan_range.convert_to_range_bounds(&table);
282 assert!(pk_prefix.len() < table.pk_indices().len());
284 let iter = table
285 .batch_chunk_iter_with_pk_bounds(
286 query_epoch.into(),
287 &pk_prefix,
288 range_bounds,
289 ordered,
290 chunk_size,
291 PrefetchOptions::new(limit.is_none(), true),
292 )
293 .await?;
294
295 pin_mut!(iter);
296 loop {
297 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
298
299 let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
300
301 if let Some(timer) = timer {
302 timer.observe_duration()
303 }
304
305 if let Some(chunk) = chunk {
306 yield chunk
307 } else {
308 break;
309 }
310 }
311 }
312}