risingwave_batch_executors/executor/
row_seq_scan.rs1use std::ops::Deref;
15use std::sync::Arc;
16
17use futures::{StreamExt, pin_mut};
18use futures_async_stream::try_stream;
19use prometheus::Histogram;
20use risingwave_common::array::DataChunk;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::catalog::{ColumnId, Schema};
23use risingwave_common::hash::VnodeCountCompat;
24use risingwave_common::row::{OwnedRow, Row};
25use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
26use risingwave_pb::batch_plan::plan_node::NodeBody;
27use risingwave_pb::common::BatchQueryEpoch;
28use risingwave_pb::plan_common::as_of::AsOfType;
29use risingwave_pb::plan_common::{PbAsOf, StorageTableDesc, as_of};
30use risingwave_storage::store::PrefetchOptions;
31use risingwave_storage::table::batch_table::BatchTable;
32use risingwave_storage::{StateStore, dispatch_state_store};
33
34use super::ScanRange;
35use crate::error::{BatchError, Result};
36use crate::executor::{
37 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
38 build_scan_ranges_from_pb,
39};
40use crate::monitor::BatchMetrics;
41
42pub struct RowSeqScanExecutor<S: StateStore> {
44 chunk_size: usize,
45 identity: String,
46
47 metrics: Option<BatchMetrics>,
50
51 table: BatchTable<S>,
52 scan_ranges: Vec<ScanRange>,
53 ordered: bool,
54 epoch: BatchQueryEpoch,
55 limit: Option<u64>,
56 as_of: Option<AsOf>,
57}
58#[derive(Debug, Clone, PartialEq, Eq, Hash)]
59pub struct AsOf {
60 pub timestamp: i64,
61}
62
63impl TryFrom<&PbAsOf> for AsOf {
64 type Error = BatchError;
65
66 fn try_from(pb: &PbAsOf) -> std::result::Result<Self, Self::Error> {
67 match pb.as_of_type.as_ref().unwrap() {
68 AsOfType::Timestamp(ts) => Ok(Self {
69 timestamp: ts.timestamp,
70 }),
71 AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel(
72 anyhow::anyhow!("batch query does not support as of process time or version"),
73 )),
74 }
75 }
76}
77
78impl From<&AsOf> for PbAsOf {
79 fn from(v: &AsOf) -> Self {
80 PbAsOf {
81 as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp {
82 timestamp: v.timestamp,
83 })),
84 }
85 }
86}
87
88impl<S: StateStore> RowSeqScanExecutor<S> {
89 pub fn new(
90 table: BatchTable<S>,
91 scan_ranges: Vec<ScanRange>,
92 ordered: bool,
93 epoch: BatchQueryEpoch,
94 chunk_size: usize,
95 identity: String,
96 limit: Option<u64>,
97 metrics: Option<BatchMetrics>,
98 as_of: Option<AsOf>,
99 ) -> Self {
100 Self {
101 chunk_size,
102 identity,
103 metrics,
104 table,
105 scan_ranges,
106 ordered,
107 epoch,
108 limit,
109 as_of,
110 }
111 }
112}
113
114pub struct RowSeqScanExecutorBuilder {}
115
116impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
117 async fn new_boxed_executor(
118 source: &ExecutorBuilder<'_>,
119 inputs: Vec<BoxedExecutor>,
120 ) -> Result<BoxedExecutor> {
121 ensure!(
122 inputs.is_empty(),
123 "Row sequential scan should not have input executor!"
124 );
125 let seq_scan_node = try_match_expand!(
126 source.plan_node().get_node_body().unwrap(),
127 NodeBody::RowSeqScan
128 )?;
129
130 let table_desc: &StorageTableDesc = seq_scan_node.get_table_desc()?;
131 let column_ids = seq_scan_node
132 .column_ids
133 .iter()
134 .copied()
135 .map(ColumnId::from)
136 .collect();
137 let vnodes = match &seq_scan_node.vnode_bitmap {
138 Some(vnodes) => Some(Bitmap::from(vnodes).into()),
139 None => Some(Bitmap::ones(table_desc.vnode_count()).into()),
142 };
143
144 let scan_ranges = build_scan_ranges_from_pb(&seq_scan_node.scan_ranges, table_desc)?;
145
146 let ordered = seq_scan_node.ordered;
147
148 let epoch = source.epoch();
149 let limit = seq_scan_node.limit;
150 let as_of = seq_scan_node
151 .as_of
152 .as_ref()
153 .map(AsOf::try_from)
154 .transpose()?;
155 let chunk_size = if let Some(limit) = seq_scan_node.limit {
156 (limit as u32).min(source.context().get_config().developer.chunk_size as u32)
157 } else {
158 source.context().get_config().developer.chunk_size as u32
159 };
160 let metrics = source.context().batch_metrics();
161
162 dispatch_state_store!(source.context().state_store(), state_store, {
163 let table = BatchTable::new_partial(state_store, column_ids, vnodes, table_desc);
164 Ok(Box::new(RowSeqScanExecutor::new(
165 table,
166 scan_ranges,
167 ordered,
168 epoch,
169 chunk_size as usize,
170 source.plan_node().get_identity().clone(),
171 limit,
172 metrics,
173 as_of,
174 )))
175 })
176 }
177}
178
179impl<S: StateStore> Executor for RowSeqScanExecutor<S> {
180 fn schema(&self) -> &Schema {
181 self.table.schema()
182 }
183
184 fn identity(&self) -> &str {
185 &self.identity
186 }
187
188 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
189 self.do_execute().boxed()
190 }
191}
192
193impl<S: StateStore> RowSeqScanExecutor<S> {
194 #[try_stream(ok = DataChunk, error = BatchError)]
195 async fn do_execute(self: Box<Self>) {
196 let Self {
197 chunk_size,
198 identity,
199 metrics,
200 table,
201 scan_ranges,
202 ordered,
203 epoch,
204 limit,
205 as_of,
206 } = *self;
207 let table = Arc::new(table);
208 let query_epoch = as_of
210 .map(|a| {
211 let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
212 tracing::debug!(epoch, identity, "time travel");
213 risingwave_pb::common::BatchQueryEpoch {
214 epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
215 epoch,
216 )),
217 }
218 })
219 .unwrap_or_else(|| epoch);
220
221 let histogram = metrics
223 .as_ref()
224 .map(|metrics| &metrics.executor_metrics().row_seq_scan_next_duration);
225
226 if ordered {
227 assert_eq!(scan_ranges.len(), 1);
231 }
232
233 let (point_gets, range_scans): (Vec<ScanRange>, Vec<ScanRange>) = scan_ranges
234 .into_iter()
235 .partition(|x| x.pk_prefix.len() == table.pk_indices().len());
236
237 let mut returned = 0;
239 if let Some(limit) = &limit
240 && returned >= *limit
241 {
242 return Ok(());
243 }
244 let mut data_chunk_builder = DataChunkBuilder::new(table.schema().data_types(), chunk_size);
245 for point_get in point_gets {
247 let table = table.clone();
248 if let Some(row) =
249 Self::execute_point_get(table, point_get, query_epoch, histogram).await?
250 {
251 if let Some(chunk) = data_chunk_builder.append_one_row(row) {
252 returned += chunk.cardinality() as u64;
253 yield chunk;
254 if let Some(limit) = &limit
255 && returned >= *limit
256 {
257 return Ok(());
258 }
259 }
260 }
261 }
262 if let Some(chunk) = data_chunk_builder.consume_all() {
263 returned += chunk.cardinality() as u64;
264 yield chunk;
265 if let Some(limit) = &limit
266 && returned >= *limit
267 {
268 return Ok(());
269 }
270 }
271
272 for range in range_scans {
276 let stream = Self::execute_range(
277 table.clone(),
278 range,
279 ordered,
280 query_epoch,
281 chunk_size,
282 limit,
283 histogram,
284 );
285 #[for_await]
286 for chunk in stream {
287 let chunk = chunk?;
288 returned += chunk.cardinality() as u64;
289 yield chunk;
290 if let Some(limit) = &limit
291 && returned >= *limit
292 {
293 return Ok(());
294 }
295 }
296 }
297 }
298
299 async fn execute_point_get(
300 table: Arc<BatchTable<S>>,
301 scan_range: ScanRange,
302 epoch: BatchQueryEpoch,
303 histogram: Option<impl Deref<Target = Histogram>>,
304 ) -> Result<Option<OwnedRow>> {
305 let pk_prefix = scan_range.pk_prefix;
306 assert!(pk_prefix.len() == table.pk_indices().len());
307
308 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
309
310 let row = table.get_row(&pk_prefix, epoch.into()).await?;
312
313 if let Some(timer) = timer {
314 timer.observe_duration()
315 }
316
317 Ok(row)
318 }
319
320 #[try_stream(ok = DataChunk, error = BatchError)]
321 async fn execute_range(
322 table: Arc<BatchTable<S>>,
323 scan_range: ScanRange,
324 ordered: bool,
325 epoch: BatchQueryEpoch,
326 chunk_size: usize,
327 limit: Option<u64>,
328 histogram: Option<impl Deref<Target = Histogram>>,
329 ) {
330 let pk_prefix = scan_range.pk_prefix.clone();
331 let range_bounds = scan_range.convert_to_range_bounds(&table);
332 assert!(pk_prefix.len() < table.pk_indices().len());
334 let iter = table
335 .batch_chunk_iter_with_pk_bounds(
336 epoch.into(),
337 &pk_prefix,
338 range_bounds,
339 ordered,
340 chunk_size,
341 PrefetchOptions::new(limit.is_none(), true),
342 )
343 .await?;
344
345 pin_mut!(iter);
346 loop {
347 let timer = histogram.as_ref().map(|histogram| histogram.start_timer());
348
349 let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
350
351 if let Some(timer) = timer {
352 timer.observe_duration()
353 }
354
355 if let Some(chunk) = chunk {
356 yield chunk
357 } else {
358 break;
359 }
360 }
361 }
362}
363
364pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
365 let ts = ts.checked_add(1).unwrap();
366 risingwave_common::util::epoch::Epoch::from_unix_millis_or_earliest(
367 u64::try_from(ts).unwrap_or(0).checked_mul(1000).unwrap(),
368 )
369}