risingwave_batch_executors/executor/
project.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::{Stream, StreamExt};
18use itertools::Itertools;
19use risingwave_common::array::DataChunk;
20use risingwave_common::catalog::{Field, Schema};
21use risingwave_expr::expr::{BoxedExpression, Expression, build_batch_expr_from_prost};
22use risingwave_pb::batch_plan::plan_node::NodeBody;
23
24use crate::error::{BatchError, Result};
25use crate::executor::{
26    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
27};
28
29pub struct ProjectExecutor {
30    expr: Vec<BoxedExpression>,
31    child: BoxedExecutor,
32    schema: Schema,
33    identity: String,
34}
35
36impl Executor for ProjectExecutor {
37    fn schema(&self) -> &Schema {
38        &self.schema
39    }
40
41    fn identity(&self) -> &str {
42        &self.identity
43    }
44
45    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
46        (*self).do_execute().boxed()
47    }
48}
49
50impl ProjectExecutor {
51    fn do_execute(self) -> impl Stream<Item = Result<DataChunk>> + 'static {
52        let Self { expr, child, .. } = self;
53        let expr: Arc<[Box<dyn Expression>]> = expr.into();
54        child
55            .execute()
56            .map(move |data_chunk| {
57                let expr = expr.clone();
58                async move {
59                    let data_chunk = data_chunk?;
60                    let arrays = {
61                        let expr_futs = expr.iter().map(|expr| expr.eval(&data_chunk));
62                        futures::future::join_all(expr_futs)
63                            .await
64                            .into_iter()
65                            .try_collect()?
66                    };
67                    let (_, vis) = data_chunk.into_parts();
68                    Ok::<_, BatchError>(DataChunk::new(arrays, vis))
69                }
70            })
71            .buffered(16)
72    }
73}
74
75impl BoxedExecutorBuilder for ProjectExecutor {
76    async fn new_boxed_executor(
77        source: &ExecutorBuilder<'_>,
78        inputs: Vec<BoxedExecutor>,
79    ) -> Result<BoxedExecutor> {
80        let [child]: [_; 1] = inputs.try_into().unwrap();
81
82        let project_node = try_match_expand!(
83            source.plan_node().get_node_body().unwrap(),
84            NodeBody::Project
85        )?;
86
87        let project_exprs: Vec<_> = project_node
88            .get_select_list()
89            .iter()
90            .map(build_batch_expr_from_prost)
91            .try_collect()?;
92
93        let fields = project_exprs
94            .iter()
95            .map(|expr| Field::unnamed(expr.return_type()))
96            .collect::<Vec<Field>>();
97
98        Ok(Box::new(Self {
99            expr: project_exprs,
100            child,
101            schema: Schema { fields },
102            identity: source.plan_node().get_identity().clone(),
103        }))
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use risingwave_common::array::{Array, I32Array};
110    use risingwave_common::test_prelude::*;
111    use risingwave_common::types::DataType;
112    use risingwave_expr::expr::{InputRefExpression, LiteralExpression};
113
114    use super::*;
115    use crate::executor::ValuesExecutor;
116    use crate::executor::test_utils::MockExecutor;
117    use crate::*;
118
119    const CHUNK_SIZE: usize = 1024;
120
121    #[tokio::test]
122    async fn test_project_executor() -> Result<()> {
123        let chunk = DataChunk::from_pretty(
124            "
125            i     i
126            1     7
127            2     8
128            33333 66666
129            4     4
130            5     3
131        ",
132        );
133
134        let expr1 = InputRefExpression::new(DataType::Int32, 0);
135        let expr_vec = vec![Box::new(expr1) as BoxedExpression];
136
137        let schema = schema_unnamed! { DataType::Int32, DataType::Int32 };
138        let mut mock_executor = MockExecutor::new(schema);
139        mock_executor.add(chunk);
140
141        let fields = expr_vec
142            .iter()
143            .map(|expr| Field::unnamed(expr.return_type()))
144            .collect::<Vec<Field>>();
145
146        let proj_executor = Box::new(ProjectExecutor {
147            expr: expr_vec,
148            child: Box::new(mock_executor),
149            schema: Schema { fields },
150            identity: "ProjectExecutor".to_owned(),
151        });
152
153        let fields = &proj_executor.schema().fields;
154        assert_eq!(fields[0].data_type, DataType::Int32);
155
156        let mut stream = proj_executor.execute();
157        let result_chunk = stream.next().await.unwrap().unwrap();
158        assert_eq!(result_chunk.dimension(), 1);
159        assert_eq!(
160            result_chunk
161                .column_at(0)
162                .as_int32()
163                .iter()
164                .collect::<Vec<_>>(),
165            vec![Some(1), Some(2), Some(33333), Some(4), Some(5)]
166        );
167        Ok(())
168    }
169
170    #[tokio::test]
171    async fn test_project_dummy_chunk() {
172        let literal = LiteralExpression::new(DataType::Int32, Some(1_i32.into()));
173
174        let values_executor2: Box<dyn Executor> = Box::new(ValuesExecutor::new(
175            vec![vec![]], // One single row with no column.
176            Schema::default(),
177            "ValuesExecutor".to_owned(),
178            CHUNK_SIZE,
179        ));
180
181        let proj_executor = Box::new(ProjectExecutor {
182            expr: vec![Box::new(literal)],
183            child: values_executor2,
184            schema: schema_unnamed!(DataType::Int32),
185            identity: "ProjectExecutor2".to_owned(),
186        });
187        let mut stream = proj_executor.execute();
188        let chunk = stream.next().await.unwrap().unwrap();
189        assert_eq!(*chunk.column_at(0), I32Array::from_iter([1]).into_ref());
190    }
191}