risingwave_stream/executor/
batch_query.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::TryStreamExt;
16use risingwave_common::array::Op;
17use risingwave_hummock_sdk::HummockReadEpoch;
18use risingwave_storage::store::PrefetchOptions;
19use risingwave_storage::table::batch_table::BatchTable;
20use risingwave_storage::table::collect_data_chunk;
21
22use crate::executor::prelude::*;
23
24pub struct BatchQueryExecutor<S: StateStore> {
25    /// The [`BatchTable`] that needs to be queried
26    table: BatchTable<S>,
27
28    /// The number of tuples in one [`StreamChunk`]
29    batch_size: usize,
30
31    schema: Schema,
32}
33
34impl<S> BatchQueryExecutor<S>
35where
36    S: StateStore,
37{
38    pub fn new(table: BatchTable<S>, batch_size: usize, schema: Schema) -> Self {
39        Self {
40            table,
41            batch_size,
42            schema,
43        }
44    }
45
46    #[try_stream(ok = Message, error = StreamExecutorError)]
47    async fn execute_inner(self, epoch: u64) {
48        let iter = self
49            .table
50            .batch_iter(
51                HummockReadEpoch::Committed(epoch),
52                false,
53                PrefetchOptions::prefetch_for_large_range_scan(),
54            )
55            .await?;
56        let iter = iter.map_ok(|keyed_row| keyed_row.into_owned_row());
57        pin_mut!(iter);
58
59        while let Some(data_chunk) =
60            collect_data_chunk(&mut iter, &self.schema, Some(self.batch_size))
61                .instrument_await("batch_query_executor_collect_chunk")
62                .await?
63        {
64            let ops = vec![Op::Insert; data_chunk.capacity()];
65            let stream_chunk = StreamChunk::from_parts(ops, data_chunk);
66            yield Message::Chunk(stream_chunk);
67        }
68    }
69}
70
71impl<S> Execute for BatchQueryExecutor<S>
72where
73    S: StateStore,
74{
75    fn execute(self: Box<Self>) -> super::BoxedMessageStream {
76        unreachable!("should call `execute_with_epoch`")
77    }
78
79    fn execute_with_epoch(self: Box<Self>, epoch: u64) -> BoxedMessageStream {
80        self.execute_inner(epoch).boxed()
81    }
82}
83
84#[cfg(test)]
85mod test {
86    use futures_async_stream::for_await;
87
88    use super::*;
89    use crate::executor::mview::test_utils::gen_basic_table;
90
91    #[tokio::test]
92    async fn test_basic() {
93        let test_batch_size = 50;
94        let test_batch_count = 5;
95        let table = gen_basic_table(test_batch_count * test_batch_size).await;
96
97        let schema = table.schema().clone();
98        let stream = BatchQueryExecutor::new(table, test_batch_size, schema)
99            .boxed()
100            .execute_with_epoch(u64::MAX);
101        let mut batch_cnt = 0;
102
103        #[for_await]
104        for msg in stream {
105            let msg: Message = msg.unwrap();
106            let chunk = msg.as_chunk().unwrap();
107            let data = *chunk.column_at(0).datum_at(0).unwrap().as_int32();
108            assert_eq!(data, (batch_cnt * test_batch_size) as i32);
109            batch_cnt += 1;
110        }
111
112        assert_eq!(batch_cnt, test_batch_count)
113    }
114}