risingwave_batch_executors/executor/
test_utils.rs1#![cfg_attr(not(test), allow(dead_code))]
16
17use assert_matches::assert_matches;
18use futures_async_stream::for_await;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, DataChunkTestExt};
21use risingwave_common::catalog::Schema;
22use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty};
23use risingwave_common::row::Row;
24use risingwave_common::types::{DataType, Datum, ToOwnedDatum};
25use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
26use risingwave_expr::expr::BoxedExpression;
27use risingwave_pb::batch_plan::PbExchangeSource;
28
29use crate::error::Result;
30use crate::exchange_source::ExchangeSourceImpl;
31use crate::executor::{BoxedExecutor, CreateSource, LookupExecutorBuilder};
32use crate::task::BatchTaskContext;
33
34const SEED: u64 = 0xFF67FEABBAEF76FF;
35
36pub use risingwave_batch::executor::test_utils::*;
37
38pub fn gen_data(batch_size: usize, batch_num: usize, data_types: &[DataType]) -> Vec<DataChunk> {
41 DataChunk::gen_data_chunks(
42 batch_num,
43 batch_size,
44 data_types,
45 &VarcharProperty::RandomFixedLength(None),
46 1.0,
47 )
48}
49
50pub fn gen_sorted_data(
53 batch_size: usize,
54 batch_num: usize,
55 start: String,
56 step: u64,
57 offset: u64,
58) -> Vec<DataChunk> {
59 let mut data_gen = FieldGeneratorImpl::with_number_sequence(
60 DataType::Int64,
61 Some(start),
62 Some(i64::MAX.to_string()),
63 0,
64 step,
65 offset,
66 )
67 .unwrap();
68 let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
69
70 for _ in 0..batch_num {
71 let mut array_builder = DataType::Int64.create_array_builder(batch_size);
72
73 for _ in 0..batch_size {
74 array_builder.append(data_gen.generate_datum(0));
75 }
76
77 let array = array_builder.finish();
78 ret.push(DataChunk::new(vec![array.into()], batch_size));
79 }
80
81 ret
82}
83
84pub fn gen_projected_data(
89 batch_size: usize,
90 batch_num: usize,
91 expr: BoxedExpression,
92) -> Vec<DataChunk> {
93 let mut data_gen =
94 FieldGeneratorImpl::with_number_random(DataType::Int64, None, None, SEED).unwrap();
95 let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
96
97 for i in 0..batch_num {
98 let mut array_builder = DataType::Int64.create_array_builder(batch_size);
99
100 for j in 0..batch_size {
101 array_builder.append(data_gen.generate_datum(((i + 1) * (j + 1)) as u64));
102 }
103
104 let chunk = DataChunk::new(vec![array_builder.finish().into()], batch_size);
105
106 let array = futures::executor::block_on(expr.eval(&chunk)).unwrap();
107 let chunk = DataChunk::new(vec![array], batch_size);
108 ret.push(chunk);
109 }
110
111 ret
112}
113
114pub async fn diff_executor_output(actual: BoxedExecutor, expect: BoxedExecutor) {
121 let mut expect_cardinality = 0;
122 let mut actual_cardinality = 0;
123 let mut expects = vec![];
124 let mut actuals = vec![];
125
126 #[for_await]
127 for chunk in expect.execute() {
128 assert_matches!(chunk, Ok(_));
129 let chunk = chunk.unwrap().compact();
130 expect_cardinality += chunk.cardinality();
131 expects.push(chunk);
132 }
133
134 #[for_await]
135 for chunk in actual.execute() {
136 assert_matches!(chunk, Ok(_));
137 let chunk = chunk.unwrap().compact();
138 actual_cardinality += chunk.cardinality();
139 actuals.push(chunk);
140 }
141
142 assert_eq!(actual_cardinality, expect_cardinality);
143 if actual_cardinality == 0 {
144 return;
145 }
146 let expect = DataChunk::rechunk(expects.as_slice(), actual_cardinality)
147 .unwrap()
148 .into_iter()
149 .next()
150 .unwrap();
151 let actual = DataChunk::rechunk(actuals.as_slice(), actual_cardinality)
152 .unwrap()
153 .into_iter()
154 .next()
155 .unwrap();
156 let col_num = expect.columns().len();
157 assert_eq!(col_num, actual.columns().len());
158 expect
159 .columns()
160 .iter()
161 .zip_eq_fast(actual.columns().iter())
162 .for_each(|(c1, c2)| assert_eq!(c1, c2));
163
164 is_data_chunk_eq(&expect, &actual)
165}
166
167fn is_data_chunk_eq(left: &DataChunk, right: &DataChunk) {
168 assert!(left.is_compacted());
169 assert!(right.is_compacted());
170
171 assert_eq!(
172 left.cardinality(),
173 right.cardinality(),
174 "two chunks cardinality is different"
175 );
176
177 left.rows()
178 .zip_eq_debug(right.rows())
179 .for_each(|(row1, row2)| assert_eq!(row1, row2));
180}
181
182pub struct FakeInnerSideExecutorBuilder {
183 schema: Schema,
184 datums: Vec<Vec<Datum>>,
185}
186
187impl FakeInnerSideExecutorBuilder {
188 pub fn new(schema: Schema) -> Self {
189 Self {
190 schema,
191 datums: vec![],
192 }
193 }
194}
195
196#[async_trait::async_trait]
197impl LookupExecutorBuilder for FakeInnerSideExecutorBuilder {
198 async fn build_executor(&mut self) -> Result<BoxedExecutor> {
199 let mut mock_executor = MockExecutor::new(self.schema.clone());
200
201 let base_data_chunk = DataChunk::from_pretty(
202 "i f
203 1 9.2
204 2 4.4
205 2 5.5
206 4 6.8
207 5 3.7
208 5 2.3
209 . .",
210 );
211
212 for idx in 0..base_data_chunk.capacity() {
213 let probe_row = base_data_chunk.row_at_unchecked_vis(idx);
214 for datum in &self.datums {
215 if datum[0] == probe_row.datum_at(0).to_owned_datum() {
216 let chunk =
217 DataChunk::from_rows(&[probe_row], &[DataType::Int32, DataType::Float32]);
218 mock_executor.add(chunk);
219 break;
220 }
221 }
222 }
223
224 Ok(Box::new(mock_executor))
225 }
226
227 async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
228 self.datums.push(key_datums.iter().cloned().collect_vec());
229 Ok(())
230 }
231
232 fn reset(&mut self) {
233 self.datums = vec![];
234 }
235}
236
237#[derive(Debug, Clone)]
238pub(super) struct FakeCreateSource {
239 fake_exchange_source: FakeExchangeSource,
240}
241
242impl FakeCreateSource {
243 pub fn new(fake_exchange_source: FakeExchangeSource) -> Self {
244 Self {
245 fake_exchange_source,
246 }
247 }
248}
249
250#[async_trait::async_trait]
251impl CreateSource for FakeCreateSource {
252 async fn create_source(
253 &self,
254 _: &dyn BatchTaskContext,
255 _: &PbExchangeSource,
256 ) -> Result<ExchangeSourceImpl> {
257 Ok(ExchangeSourceImpl::Fake(self.fake_exchange_source.clone()))
258 }
259}