risingwave_stream/executor/
row_merge.rs1use risingwave_common::bail;
16use risingwave_common::types::ToOwnedDatum;
17use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
18use risingwave_common::util::column_index_mapping::ColIndexMapping;
19
20use super::barrier_align::*;
21use crate::executor::prelude::*;
22
23pub struct RowMergeExecutor {
24 ctx: ActorContextRef,
25 pub lhs_input: Executor,
26 pub rhs_input: Executor,
27 pub lhs_mapping: ColIndexMapping,
29 pub rhs_mapping: ColIndexMapping,
31 pub schema: Schema,
33}
34
35impl RowMergeExecutor {
36 pub fn new(
37 ctx: ActorContextRef,
38 lhs_input: Executor,
39 rhs_input: Executor,
40 lhs_mapping: ColIndexMapping,
41 rhs_mapping: ColIndexMapping,
42 schema: Schema,
43 ) -> Self {
44 Self {
45 ctx,
46 lhs_input,
47 rhs_input,
48 lhs_mapping,
49 rhs_mapping,
50 schema,
51 }
52 }
53
54 #[try_stream(ok = Message, error = StreamExecutorError)]
55 pub async fn execute_inner(self) {
56 let lhs_mapping = self.lhs_mapping;
57 let rhs_mapping = self.rhs_mapping;
58 let data_types = self
59 .schema
60 .fields()
61 .iter()
62 .map(|f| f.data_type())
63 .collect::<Vec<_>>();
64
65 {
66 let mut lhs_buffer: Vec<StreamChunk> = Vec::with_capacity(1);
67 let mut rhs_buffer: Vec<StreamChunk> = Vec::with_capacity(1);
68 let aligned_stream = barrier_align(
69 self.lhs_input.execute(),
70 self.rhs_input.execute(),
71 self.ctx.id,
72 self.ctx.fragment_id,
73 self.ctx.streaming_metrics.clone(),
74 "RowMerge",
75 );
76 pin_mut!(aligned_stream);
77 #[for_await]
78 for message in aligned_stream {
79 match message? {
80 AlignedMessage::Left(chunk) => {
81 lhs_buffer.push(chunk);
82 }
83 AlignedMessage::Right(chunk) => {
84 rhs_buffer.push(chunk);
85 }
86 AlignedMessage::Barrier(barrier) => {
87 if lhs_buffer.is_empty() && rhs_buffer.is_empty() {
88 yield Message::Barrier(barrier);
89 continue;
90 }
91 #[for_await]
92 for output in Self::flush_buffers(
93 &data_types,
94 &lhs_mapping,
95 &rhs_mapping,
96 &mut lhs_buffer,
97 &mut rhs_buffer,
98 ) {
99 yield output?;
100 }
101 yield Message::Barrier(barrier);
102 }
103 AlignedMessage::WatermarkLeft(watermark) => {
104 tracing::warn!("unexpected watermark from left stream: {:?}", watermark);
105 }
106 AlignedMessage::WatermarkRight(watermark) => {
107 tracing::warn!("unexpected watermark from right stream: {:?}", watermark);
108 }
109 }
110 }
111 }
112 }
113
114 #[try_stream(ok = Message, error = StreamExecutorError)]
115 async fn flush_buffers<'a>(
116 data_types: &'a [DataType],
117 lhs_mapping: &'a ColIndexMapping,
118 rhs_mapping: &'a ColIndexMapping,
119 lhs_buffer: &'a mut Vec<StreamChunk>,
120 rhs_buffer: &'a mut Vec<StreamChunk>,
121 ) {
122 if lhs_buffer.is_empty() {
123 bail!("lhs buffer should not be empty ");
124 };
125 if rhs_buffer.is_empty() {
126 bail!("rhs buffer should not be empty ");
127 };
128
129 for lhs_chunk in lhs_buffer.drain(..) {
130 for rhs_chunk in rhs_buffer.drain(..) {
131 yield Self::build_chunk(
132 data_types,
133 lhs_mapping,
134 rhs_mapping,
135 lhs_chunk.clone(),
136 rhs_chunk,
137 )?;
138 }
139 }
140 }
141
142 fn build_chunk(
143 data_types: &[DataType],
144 lhs_mapping: &ColIndexMapping,
145 rhs_mapping: &ColIndexMapping,
146 lhs_chunk: StreamChunk,
147 rhs_chunk: StreamChunk,
148 ) -> Result<Message, StreamExecutorError> {
149 if !(1..=2).contains(&lhs_chunk.cardinality()) {
150 bail!("lhs chunk cardinality should be 1 or 2");
151 }
152 if !(1..=2).contains(&rhs_chunk.cardinality()) {
153 bail!("rhs chunk cardinality should be 1 or 2");
154 }
155 if lhs_chunk.cardinality() != rhs_chunk.cardinality() {
156 bail!("lhs and rhs chunk cardinality should be the same");
157 }
158 let cardinality = lhs_chunk.cardinality();
159 let mut ops = Vec::with_capacity(cardinality);
160 let mut merged_rows = vec![vec![Datum::None; data_types.len()]; cardinality];
161 for (i, (op, lhs_row)) in lhs_chunk.rows().enumerate() {
162 ops.push(op);
163 for (j, d) in lhs_row.iter().enumerate() {
164 if let Some(out_index) = lhs_mapping.try_map(j) {
168 merged_rows[i][out_index] = d.to_owned_datum();
169 }
170 }
171 }
172
173 for (i, (_, rhs_row)) in rhs_chunk.rows().enumerate() {
174 for (j, d) in rhs_row.iter().enumerate() {
175 if let Some(out_index) = rhs_mapping.try_map(j) {
179 merged_rows[i][out_index] = d.to_owned_datum();
180 }
181 }
182 }
183 let mut builder = DataChunkBuilder::new(data_types.to_vec(), cardinality);
184 for row in merged_rows {
185 if let Some(chunk) = builder.append_one_row(&row[..]) {
186 return Ok(Message::Chunk(StreamChunk::from_parts(ops, chunk)));
187 }
188 }
189 bail!("builder should have yielded a chunk")
190 }
191}
192
193impl Execute for RowMergeExecutor {
194 fn execute(self: Box<Self>) -> BoxedMessageStream {
195 self.execute_inner().boxed()
196 }
197}