risingwave_batch/executor/
test_utils.rs1use std::collections::VecDeque;
16
17use futures::StreamExt;
18use futures_async_stream::try_stream;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::Schema;
21
22use super::{
23 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
24 register_executor,
25};
26use crate::error::{BatchError, Result};
27use crate::exchange_source::ExchangeSource;
28use crate::task::TaskId;
29
30#[derive(Debug, Clone)]
31pub struct FakeExchangeSource {
32 chunks: Vec<Option<DataChunk>>,
33}
34
35impl FakeExchangeSource {
36 pub fn new(chunks: Vec<Option<DataChunk>>) -> Self {
37 Self { chunks }
38 }
39}
40
41impl ExchangeSource for FakeExchangeSource {
42 async fn take_data(&mut self) -> Result<Option<DataChunk>> {
43 if let Some(chunk) = self.chunks.pop() {
44 Ok(chunk)
45 } else {
46 Ok(None)
47 }
48 }
49
50 fn get_task_id(&self) -> TaskId {
51 TaskId::default()
52 }
53}
54
55register_executor!(BlockExecutor, BlockExecutor);
57register_executor!(BusyLoopExecutor, BusyLoopExecutor);
58
59pub struct MockExecutor {
63 chunks: VecDeque<DataChunk>,
64 schema: Schema,
65 identity: String,
66}
67
68impl MockExecutor {
69 pub fn new(schema: Schema) -> Self {
70 Self {
71 chunks: VecDeque::new(),
72 schema,
73 identity: "MockExecutor".to_owned(),
74 }
75 }
76
77 pub fn with_chunk(chunk: DataChunk, schema: Schema) -> Self {
78 let mut ret = Self::new(schema);
79 ret.add(chunk);
80 ret
81 }
82
83 pub fn add(&mut self, chunk: DataChunk) {
84 self.chunks.push_back(chunk);
85 }
86}
87
88impl Executor for MockExecutor {
89 fn schema(&self) -> &Schema {
90 &self.schema
91 }
92
93 fn identity(&self) -> &str {
94 &self.identity
95 }
96
97 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
98 self.do_execute()
99 }
100}
101
102impl MockExecutor {
103 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
104 async fn do_execute(self: Box<Self>) {
105 for data_chunk in self.chunks {
106 yield data_chunk;
107 }
108 }
109}
110
111pub struct BlockExecutor;
112
113impl Executor for BlockExecutor {
114 fn schema(&self) -> &Schema {
115 unimplemented!("Not used in test")
116 }
117
118 fn identity(&self) -> &str {
119 "BlockExecutor"
120 }
121
122 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
123 self.do_execute().boxed()
124 }
125}
126
127impl BlockExecutor {
128 #[try_stream(ok = DataChunk, error = BatchError)]
129 async fn do_execute(self) {
130 #[allow(clippy::empty_loop)]
132 loop {}
133 }
134}
135
136impl BoxedExecutorBuilder for BlockExecutor {
137 async fn new_boxed_executor(
138 _source: &ExecutorBuilder<'_>,
139 _inputs: Vec<BoxedExecutor>,
140 ) -> Result<BoxedExecutor> {
141 Ok(Box::new(BlockExecutor))
142 }
143}
144
145pub struct BusyLoopExecutor;
146
147impl Executor for BusyLoopExecutor {
148 fn schema(&self) -> &Schema {
149 unimplemented!("Not used in test")
150 }
151
152 fn identity(&self) -> &str {
153 "BusyLoopExecutor"
154 }
155
156 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
157 self.do_execute().boxed()
158 }
159}
160
161impl BusyLoopExecutor {
162 #[try_stream(ok = DataChunk, error = BatchError)]
163 async fn do_execute(self) {
164 loop {
166 yield DataChunk::new_dummy(1);
167 }
168 }
169}
170
171impl BoxedExecutorBuilder for BusyLoopExecutor {
172 async fn new_boxed_executor(
173 _source: &ExecutorBuilder<'_>,
174 _inputs: Vec<BoxedExecutor>,
175 ) -> Result<BoxedExecutor> {
176 Ok(Box::new(BusyLoopExecutor))
177 }
178}