risingwave_stream/executor/
row_merge.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::bail;
16use risingwave_common::types::ToOwnedDatum;
17use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
18use risingwave_common::util::column_index_mapping::ColIndexMapping;
19
20use super::barrier_align::*;
21use crate::executor::prelude::*;
22
23pub struct RowMergeExecutor {
24    ctx: ActorContextRef,
25    pub lhs_input: Executor,
26    pub rhs_input: Executor,
27    /// Maps input from the lhs to the output.
28    pub lhs_mapping: ColIndexMapping,
29    /// Maps input from the rhs to the output.
30    pub rhs_mapping: ColIndexMapping,
31    /// Output schema
32    pub schema: Schema,
33}
34
35impl RowMergeExecutor {
36    pub fn new(
37        ctx: ActorContextRef,
38        lhs_input: Executor,
39        rhs_input: Executor,
40        lhs_mapping: ColIndexMapping,
41        rhs_mapping: ColIndexMapping,
42        schema: Schema,
43    ) -> Self {
44        Self {
45            ctx,
46            lhs_input,
47            rhs_input,
48            lhs_mapping,
49            rhs_mapping,
50            schema,
51        }
52    }
53
54    #[try_stream(ok = Message, error = StreamExecutorError)]
55    pub async fn execute_inner(self) {
56        let lhs_mapping = self.lhs_mapping;
57        let rhs_mapping = self.rhs_mapping;
58        let data_types = self
59            .schema
60            .fields()
61            .iter()
62            .map(|f| f.data_type())
63            .collect::<Vec<_>>();
64
65        {
66            let mut lhs_buffer: Vec<StreamChunk> = Vec::with_capacity(1);
67            let mut rhs_buffer: Vec<StreamChunk> = Vec::with_capacity(1);
68            let aligned_stream = barrier_align(
69                self.lhs_input.execute(),
70                self.rhs_input.execute(),
71                self.ctx.id,
72                self.ctx.fragment_id,
73                self.ctx.streaming_metrics.clone(),
74                "RowMerge",
75            );
76            pin_mut!(aligned_stream);
77            #[for_await]
78            for message in aligned_stream {
79                match message? {
80                    AlignedMessage::Left(chunk) => {
81                        lhs_buffer.push(chunk);
82                    }
83                    AlignedMessage::Right(chunk) => {
84                        rhs_buffer.push(chunk);
85                    }
86                    AlignedMessage::Barrier(barrier) => {
87                        if lhs_buffer.is_empty() && rhs_buffer.is_empty() {
88                            yield Message::Barrier(barrier);
89                            continue;
90                        }
91                        #[for_await]
92                        for output in Self::flush_buffers(
93                            &data_types,
94                            &lhs_mapping,
95                            &rhs_mapping,
96                            &mut lhs_buffer,
97                            &mut rhs_buffer,
98                        ) {
99                            yield output?;
100                        }
101                        yield Message::Barrier(barrier);
102                    }
103                    AlignedMessage::WatermarkLeft(watermark) => {
104                        tracing::warn!("unexpected watermark from left stream: {:?}", watermark);
105                    }
106                    AlignedMessage::WatermarkRight(watermark) => {
107                        tracing::warn!("unexpected watermark from right stream: {:?}", watermark);
108                    }
109                }
110            }
111        }
112    }
113
114    #[try_stream(ok = Message, error = StreamExecutorError)]
115    async fn flush_buffers<'a>(
116        data_types: &'a [DataType],
117        lhs_mapping: &'a ColIndexMapping,
118        rhs_mapping: &'a ColIndexMapping,
119        lhs_buffer: &'a mut Vec<StreamChunk>,
120        rhs_buffer: &'a mut Vec<StreamChunk>,
121    ) {
122        if lhs_buffer.is_empty() {
123            bail!("lhs buffer should not be empty ");
124        };
125        if rhs_buffer.is_empty() {
126            bail!("rhs buffer should not be empty ");
127        };
128
129        for lhs_chunk in lhs_buffer.drain(..) {
130            for rhs_chunk in rhs_buffer.drain(..) {
131                yield Self::build_chunk(
132                    data_types,
133                    lhs_mapping,
134                    rhs_mapping,
135                    lhs_chunk.clone(),
136                    rhs_chunk,
137                )?;
138            }
139        }
140    }
141
142    fn build_chunk(
143        data_types: &[DataType],
144        lhs_mapping: &ColIndexMapping,
145        rhs_mapping: &ColIndexMapping,
146        lhs_chunk: StreamChunk,
147        rhs_chunk: StreamChunk,
148    ) -> Result<Message, StreamExecutorError> {
149        if !(1..=2).contains(&lhs_chunk.cardinality()) {
150            bail!("lhs chunk cardinality should be 1 or 2");
151        }
152        if !(1..=2).contains(&rhs_chunk.cardinality()) {
153            bail!("rhs chunk cardinality should be 1 or 2");
154        }
155        if lhs_chunk.cardinality() != rhs_chunk.cardinality() {
156            bail!("lhs and rhs chunk cardinality should be the same");
157        }
158        let cardinality = lhs_chunk.cardinality();
159        let mut ops = Vec::with_capacity(cardinality);
160        let mut merged_rows = vec![vec![Datum::None; data_types.len()]; cardinality];
161        for (i, (op, lhs_row)) in lhs_chunk.rows().enumerate() {
162            ops.push(op);
163            for (j, d) in lhs_row.iter().enumerate() {
164                // NOTE(kwannoel): Unnecessary columns will not have a mapping,
165                // for instance extra row count column.
166                // those can be skipped here.
167                if let Some(out_index) = lhs_mapping.try_map(j) {
168                    merged_rows[i][out_index] = d.to_owned_datum();
169                }
170            }
171        }
172
173        for (i, (_, rhs_row)) in rhs_chunk.rows().enumerate() {
174            for (j, d) in rhs_row.iter().enumerate() {
175                // NOTE(kwannoel): Unnecessary columns will not have a mapping,
176                // for instance extra row count column.
177                // those can be skipped here.
178                if let Some(out_index) = rhs_mapping.try_map(j) {
179                    merged_rows[i][out_index] = d.to_owned_datum();
180                }
181            }
182        }
183        let mut builder = DataChunkBuilder::new(data_types.to_vec(), cardinality);
184        for row in merged_rows {
185            if let Some(chunk) = builder.append_one_row(&row[..]) {
186                return Ok(Message::Chunk(StreamChunk::from_parts(ops, chunk)));
187            }
188        }
189        bail!("builder should have yielded a chunk")
190    }
191}
192
193impl Execute for RowMergeExecutor {
194    fn execute(self: Box<Self>) -> BoxedMessageStream {
195        self.execute_inner().boxed()
196    }
197}