risingwave_batch_executors/executor/
expand.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::{Array, DataChunk, I64Array};
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
21use risingwave_pb::batch_plan::plan_node::NodeBody;
22
23use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
24use crate::error::{BatchError, Result};
25
26pub struct ExpandExecutor {
27    column_subsets: Vec<Vec<usize>>,
28    child: BoxedExecutor,
29    schema: Schema,
30    identity: String,
31    chunk_size: usize,
32}
33
34impl Executor for ExpandExecutor {
35    fn schema(&self) -> &Schema {
36        &self.schema
37    }
38
39    fn identity(&self) -> &str {
40        &self.identity
41    }
42
43    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
44        self.do_execute()
45    }
46}
47
48impl ExpandExecutor {
49    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
50    async fn do_execute(self: Box<Self>) {
51        let mut data_chunk_builder =
52            DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
53
54        #[for_await]
55        for input in self.child.execute() {
56            let input = input?;
57            for (i, subsets) in self.column_subsets.iter().enumerate() {
58                let flags =
59                    I64Array::from_iter(std::iter::repeat_n(i as i64, input.capacity())).into_ref();
60                let (mut columns, vis) = input.keep_columns(subsets).into_parts();
61                columns.extend(input.columns().iter().cloned());
62                columns.push(flags);
63                let chunk = DataChunk::new(columns, vis);
64
65                for data_chunk in data_chunk_builder.append_chunk(chunk) {
66                    yield data_chunk;
67                }
68            }
69        }
70        if let Some(chunk) = data_chunk_builder.consume_all() {
71            yield chunk;
72        }
73    }
74
75    pub fn new(input: BoxedExecutor, column_subsets: Vec<Vec<usize>>, chunk_size: usize) -> Self {
76        let schema = {
77            let mut fields = input.schema().clone().into_fields();
78            fields.extend(fields.clone());
79            fields.push(Field::with_name(DataType::Int64, "flag"));
80            Schema::new(fields)
81        };
82        Self {
83            column_subsets,
84            child: input,
85            schema,
86            identity: "ExpandExecutor".into(),
87            chunk_size,
88        }
89    }
90}
91
92impl BoxedExecutorBuilder for ExpandExecutor {
93    async fn new_boxed_executor(
94        source: &ExecutorBuilder<'_>,
95        inputs: Vec<BoxedExecutor>,
96    ) -> Result<BoxedExecutor> {
97        let expand_node = try_match_expand!(
98            source.plan_node().get_node_body().unwrap(),
99            NodeBody::Expand
100        )?;
101
102        let column_subsets = expand_node
103            .column_subsets
104            .iter()
105            .map(|subset| {
106                subset
107                    .column_indices
108                    .iter()
109                    .map(|idx| *idx as usize)
110                    .collect_vec()
111            })
112            .collect_vec();
113
114        let [input]: [_; 1] = inputs.try_into().unwrap();
115        Ok(Box::new(Self::new(
116            input,
117            column_subsets,
118            source.context().get_config().developer.chunk_size,
119        )))
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use futures::StreamExt;
126    use risingwave_common::array::{DataChunk, DataChunkTestExt};
127    use risingwave_common::catalog::{Field, Schema};
128    use risingwave_common::types::DataType;
129
130    use super::ExpandExecutor;
131    use crate::executor::Executor;
132    use crate::executor::test_utils::MockExecutor;
133
134    const CHUNK_SIZE: usize = 1024;
135
136    #[tokio::test]
137    async fn test_expand_executor() {
138        let mock_schema = Schema {
139            fields: vec![
140                Field::unnamed(DataType::Int32),
141                Field::unnamed(DataType::Int32),
142                Field::unnamed(DataType::Int32),
143            ],
144        };
145        let expand_schema = Schema {
146            fields: vec![
147                Field::unnamed(DataType::Int32),
148                Field::unnamed(DataType::Int32),
149                Field::unnamed(DataType::Int32),
150                Field::unnamed(DataType::Int32),
151                Field::unnamed(DataType::Int32),
152                Field::unnamed(DataType::Int32),
153                Field::unnamed(DataType::Int64),
154            ],
155        };
156        let mut mock_executor = MockExecutor::new(mock_schema);
157        mock_executor.add(DataChunk::from_pretty(
158            "i i i
159             1 2 3
160             2 3 4",
161        ));
162        let column_subsets = vec![vec![0, 1], vec![1, 2]];
163        let expand_executor = Box::new(ExpandExecutor {
164            column_subsets,
165            child: Box::new(mock_executor),
166            schema: expand_schema,
167            identity: "ExpandExecutor".to_owned(),
168            chunk_size: CHUNK_SIZE,
169        });
170        let mut stream = expand_executor.execute();
171        let res = stream.next().await.unwrap().unwrap();
172        let expected_chunk = DataChunk::from_pretty(
173            "i i i i i i I
174             1 2 . 1 2 3 0
175             2 3 . 2 3 4 0
176             . 2 3 1 2 3 1
177             . 3 4 2 3 4 1",
178        );
179        assert_eq!(res, expected_chunk);
180    }
181}