risingwave_stream/executor/
expand.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::{Array, I64Array};
16
17use crate::executor::prelude::*;
18
19pub struct ExpandExecutor {
20    input: Executor,
21    column_subsets: Vec<Vec<usize>>,
22}
23
24impl ExpandExecutor {
25    pub fn new(input: Executor, column_subsets: Vec<Vec<usize>>) -> Self {
26        Self {
27            input,
28            column_subsets,
29        }
30    }
31
32    #[try_stream(ok = Message, error = StreamExecutorError)]
33    async fn execute_inner(self) {
34        #[for_await]
35        for msg in self.input.execute() {
36            let input = match msg? {
37                Message::Chunk(c) => c,
38                m => {
39                    yield m;
40                    continue;
41                }
42            };
43            for (i, subsets) in self.column_subsets.iter().enumerate() {
44                let flags =
45                    I64Array::from_iter(std::iter::repeat_n(i as i64, input.capacity())).into_ref();
46                let (mut columns, vis) = input.data_chunk().keep_columns(subsets).into_parts();
47                columns.extend(input.columns().iter().cloned());
48                columns.push(flags);
49                let chunk = StreamChunk::with_visibility(input.ops(), columns, vis);
50                yield Message::Chunk(chunk);
51            }
52        }
53    }
54}
55
56impl Debug for ExpandExecutor {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("ExpandExecutor")
59            .field("column_subsets", &self.column_subsets)
60            .finish()
61    }
62}
63
64impl Execute for ExpandExecutor {
65    fn execute(self: Box<Self>) -> BoxedMessageStream {
66        self.execute_inner().boxed()
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use futures::StreamExt;
73    use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
74    use risingwave_common::catalog::{Field, Schema};
75    use risingwave_common::types::DataType;
76
77    use super::ExpandExecutor;
78    use crate::executor::test_utils::MockSource;
79    use crate::executor::{Execute, PkIndices};
80
81    #[tokio::test]
82    async fn test_expand() {
83        let chunk1 = StreamChunk::from_pretty(
84            " I I I
85            + 1 4 1
86            + 5 2 2 D
87            + 6 6 3
88            - 7 5 4",
89        );
90        let source = MockSource::with_chunks(vec![chunk1]).into_executor(
91            Schema::new(vec![
92                Field::unnamed(DataType::Int64),
93                Field::unnamed(DataType::Int64),
94                Field::unnamed(DataType::Int64),
95            ]),
96            PkIndices::new(),
97        );
98
99        let column_subsets = vec![vec![0, 1], vec![1, 2]];
100        let mut expand = ExpandExecutor::new(source, column_subsets)
101            .boxed()
102            .execute();
103
104        let chunk = expand.next().await.unwrap().unwrap().into_chunk().unwrap();
105        assert_eq!(
106            chunk,
107            StreamChunk::from_pretty(
108                " I I I I I I I
109                + 1 4 . 1 4 1 0
110                + 5 2 . 5 2 2 0 D
111                + 6 6 . 6 6 3 0
112                - 7 5 . 7 5 4 0"
113            )
114        );
115
116        let chunk = expand.next().await.unwrap().unwrap().into_chunk().unwrap();
117        assert_eq!(
118            chunk,
119            StreamChunk::from_pretty(
120                " I I I I I I I
121                + . 4 1 1 4 1 1
122                + . 2 2 5 2 2 1 D
123                + . 6 3 6 6 3 1
124                - . 5 4 7 5 4 1"
125            )
126        );
127    }
128}