risingwave_batch_executors/executor/
values.rs1use std::vec;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_common::util::iter_util::ZipEqFast;
22use risingwave_expr::expr::{BoxedExpression, build_from_prost};
23use risingwave_pb::batch_plan::plan_node::NodeBody;
24
25use crate::error::{BatchError, Result};
26use crate::executor::{
27 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
28};
29
30pub struct ValuesExecutor {
32 rows: vec::IntoIter<Vec<BoxedExpression>>,
33 schema: Schema,
34 identity: String,
35 chunk_size: usize,
36}
37
38impl ValuesExecutor {
39 pub(crate) fn new(
40 rows: Vec<Vec<BoxedExpression>>,
41 schema: Schema,
42 identity: String,
43 chunk_size: usize,
44 ) -> Self {
45 Self {
46 rows: rows.into_iter(),
47 schema,
48 identity,
49 chunk_size,
50 }
51 }
52}
53
54impl Executor for ValuesExecutor {
55 fn schema(&self) -> &Schema {
56 &self.schema
57 }
58
59 fn identity(&self) -> &str {
60 &self.identity
61 }
62
63 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
64 self.do_execute()
65 }
66}
67
68impl ValuesExecutor {
69 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
70 async fn do_execute(mut self: Box<Self>) {
71 if !self.rows.is_empty() {
72 let cardinality = self.rows.len();
73 ensure!(cardinality > 0);
74
75 while !self.rows.is_empty() {
76 let one_row_chunk = DataChunk::new_dummy(1);
80
81 let chunk_size = self.chunk_size.min(self.rows.len());
82 let mut array_builders = self.schema.create_array_builders(chunk_size);
83 for row in self.rows.by_ref().take(chunk_size) {
84 for (expr, builder) in row.into_iter().zip_eq_fast(&mut array_builders) {
85 let out = expr.eval(&one_row_chunk).await?;
86 builder.append_array(&out);
87 }
88 }
89
90 let columns: Vec<_> = array_builders
91 .into_iter()
92 .map(|b| b.finish().into())
93 .collect();
94
95 let chunk = DataChunk::new(columns, chunk_size);
96
97 yield chunk
98 }
99 }
100 }
101}
102
103impl BoxedExecutorBuilder for ValuesExecutor {
104 async fn new_boxed_executor(
105 source: &ExecutorBuilder<'_>,
106 inputs: Vec<BoxedExecutor>,
107 ) -> Result<BoxedExecutor> {
108 ensure!(inputs.is_empty(), "ValuesExecutor should have no child!");
109 let value_node = try_match_expand!(
110 source.plan_node().get_node_body().unwrap(),
111 NodeBody::Values
112 )?;
113
114 let mut rows: Vec<Vec<BoxedExpression>> = Vec::with_capacity(value_node.get_tuples().len());
115 for row in value_node.get_tuples() {
116 let expr_row: Vec<_> = row.get_cells().iter().map(build_from_prost).try_collect()?;
117 rows.push(expr_row);
118 }
119
120 let fields = value_node
121 .get_fields()
122 .iter()
123 .map(Field::from)
124 .collect::<Vec<Field>>();
125
126 Ok(Box::new(Self {
127 rows: rows.into_iter(),
128 schema: Schema { fields },
129 identity: source.plan_node().get_identity().clone(),
130 chunk_size: source.context().get_config().developer.chunk_size,
131 }))
132 }
133}
134
135#[cfg(test)]
136mod tests {
137
138 use futures::stream::StreamExt;
139 use risingwave_common::array::{
140 Array, ArrayImpl, I16Array, I32Array, I64Array, StructArray, StructValue,
141 };
142 use risingwave_common::catalog::{Field, Schema};
143 use risingwave_common::types::{DataType, ScalarImpl, StructType};
144 use risingwave_expr::expr::{BoxedExpression, LiteralExpression};
145
146 use crate::executor::{Executor, ValuesExecutor};
147
148 const CHUNK_SIZE: usize = 1024;
149
150 #[tokio::test]
151 async fn test_values_executor() {
152 let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]);
153 let exprs = vec![
154 Box::new(LiteralExpression::new(
155 DataType::Int16,
156 Some(ScalarImpl::Int16(1)),
157 )) as BoxedExpression,
158 Box::new(LiteralExpression::new(
159 DataType::Int32,
160 Some(ScalarImpl::Int32(2)),
161 )),
162 Box::new(LiteralExpression::new(
163 DataType::Int64,
164 Some(ScalarImpl::Int64(3)),
165 )),
166 Box::new(LiteralExpression::new(
167 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into(),
168 Some(ScalarImpl::Struct(value)),
169 )) as BoxedExpression,
170 ];
171
172 let fields = exprs
173 .iter() .map(|col| Field::unnamed(col.return_type()))
175 .collect::<Vec<Field>>();
176
177 let values_executor = Box::new(ValuesExecutor {
178 rows: vec![exprs].into_iter(),
179 schema: Schema { fields },
180 identity: "ValuesExecutor2".to_owned(),
181 chunk_size: CHUNK_SIZE,
182 });
183
184 let fields = &values_executor.schema().fields;
185 assert_eq!(fields[0].data_type, DataType::Int16);
186 assert_eq!(fields[1].data_type, DataType::Int32);
187 assert_eq!(fields[2].data_type, DataType::Int64);
188 assert_eq!(
189 fields[3].data_type,
190 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]).into()
191 );
192
193 let mut stream = values_executor.execute();
194 let result = stream.next().await.unwrap();
195 let array: ArrayImpl = StructArray::new(
196 StructType::unnamed(vec![DataType::Int32, DataType::Int32, DataType::Int32]),
197 vec![
198 I32Array::from_iter([1]).into_ref(),
199 I32Array::from_iter([2]).into_ref(),
200 I32Array::from_iter([3]).into_ref(),
201 ],
202 [true].into_iter().collect(),
203 )
204 .into();
205
206 if let Ok(result) = result {
207 assert_eq!(*result.column_at(0), I16Array::from_iter([1]).into_ref());
208 assert_eq!(*result.column_at(1), I32Array::from_iter([2]).into_ref());
209 assert_eq!(*result.column_at(2), I64Array::from_iter([3]).into_ref());
210 assert_eq!(*result.column_at(3), array.into());
211 }
212 }
213
214 #[tokio::test]
215 async fn test_chunk_split_size() {
216 let rows = [
217 Box::new(LiteralExpression::new(
218 DataType::Int32,
219 Some(ScalarImpl::Int32(1)),
220 )) as BoxedExpression,
221 Box::new(LiteralExpression::new(
222 DataType::Int32,
223 Some(ScalarImpl::Int32(2)),
224 )) as BoxedExpression,
225 Box::new(LiteralExpression::new(
226 DataType::Int32,
227 Some(ScalarImpl::Int32(3)),
228 )) as BoxedExpression,
229 Box::new(LiteralExpression::new(
230 DataType::Int32,
231 Some(ScalarImpl::Int32(4)),
232 )) as BoxedExpression,
233 ]
234 .into_iter()
235 .map(|expr| vec![expr])
236 .collect::<Vec<_>>();
237
238 let fields = vec![Field::unnamed(DataType::Int32)];
239
240 let values_executor = Box::new(ValuesExecutor::new(
241 rows,
242 Schema { fields },
243 "ValuesExecutor2".to_owned(),
244 3,
245 ));
246 let mut stream = values_executor.execute();
247 assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 3);
248 assert_eq!(stream.next().await.unwrap().unwrap().cardinality(), 1);
249 assert!(stream.next().await.is_none());
250 }
251
252 #[tokio::test]
254 async fn test_no_column_values_executor() {
255 let values_executor = Box::new(ValuesExecutor::new(
256 vec![vec![]],
257 Schema::default(),
258 "ValuesExecutor2".to_owned(),
259 CHUNK_SIZE,
260 ));
261 let mut stream = values_executor.execute();
262
263 let result = stream.next().await.unwrap().unwrap();
264 assert_eq!(result.cardinality(), 1);
265 assert_eq!(result.dimension(), 0);
266
267 assert!(stream.next().await.is_none());
268 }
269}