risingwave_batch_executors/executor/
project_set.rs1use either::Either;
16use futures_async_stream::try_stream;
17use itertools::Itertools;
18use risingwave_common::array::{ArrayRef, DataChunk};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::{DataType, DatumRef};
21use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_expr::expr::{self, BoxedExpression};
24use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::expr::PbProjectSetSelectItem;
27use risingwave_pb::expr::project_set_select_item::PbSelectItem;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34pub struct ProjectSetExecutor {
35 select_list: Vec<ProjectSetSelectItem>,
36 child: BoxedExecutor,
37 schema: Schema,
38 identity: String,
39 chunk_size: usize,
40}
41
42impl Executor for ProjectSetExecutor {
43 fn schema(&self) -> &Schema {
44 &self.schema
45 }
46
47 fn identity(&self) -> &str {
48 &self.identity
49 }
50
51 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
52 self.do_execute()
53 }
54}
55
56impl ProjectSetExecutor {
57 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
58 async fn do_execute(self: Box<Self>) {
59 assert!(!self.select_list.is_empty());
60
61 let mut builder = DataChunkBuilder::new(
64 std::iter::once(DataType::Int64)
65 .chain(self.select_list.iter().map(|i| i.return_type()))
66 .collect(),
67 self.chunk_size,
68 );
69 let mut row = vec![None as DatumRef<'_>; builder.num_columns()];
71
72 #[for_await]
73 for input in self.child.execute() {
74 let input = input?;
75
76 let mut results = Vec::with_capacity(self.select_list.len());
77 for select_item in &self.select_list {
78 let result = select_item.eval(&input).await?;
79 results.push(result);
80 }
81
82 for row_idx in 0..input.capacity() {
84 for projected_row_id in 0i64.. {
86 let row: &mut [DatumRef<'_>] =
90 unsafe { std::mem::transmute(row.as_mut_slice()) };
91 row[0] = Some(projected_row_id.into());
92 let mut valid = false;
94 for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) {
96 *value = match item {
97 Either::Left(state) => {
98 if let Some((i, value)) = state.peek()
99 && i == row_idx
100 {
101 valid = true;
102 value?
103 } else {
104 None
105 }
106 }
107 Either::Right(array) => array.value_at(row_idx),
108 };
109 }
110 if !valid {
111 break;
113 }
114 if let Some(chunk) = builder.append_one_row(&*row) {
115 yield chunk;
116 }
117 for item in &mut results {
119 if let Either::Left(state) = item
120 && matches!(state.peek(), Some((i, _)) if i == row_idx)
121 {
122 state.next().await?;
123 }
124 }
125 }
126 }
127 if let Some(chunk) = builder.consume_all() {
128 yield chunk;
129 }
130 }
131 }
132}
133
134impl BoxedExecutorBuilder for ProjectSetExecutor {
135 async fn new_boxed_executor(
136 source: &ExecutorBuilder<'_>,
137 inputs: Vec<BoxedExecutor>,
138 ) -> Result<BoxedExecutor> {
139 let [child]: [_; 1] = inputs.try_into().unwrap();
140
141 let project_set_node = try_match_expand!(
142 source.plan_node().get_node_body().unwrap(),
143 NodeBody::ProjectSet
144 )?;
145
146 let select_list: Vec<_> = project_set_node
147 .get_select_list()
148 .iter()
149 .map(|proto| {
150 ProjectSetSelectItem::from_prost(
151 proto,
152 source.context().get_config().developer.chunk_size,
153 )
154 })
155 .try_collect()?;
156
157 let mut fields = vec![Field::with_name(DataType::Int64, "projected_row_id")];
158 fields.extend(
159 select_list
160 .iter()
161 .map(|expr| Field::unnamed(expr.return_type())),
162 );
163
164 Ok(Box::new(Self {
165 select_list,
166 child,
167 schema: Schema { fields },
168 identity: source.plan_node().get_identity().clone(),
169 chunk_size: source.context().get_config().developer.chunk_size,
170 }))
171 }
172}
173
174#[derive(Debug)]
178pub enum ProjectSetSelectItem {
179 Scalar(BoxedExpression),
180 Set(BoxedTableFunction),
181}
182
183impl From<BoxedTableFunction> for ProjectSetSelectItem {
184 fn from(table_function: BoxedTableFunction) -> Self {
185 ProjectSetSelectItem::Set(table_function)
186 }
187}
188
189impl From<BoxedExpression> for ProjectSetSelectItem {
190 fn from(expr: BoxedExpression) -> Self {
191 ProjectSetSelectItem::Scalar(expr)
192 }
193}
194
195impl ProjectSetSelectItem {
196 pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result<Self> {
197 Ok(match prost.select_item.as_ref().unwrap() {
198 PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?),
199 PbSelectItem::TableFunction(tf) => {
200 Self::Set(table_function::build_from_prost(tf, chunk_size)?)
201 }
202 })
203 }
204
205 pub fn return_type(&self) -> DataType {
206 match self {
207 ProjectSetSelectItem::Scalar(expr) => expr.return_type(),
208 ProjectSetSelectItem::Set(tf) => tf.return_type(),
209 }
210 }
211
212 pub async fn eval<'a>(
213 &'a self,
214 input: &'a DataChunk,
215 ) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>> {
216 match self {
217 Self::Set(tf) => Ok(Either::Left(
218 TableFunctionOutputIter::new(tf.eval(input).await).await?,
219 )),
220 Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)),
221 }
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use futures::stream::StreamExt;
228 use futures_async_stream::for_await;
229 use risingwave_common::test_prelude::*;
230 use risingwave_expr::expr::{ExpressionBoxExt, InputRefExpression, LiteralExpression};
231 use risingwave_expr::table_function::repeat;
232
233 use super::*;
234 use crate::executor::ValuesExecutor;
235 use crate::executor::test_utils::MockExecutor;
236 use crate::*;
237
238 const CHUNK_SIZE: usize = 1024;
239
240 #[tokio::test]
241 async fn test_project_set_executor() -> Result<()> {
242 let chunk = DataChunk::from_pretty(
243 "i i
244 1 7
245 2 8
246 33333 66666
247 4 4
248 5 3",
249 );
250
251 let expr1 = InputRefExpression::new(DataType::Int32, 0);
252 let expr2 = repeat(
253 LiteralExpression::new(DataType::Int32, Some(1_i32.into())).boxed(),
254 2,
255 );
256 let expr3 = repeat(
257 LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
258 3,
259 );
260 let select_list: Vec<ProjectSetSelectItem> =
261 vec![expr1.boxed().into(), expr2.into(), expr3.into()];
262
263 let schema = schema_unnamed! { DataType::Int32, DataType::Int32 };
264 let mut mock_executor = MockExecutor::new(schema);
265 mock_executor.add(chunk);
266
267 let fields = select_list
268 .iter()
269 .map(|expr| Field::unnamed(expr.return_type()))
270 .collect::<Vec<Field>>();
271
272 let proj_executor = Box::new(ProjectSetExecutor {
273 select_list,
274 child: Box::new(mock_executor),
275 schema: Schema { fields },
276 identity: "ProjectSetExecutor".to_owned(),
277 chunk_size: CHUNK_SIZE,
278 });
279
280 let fields = &proj_executor.schema().fields;
281 assert_eq!(fields[0].data_type, DataType::Int32);
282
283 let expected = [DataChunk::from_pretty(
284 "I i i i
285 0 1 1 2
286 1 1 1 2
287 2 1 . 2
288 0 2 1 2
289 1 2 1 2
290 2 2 . 2
291 0 33333 1 2
292 1 33333 1 2
293 2 33333 . 2
294 0 4 1 2
295 1 4 1 2
296 2 4 . 2
297 0 5 1 2
298 1 5 1 2
299 2 5 . 2",
300 )];
301
302 #[for_await]
303 for (i, result_chunk) in proj_executor.execute().enumerate() {
304 let result_chunk = result_chunk?;
305 assert_eq!(result_chunk, expected[i]);
306 }
307 Ok(())
308 }
309
310 #[tokio::test]
311 async fn test_project_set_dummy_chunk() {
312 let literal = LiteralExpression::new(DataType::Int32, Some(1_i32.into()));
313 let tf = repeat(
314 LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
315 2,
316 );
317
318 let values_executor2: Box<dyn Executor> = Box::new(ValuesExecutor::new(
319 vec![vec![]], Schema::default(),
321 "ValuesExecutor".to_owned(),
322 CHUNK_SIZE,
323 ));
324
325 let proj_executor = Box::new(ProjectSetExecutor {
326 select_list: vec![literal.boxed().into(), tf.into()],
327 child: values_executor2,
328 schema: schema_unnamed!(DataType::Int32, DataType::Int32),
329 identity: "ProjectSetExecutor2".to_owned(),
330 chunk_size: CHUNK_SIZE,
331 });
332 let mut stream = proj_executor.execute();
333 let chunk = stream.next().await.unwrap().unwrap();
334 assert_eq!(
335 chunk,
336 DataChunk::from_pretty(
337 "I i i
338 0 1 2
339 1 1 2",
340 ),
341 );
342 }
343}