risingwave_stream/executor/join/
builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
16use risingwave_common::array::{Op, RowRef, StreamChunk};
17use risingwave_common::row::Row;
18use risingwave_common::types::{DataType, DatumRef};
19
20use self::row::JoinRow;
21// Re-export `StreamChunkBuilder`.
22use super::*;
23
24type IndexMappings = Vec<(usize, usize)>;
25
26/// Build stream chunks with fixed chunk size from joined two sides of rows.
27pub struct JoinStreamChunkBuilder {
28    builder: StreamChunkBuilder,
29
30    /// The column index mapping from update side to output.
31    update_to_output: IndexMappings,
32
33    /// The column index mapping from matched side to output.
34    matched_to_output: IndexMappings,
35}
36
37impl JoinStreamChunkBuilder {
38    pub fn new(
39        chunk_size: usize,
40        data_types: Vec<DataType>,
41        update_to_output: IndexMappings,
42        matched_to_output: IndexMappings,
43    ) -> Self {
44        // Enforce that the chunk size is at least 2, so that appending two rows to the chunk
45        // builder will at most yield one chunk. `with_match` depends on and gets simplified
46        // by such property.
47        let chunk_size = chunk_size.max(2);
48
49        Self {
50            builder: StreamChunkBuilder::new(chunk_size, data_types),
51            update_to_output,
52            matched_to_output,
53        }
54    }
55
56    /// Get the mappings from left/right input indices to the output indices. The mappings can be
57    /// used to create [`JoinStreamChunkBuilder`] later.
58    ///
59    /// Please note the semantics of `update` and `matched` when creating the builder: either left
60    /// or right side can be `update` side or `matched` side, the key is to call the corresponding
61    /// append method once you passed `left_to_output`/`right_to_output` to
62    /// `update_to_output`/`matched_to_output`.
63    pub fn get_i2o_mapping(
64        output_indices: &[usize],
65        left_len: usize,
66        right_len: usize,
67    ) -> (IndexMappings, IndexMappings) {
68        let mut left_to_output = vec![];
69        let mut right_to_output = vec![];
70
71        for (output_idx, &idx) in output_indices.iter().enumerate() {
72            if idx < left_len {
73                left_to_output.push((idx, output_idx))
74            } else if idx >= left_len && idx < left_len + right_len {
75                right_to_output.push((idx - left_len, output_idx));
76            } else {
77                unreachable!("output_indices out of bound")
78            }
79        }
80        (left_to_output, right_to_output)
81    }
82
83    /// Append a row with coming update value and matched value.
84    ///
85    /// A [`StreamChunk`] will be returned when `size == capacity`.
86    #[must_use]
87    pub fn append_row(
88        &mut self,
89        op: Op,
90        row_update: impl Row,
91        row_matched: impl Row,
92    ) -> Option<StreamChunk> {
93        self.builder.append_iter(
94            op,
95            self.update_to_output
96                .iter()
97                .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
98                .chain(
99                    self.matched_to_output
100                        .iter()
101                        .map(|&(matched_idx, output_idx)| {
102                            (output_idx, row_matched.datum_at(matched_idx))
103                        }),
104                ),
105        )
106    }
107
108    /// Append a row with coming update value and fill the other side with null.
109    ///
110    /// A [`StreamChunk`] will be returned when `size == capacity`.
111    #[must_use]
112    pub fn append_row_update(&mut self, op: Op, row_update: impl Row) -> Option<StreamChunk> {
113        self.builder.append_iter(
114            op,
115            self.update_to_output
116                .iter()
117                .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
118                .chain(
119                    self.matched_to_output
120                        .iter()
121                        .map(|&(_, output_idx)| (output_idx, DatumRef::None)),
122                ),
123        )
124    }
125
126    /// Append a row with matched value and fill the coming side with null.
127    ///
128    /// A [`StreamChunk`] will be returned when `size == capacity`.
129    #[must_use]
130    pub fn append_row_matched(&mut self, op: Op, row_matched: impl Row) -> Option<StreamChunk> {
131        self.builder.append_iter(
132            op,
133            self.update_to_output
134                .iter()
135                .map(|&(_, output_idx)| (output_idx, DatumRef::None))
136                .chain(
137                    self.matched_to_output
138                        .iter()
139                        .map(|&(matched_idx, output_idx)| {
140                            (output_idx, row_matched.datum_at(matched_idx))
141                        }),
142                ),
143        )
144    }
145
146    /// Take out the remaining rows as a chunk. Return `None` if the builder is empty.
147    #[must_use]
148    pub fn take(&mut self) -> Option<StreamChunk> {
149        self.builder.take()
150    }
151}
152
153pub struct JoinChunkBuilder<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> {
154    stream_chunk_builder: JoinStreamChunkBuilder,
155}
156
157impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder<T, SIDE> {
158    pub fn new(stream_chunk_builder: JoinStreamChunkBuilder) -> Self {
159        Self {
160            stream_chunk_builder,
161        }
162    }
163
164    /// Remove unnecessary updates in the pattern `-(k, old), +(k, NULL), -(k, NULL), +(k, new)`
165    /// to avoid this issue: <https://github.com/risingwavelabs/risingwave/issues/17450>
166    pub fn post_process(c: StreamChunk) -> StreamChunk {
167        c.eliminate_adjacent_noop_update()
168    }
169
170    /// TODO(kwannoel): We can actually reuse a lot of the logic between `with_match_on_insert`
171    /// and `with_match_on_delete`. We should refactor this to avoid code duplication.
172    /// We just introduce this wrapper function to avoid large code diffs.
173    pub fn with_match<const OP: JoinOpPrimitive>(
174        &mut self,
175        row: &RowRef<'_>,
176        matched_row: &JoinRow<impl Row>,
177    ) -> Option<StreamChunk> {
178        match OP {
179            JoinOp::Insert => self.with_match_on_insert(row, matched_row),
180            JoinOp::Delete => self.with_match_on_delete(row, matched_row),
181        }
182    }
183
184    pub fn with_match_on_insert(
185        &mut self,
186        row: &RowRef<'_>,
187        matched_row: &JoinRow<impl Row>,
188    ) -> Option<StreamChunk> {
189        // Left/Right Anti sides
190        if is_anti(T) {
191            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
192                self.stream_chunk_builder
193                    .append_row_matched(Op::Delete, &matched_row.row)
194                    .map(Self::post_process)
195            } else {
196                None
197            }
198        // Left/Right Semi sides
199        } else if is_semi(T) {
200            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
201                self.stream_chunk_builder
202                    .append_row_matched(Op::Insert, &matched_row.row)
203                    .map(Self::post_process)
204            } else {
205                None
206            }
207        // Outer sides
208        } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
209            // if the matched_row does not have any current matches
210
211            // The current side part of the stream key changes from NULL to non-NULL.
212            // Thus we cannot use `UpdateDelete` and `UpdateInsert`, as it requires the
213            // stream key to remain the same.
214            let chunk1 = self
215                .stream_chunk_builder
216                .append_row_matched(Op::Delete, &matched_row.row);
217            let chunk2 = self
218                .stream_chunk_builder
219                .append_row(Op::Insert, row, &matched_row.row);
220
221            // We've enforced chunk size to be at least 2, so it's impossible to have 2 chunks yield.
222            // TODO: better to ensure they are in the same chunk to make `post_process` more effective.
223            assert!(chunk1.is_none() || chunk2.is_none());
224            chunk1.or(chunk2).map(Self::post_process)
225        // Inner sides
226        } else {
227            self.stream_chunk_builder
228                .append_row(Op::Insert, row, &matched_row.row)
229                .map(Self::post_process)
230        }
231    }
232
233    pub fn with_match_on_delete(
234        &mut self,
235        row: &RowRef<'_>,
236        matched_row: &JoinRow<impl Row>,
237    ) -> Option<StreamChunk> {
238        // Left/Right Anti sides
239        if is_anti(T) {
240            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
241                self.stream_chunk_builder
242                    .append_row_matched(Op::Insert, &matched_row.row)
243                    .map(Self::post_process)
244            } else {
245                None
246            }
247        // Left/Right Semi sides
248        } else if is_semi(T) {
249            if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
250                self.stream_chunk_builder
251                    .append_row_matched(Op::Delete, &matched_row.row)
252                    .map(Self::post_process)
253            } else {
254                None
255            }
256        // Outer sides
257        } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
258            // if the matched_row does not have any current matches
259
260            // The current side part of the stream key changes from non-NULL to NULL.
261            // Thus we cannot use `UpdateDelete` and `UpdateInsert`, as it requires the
262            // stream key to remain the same.
263            let chunk1 = self
264                .stream_chunk_builder
265                .append_row(Op::Delete, row, &matched_row.row);
266            let chunk2 = self
267                .stream_chunk_builder
268                .append_row_matched(Op::Insert, &matched_row.row);
269
270            // We've enforced chunk size to be at least 2, so it's impossible to have 2 chunks yield.
271            // TODO: better to ensure they are in the same chunk to make `post_process` more effective.
272            assert!(chunk1.is_none() || chunk2.is_none());
273            chunk1.or(chunk2).map(Self::post_process)
274
275        // Inner sides
276        } else {
277            // concat with the matched_row and append the new row
278            // FIXME: we always use `Op::Delete` here to avoid
279            // violating the assumption for U+ after U-, we can actually do better.
280            self.stream_chunk_builder
281                .append_row(Op::Delete, row, &matched_row.row)
282                .map(Self::post_process)
283        }
284    }
285
286    #[inline]
287    pub fn forward_exactly_once_if_matched(
288        &mut self,
289        op: Op,
290        row: RowRef<'_>,
291    ) -> Option<StreamChunk> {
292        // if it's a semi join and the side needs to be maintained.
293        if is_semi(T) && forward_exactly_once(T, SIDE) {
294            self.stream_chunk_builder
295                .append_row_update(op, row)
296                .map(Self::post_process)
297        } else {
298            None
299        }
300    }
301
302    #[inline]
303    pub fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
304        // if it's outer join or anti join and the side needs to be maintained.
305        if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) {
306            self.stream_chunk_builder
307                .append_row_update(op, row)
308                .map(Self::post_process)
309        } else {
310            None
311        }
312    }
313
314    #[inline]
315    pub fn take(&mut self) -> Option<StreamChunk> {
316        self.stream_chunk_builder.take().map(Self::post_process)
317    }
318}