risingwave_batch_executors/executor/
test_utils.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg_attr(not(test), allow(dead_code))]
16
17use assert_matches::assert_matches;
18use futures_async_stream::for_await;
19use itertools::Itertools;
20use risingwave_common::array::{DataChunk, DataChunkTestExt};
21use risingwave_common::catalog::Schema;
22use risingwave_common::field_generator::{FieldGeneratorImpl, VarcharProperty};
23use risingwave_common::row::Row;
24use risingwave_common::types::{DataType, Datum, ToOwnedDatum};
25use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
26use risingwave_expr::expr::BoxedExpression;
27use risingwave_pb::batch_plan::PbExchangeSource;
28
29use crate::error::Result;
30use crate::exchange_source::ExchangeSourceImpl;
31use crate::executor::{BoxedExecutor, CreateSource, LookupExecutorBuilder};
32use crate::task::BatchTaskContext;
33
34const SEED: u64 = 0xFF67FEABBAEF76FF;
35
36pub use risingwave_batch::executor::test_utils::*;
37
38/// Generate `batch_num` data chunks with type `data_types`, each data chunk has cardinality of
39/// `batch_size`.
40pub fn gen_data(batch_size: usize, batch_num: usize, data_types: &[DataType]) -> Vec<DataChunk> {
41    DataChunk::gen_data_chunks(
42        batch_num,
43        batch_size,
44        data_types,
45        &VarcharProperty::RandomFixedLength(None),
46        1.0,
47    )
48}
49
50/// Generate `batch_num` sorted data chunks with type `Int64`, each data chunk has cardinality of
51/// `batch_size`.
52pub fn gen_sorted_data(
53    batch_size: usize,
54    batch_num: usize,
55    start: String,
56    step: u64,
57    offset: u64,
58) -> Vec<DataChunk> {
59    let mut data_gen = FieldGeneratorImpl::with_number_sequence(
60        DataType::Int64,
61        Some(start),
62        Some(i64::MAX.to_string()),
63        0,
64        step,
65        offset,
66    )
67    .unwrap();
68    let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
69
70    for _ in 0..batch_num {
71        let mut array_builder = DataType::Int64.create_array_builder(batch_size);
72
73        for _ in 0..batch_size {
74            array_builder.append(data_gen.generate_datum(0));
75        }
76
77        let array = array_builder.finish();
78        ret.push(DataChunk::new(vec![array.into()], batch_size));
79    }
80
81    ret
82}
83
84/// Generate `batch_num` data chunks with type `Int64`, each data chunk has cardinality of
85/// `batch_size`. Then project each data chunk with `expr`.
86///
87/// NOTE: For convenience, here we only use data type `Int64`.
88pub fn gen_projected_data(
89    batch_size: usize,
90    batch_num: usize,
91    expr: BoxedExpression,
92) -> Vec<DataChunk> {
93    let mut data_gen =
94        FieldGeneratorImpl::with_number_random(DataType::Int64, None, None, SEED).unwrap();
95    let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
96
97    for i in 0..batch_num {
98        let mut array_builder = DataType::Int64.create_array_builder(batch_size);
99
100        for j in 0..batch_size {
101            array_builder.append(data_gen.generate_datum(((i + 1) * (j + 1)) as u64));
102        }
103
104        let chunk = DataChunk::new(vec![array_builder.finish().into()], batch_size);
105
106        let array = futures::executor::block_on(expr.eval(&chunk)).unwrap();
107        let chunk = DataChunk::new(vec![array], batch_size);
108        ret.push(chunk);
109    }
110
111    ret
112}
113
114/// if the input from two child executor is same(considering order),
115/// it will also check the columns structure of chunks from child executor
116/// use for executor unit test.
117///
118/// if want diff ignoring order, add a `order_by` executor in manual currently, when the `schema`
119/// method of `executor` is ready, an order-ignored version will be added.
120pub async fn diff_executor_output(actual: BoxedExecutor, expect: BoxedExecutor) {
121    let mut expect_cardinality = 0;
122    let mut actual_cardinality = 0;
123    let mut expects = vec![];
124    let mut actuals = vec![];
125
126    #[for_await]
127    for chunk in expect.execute() {
128        assert_matches!(chunk, Ok(_));
129        let chunk = chunk.unwrap().compact();
130        expect_cardinality += chunk.cardinality();
131        expects.push(chunk);
132    }
133
134    #[for_await]
135    for chunk in actual.execute() {
136        assert_matches!(chunk, Ok(_));
137        let chunk = chunk.unwrap().compact();
138        actual_cardinality += chunk.cardinality();
139        actuals.push(chunk);
140    }
141
142    assert_eq!(actual_cardinality, expect_cardinality);
143    if actual_cardinality == 0 {
144        return;
145    }
146    let expect = DataChunk::rechunk(expects.as_slice(), actual_cardinality)
147        .unwrap()
148        .into_iter()
149        .next()
150        .unwrap();
151    let actual = DataChunk::rechunk(actuals.as_slice(), actual_cardinality)
152        .unwrap()
153        .into_iter()
154        .next()
155        .unwrap();
156    let col_num = expect.columns().len();
157    assert_eq!(col_num, actual.columns().len());
158    expect
159        .columns()
160        .iter()
161        .zip_eq_fast(actual.columns().iter())
162        .for_each(|(c1, c2)| assert_eq!(c1, c2));
163
164    is_data_chunk_eq(&expect, &actual)
165}
166
167fn is_data_chunk_eq(left: &DataChunk, right: &DataChunk) {
168    assert!(left.is_compacted());
169    assert!(right.is_compacted());
170
171    assert_eq!(
172        left.cardinality(),
173        right.cardinality(),
174        "two chunks cardinality is different"
175    );
176
177    left.rows()
178        .zip_eq_debug(right.rows())
179        .for_each(|(row1, row2)| assert_eq!(row1, row2));
180}
181
182pub struct FakeInnerSideExecutorBuilder {
183    schema: Schema,
184    datums: Vec<Vec<Datum>>,
185}
186
187impl FakeInnerSideExecutorBuilder {
188    pub fn new(schema: Schema) -> Self {
189        Self {
190            schema,
191            datums: vec![],
192        }
193    }
194}
195
196#[async_trait::async_trait]
197impl LookupExecutorBuilder for FakeInnerSideExecutorBuilder {
198    async fn build_executor(&mut self) -> Result<BoxedExecutor> {
199        let mut mock_executor = MockExecutor::new(self.schema.clone());
200
201        let base_data_chunk = DataChunk::from_pretty(
202            "i f
203             1 9.2
204             2 4.4
205             2 5.5
206             4 6.8
207             5 3.7
208             5 2.3
209             . .",
210        );
211
212        for idx in 0..base_data_chunk.capacity() {
213            let probe_row = base_data_chunk.row_at_unchecked_vis(idx);
214            for datum in &self.datums {
215                if datum[0] == probe_row.datum_at(0).to_owned_datum() {
216                    let chunk =
217                        DataChunk::from_rows(&[probe_row], &[DataType::Int32, DataType::Float32]);
218                    mock_executor.add(chunk);
219                    break;
220                }
221            }
222        }
223
224        Ok(Box::new(mock_executor))
225    }
226
227    async fn add_scan_range(&mut self, key_datums: Vec<Datum>) -> Result<()> {
228        self.datums.push(key_datums.iter().cloned().collect_vec());
229        Ok(())
230    }
231
232    fn reset(&mut self) {
233        self.datums = vec![];
234    }
235}
236
237#[derive(Debug, Clone)]
238pub(super) struct FakeCreateSource {
239    fake_exchange_source: FakeExchangeSource,
240}
241
242impl FakeCreateSource {
243    pub fn new(fake_exchange_source: FakeExchangeSource) -> Self {
244        Self {
245            fake_exchange_source,
246        }
247    }
248}
249
250#[async_trait::async_trait]
251impl CreateSource for FakeCreateSource {
252    async fn create_source(
253        &self,
254        _: &dyn BatchTaskContext,
255        _: &PbExchangeSource,
256    ) -> Result<ExchangeSourceImpl> {
257        Ok(ExchangeSourceImpl::Fake(self.fake_exchange_source.clone()))
258    }
259}