risingwave_batch_executors/executor/
project.rs1use std::sync::Arc;
16
17use futures::{Stream, StreamExt};
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_expr::expr::{BoxedExpression, Expression, build_batch_expr_from_prost};
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23
24use crate::error::{BatchError, Result};
25use crate::executor::{
26 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
27};
28
29pub struct ProjectExecutor {
30 expr: Vec<BoxedExpression>,
31 child: BoxedExecutor,
32 schema: Schema,
33 identity: String,
34}
35
36impl Executor for ProjectExecutor {
37 fn schema(&self) -> &Schema {
38 &self.schema
39 }
40
41 fn identity(&self) -> &str {
42 &self.identity
43 }
44
45 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
46 (*self).do_execute().boxed()
47 }
48}
49
50impl ProjectExecutor {
51 fn do_execute(self) -> impl Stream<Item = Result<DataChunk>> + 'static {
52 let Self { expr, child, .. } = self;
53 let expr: Arc<[Box<dyn Expression>]> = expr.into();
54 child
55 .execute()
56 .map(move |data_chunk| {
57 let expr = expr.clone();
58 async move {
59 let data_chunk = data_chunk?;
60 let arrays = {
61 let expr_futs = expr.iter().map(|expr| expr.eval(&data_chunk));
62 futures::future::join_all(expr_futs)
63 .await
64 .into_iter()
65 .try_collect()?
66 };
67 let (_, vis) = data_chunk.into_parts();
68 Ok::<_, BatchError>(DataChunk::new(arrays, vis))
69 }
70 })
71 .buffered(16)
72 }
73}
74
75impl BoxedExecutorBuilder for ProjectExecutor {
76 async fn new_boxed_executor(
77 source: &ExecutorBuilder<'_>,
78 inputs: Vec<BoxedExecutor>,
79 ) -> Result<BoxedExecutor> {
80 let [child]: [_; 1] = inputs.try_into().unwrap();
81
82 let project_node = try_match_expand!(
83 source.plan_node().get_node_body().unwrap(),
84 NodeBody::Project
85 )?;
86
87 let project_exprs: Vec<_> = project_node
88 .get_select_list()
89 .iter()
90 .map(build_batch_expr_from_prost)
91 .try_collect()?;
92
93 let fields = project_exprs
94 .iter()
95 .map(|expr| Field::unnamed(expr.return_type()))
96 .collect::<Vec<Field>>();
97
98 Ok(Box::new(Self {
99 expr: project_exprs,
100 child,
101 schema: Schema { fields },
102 identity: source.plan_node().get_identity().clone(),
103 }))
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use risingwave_common::array::{Array, I32Array};
110 use risingwave_common::test_prelude::*;
111 use risingwave_common::types::DataType;
112 use risingwave_expr::expr::{InputRefExpression, LiteralExpression};
113
114 use super::*;
115 use crate::executor::ValuesExecutor;
116 use crate::executor::test_utils::MockExecutor;
117 use crate::*;
118
119 const CHUNK_SIZE: usize = 1024;
120
121 #[tokio::test]
122 async fn test_project_executor() -> Result<()> {
123 let chunk = DataChunk::from_pretty(
124 "
125 i i
126 1 7
127 2 8
128 33333 66666
129 4 4
130 5 3
131 ",
132 );
133
134 let expr1 = InputRefExpression::new(DataType::Int32, 0);
135 let expr_vec = vec![Box::new(expr1) as BoxedExpression];
136
137 let schema = schema_unnamed! { DataType::Int32, DataType::Int32 };
138 let mut mock_executor = MockExecutor::new(schema);
139 mock_executor.add(chunk);
140
141 let fields = expr_vec
142 .iter()
143 .map(|expr| Field::unnamed(expr.return_type()))
144 .collect::<Vec<Field>>();
145
146 let proj_executor = Box::new(ProjectExecutor {
147 expr: expr_vec,
148 child: Box::new(mock_executor),
149 schema: Schema { fields },
150 identity: "ProjectExecutor".to_owned(),
151 });
152
153 let fields = &proj_executor.schema().fields;
154 assert_eq!(fields[0].data_type, DataType::Int32);
155
156 let mut stream = proj_executor.execute();
157 let result_chunk = stream.next().await.unwrap().unwrap();
158 assert_eq!(result_chunk.dimension(), 1);
159 assert_eq!(
160 result_chunk
161 .column_at(0)
162 .as_int32()
163 .iter()
164 .collect::<Vec<_>>(),
165 vec![Some(1), Some(2), Some(33333), Some(4), Some(5)]
166 );
167 Ok(())
168 }
169
170 #[tokio::test]
171 async fn test_project_dummy_chunk() {
172 let literal = LiteralExpression::new(DataType::Int32, Some(1_i32.into()));
173
174 let values_executor2: Box<dyn Executor> = Box::new(ValuesExecutor::new(
175 vec![vec![]], Schema::default(),
177 "ValuesExecutor".to_owned(),
178 CHUNK_SIZE,
179 ));
180
181 let proj_executor = Box::new(ProjectExecutor {
182 expr: vec![Box::new(literal)],
183 child: values_executor2,
184 schema: schema_unnamed!(DataType::Int32),
185 identity: "ProjectExecutor2".to_owned(),
186 });
187 let mut stream = proj_executor.execute();
188 let chunk = stream.next().await.unwrap().unwrap();
189 assert_eq!(*chunk.column_at(0), I32Array::from_iter([1]).into_ref());
190 }
191}