risingwave_stream/executor/join/
builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::stream_chunk::StreamChunkMut;
16use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
17use risingwave_common::array::{Op, RowRef, StreamChunk};
18use risingwave_common::row::{OwnedRow, Row};
19use risingwave_common::types::{DataType, DatumRef};
20
21use self::row::JoinRow;
22// Re-export `StreamChunkBuilder`.
23use super::*;
24
25type IndexMappings = Vec<(usize, usize)>;
26
27/// Build stream chunks with fixed chunk size from joined two sides of rows.
28pub struct JoinStreamChunkBuilder {
29    builder: StreamChunkBuilder,
30
31    /// The column index mapping from update side to output.
32    update_to_output: IndexMappings,
33
34    /// The column index mapping from matched side to output.
35    matched_to_output: IndexMappings,
36}
37
38impl JoinStreamChunkBuilder {
39    pub fn new(
40        chunk_size: usize,
41        data_types: Vec<DataType>,
42        update_to_output: IndexMappings,
43        matched_to_output: IndexMappings,
44    ) -> Self {
45        Self {
46            builder: StreamChunkBuilder::new(chunk_size, data_types),
47            update_to_output,
48            matched_to_output,
49        }
50    }
51
52    /// Get the mappings from left/right input indices to the output indices. The mappings can be
53    /// used to create [`JoinStreamChunkBuilder`] later.
54    ///
55    /// Please note the semantics of `update` and `matched` when creating the builder: either left
56    /// or right side can be `update` side or `matched` side, the key is to call the corresponding
57    /// append method once you passed `left_to_output`/`right_to_output` to
58    /// `update_to_output`/`matched_to_output`.
59    pub fn get_i2o_mapping(
60        output_indices: &[usize],
61        left_len: usize,
62        right_len: usize,
63    ) -> (IndexMappings, IndexMappings) {
64        let mut left_to_output = vec![];
65        let mut right_to_output = vec![];
66
67        for (output_idx, &idx) in output_indices.iter().enumerate() {
68            if idx < left_len {
69                left_to_output.push((idx, output_idx))
70            } else if idx >= left_len && idx < left_len + right_len {
71                right_to_output.push((idx - left_len, output_idx));
72            } else {
73                unreachable!("output_indices out of bound")
74            }
75        }
76        (left_to_output, right_to_output)
77    }
78
79    /// Append a row with coming update value and matched value.
80    ///
81    /// A [`StreamChunk`] will be returned when `size == capacity`.
82    #[must_use]
83    pub fn append_row(
84        &mut self,
85        op: Op,
86        row_update: impl Row,
87        row_matched: impl Row,
88    ) -> Option<StreamChunk> {
89        self.builder.append_iter(
90            op,
91            self.update_to_output
92                .iter()
93                .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
94                .chain(
95                    self.matched_to_output
96                        .iter()
97                        .map(|&(matched_idx, output_idx)| {
98                            (output_idx, row_matched.datum_at(matched_idx))
99                        }),
100                ),
101        )
102    }
103
104    /// Append a row with coming update value and fill the other side with null.
105    ///
106    /// A [`StreamChunk`] will be returned when `size == capacity`.
107    #[must_use]
108    pub fn append_row_update(&mut self, op: Op, row_update: impl Row) -> Option<StreamChunk> {
109        self.builder.append_iter(
110            op,
111            self.update_to_output
112                .iter()
113                .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
114                .chain(
115                    self.matched_to_output
116                        .iter()
117                        .map(|&(_, output_idx)| (output_idx, DatumRef::None)),
118                ),
119        )
120    }
121
122    /// Append a row with matched value and fill the coming side with null.
123    ///
124    /// A [`StreamChunk`] will be returned when `size == capacity`.
125    #[must_use]
126    pub fn append_row_matched(&mut self, op: Op, row_matched: impl Row) -> Option<StreamChunk> {
127        self.builder.append_iter(
128            op,
129            self.update_to_output
130                .iter()
131                .map(|&(_, output_idx)| (output_idx, DatumRef::None))
132                .chain(
133                    self.matched_to_output
134                        .iter()
135                        .map(|&(matched_idx, output_idx)| {
136                            (output_idx, row_matched.datum_at(matched_idx))
137                        }),
138                ),
139        )
140    }
141
142    /// Take out the remaining rows as a chunk. Return `None` if the builder is empty.
143    #[must_use]
144    pub fn take(&mut self) -> Option<StreamChunk> {
145        self.builder.take()
146    }
147}
148
149pub struct JoinChunkBuilder<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> {
150    stream_chunk_builder: JoinStreamChunkBuilder,
151}
152
153impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder<T, SIDE> {
154    pub fn new(stream_chunk_builder: JoinStreamChunkBuilder) -> Self {
155        Self {
156            stream_chunk_builder,
157        }
158    }
159
160    pub fn post_process(c: StreamChunk) -> StreamChunk {
161        let mut c = StreamChunkMut::from(c);
162
163        // NOTE(st1page): remove the pattern `UpdateDel(k, old), UpdateIns(k, NULL), UpdateDel(k, NULL),  UpdateIns(k, new)`
164        // to avoid this issue <https://github.com/risingwavelabs/risingwave/issues/17450>
165        let mut i = 2;
166        while i < c.capacity() {
167            if c.op(i - 1) == Op::UpdateInsert
168                && c.op(i) == Op::UpdateDelete
169                && c.row_ref(i) == c.row_ref(i - 1)
170            {
171                if c.op(i - 2) == Op::UpdateDelete && c.op(i + 1) == Op::UpdateInsert {
172                    c.set_op(i - 2, Op::Delete);
173                    c.set_vis(i - 1, false);
174                    c.set_vis(i, false);
175                    c.set_op(i + 1, Op::Insert);
176                    i += 3;
177                } else {
178                    debug_assert!(
179                        false,
180                        "unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
181                        c.op(i - 2),
182                        c.op(i - 1),
183                        c.op(i),
184                        c.op(i + 1)
185                    );
186                    warn!(
187                        "unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
188                        c.op(i - 2),
189                        c.op(i - 1),
190                        c.op(i),
191                        c.op(i + 1)
192                    );
193                    i += 1;
194                }
195            } else {
196                i += 1;
197            }
198        }
199        c.into()
200    }
201
202    /// TODO(kwannoel): We can actually reuse a lot of the logic between `with_match_on_insert`
203    /// and `with_match_on_delete`. We should refactor this to avoid code duplication.
204    /// We just introduce this wrapper function to avoid large code diffs.
205    pub fn with_match<const OP: JoinOpPrimitive>(
206        &mut self,
207        row: &RowRef<'_>,
208        matched_row: &JoinRow<OwnedRow>,
209    ) -> Option<StreamChunk> {
210        match OP {
211            JoinOp::Insert => self.with_match_on_insert(row, matched_row),
212            JoinOp::Delete => self.with_match_on_delete(row, matched_row),
213        }
214    }
215
216    pub fn with_match_on_insert(
217        &mut self,
218        row: &RowRef<'_>,
219        matched_row: &JoinRow<OwnedRow>,
220    ) -> Option<StreamChunk> {
221        // Left/Right Anti sides
222        if is_anti(T) {
223            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
224                self.stream_chunk_builder
225                    .append_row_matched(Op::Delete, &matched_row.row)
226                    .map(Self::post_process)
227            } else {
228                None
229            }
230        // Left/Right Semi sides
231        } else if is_semi(T) {
232            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
233                self.stream_chunk_builder
234                    .append_row_matched(Op::Insert, &matched_row.row)
235                    .map(Self::post_process)
236            } else {
237                None
238            }
239        // Outer sides
240        } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
241            // if the matched_row does not have any current matches
242            // `StreamChunkBuilder` guarantees that `UpdateDelete` will never
243            // issue an output chunk.
244            if self
245                .stream_chunk_builder
246                .append_row_matched(Op::UpdateDelete, &matched_row.row)
247                .is_some()
248            {
249                unreachable!("`Op::UpdateDelete` should not yield chunk");
250            }
251            self.stream_chunk_builder
252                .append_row(Op::UpdateInsert, row, &matched_row.row)
253                .map(Self::post_process)
254        // Inner sides
255        } else {
256            self.stream_chunk_builder
257                .append_row(Op::Insert, row, &matched_row.row)
258                .map(Self::post_process)
259        }
260    }
261
262    pub fn with_match_on_delete(
263        &mut self,
264        row: &RowRef<'_>,
265        matched_row: &JoinRow<OwnedRow>,
266    ) -> Option<StreamChunk> {
267        // Left/Right Anti sides
268        if is_anti(T) {
269            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
270                self.stream_chunk_builder
271                    .append_row_matched(Op::Insert, &matched_row.row)
272                    .map(Self::post_process)
273            } else {
274                None
275            }
276        // Left/Right Semi sides
277        } else if is_semi(T) {
278            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
279                self.stream_chunk_builder
280                    .append_row_matched(Op::Delete, &matched_row.row)
281                    .map(Self::post_process)
282            } else {
283                None
284            }
285        // Outer sides
286        } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
287            // if the matched_row does not have any current
288            // matches
289            if self
290                .stream_chunk_builder
291                .append_row(Op::UpdateDelete, row, &matched_row.row)
292                .is_some()
293            {
294                unreachable!("`Op::UpdateDelete` should not yield chunk");
295            }
296            self.stream_chunk_builder
297                .append_row_matched(Op::UpdateInsert, &matched_row.row)
298                .map(|c: StreamChunk| Self::post_process(c))
299
300        // Inner sides
301        } else {
302            // concat with the matched_row and append the new
303            // row
304            // FIXME: we always use `Op::Delete` here to avoid
305            // violating
306            // the assumption for U+ after U-.
307            self.stream_chunk_builder
308                .append_row(Op::Delete, row, &matched_row.row)
309                .map(Self::post_process)
310        }
311    }
312
313    #[inline]
314    pub fn forward_exactly_once_if_matched(
315        &mut self,
316        op: Op,
317        row: RowRef<'_>,
318    ) -> Option<StreamChunk> {
319        // if it's a semi join and the side needs to be maintained.
320        if is_semi(T) && forward_exactly_once(T, SIDE) {
321            self.stream_chunk_builder
322                .append_row_update(op, row)
323                .map(Self::post_process)
324        } else {
325            None
326        }
327    }
328
329    #[inline]
330    pub fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
331        // if it's outer join or anti join and the side needs to be maintained.
332        if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) {
333            self.stream_chunk_builder
334                .append_row_update(op, row)
335                .map(Self::post_process)
336        } else {
337            None
338        }
339    }
340
341    #[inline]
342    pub fn take(&mut self) -> Option<StreamChunk> {
343        self.stream_chunk_builder.take().map(Self::post_process)
344    }
345}