risingwave_batch_executors/executor/
project_set.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures_async_stream::try_stream;
17use itertools::Itertools;
18use risingwave_common::array::{ArrayRef, DataChunk};
19use risingwave_common::catalog::{Field, Schema};
20use risingwave_common::types::{DataType, DatumRef};
21use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_expr::expr::{self, BoxedExpression};
24use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::expr::PbProjectSetSelectItem;
27use risingwave_pb::expr::project_set_select_item::PbSelectItem;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34pub struct ProjectSetExecutor {
35    select_list: Vec<ProjectSetSelectItem>,
36    child: BoxedExecutor,
37    schema: Schema,
38    identity: String,
39    chunk_size: usize,
40}
41
42impl Executor for ProjectSetExecutor {
43    fn schema(&self) -> &Schema {
44        &self.schema
45    }
46
47    fn identity(&self) -> &str {
48        &self.identity
49    }
50
51    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
52        self.do_execute()
53    }
54}
55
56impl ProjectSetExecutor {
57    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
58    async fn do_execute(self: Box<Self>) {
59        assert!(!self.select_list.is_empty());
60
61        // First column will be `projected_row_id`, which represents the index in the
62        // output table
63        let mut builder = DataChunkBuilder::new(
64            std::iter::once(DataType::Int64)
65                .chain(self.select_list.iter().map(|i| i.return_type()))
66                .collect(),
67            self.chunk_size,
68        );
69        // a temporary row buffer
70        let mut row = vec![None as DatumRef<'_>; builder.num_columns()];
71
72        #[for_await]
73        for input in self.child.execute() {
74            let input = input?;
75
76            let mut results = Vec::with_capacity(self.select_list.len());
77            for select_item in &self.select_list {
78                let result = select_item.eval(&input).await?;
79                results.push(result);
80            }
81
82            // for each input row
83            for row_idx in 0..input.capacity() {
84                // for each output row
85                for projected_row_id in 0i64.. {
86                    // SAFETY:
87                    // We use `row` as a buffer and don't read elements from the previous loop.
88                    // The `transmute` is used for bypassing the borrow checker.
89                    let row: &mut [DatumRef<'_>] =
90                        unsafe { std::mem::transmute(row.as_mut_slice()) };
91                    row[0] = Some(projected_row_id.into());
92                    // if any of the set columns has a value
93                    let mut valid = false;
94                    // for each column
95                    for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) {
96                        *value = match item {
97                            Either::Left(state) => {
98                                if let Some((i, value)) = state.peek()
99                                    && i == row_idx
100                                {
101                                    valid = true;
102                                    value?
103                                } else {
104                                    None
105                                }
106                            }
107                            Either::Right(array) => array.value_at(row_idx),
108                        };
109                    }
110                    if !valid {
111                        // no more output rows for the input row
112                        break;
113                    }
114                    if let Some(chunk) = builder.append_one_row(&*row) {
115                        yield chunk;
116                    }
117                    // move to the next row
118                    for item in &mut results {
119                        if let Either::Left(state) = item
120                            && matches!(state.peek(), Some((i, _)) if i == row_idx)
121                        {
122                            state.next().await?;
123                        }
124                    }
125                }
126            }
127            if let Some(chunk) = builder.consume_all() {
128                yield chunk;
129            }
130        }
131    }
132}
133
134impl BoxedExecutorBuilder for ProjectSetExecutor {
135    async fn new_boxed_executor(
136        source: &ExecutorBuilder<'_>,
137        inputs: Vec<BoxedExecutor>,
138    ) -> Result<BoxedExecutor> {
139        let [child]: [_; 1] = inputs.try_into().unwrap();
140
141        let project_set_node = try_match_expand!(
142            source.plan_node().get_node_body().unwrap(),
143            NodeBody::ProjectSet
144        )?;
145
146        let select_list: Vec<_> = project_set_node
147            .get_select_list()
148            .iter()
149            .map(|proto| {
150                ProjectSetSelectItem::from_prost(
151                    proto,
152                    source.context().get_config().developer.chunk_size,
153                )
154            })
155            .try_collect()?;
156
157        let mut fields = vec![Field::with_name(DataType::Int64, "projected_row_id")];
158        fields.extend(
159            select_list
160                .iter()
161                .map(|expr| Field::unnamed(expr.return_type())),
162        );
163
164        Ok(Box::new(Self {
165            select_list,
166            child,
167            schema: Schema { fields },
168            identity: source.plan_node().get_identity().clone(),
169            chunk_size: source.context().get_config().developer.chunk_size,
170        }))
171    }
172}
173
174/// Either a scalar expression or a set-returning function.
175///
176/// See also [`PbProjectSetSelectItem`]
177#[derive(Debug)]
178pub enum ProjectSetSelectItem {
179    Scalar(BoxedExpression),
180    Set(BoxedTableFunction),
181}
182
183impl From<BoxedTableFunction> for ProjectSetSelectItem {
184    fn from(table_function: BoxedTableFunction) -> Self {
185        ProjectSetSelectItem::Set(table_function)
186    }
187}
188
189impl From<BoxedExpression> for ProjectSetSelectItem {
190    fn from(expr: BoxedExpression) -> Self {
191        ProjectSetSelectItem::Scalar(expr)
192    }
193}
194
195impl ProjectSetSelectItem {
196    pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result<Self> {
197        Ok(match prost.select_item.as_ref().unwrap() {
198            PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?),
199            PbSelectItem::TableFunction(tf) => {
200                Self::Set(table_function::build_from_prost(tf, chunk_size)?)
201            }
202        })
203    }
204
205    pub fn return_type(&self) -> DataType {
206        match self {
207            ProjectSetSelectItem::Scalar(expr) => expr.return_type(),
208            ProjectSetSelectItem::Set(tf) => tf.return_type(),
209        }
210    }
211
212    pub async fn eval<'a>(
213        &'a self,
214        input: &'a DataChunk,
215    ) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>> {
216        match self {
217            Self::Set(tf) => Ok(Either::Left(
218                TableFunctionOutputIter::new(tf.eval(input).await).await?,
219            )),
220            Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)),
221        }
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use futures::stream::StreamExt;
228    use futures_async_stream::for_await;
229    use risingwave_common::test_prelude::*;
230    use risingwave_expr::expr::{ExpressionBoxExt, InputRefExpression, LiteralExpression};
231    use risingwave_expr::table_function::repeat;
232
233    use super::*;
234    use crate::executor::ValuesExecutor;
235    use crate::executor::test_utils::MockExecutor;
236    use crate::*;
237
238    const CHUNK_SIZE: usize = 1024;
239
240    #[tokio::test]
241    async fn test_project_set_executor() -> Result<()> {
242        let chunk = DataChunk::from_pretty(
243            "i     i
244             1     7
245             2     8
246             33333 66666
247             4     4
248             5     3",
249        );
250
251        let expr1 = InputRefExpression::new(DataType::Int32, 0);
252        let expr2 = repeat(
253            LiteralExpression::new(DataType::Int32, Some(1_i32.into())).boxed(),
254            2,
255        );
256        let expr3 = repeat(
257            LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
258            3,
259        );
260        let select_list: Vec<ProjectSetSelectItem> =
261            vec![expr1.boxed().into(), expr2.into(), expr3.into()];
262
263        let schema = schema_unnamed! { DataType::Int32, DataType::Int32 };
264        let mut mock_executor = MockExecutor::new(schema);
265        mock_executor.add(chunk);
266
267        let fields = select_list
268            .iter()
269            .map(|expr| Field::unnamed(expr.return_type()))
270            .collect::<Vec<Field>>();
271
272        let proj_executor = Box::new(ProjectSetExecutor {
273            select_list,
274            child: Box::new(mock_executor),
275            schema: Schema { fields },
276            identity: "ProjectSetExecutor".to_owned(),
277            chunk_size: CHUNK_SIZE,
278        });
279
280        let fields = &proj_executor.schema().fields;
281        assert_eq!(fields[0].data_type, DataType::Int32);
282
283        let expected = [DataChunk::from_pretty(
284            "I i     i i
285             0 1     1 2
286             1 1     1 2
287             2 1     . 2
288             0 2     1 2
289             1 2     1 2
290             2 2     . 2
291             0 33333 1 2
292             1 33333 1 2
293             2 33333 . 2
294             0 4     1 2
295             1 4     1 2
296             2 4     . 2
297             0 5     1 2
298             1 5     1 2
299             2 5     . 2",
300        )];
301
302        #[for_await]
303        for (i, result_chunk) in proj_executor.execute().enumerate() {
304            let result_chunk = result_chunk?;
305            assert_eq!(result_chunk, expected[i]);
306        }
307        Ok(())
308    }
309
310    #[tokio::test]
311    async fn test_project_set_dummy_chunk() {
312        let literal = LiteralExpression::new(DataType::Int32, Some(1_i32.into()));
313        let tf = repeat(
314            LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
315            2,
316        );
317
318        let values_executor2: Box<dyn Executor> = Box::new(ValuesExecutor::new(
319            vec![vec![]], // One single row with no column.
320            Schema::default(),
321            "ValuesExecutor".to_owned(),
322            CHUNK_SIZE,
323        ));
324
325        let proj_executor = Box::new(ProjectSetExecutor {
326            select_list: vec![literal.boxed().into(), tf.into()],
327            child: values_executor2,
328            schema: schema_unnamed!(DataType::Int32, DataType::Int32),
329            identity: "ProjectSetExecutor2".to_owned(),
330            chunk_size: CHUNK_SIZE,
331        });
332        let mut stream = proj_executor.execute();
333        let chunk = stream.next().await.unwrap().unwrap();
334        assert_eq!(
335            chunk,
336            DataChunk::from_pretty(
337                "I i i
338                 0 1 2
339                 1 1 2",
340            ),
341        );
342    }
343}