risingwave_batch_executors/executor/
row_seq_scan.rs1use std::ops::Deref;
15use std::sync::Arc;
16
17use futures::{StreamExt, pin_mut};
18use futures_async_stream::try_stream;
19use prometheus::Histogram;
20use risingwave_common::array::DataChunk;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{ColumnId, Schema};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::row::{OwnedRow, Row};
25use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
26use risingwave_pb::batch_plan::plan_node::NodeBody;
27use risingwave_pb::common::BatchQueryEpoch;
28use risingwave_pb::plan_common::as_of::AsOfType;
29use risingwave_pb::plan_common::{PbAsOf, StorageTableDesc, as_of};
30use risingwave_storage::store::PrefetchOptions;
31use risingwave_storage::table::batch_table::BatchTable;
32use risingwave_storage::{StateStore, dispatch_state_store};
33
34use super::ScanRange;
35use crate::error::{BatchError, Result};
36use crate::executor::{
37 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
38 build_scan_ranges_from_pb,
39};
40use crate::monitor::BatchMetrics;
41
42pub struct RowSeqScanExecutor<S: StateStore> {
44 chunk_size: usize,
45 identity: String,
46
47 metrics: Option<BatchMetrics>,
50
51 table: BatchTable<S>,
52 scan_ranges: Vec<ScanRange>,
53 ordered: bool,
54 epoch: BatchQueryEpoch,
55 limit: Option<u64>,
56 as_of: Option<AsOf>,
57}
58#[derive(Debug, Clone, PartialEq, Eq, Hash)]
59pub struct AsOf {
60 pub timestamp: i64,
61}
62
63impl TryFrom<&PbAsOf> for AsOf {
64 type Error = BatchError;
65
66 fn try_from(pb: &PbAsOf) -> std::result::Result<Self, Self::Error> {
67 match pb.as_of_type.as_ref().unwrap() {
68 AsOfType::Timestamp(ts) => Ok(Self {
69 timestamp: ts.timestamp,
70 }),
71 AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel(
72 anyhow::anyhow!("batch query does not support as of process time or version"),
73 )),
74 }
75 }
76}
77
78impl From<&AsOf> for PbAsOf {
79 fn from(v: &AsOf) -> Self {
80 PbAsOf {
81 as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp {
82 timestamp: v.timestamp,
83 })),
84 }
85 }
86}
87
88impl<S: StateStore> RowSeqScanExecutor<S> {
89 pub fn new(
90 table: BatchTable<S>,
91 scan_ranges: Vec<ScanRange>,
92 ordered: bool,
93 epoch: BatchQueryEpoch,
94 chunk_size: usize,
95 identity: String,
96 limit: Option<u64>,
97 metrics: Option<BatchMetrics>,
98 as_of: Option<AsOf>,
99 ) -> Self {
100 Self {
101 chunk_size,
102 identity,
103 metrics,
104 table,
105 scan_ranges,
106 ordered,
107 epoch,
108 limit,
109 as_of,
110 }
111 }
112}
113
114pub struct RowSeqScanExecutorBuilder {}
115
116impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
117 async fn new_boxed_executor(
118 source: &ExecutorBuilder<'_>,
119 inputs: Vec<BoxedExecutor>,
120 ) -> Result<BoxedExecutor> {
121 ensure!(
122 inputs.is_empty(),
123 "Row sequential scan should not have input executor!"
124 );
125 let seq_scan_node = try_match_expand!(
126 source.plan_node().get_node_body().unwrap(),
127 NodeBody::RowSeqScan
128 )?;
129
130 let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
131 let column_ids = seq_scan_node
132 .column_ids
133 .iter()
134 .copied()
135 .map(ColumnId::from)
136 .collect();
137 let vnodes = match &seq_scan_node.vnode_bitmap {
138 Some(vnodes) => Some(Bitmap::from(vnodes).into()),
139 None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
142 };
143
144 let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?;
145
146 let ordered = seq_scan_node.ordered;
147
148 let epoch = source.epoch();
149 let limit = seq_scan_node.limit;
150 let as_of = seq_scan_node
151 .as_of
152 .as_ref()
153 .map(AsOf::try_from)
154 .transpose()?;
155 let chunk_size = if let Some(limit) = seq_scan_node.limit {
156 (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
157 } else {
158 source.context().get_config().developer.chunk_size as u32
159 };
160 let metrics = source.context().batch_metrics();
161
162 dispatch_state_store!(source.context().state_store(), state_store, {
163 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
164 Ok(Box::new(RowSeqScanExecutor::new(
165 table,
166 scan_ranges,
167 ordered,
168 epoch,
169 chunk_size as usize,
170 source.plan_node().get_identity().clone(),
171 limit,
172 metrics,
173 as_of,
174 )))
175 })
176 }
177}
178
179impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
180 fn schema(&self) -> &Schema {
181 self.table.schema()
182 }
183
184 fn identity(&self) -> &str {
185 &self.identity
186 }
187
188 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
189 self.do_execute().boxed()
190 }
191}
192
193impl<S: StateStore> RowSeqScanExecutor<S> {
194 #[try_stream(ok = DataChunk, error = BatchError)]
195 async fn do_execute(self: Box<Self>) {
196 let Self {
197 chunk_size,
198 identity,
199 metrics,
200 table,
201 scan_ranges,
202 ordered,
203 epoch,
204 limit,
205 as_of,
206 } = *self;
207 let table = Arc::new(table);
208 let query_epoch = as_of
210 .map(|a| {
211 let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
212 tracing::debug!(epoch, identity, "time travel");
213 risingwave_pb::common::BatchQueryEpoch {
214 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
215 epoch,
216 )),
217 }
218 })
219 .unwrap_or_else(|| epoch);
220
221 let histogram = metrics
223 .as_ref()
224 .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
225
226 if ordered {
227 assert_eq!(scan_ranges.len(), 1);
231 }
232
233 let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
234 .into_iter()
235 .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
236
237 let mut returned = 0;
239 if let Some(limit) = &limit
240 && returned >= *limit
241 {
242 return Ok(());
243 }
244 let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
245 for point_get in point_gets {
247 let table = table.clone();
248 if let Some(row) =
249 Self::execute_point_get(table, point_get, query_epoch, histogram).await?
250 && let Some(chunk) = data_chunk_builder.append_one_row(row)
251 {
252 returned += chunk.cardinality() as u64;
253 yield chunk;
254 if let Some(limit) = &limit
255 && returned >= *limit
256 {
257 return Ok(());
258 }
259 }
260 }
261 if let Some(chunk) = data_chunk_builder.consume_all() {
262 returned += chunk.cardinality() as u64;
263 yield chunk;
264 if let Some(limit) = &limit
265 && returned >= *limit
266 {
267 return Ok(());
268 }
269 }
270
271 for range in range_scans {
275 let stream = Self::execute_range(
276 table.clone(),
277 range,
278 ordered,
279 query_epoch,
280 chunk_size,
281 limit,
282 histogram,
283 );
284 #[for_await]
285 for chunk in stream {
286 let chunk = chunk?;
287 returned += chunk.cardinality() as u64;
288 yield chunk;
289 if let Some(limit) = &limit
290 && returned >= *limit
291 {
292 return Ok(());
293 }
294 }
295 }
296 }
297
298 async fn execute_point_get(
299 table: Arc<BatchTable<S>>,
300 scan_range: ScanRange,
301 epoch: BatchQueryEpoch,
302 histogram: Option<impl Deref<Target = Histogram>>,
303 ) -> Result<Option<OwnedRow>> {
304 let pk_prefix = scan_range.pk_prefix;
305 assert!(pk_prefix.len() == table.pk_indices().len());
306
307 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
308
309 let row = table.get_row(&pk_prefix, epoch.into()).await?;
311
312 if let Some(timer) = timer {
313 timer.observe_duration()
314 }
315
316 Ok(row)
317 }
318
319 #[try_stream(ok = DataChunk, error = BatchError)]
320 async fn execute_range(
321 table: Arc<BatchTable<S>>,
322 scan_range: ScanRange,
323 ordered: bool,
324 epoch: BatchQueryEpoch,
325 chunk_size: usize,
326 limit: Option<u64>,
327 histogram: Option<impl Deref<Target = Histogram>>,
328 ) {
329 let pk_prefix = scan_range.pk_prefix.clone();
330 let range_bounds = scan_range.convert_to_range_bounds(&table);
331 assert!(pk_prefix.len() < table.pk_indices().len());
333 let iter = table
334 .batch_chunk_iter_with_pk_bounds(
335 epoch.into(),
336 &pk_prefix,
337 range_bounds,
338 ordered,
339 chunk_size,
340 PrefetchOptions::new(limit.is_none(), true),
341 )
342 .await?;
343
344 pin_mut!(iter);
345 loop {
346 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
347
348 let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
349
350 if let Some(timer) = timer {
351 timer.observe_duration()
352 }
353
354 if let Some(chunk) = chunk {
355 yield chunk
356 } else {
357 break;
358 }
359 }
360 }
361}
362
363pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
364 let ts = ts.checked_add(1).unwrap();
365 risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
366 u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
367 )
368}