risingwave_stream/executor/
batch_query.rs1use futures::TryStreamExt;
16use risingwave_common::array::Op;
17use risingwave_hummock_sdk::HummockReadEpoch;
18use risingwave_storage::store::PrefetchOptions;
19use risingwave_storage::table::batch_table::BatchTable;
20use risingwave_storage::table::collect_data_chunk;
21
22use crate::executor::prelude::*;
23
24pub struct BatchQueryExecutor<S: StateStore> {
25 table: BatchTable<S>,
27
28 batch_size: usize,
30
31 schema: Schema,
32}
33
34impl<S> BatchQueryExecutor<S>
35where
36 S: StateStore,
37{
38 pub fn new(table: BatchTable<S>, batch_size: usize, schema: Schema) -> Self {
39 Self {
40 table,
41 batch_size,
42 schema,
43 }
44 }
45
46 #[try_stream(ok = Message, error = StreamExecutorError)]
47 async fn execute_inner(self, epoch: u64) {
48 let iter = self
49 .table
50 .batch_iter(
51 HummockReadEpoch::Committed(epoch),
52 false,
53 PrefetchOptions::prefetch_for_large_range_scan(),
54 )
55 .await?;
56 let iter = iter.map_ok(|keyed_row| keyed_row.into_owned_row());
57 pin_mut!(iter);
58
59 while let Some(data_chunk) =
60 collect_data_chunk(&mut iter, &self.schema, Some(self.batch_size))
61 .instrument_await("batch_query_executor_collect_chunk")
62 .await?
63 {
64 let ops = vec![Op::Insert; data_chunk.capacity()];
65 let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
66 yield Message::Chunk(stream_chunk);
67 }
68 }
69}
70
71impl<S> Execute for BatchQueryExecutor<S>
72where
73 S: StateStore,
74{
75 fn execute(self: Box<Self>) -> super::BoxedMessageStream {
76 unreachable!("should call `execute_with_epoch`")
77 }
78
79 fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
80 self.execute_inner(epoch).boxed()
81 }
82}
83
84#[cfg(test)]
85mod test {
86 use futures_async_stream::for_await;
87
88 use super::*;
89 use crate::executor::mview::test_utils::gen_basic_table;
90
91 #[tokio::test]
92 async fn test_basic() {
93 let test_batch_size = 50;
94 let test_batch_count = 5;
95 let table = gen_basic_table(test_batch_count * test_batch_size).await;
96
97 let schema = table.schema().clone();
98 let stream = BatchQueryExecutor::new(table, test_batch_size, schema)
99 .boxed()
100 .execute_with_epoch(u64::MAX);
101 let mut batch_cnt = 0;
102
103 #[for_await]
104 for msg in stream {
105 let msg: Message = msg.unwrap();
106 let chunk = msg.as_chunk().unwrap();
107 let data = *chunk.column_at(0).datum_at(0).unwrap().as_int32();
108 assert_eq!(data, (batch_cnt * test_batch_size) as i32);
109 batch_cnt += 1;
110 }
111
112 assert_eq!(batch_cnt, test_batch_count)
113 }
114}