risingwave_batch_executors/executor/
values.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::vec;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_expr::expr::{BoxedExpression, build_from_prost};
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::{BatchError, Result};
26use crate::executor::{
27    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
28};
29
30/// [`ValuesExecutor`] implements Values executor.
31pub struct ValuesExecutor {
32    rows: vec::IntoIter<Vec<BoxedExpression>>,
33    schema: Schema,
34    identity: String,
35    chunk_size: usize,
36}
37
38impl ValuesExecutor {
39    pub(crate) fn new(
40        rows: Vec<Vec<BoxedExpression>>,
41        schema: Schema,
42        identity: String,
43        chunk_size: usize,
44    ) -> Self {
45        Self {
46            rows: rows.into_iter(),
47            schema,
48            identity,
49            chunk_size,
50        }
51    }
52}
53
54impl Executor for ValuesExecutor {
55    fn schema(&self) -> &Schema {
56        &self.schema
57    }
58
59    fn identity(&self) -> &str {
60        &self.identity
61    }
62
63    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
64        self.do_execute()
65    }
66}
67
68impl ValuesExecutor {
69    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
70    async fn do_execute(mut self: Box<Self>) {
71        if !self.rows.is_empty() {
72            let cardinality = self.rows.len();
73            ensure!(cardinality > 0);
74
75            while !self.rows.is_empty() {
76                // We need a one row chunk rather than an empty chunk because constant
77                // expression's eval result is same size as input chunk
78                // cardinality.
79                let one_row_chunk = DataChunk::new_dummy(1);
80
81                let chunk_size = self.chunk_size.min(self.rows.len());
82                let mut array_builders = self.schema.create_array_builders(chunk_size);
83                for row in self.rows.by_ref().take(chunk_size) {
84                    for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
85                        let out = expr.eval(&one_row_chunk).await?;
86                        builder.append_array(&out);
87                    }
88                }
89
90                let columns: Vec<_> = array_builders
91                    .into_iter()
92                    .map(|b| b.finish().into())
93                    .collect();
94
95                let chunk = DataChunk::new(columns, chunk_size);
96
97                yield chunk
98            }
99        }
100    }
101}
102
103impl BoxedExecutorBuilder for ValuesExecutor {
104    async fn new_boxed_executor(
105        source: &ExecutorBuilder<'_>,
106        inputs: Vec<BoxedExecutor>,
107    ) -> Result<BoxedExecutor> {
108        ensure!(inputs.is_empty(), "ValuesExecutor should have no child!");
109        let value_node = try_match_expand!(
110            source.plan_node().get_node_body().unwrap(),
111            NodeBody::Values
112        )?;
113
114        let mut rows: Vec<Vec<BoxedExpression>> = Vec::with_capacity(value_node.get_tuples().len());
115        for row in value_node.get_tuples() {
116            let expr_row: Vec<_> = row.get_cells().iter().map(build_from_prost).try_collect()?;
117            rows.push(expr_row);
118        }
119
120        let fields = value_node
121            .get_fields()
122            .iter()
123            .map(Field::from)
124            .collect::<Vec<Field>>();
125
126        Ok(Box::new(Self {
127            rows: rows.into_iter(),
128            schema: Schema { fields },
129            identity: source.plan_node().get_identity().clone(),
130            chunk_size: source.context().get_config().developer.chunk_size,
131        }))
132    }
133}
134
135#[cfg(test)]
136mod tests {
137
138    use futures::stream::StreamExt;
139    use risingwave_common::array::{
140        Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
141    };
142    use risingwave_common::catalog::{Field, Schema};
143    use risingwave_common::types::{DataType, ScalarImpl, StructType};
144    use risingwave_expr::expr::{BoxedExpression, LiteralExpression};
145
146    use crate::executor::{Executor, ValuesExecutor};
147
148    const CHUNK_SIZE: usize = 1024;
149
150    #[tokio::test]
151    async fn test_values_executor() {
152        let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
153        let exprs = vec![
154            Box::new(LiteralExpression::new(
155                DataType::Int16,
156                Some(ScalarImpl::Int16(1)),
157            )) as BoxedExpression,
158            Box::new(LiteralExpression::new(
159                DataType::Int32,
160                Some(ScalarImpl::Int32(2)),
161            )),
162            Box::new(LiteralExpression::new(
163                DataType::Int64,
164                Some(ScalarImpl::Int64(3)),
165            )),
166            Box::new(LiteralExpression::new(
167                StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
168                Some(ScalarImpl::Struct(value)),
169            )) as BoxedExpression,
170        ];
171
172        let fields = exprs
173            .iter() // for each column
174            .map(|col| Field::unnamed(col.return_type()))
175            .collect::<Vec<Field>>();
176
177        let values_executor = Box::new(ValuesExecutor {
178            rows: vec![exprs].into_iter(),
179            schema: Schema { fields },
180            identity: "ValuesExecutor2".to_owned(),
181            chunk_size: CHUNK_SIZE,
182        });
183
184        let fields = &values_executor.schema().fields;
185        assert_eq!(fields[0].data_type, DataType::Int16);
186        assert_eq!(fields[1].data_type, DataType::Int32);
187        assert_eq!(fields[2].data_type, DataType::Int64);
188        assert_eq!(
189            fields[3].data_type,
190            StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into()
191        );
192
193        let mut stream = values_executor.execute();
194        let result = stream.next().await.unwrap();
195        let array: ArrayImpl = StructArray::new(
196            StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
197            vec![
198                I32Array::from_iter([1]).into_ref(),
199                I32Array::from_iter([2]).into_ref(),
200                I32Array::from_iter([3]).into_ref(),
201            ],
202            [true].into_iter().collect(),
203        )
204        .into();
205
206        if let Ok(result) = result {
207            assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
208            assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
209            assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
210            assert_eq!(*result.column_at(3), array.into());
211        }
212    }
213
214    #[tokio::test]
215    async fn test_chunk_split_size() {
216        let rows = [
217            Box::new(LiteralExpression::new(
218                DataType::Int32,
219                Some(ScalarImpl::Int32(1)),
220            )) as BoxedExpression,
221            Box::new(LiteralExpression::new(
222                DataType::Int32,
223                Some(ScalarImpl::Int32(2)),
224            )) as BoxedExpression,
225            Box::new(LiteralExpression::new(
226                DataType::Int32,
227                Some(ScalarImpl::Int32(3)),
228            )) as BoxedExpression,
229            Box::new(LiteralExpression::new(
230                DataType::Int32,
231                Some(ScalarImpl::Int32(4)),
232            )) as BoxedExpression,
233        ]
234        .into_iter()
235        .map(|expr| vec![expr])
236        .collect::<Vec<_>>();
237
238        let fields = vec![Field::unnamed(DataType::Int32)];
239
240        let values_executor = Box::new(ValuesExecutor::new(
241            rows,
242            Schema { fields },
243            "ValuesExecutor2".to_owned(),
244            3,
245        ));
246        let mut stream = values_executor.execute();
247        assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 3);
248        assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 1);
249        assert!(stream.next().await.is_none());
250    }
251
252    // Handle the possible case of ValuesNode([[]])
253    #[tokio::test]
254    async fn test_no_column_values_executor() {
255        let values_executor = Box::new(ValuesExecutor::new(
256            vec![vec![]],
257            Schema::default(),
258            "ValuesExecutor2".to_owned(),
259            CHUNK_SIZE,
260        ));
261        let mut stream = values_executor.execute();
262
263        let result = stream.next().await.unwrap().unwrap();
264        assert_eq!(result.cardinality(), 1);
265        assert_eq!(result.dimension(), 0);
266
267        assert!(stream.next().await.is_none());
268    }
269}