risingwave_batch_executors/executor/
values.rs1use std::vec;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_expr::expr::{BoxedExpression, build_from_prost};
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::{BatchError, Result};
26use crate::executor::{
27 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
28};
29
30pub struct ValuesExecutor {
32 rows: vec::IntoIter<Vec<BoxedExpression>>,
33 schema: Schema,
34 identity: String,
35 chunk_size: usize,
36}
37
38impl ValuesExecutor {
39 #[cfg(test)]
40 pub(crate) fn new(
41 rows: Vec<Vec<BoxedExpression>>,
42 schema: Schema,
43 identity: String,
44 chunk_size: usize,
45 ) -> Self {
46 Self {
47 rows: rows.into_iter(),
48 schema,
49 identity,
50 chunk_size,
51 }
52 }
53}
54
55impl Executor for ValuesExecutor {
56 fn schema(&self) -> &Schema {
57 &self.schema
58 }
59
60 fn identity(&self) -> &str {
61 &self.identity
62 }
63
64 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
65 self.do_execute()
66 }
67}
68
69impl ValuesExecutor {
70 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
71 async fn do_execute(mut self: Box<Self>) {
72 if !self.rows.is_empty() {
73 let cardinality = self.rows.len();
74 ensure!(cardinality > 0);
75
76 while !self.rows.is_empty() {
77 let one_row_chunk = DataChunk::new_dummy(1);
81
82 let chunk_size = self.chunk_size.min(self.rows.len());
83 let mut array_builders = self.schema.create_array_builders(chunk_size);
84 for row in self.rows.by_ref().take(chunk_size) {
85 for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
86 let out = expr.eval(&one_row_chunk).await?;
87 builder.append_array(&out);
88 }
89 }
90
91 let columns: Vec<_> = array_builders
92 .into_iter()
93 .map(|b| b.finish().into())
94 .collect();
95
96 let chunk = DataChunk::new(columns, chunk_size);
97
98 yield chunk
99 }
100 }
101 }
102}
103
104impl BoxedExecutorBuilder for ValuesExecutor {
105 async fn new_boxed_executor(
106 source: &ExecutorBuilder<'_>,
107 inputs: Vec<BoxedExecutor>,
108 ) -> Result<BoxedExecutor> {
109 ensure!(inputs.is_empty(), "ValuesExecutor should have no child!");
110 let value_node = try_match_expand!(
111 source.plan_node().get_node_body().unwrap(),
112 NodeBody::Values
113 )?;
114
115 let mut rows: Vec<Vec<BoxedExpression>> = Vec::with_capacity(value_node.get_tuples().len());
116 for row in value_node.get_tuples() {
117 let expr_row: Vec<_> = row.get_cells().iter().map(build_from_prost).try_collect()?;
118 rows.push(expr_row);
119 }
120
121 let fields = value_node
122 .get_fields()
123 .iter()
124 .map(Field::from)
125 .collect::<Vec<Field>>();
126
127 Ok(Box::new(Self {
128 rows: rows.into_iter(),
129 schema: Schema { fields },
130 identity: source.plan_node().get_identity().clone(),
131 chunk_size: source.context().get_config().developer.chunk_size,
132 }))
133 }
134}
135
136#[cfg(test)]
137mod tests {
138
139 use futures::stream::StreamExt;
140 use risingwave_common::array::{
141 Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
142 };
143 use risingwave_common::catalog::{Field, Schema};
144 use risingwave_common::types::{DataType, ScalarImpl, StructType};
145 use risingwave_expr::expr::{BoxedExpression, LiteralExpression};
146
147 use crate::executor::{Executor, ValuesExecutor};
148
149 const CHUNK_SIZE: usize = 1024;
150
151 #[tokio::test]
152 async fn test_values_executor() {
153 let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
154 let exprs = vec![
155 Box::new(LiteralExpression::new(
156 DataType::Int16,
157 Some(ScalarImpl::Int16(1)),
158 )) as BoxedExpression,
159 Box::new(LiteralExpression::new(
160 DataType::Int32,
161 Some(ScalarImpl::Int32(2)),
162 )),
163 Box::new(LiteralExpression::new(
164 DataType::Int64,
165 Some(ScalarImpl::Int64(3)),
166 )),
167 Box::new(LiteralExpression::new(
168 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
169 Some(ScalarImpl::Struct(value)),
170 )) as BoxedExpression,
171 ];
172
173 let fields = exprs
174 .iter() .map(|col| Field::unnamed(col.return_type()))
176 .collect::<Vec<Field>>();
177
178 let values_executor = Box::new(ValuesExecutor {
179 rows: vec![exprs].into_iter(),
180 schema: Schema { fields },
181 identity: "ValuesExecutor2".to_owned(),
182 chunk_size: CHUNK_SIZE,
183 });
184
185 let fields = &values_executor.schema().fields;
186 assert_eq!(fields[0].data_type, DataType::Int16);
187 assert_eq!(fields[1].data_type, DataType::Int32);
188 assert_eq!(fields[2].data_type, DataType::Int64);
189 assert_eq!(
190 fields[3].data_type,
191 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into()
192 );
193
194 let mut stream = values_executor.execute();
195 let result = stream.next().await.unwrap();
196 let array: ArrayImpl = StructArray::new(
197 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
198 vec![
199 I32Array::from_iter([1]).into_ref(),
200 I32Array::from_iter([2]).into_ref(),
201 I32Array::from_iter([3]).into_ref(),
202 ],
203 [true].into_iter().collect(),
204 )
205 .into();
206
207 if let Ok(result) = result {
208 assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
209 assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
210 assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
211 assert_eq!(*result.column_at(3), array.into());
212 }
213 }
214
215 #[tokio::test]
216 async fn test_chunk_split_size() {
217 let rows = [
218 Box::new(LiteralExpression::new(
219 DataType::Int32,
220 Some(ScalarImpl::Int32(1)),
221 )) as BoxedExpression,
222 Box::new(LiteralExpression::new(
223 DataType::Int32,
224 Some(ScalarImpl::Int32(2)),
225 )) as BoxedExpression,
226 Box::new(LiteralExpression::new(
227 DataType::Int32,
228 Some(ScalarImpl::Int32(3)),
229 )) as BoxedExpression,
230 Box::new(LiteralExpression::new(
231 DataType::Int32,
232 Some(ScalarImpl::Int32(4)),
233 )) as BoxedExpression,
234 ]
235 .into_iter()
236 .map(|expr| vec![expr])
237 .collect::<Vec<_>>();
238
239 let fields = vec![Field::unnamed(DataType::Int32)];
240
241 let values_executor = Box::new(ValuesExecutor::new(
242 rows,
243 Schema { fields },
244 "ValuesExecutor2".to_owned(),
245 3,
246 ));
247 let mut stream = values_executor.execute();
248 assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 3);
249 assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 1);
250 assert!(stream.next().await.is_none());
251 }
252
253 #[tokio::test]
255 async fn test_no_column_values_executor() {
256 let values_executor = Box::new(ValuesExecutor::new(
257 vec![vec![]],
258 Schema::default(),
259 "ValuesExecutor2".to_owned(),
260 CHUNK_SIZE,
261 ));
262 let mut stream = values_executor.execute();
263
264 let result = stream.next().await.unwrap().unwrap();
265 assert_eq!(result.cardinality(), 1);
266 assert_eq!(result.dimension(), 0);
267
268 assert!(stream.next().await.is_none());
269 }
270}