risingwave_batch_executors/executor/
project_set.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use either::Either;
16use futures_async_stream::try_stream;
17use itertools::Itertools;
18use risingwave_common::array::{ArrayRef, DataChunk};
19use risingwave_common::catalog::{Field, PROJECTED_ROW_ID_COLUMN_NAME, Schema};
20use risingwave_common::types::{DataType, DatumRef};
21use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_expr::expr::{self, BoxedExpression};
24use risingwave_expr::table_function::{self, BoxedTableFunction, TableFunctionOutputIter};
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26use risingwave_pb::expr::PbProjectSetSelectItem;
27use risingwave_pb::expr::project_set_select_item::PbSelectItem;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34pub struct ProjectSetExecutor {
35    select_list: Vec<ProjectSetSelectItem>,
36    child: BoxedExecutor,
37    schema: Schema,
38    identity: String,
39    chunk_size: usize,
40}
41
42impl Executor for ProjectSetExecutor {
43    fn schema(&self) -> &Schema {
44        &self.schema
45    }
46
47    fn identity(&self) -> &str {
48        &self.identity
49    }
50
51    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
52        self.do_execute()
53    }
54}
55
56impl ProjectSetExecutor {
57    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
58    async fn do_execute(self: Box<Self>) {
59        assert!(!self.select_list.is_empty());
60
61        // First column will be `projected_row_id`, which represents the index in the
62        // output table
63        let mut builder = DataChunkBuilder::new(
64            std::iter::once(DataType::Int64)
65                .chain(self.select_list.iter().map(|i| i.return_type()))
66                .collect(),
67            self.chunk_size,
68        );
69        // a temporary row buffer
70        let mut row = vec![None as DatumRef<'_>; builder.num_columns()];
71
72        #[for_await]
73        for input in self.child.execute() {
74            let input = input?;
75
76            let mut results = Vec::with_capacity(self.select_list.len());
77            for select_item in &self.select_list {
78                let result = select_item.eval(&input).await?;
79                results.push(result);
80            }
81
82            // for each input row
83            for row_idx in 0..input.capacity() {
84                // for each output row
85                for projected_row_id in 0i64.. {
86                    // SAFETY:
87                    // We use `row` as a buffer and don't read elements from the previous loop.
88                    // The `transmute` is used for bypassing the borrow checker.
89                    let row: &mut [DatumRef<'_>] =
90                        unsafe { std::mem::transmute(row.as_mut_slice()) };
91                    row[0] = Some(projected_row_id.into());
92                    // if any of the set columns has a value
93                    let mut valid = false;
94                    // for each column
95                    for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) {
96                        *value = match item {
97                            Either::Left(state) => {
98                                if let Some((i, value)) = state.peek()
99                                    && i == row_idx
100                                {
101                                    valid = true;
102                                    value?
103                                } else {
104                                    None
105                                }
106                            }
107                            Either::Right(array) => array.value_at(row_idx),
108                        };
109                    }
110                    if !valid {
111                        // no more output rows for the input row
112                        break;
113                    }
114                    if let Some(chunk) = builder.append_one_row(&*row) {
115                        yield chunk;
116                    }
117                    // move to the next row
118                    for item in &mut results {
119                        if let Either::Left(state) = item
120                            && matches!(state.peek(), Some((i, _)) if i == row_idx)
121                        {
122                            state.next().await?;
123                        }
124                    }
125                }
126            }
127            if let Some(chunk) = builder.consume_all() {
128                yield chunk;
129            }
130        }
131    }
132}
133
134impl BoxedExecutorBuilder for ProjectSetExecutor {
135    async fn new_boxed_executor(
136        source: &ExecutorBuilder<'_>,
137        inputs: Vec<BoxedExecutor>,
138    ) -> Result<BoxedExecutor> {
139        let [child]: [_; 1] = inputs.try_into().unwrap();
140
141        let project_set_node = try_match_expand!(
142            source.plan_node().get_node_body().unwrap(),
143            NodeBody::ProjectSet
144        )?;
145
146        let select_list: Vec<_> = project_set_node
147            .get_select_list()
148            .iter()
149            .map(|proto| {
150                ProjectSetSelectItem::from_prost(
151                    proto,
152                    source.context().get_config().developer.chunk_size,
153                )
154            })
155            .try_collect()?;
156
157        let mut fields = vec![Field::with_name(
158            DataType::Int64,
159            PROJECTED_ROW_ID_COLUMN_NAME,
160        )];
161        fields.extend(
162            select_list
163                .iter()
164                .map(|expr| Field::unnamed(expr.return_type())),
165        );
166
167        Ok(Box::new(Self {
168            select_list,
169            child,
170            schema: Schema { fields },
171            identity: source.plan_node().get_identity().clone(),
172            chunk_size: source.context().get_config().developer.chunk_size,
173        }))
174    }
175}
176
177/// Either a scalar expression or a set-returning function.
178///
179/// See also [`PbProjectSetSelectItem`]
180#[derive(Debug)]
181pub enum ProjectSetSelectItem {
182    Scalar(BoxedExpression),
183    Set(BoxedTableFunction),
184}
185
186impl From<BoxedTableFunction> for ProjectSetSelectItem {
187    fn from(table_function: BoxedTableFunction) -> Self {
188        ProjectSetSelectItem::Set(table_function)
189    }
190}
191
192impl From<BoxedExpression> for ProjectSetSelectItem {
193    fn from(expr: BoxedExpression) -> Self {
194        ProjectSetSelectItem::Scalar(expr)
195    }
196}
197
198impl ProjectSetSelectItem {
199    pub fn from_prost(prost: &PbProjectSetSelectItem, chunk_size: usize) -> Result<Self> {
200        Ok(match prost.select_item.as_ref().unwrap() {
201            PbSelectItem::Expr(expr) => Self::Scalar(expr::build_from_prost(expr)?),
202            PbSelectItem::TableFunction(tf) => {
203                Self::Set(table_function::build_from_prost(tf, chunk_size)?)
204            }
205        })
206    }
207
208    pub fn return_type(&self) -> DataType {
209        match self {
210            ProjectSetSelectItem::Scalar(expr) => expr.return_type(),
211            ProjectSetSelectItem::Set(tf) => tf.return_type(),
212        }
213    }
214
215    pub async fn eval<'a>(
216        &'a self,
217        input: &'a DataChunk,
218    ) -> Result<Either<TableFunctionOutputIter<'a>, ArrayRef>> {
219        match self {
220            Self::Set(tf) => Ok(Either::Left(
221                TableFunctionOutputIter::new(tf.eval(input).await).await?,
222            )),
223            Self::Scalar(expr) => Ok(Either::Right(expr.eval(input).await?)),
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use futures::stream::StreamExt;
231    use futures_async_stream::for_await;
232    use risingwave_common::test_prelude::*;
233    use risingwave_expr::expr::{ExpressionBoxExt, InputRefExpression, LiteralExpression};
234    use risingwave_expr::table_function::repeat;
235
236    use super::*;
237    use crate::executor::ValuesExecutor;
238    use crate::executor::test_utils::MockExecutor;
239    use crate::*;
240
241    const CHUNK_SIZE: usize = 1024;
242
243    #[tokio::test]
244    async fn test_project_set_executor() -> Result<()> {
245        let chunk = DataChunk::from_pretty(
246            "i     i
247             1     7
248             2     8
249             33333 66666
250             4     4
251             5     3",
252        );
253
254        let expr1 = InputRefExpression::new(DataType::Int32, 0);
255        let expr2 = repeat(
256            LiteralExpression::new(DataType::Int32, Some(1_i32.into())).boxed(),
257            2,
258        );
259        let expr3 = repeat(
260            LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
261            3,
262        );
263        let select_list: Vec<ProjectSetSelectItem> =
264            vec![expr1.boxed().into(), expr2.into(), expr3.into()];
265
266        let schema = schema_unnamed! { DataType::Int32, DataType::Int32 };
267        let mut mock_executor = MockExecutor::new(schema);
268        mock_executor.add(chunk);
269
270        let fields = select_list
271            .iter()
272            .map(|expr| Field::unnamed(expr.return_type()))
273            .collect::<Vec<Field>>();
274
275        let proj_executor = Box::new(ProjectSetExecutor {
276            select_list,
277            child: Box::new(mock_executor),
278            schema: Schema { fields },
279            identity: "ProjectSetExecutor".to_owned(),
280            chunk_size: CHUNK_SIZE,
281        });
282
283        let fields = &proj_executor.schema().fields;
284        assert_eq!(fields[0].data_type, DataType::Int32);
285
286        let expected = [DataChunk::from_pretty(
287            "I i     i i
288             0 1     1 2
289             1 1     1 2
290             2 1     . 2
291             0 2     1 2
292             1 2     1 2
293             2 2     . 2
294             0 33333 1 2
295             1 33333 1 2
296             2 33333 . 2
297             0 4     1 2
298             1 4     1 2
299             2 4     . 2
300             0 5     1 2
301             1 5     1 2
302             2 5     . 2",
303        )];
304
305        #[for_await]
306        for (i, result_chunk) in proj_executor.execute().enumerate() {
307            let result_chunk = result_chunk?;
308            assert_eq!(result_chunk, expected[i]);
309        }
310        Ok(())
311    }
312
313    #[tokio::test]
314    async fn test_project_set_dummy_chunk() {
315        let literal = LiteralExpression::new(DataType::Int32, Some(1_i32.into()));
316        let tf = repeat(
317            LiteralExpression::new(DataType::Int32, Some(2_i32.into())).boxed(),
318            2,
319        );
320
321        let values_executor2: Box<dyn Executor> = Box::new(ValuesExecutor::new(
322            vec![vec![]], // One single row with no column.
323            Schema::default(),
324            "ValuesExecutor".to_owned(),
325            CHUNK_SIZE,
326        ));
327
328        let proj_executor = Box::new(ProjectSetExecutor {
329            select_list: vec![literal.boxed().into(), tf.into()],
330            child: values_executor2,
331            schema: schema_unnamed!(DataType::Int32, DataType::Int32),
332            identity: "ProjectSetExecutor2".to_owned(),
333            chunk_size: CHUNK_SIZE,
334        });
335        let mut stream = proj_executor.execute();
336        let chunk = stream.next().await.unwrap().unwrap();
337        assert_eq!(
338            chunk,
339            DataChunk::from_pretty(
340                "I i i
341                 0 1 2
342                 1 1 2",
343            ),
344        );
345    }
346}