risingwave_batch/executor/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::Schema;
21
22use super::{
23    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
24    register_executor,
25};
26use crate::error::{BatchError, Result};
27use crate::exchange_source::ExchangeSource;
28use crate::task::TaskId;
29
30#[derive(Debug, Clone)]
31pub struct FakeExchangeSource {
32    chunks: Vec<Option<DataChunk>>,
33}
34
35impl FakeExchangeSource {
36    pub fn new(chunks: Vec<Option<DataChunk>>) -> Self {
37        Self { chunks }
38    }
39}
40
41impl ExchangeSource for FakeExchangeSource {
42    async fn take_data(&mut self) -> Result<Option<DataChunk>> {
43        if let Some(chunk) = self.chunks.pop() {
44            Ok(chunk)
45        } else {
46            Ok(None)
47        }
48    }
49
50    fn get_task_id(&self) -> TaskId {
51        TaskId::default()
52    }
53}
54
55// Following executors are only for testing.
56register_executor!(BlockExecutor, BlockExecutor);
57register_executor!(BusyLoopExecutor, BusyLoopExecutor);
58
59/// Mock the input of executor.
60/// You can bind one or more `MockExecutor` as the children of the executor to test,
61/// (`HashAgg`, e.g), so that allow testing without instantiating real `SeqScan`s and real storage.
62pub struct MockExecutor {
63    chunks: VecDeque<DataChunk>,
64    schema: Schema,
65    identity: String,
66}
67
68impl MockExecutor {
69    pub fn new(schema: Schema) -> Self {
70        Self {
71            chunks: VecDeque::new(),
72            schema,
73            identity: "MockExecutor".to_owned(),
74        }
75    }
76
77    pub fn with_chunk(chunk: DataChunk, schema: Schema) -> Self {
78        let mut ret = Self::new(schema);
79        ret.add(chunk);
80        ret
81    }
82
83    pub fn add(&mut self, chunk: DataChunk) {
84        self.chunks.push_back(chunk);
85    }
86}
87
88impl Executor for MockExecutor {
89    fn schema(&self) -> &Schema {
90        &self.schema
91    }
92
93    fn identity(&self) -> &str {
94        &self.identity
95    }
96
97    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
98        self.do_execute()
99    }
100}
101
102impl MockExecutor {
103    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
104    async fn do_execute(self: Box<Self>) {
105        for data_chunk in self.chunks {
106            yield data_chunk;
107        }
108    }
109}
110
111pub struct BlockExecutor;
112
113impl Executor for BlockExecutor {
114    fn schema(&self) -> &Schema {
115        unimplemented!("Not used in test")
116    }
117
118    fn identity(&self) -> &str {
119        "BlockExecutor"
120    }
121
122    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
123        self.do_execute().boxed()
124    }
125}
126
127impl BlockExecutor {
128    #[try_stream(ok = DataChunk, error = BatchError)]
129    async fn do_execute(self) {
130        // infinite loop to block
131        #[allow(clippy::empty_loop)]
132        loop {}
133    }
134}
135
136impl BoxedExecutorBuilder for BlockExecutor {
137    async fn new_boxed_executor(
138        _source: &ExecutorBuilder<'_>,
139        _inputs: Vec<BoxedExecutor>,
140    ) -> Result<BoxedExecutor> {
141        Ok(Box::new(BlockExecutor))
142    }
143}
144
145pub struct BusyLoopExecutor;
146
147impl Executor for BusyLoopExecutor {
148    fn schema(&self) -> &Schema {
149        unimplemented!("Not used in test")
150    }
151
152    fn identity(&self) -> &str {
153        "BusyLoopExecutor"
154    }
155
156    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
157        self.do_execute().boxed()
158    }
159}
160
161impl BusyLoopExecutor {
162    #[try_stream(ok = DataChunk, error = BatchError)]
163    async fn do_execute(self) {
164        // infinite loop to generate data
165        loop {
166            yield DataChunk::new_dummy(1);
167        }
168    }
169}
170
171impl BoxedExecutorBuilder for BusyLoopExecutor {
172    async fn new_boxed_executor(
173        _source: &ExecutorBuilder<'_>,
174        _inputs: Vec<BoxedExecutor>,
175    ) -> Result<BoxedExecutor> {
176        Ok(Box::new(BusyLoopExecutor))
177    }
178}