risingwave_batch_executors/executor/
values.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::vec;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_expr::expr::{BoxedExpression, build_from_prost};
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::{BatchError, Result};
26use crate::executor::{
27    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
28};
29
30/// [`ValuesExecutor`] implements Values executor.
31pub struct ValuesExecutor {
32    rows: vec::IntoIter<Vec<BoxedExpression>>,
33    schema: Schema,
34    identity: String,
35    chunk_size: usize,
36}
37
38impl ValuesExecutor {
39    #[cfg(test)]
40    pub(crate) fn new(
41        rows: Vec<Vec<BoxedExpression>>,
42        schema: Schema,
43        identity: String,
44        chunk_size: usize,
45    ) -> Self {
46        Self {
47            rows: rows.into_iter(),
48            schema,
49            identity,
50            chunk_size,
51        }
52    }
53}
54
55impl Executor for ValuesExecutor {
56    fn schema(&self) -> &Schema {
57        &self.schema
58    }
59
60    fn identity(&self) -> &str {
61        &self.identity
62    }
63
64    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
65        self.do_execute()
66    }
67}
68
69impl ValuesExecutor {
70    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
71    async fn do_execute(mut self: Box<Self>) {
72        if !self.rows.is_empty() {
73            let cardinality = self.rows.len();
74            ensure!(cardinality > 0);
75
76            while !self.rows.is_empty() {
77                // We need a one row chunk rather than an empty chunk because constant
78                // expression's eval result is same size as input chunk
79                // cardinality.
80                let one_row_chunk = DataChunk::new_dummy(1);
81
82                let chunk_size = self.chunk_size.min(self.rows.len());
83                let mut array_builders = self.schema.create_array_builders(chunk_size);
84                for row in self.rows.by_ref().take(chunk_size) {
85                    for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
86                        let out = expr.eval(&one_row_chunk).await?;
87                        builder.append_array(&out);
88                    }
89                }
90
91                let columns: Vec<_> = array_builders
92                    .into_iter()
93                    .map(|b| b.finish().into())
94                    .collect();
95
96                let chunk = DataChunk::new(columns, chunk_size);
97
98                yield chunk
99            }
100        }
101    }
102}
103
104impl BoxedExecutorBuilder for ValuesExecutor {
105    async fn new_boxed_executor(
106        source: &ExecutorBuilder<'_>,
107        inputs: Vec<BoxedExecutor>,
108    ) -> Result<BoxedExecutor> {
109        ensure!(inputs.is_empty(), "ValuesExecutor should have no child!");
110        let value_node = try_match_expand!(
111            source.plan_node().get_node_body().unwrap(),
112            NodeBody::Values
113        )?;
114
115        let mut rows: Vec<Vec<BoxedExpression>> = Vec::with_capacity(value_node.get_tuples().len());
116        for row in value_node.get_tuples() {
117            let expr_row: Vec<_> = row.get_cells().iter().map(build_from_prost).try_collect()?;
118            rows.push(expr_row);
119        }
120
121        let fields = value_node
122            .get_fields()
123            .iter()
124            .map(Field::from)
125            .collect::<Vec<Field>>();
126
127        Ok(Box::new(Self {
128            rows: rows.into_iter(),
129            schema: Schema { fields },
130            identity: source.plan_node().get_identity().clone(),
131            chunk_size: source.context().get_config().developer.chunk_size,
132        }))
133    }
134}
135
136#[cfg(test)]
137mod tests {
138
139    use futures::stream::StreamExt;
140    use risingwave_common::array::{
141        Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
142    };
143    use risingwave_common::catalog::{Field, Schema};
144    use risingwave_common::types::{DataType, ScalarImpl, StructType};
145    use risingwave_expr::expr::{BoxedExpression, LiteralExpression};
146
147    use crate::executor::{Executor, ValuesExecutor};
148
149    const CHUNK_SIZE: usize = 1024;
150
151    #[tokio::test]
152    async fn test_values_executor() {
153        let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
154        let exprs = vec![
155            Box::new(LiteralExpression::new(
156                DataType::Int16,
157                Some(ScalarImpl::Int16(1)),
158            )) as BoxedExpression,
159            Box::new(LiteralExpression::new(
160                DataType::Int32,
161                Some(ScalarImpl::Int32(2)),
162            )),
163            Box::new(LiteralExpression::new(
164                DataType::Int64,
165                Some(ScalarImpl::Int64(3)),
166            )),
167            Box::new(LiteralExpression::new(
168                StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
169                Some(ScalarImpl::Struct(value)),
170            )) as BoxedExpression,
171        ];
172
173        let fields = exprs
174            .iter() // for each column
175            .map(|col| Field::unnamed(col.return_type()))
176            .collect::<Vec<Field>>();
177
178        let values_executor = Box::new(ValuesExecutor {
179            rows: vec![exprs].into_iter(),
180            schema: Schema { fields },
181            identity: "ValuesExecutor2".to_owned(),
182            chunk_size: CHUNK_SIZE,
183        });
184
185        let fields = &values_executor.schema().fields;
186        assert_eq!(fields[0].data_type, DataType::Int16);
187        assert_eq!(fields[1].data_type, DataType::Int32);
188        assert_eq!(fields[2].data_type, DataType::Int64);
189        assert_eq!(
190            fields[3].data_type,
191            StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into()
192        );
193
194        let mut stream = values_executor.execute();
195        let result = stream.next().await.unwrap();
196        let array: ArrayImpl = StructArray::new(
197            StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
198            vec![
199                I32Array::from_iter([1]).into_ref(),
200                I32Array::from_iter([2]).into_ref(),
201                I32Array::from_iter([3]).into_ref(),
202            ],
203            [true].into_iter().collect(),
204        )
205        .into();
206
207        if let Ok(result) = result {
208            assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
209            assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
210            assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
211            assert_eq!(*result.column_at(3), array.into());
212        }
213    }
214
215    #[tokio::test]
216    async fn test_chunk_split_size() {
217        let rows = [
218            Box::new(LiteralExpression::new(
219                DataType::Int32,
220                Some(ScalarImpl::Int32(1)),
221            )) as BoxedExpression,
222            Box::new(LiteralExpression::new(
223                DataType::Int32,
224                Some(ScalarImpl::Int32(2)),
225            )) as BoxedExpression,
226            Box::new(LiteralExpression::new(
227                DataType::Int32,
228                Some(ScalarImpl::Int32(3)),
229            )) as BoxedExpression,
230            Box::new(LiteralExpression::new(
231                DataType::Int32,
232                Some(ScalarImpl::Int32(4)),
233            )) as BoxedExpression,
234        ]
235        .into_iter()
236        .map(|expr| vec![expr])
237        .collect::<Vec<_>>();
238
239        let fields = vec![Field::unnamed(DataType::Int32)];
240
241        let values_executor = Box::new(ValuesExecutor::new(
242            rows,
243            Schema { fields },
244            "ValuesExecutor2".to_owned(),
245            3,
246        ));
247        let mut stream = values_executor.execute();
248        assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 3);
249        assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 1);
250        assert!(stream.next().await.is_none());
251    }
252
253    // Handle the possible case of ValuesNode([[]])
254    #[tokio::test]
255    async fn test_no_column_values_executor() {
256        let values_executor = Box::new(ValuesExecutor::new(
257            vec![vec![]],
258            Schema::default(),
259            "ValuesExecutor2".to_owned(),
260            CHUNK_SIZE,
261        ));
262        let mut stream = values_executor.execute();
263
264        let result = stream.next().await.unwrap().unwrap();
265        assert_eq!(result.cardinality(), 1);
266        assert_eq!(result.dimension(), 0);
267
268        assert!(stream.next().await.is_none());
269    }
270}