risingwave_stream/executor/
expand.rs1use risingwave_common::array::{Array, I64Array};
16
17use crate::executor::prelude::*;
18
19pub struct ExpandExecutor {
20 input: Executor,
21 column_subsets: Vec<Vec<usize>>,
22}
23
24impl ExpandExecutor {
25 pub fn new(input: Executor, column_subsets: Vec<Vec<usize>>) -> Self {
26 Self {
27 input,
28 column_subsets,
29 }
30 }
31
32 #[try_stream(ok = Message, error = StreamExecutorError)]
33 async fn execute_inner(self) {
34 #[for_await]
35 for msg in self.input.execute() {
36 let input = match msg? {
37 Message::Chunk(c) => c,
38 m => {
39 yield m;
40 continue;
41 }
42 };
43 for (i, subsets) in self.column_subsets.iter().enumerate() {
44 let flags =
45 I64Array::from_iter(std::iter::repeat_n(i as i64, input.capacity())).into_ref();
46 let (mut columns, vis) = input.data_chunk().keep_columns(subsets).into_parts();
47 columns.extend(input.columns().iter().cloned());
48 columns.push(flags);
49 let chunk = StreamChunk::with_visibility(input.ops(), columns, vis);
50 yield Message::Chunk(chunk);
51 }
52 }
53 }
54}
55
56impl Debug for ExpandExecutor {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("ExpandExecutor")
59 .field("column_subsets", &self.column_subsets)
60 .finish()
61 }
62}
63
64impl Execute for ExpandExecutor {
65 fn execute(self: Box<Self>) -> BoxedMessageStream {
66 self.execute_inner().boxed()
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use futures::StreamExt;
73 use risingwave_common::array::{StreamChunk, StreamChunkTestExt};
74 use risingwave_common::catalog::{Field, Schema};
75 use risingwave_common::types::DataType;
76
77 use super::ExpandExecutor;
78 use crate::executor::test_utils::MockSource;
79 use crate::executor::{Execute, PkIndices};
80
81 #[tokio::test]
82 async fn test_expand() {
83 let chunk1 = StreamChunk::from_pretty(
84 " I I I
85 + 1 4 1
86 + 5 2 2 D
87 + 6 6 3
88 - 7 5 4",
89 );
90 let source = MockSource::with_chunks(vec![chunk1]).into_executor(
91 Schema::new(vec![
92 Field::unnamed(DataType::Int64),
93 Field::unnamed(DataType::Int64),
94 Field::unnamed(DataType::Int64),
95 ]),
96 PkIndices::new(),
97 );
98
99 let column_subsets = vec![vec![0, 1], vec![1, 2]];
100 let mut expand = ExpandExecutor::new(source, column_subsets)
101 .boxed()
102 .execute();
103
104 let chunk = expand.next().await.unwrap().unwrap().into_chunk().unwrap();
105 assert_eq!(
106 chunk,
107 StreamChunk::from_pretty(
108 " I I I I I I I
109 + 1 4 . 1 4 1 0
110 + 5 2 . 5 2 2 0 D
111 + 6 6 . 6 6 3 0
112 - 7 5 . 7 5 4 0"
113 )
114 );
115
116 let chunk = expand.next().await.unwrap().unwrap().into_chunk().unwrap();
117 assert_eq!(
118 chunk,
119 StreamChunk::from_pretty(
120 " I I I I I I I
121 + . 4 1 1 4 1 1
122 + . 2 2 5 2 2 1 D
123 + . 6 3 6 6 3 1
124 - . 5 4 7 5 4 1"
125 )
126 );
127 }
128}