risingwave_batch_executors/executor/
expand.rs1use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::{Array, DataChunk, I64Array};
18use risingwave_common::catalog::{Field, Schema};
19use risingwave_common::types::DataType;
20use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
21use risingwave_pb::batch_plan::plan_node::NodeBody;
22
23use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
24use crate::error::{BatchError, Result};
25
26pub struct ExpandExecutor {
27 column_subsets: Vec<Vec<usize>>,
28 child: BoxedExecutor,
29 schema: Schema,
30 identity: String,
31 chunk_size: usize,
32}
33
34impl Executor for ExpandExecutor {
35 fn schema(&self) -> &Schema {
36 &self.schema
37 }
38
39 fn identity(&self) -> &str {
40 &self.identity
41 }
42
43 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
44 self.do_execute()
45 }
46}
47
48impl ExpandExecutor {
49 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
50 async fn do_execute(self: Box<Self>) {
51 let mut data_chunk_builder =
52 DataChunkBuilder::new(self.schema.data_types(), self.chunk_size);
53
54 #[for_await]
55 for input in self.child.execute() {
56 let input = input?;
57 for (i, subsets) in self.column_subsets.iter().enumerate() {
58 let flags =
59 I64Array::from_iter(std::iter::repeat_n(i as i64, input.capacity())).into_ref();
60 let (mut columns, vis) = input.keep_columns(subsets).into_parts();
61 columns.extend(input.columns().iter().cloned());
62 columns.push(flags);
63 let chunk = DataChunk::new(columns, vis);
64
65 for data_chunk in data_chunk_builder.append_chunk(chunk) {
66 yield data_chunk;
67 }
68 }
69 }
70 if let Some(chunk) = data_chunk_builder.consume_all() {
71 yield chunk;
72 }
73 }
74
75 pub fn new(input: BoxedExecutor, column_subsets: Vec<Vec<usize>>, chunk_size: usize) -> Self {
76 let schema = {
77 let mut fields = input.schema().clone().into_fields();
78 fields.extend(fields.clone());
79 fields.push(Field::with_name(DataType::Int64, "flag"));
80 Schema::new(fields)
81 };
82 Self {
83 column_subsets,
84 child: input,
85 schema,
86 identity: "ExpandExecutor".into(),
87 chunk_size,
88 }
89 }
90}
91
92impl BoxedExecutorBuilder for ExpandExecutor {
93 async fn new_boxed_executor(
94 source: &ExecutorBuilder<'_>,
95 inputs: Vec<BoxedExecutor>,
96 ) -> Result<BoxedExecutor> {
97 let expand_node = try_match_expand!(
98 source.plan_node().get_node_body().unwrap(),
99 NodeBody::Expand
100 )?;
101
102 let column_subsets = expand_node
103 .column_subsets
104 .iter()
105 .map(|subset| {
106 subset
107 .column_indices
108 .iter()
109 .map(|idx| *idx as usize)
110 .collect_vec()
111 })
112 .collect_vec();
113
114 let [input]: [_; 1] = inputs.try_into().unwrap();
115 Ok(Box::new(Self::new(
116 input,
117 column_subsets,
118 source.context().get_config().developer.chunk_size,
119 )))
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use futures::StreamExt;
126 use risingwave_common::array::{DataChunk, DataChunkTestExt};
127 use risingwave_common::catalog::{Field, Schema};
128 use risingwave_common::types::DataType;
129
130 use super::ExpandExecutor;
131 use crate::executor::Executor;
132 use crate::executor::test_utils::MockExecutor;
133
134 const CHUNK_SIZE: usize = 1024;
135
136 #[tokio::test]
137 async fn test_expand_executor() {
138 let mock_schema = Schema {
139 fields: vec![
140 Field::unnamed(DataType::Int32),
141 Field::unnamed(DataType::Int32),
142 Field::unnamed(DataType::Int32),
143 ],
144 };
145 let expand_schema = Schema {
146 fields: vec![
147 Field::unnamed(DataType::Int32),
148 Field::unnamed(DataType::Int32),
149 Field::unnamed(DataType::Int32),
150 Field::unnamed(DataType::Int32),
151 Field::unnamed(DataType::Int32),
152 Field::unnamed(DataType::Int32),
153 Field::unnamed(DataType::Int64),
154 ],
155 };
156 let mut mock_executor = MockExecutor::new(mock_schema);
157 mock_executor.add(DataChunk::from_pretty(
158 "i i i
159 1 2 3
160 2 3 4",
161 ));
162 let column_subsets = vec![vec![0, 1], vec![1, 2]];
163 let expand_executor = Box::new(ExpandExecutor {
164 column_subsets,
165 child: Box::new(mock_executor),
166 schema: expand_schema,
167 identity: "ExpandExecutor".to_owned(),
168 chunk_size: CHUNK_SIZE,
169 });
170 let mut stream = expand_executor.execute();
171 let res = stream.next().await.unwrap().unwrap();
172 let expected_chunk = DataChunk::from_pretty(
173 "i i i i i i I
174 1 2 . 1 2 3 0
175 2 3 . 2 3 4 0
176 . 2 3 1 2 3 1
177 . 3 4 2 3 4 1",
178 );
179 assert_eq!(res, expected_chunk);
180 }
181}