risingwave_batch_executors/executor/
test_utils.rs1#![cfg_attr(not(test), allow(dead_code))]
16
17use assert_matches::assert_matches;
18use futures_async_stream::for_await;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, DataChunkTestExt};
21use risingwave_common::catalog::Schema;
22use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty};
23use risingwave_common::row::Row;
24use risingwave_common::types::{DataType, Datum, ToOwnedDatum};
25use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
26use risingwave_expr::expr::BoxedExpression;
27use risingwave_pb::batch_plan::PbExchangeSource;
28
29use crate::error::Result;
30use crate::exchange_source::ExchangeSourceImpl;
31use crate::executor::{BoxedExecutor, CreateSource, LookupExecutorBuilder};
32use crate::task::BatchTaskContext;
33
34const SEED: u64 = 0xFF67FEABBAEF76FF;
35
36pub use risingwave_batch::executor::test_utils::*;
37
38pub fn gen_data(batch_size: usize, batch_num: usize, data_types: &[DataType]) -> Vec<DataChunk> {
41 DataChunk::gen_data_chunks(
42 batch_num,
43 batch_size,
44 data_types,
45 &VarcharProperty::RandomFixedLength(None),
46 1.0,
47 )
48}
49
50pub fn gen_sorted_data(
53 batch_size: usize,
54 batch_num: usize,
55 start: String,
56 step: u64,
57 offset: u64,
58) -> Vec<DataChunk> {
59 let mut data_gen = FieldGeneratorImpl::with_number_sequence(
60 DataType::Int64,
61 Some(start),
62 Some(i64::MAX.to_string()),
63 0,
64 step,
65 offset,
66 )
67 .unwrap();
68 let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
69
70 for _ in 0..batch_num {
71 let mut array_builder = DataType::Int64.create_array_builder(batch_size);
72
73 for _ in 0..batch_size {
74 array_builder.append(data_gen.generate_datum(0));
75 }
76
77 let array = array_builder.finish();
78 ret.push(DataChunk::new(vec![array.into()], batch_size));
79 }
80
81 ret
82}
83
84pub fn gen_projected_data(
89 batch_size: usize,
90 batch_num: usize,
91 expr: BoxedExpression,
92) -> Vec<DataChunk> {
93 let mut data_gen =
94 FieldGeneratorImpl::with_number_random(DataType::Int64, None, None, SEED).unwrap();
95 let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
96
97 for i in 0..batch_num {
98 let mut array_builder = DataType::Int64.create_array_builder(batch_size);
99
100 for j in 0..batch_size {
101 array_builder.append(data_gen.generate_datum(((i + 1) * (j + 1)) as u64));
102 }
103
104 let chunk = DataChunk::new(vec![array_builder.finish().into()], batch_size);
105
106 let array = futures::executor::block_on(expr.eval(&chunk)).unwrap();
107 let chunk = DataChunk::new(vec![array], batch_size);
108 ret.push(chunk);
109 }
110
111 ret
112}
113
114pub async fn diff_executor_output(actual: BoxedExecutor, expect: BoxedExecutor) {
121 let mut expect_cardinality = 0;
122 let mut actual_cardinality = 0;
123 let mut expects = vec![];
124 let mut actuals = vec![];
125
126 #[for_await]
127 for chunk in expect.execute() {
128 assert_matches!(chunk, Ok(_));
129 let chunk = chunk.unwrap().compact();
130 expect_cardinality += chunk.cardinality();
131 expects.push(chunk);
132 }
133
134 #[for_await]
135 for chunk in actual.execute() {
136 assert_matches!(chunk, Ok(_));
137 let chunk = chunk.unwrap().compact();
138 actual_cardinality += chunk.cardinality();
139 actuals.push(chunk);
140 }
141
142 assert_eq!(actual_cardinality, expect_cardinality);
143 if actual_cardinality == 0 {
144 return;
145 }
146 let expect = DataChunk::rechunk(expects.as_slice(), actual_cardinality)
147 .unwrap()
148 .into_iter()
149 .next()
150 .unwrap();
151 let actual = DataChunk::rechunk(actuals.as_slice(), actual_cardinality)
152 .unwrap()
153 .into_iter()
154 .next()
155 .unwrap();
156 let col_num = expect.columns().len();
157 assert_eq!(col_num, actual.columns().len());
158 expect
159 .columns()
160 .iter()
161 .zip_eq_fast(actual.columns().iter())
162 .for_each(|(c1, c2)| assert_eq!(c1, c2));
163
164 is_data_chunk_eq(&expect, &actual)
165}
166
167fn is_data_chunk_eq(left: &DataChunk, right: &DataChunk) {
168 assert!(left.is_compacted());
169 assert!(right.is_compacted());
170
171 assert_eq!(
172 left.cardinality(),
173 right.cardinality(),
174 "two chunks cardinality is different"
175 );
176
177 left.rows()
178 .zip_eq_debug(right.rows())
179 .for_each(|(row1, row2)| assert_eq!(row1, row2));
180}
181
182pub struct FakeInnerSideExecutorBuilder {
183 schema: Schema,
184 datums: Vec<Vec<Datum>>,
185}
186
187impl FakeInnerSideExecutorBuilder {
188 pub fn new(schema: Schema) -> Self {
189 Self {
190 schema,
191 datums: vec![],
192 }
193 }
194}
195
196impl LookupExecutorBuilder for FakeInnerSideExecutorBuilder {
197 async fn build_executor(&mut self) -> Result<BoxedExecutor> {
198 let mut mock_executor = MockExecutor::new(self.schema.clone());
199
200 let base_data_chunk = DataChunk::from_pretty(
201 "i f
202 1 9.2
203 2 4.4
204 2 5.5
205 4 6.8
206 5 3.7
207 5 2.3
208 . .",
209 );
210
211 for idx in 0..base_data_chunk.capacity() {
212 let probe_row = base_data_chunk.row_at_unchecked_vis(idx);
213 for datum in &self.datums {
214 if datum[0] == probe_row.datum_at(0).to_owned_datum() {
215 let chunk =
216 DataChunk::from_rows(&[probe_row], &[DataType::Int32, DataType::Float32]);
217 mock_executor.add(chunk);
218 break;
219 }
220 }
221 }
222
223 Ok(Box::new(mock_executor))
224 }
225
226 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
227 self.datums.push(key_datums.iter().cloned().collect_vec());
228 Ok(())
229 }
230
231 fn reset(&mut self) {
232 self.datums = vec![];
233 }
234}
235
236#[derive(Debug, Clone)]
237pub(super) struct FakeCreateSource {
238 fake_exchange_source: FakeExchangeSource,
239}
240
241impl FakeCreateSource {
242 pub fn new(fake_exchange_source: FakeExchangeSource) -> Self {
243 Self {
244 fake_exchange_source,
245 }
246 }
247}
248
249#[async_trait::async_trait]
250impl CreateSource for FakeCreateSource {
251 async fn create_source(
252 &self,
253 _: &dyn BatchTaskContext,
254 _: &PbExchangeSource,
255 ) -> Result<ExchangeSourceImpl> {
256 Ok(ExchangeSourceImpl::Fake(self.fake_exchange_source.clone()))
257 }
258}