risingwave_stream/executor/join/
builder.rs1use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
16use risingwave_common::array::{Op, RowRef, StreamChunk};
17use risingwave_common::row::Row;
18use risingwave_common::types::{DataType, DatumRef};
19
20use self::row::JoinRow;
21use super::*;
23
24type IndexMappings = Vec<(usize, usize)>;
25
26pub struct JoinStreamChunkBuilder {
28 builder: StreamChunkBuilder,
29
30 update_to_output: IndexMappings,
32
33 matched_to_output: IndexMappings,
35}
36
37impl JoinStreamChunkBuilder {
38 pub fn new(
39 chunk_size: usize,
40 data_types: Vec<DataType>,
41 update_to_output: IndexMappings,
42 matched_to_output: IndexMappings,
43 ) -> Self {
44 let chunk_size = chunk_size.max(2);
48
49 Self {
50 builder: StreamChunkBuilder::new(chunk_size, data_types),
51 update_to_output,
52 matched_to_output,
53 }
54 }
55
56 pub fn get_i2o_mapping(
64 output_indices: &[usize],
65 left_len: usize,
66 right_len: usize,
67 ) -> (IndexMappings, IndexMappings) {
68 let mut left_to_output = vec![];
69 let mut right_to_output = vec![];
70
71 for (output_idx, &idx) in output_indices.iter().enumerate() {
72 if idx < left_len {
73 left_to_output.push((idx, output_idx))
74 } else if idx >= left_len && idx < left_len + right_len {
75 right_to_output.push((idx - left_len, output_idx));
76 } else {
77 unreachable!("output_indices out of bound")
78 }
79 }
80 (left_to_output, right_to_output)
81 }
82
83 #[must_use]
87 pub fn append_row(
88 &mut self,
89 op: Op,
90 row_update: impl Row,
91 row_matched: impl Row,
92 ) -> Option<StreamChunk> {
93 self.builder.append_iter(
94 op,
95 self.update_to_output
96 .iter()
97 .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
98 .chain(
99 self.matched_to_output
100 .iter()
101 .map(|&(matched_idx, output_idx)| {
102 (output_idx, row_matched.datum_at(matched_idx))
103 }),
104 ),
105 )
106 }
107
108 #[must_use]
112 pub fn append_row_update(&mut self, op: Op, row_update: impl Row) -> Option<StreamChunk> {
113 self.builder.append_iter(
114 op,
115 self.update_to_output
116 .iter()
117 .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
118 .chain(
119 self.matched_to_output
120 .iter()
121 .map(|&(_, output_idx)| (output_idx, DatumRef::None)),
122 ),
123 )
124 }
125
126 #[must_use]
130 pub fn append_row_matched(&mut self, op: Op, row_matched: impl Row) -> Option<StreamChunk> {
131 self.builder.append_iter(
132 op,
133 self.update_to_output
134 .iter()
135 .map(|&(_, output_idx)| (output_idx, DatumRef::None))
136 .chain(
137 self.matched_to_output
138 .iter()
139 .map(|&(matched_idx, output_idx)| {
140 (output_idx, row_matched.datum_at(matched_idx))
141 }),
142 ),
143 )
144 }
145
146 #[must_use]
148 pub fn take(&mut self) -> Option<StreamChunk> {
149 self.builder.take()
150 }
151}
152
153pub struct JoinChunkBuilder<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> {
154 stream_chunk_builder: JoinStreamChunkBuilder,
155}
156
157impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder<T, SIDE> {
158 pub fn new(stream_chunk_builder: JoinStreamChunkBuilder) -> Self {
159 Self {
160 stream_chunk_builder,
161 }
162 }
163
164 pub fn post_process(c: StreamChunk) -> StreamChunk {
167 c.eliminate_adjacent_noop_update()
168 }
169
170 pub fn with_match<const OP: JoinOpPrimitive>(
174 &mut self,
175 row: &RowRef<'_>,
176 matched_row: &JoinRow<impl Row>,
177 ) -> Option<StreamChunk> {
178 match OP {
179 JoinOp::Insert => self.with_match_on_insert(row, matched_row),
180 JoinOp::Delete => self.with_match_on_delete(row, matched_row),
181 }
182 }
183
184 pub fn with_match_on_insert(
185 &mut self,
186 row: &RowRef<'_>,
187 matched_row: &JoinRow<impl Row>,
188 ) -> Option<StreamChunk> {
189 if is_anti(T) {
191 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
192 self.stream_chunk_builder
193 .append_row_matched(Op::Delete, &matched_row.row)
194 .map(Self::post_process)
195 } else {
196 None
197 }
198 } else if is_semi(T) {
200 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
201 self.stream_chunk_builder
202 .append_row_matched(Op::Insert, &matched_row.row)
203 .map(Self::post_process)
204 } else {
205 None
206 }
207 } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
209 let chunk1 = self
215 .stream_chunk_builder
216 .append_row_matched(Op::Delete, &matched_row.row);
217 let chunk2 = self
218 .stream_chunk_builder
219 .append_row(Op::Insert, row, &matched_row.row);
220
221 assert!(chunk1.is_none() || chunk2.is_none());
224 chunk1.or(chunk2).map(Self::post_process)
225 } else {
227 self.stream_chunk_builder
228 .append_row(Op::Insert, row, &matched_row.row)
229 .map(Self::post_process)
230 }
231 }
232
233 pub fn with_match_on_delete(
234 &mut self,
235 row: &RowRef<'_>,
236 matched_row: &JoinRow<impl Row>,
237 ) -> Option<StreamChunk> {
238 if is_anti(T) {
240 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
241 self.stream_chunk_builder
242 .append_row_matched(Op::Insert, &matched_row.row)
243 .map(Self::post_process)
244 } else {
245 None
246 }
247 } else if is_semi(T) {
249 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
250 self.stream_chunk_builder
251 .append_row_matched(Op::Delete, &matched_row.row)
252 .map(Self::post_process)
253 } else {
254 None
255 }
256 } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
258 let chunk1 = self
264 .stream_chunk_builder
265 .append_row(Op::Delete, row, &matched_row.row);
266 let chunk2 = self
267 .stream_chunk_builder
268 .append_row_matched(Op::Insert, &matched_row.row);
269
270 assert!(chunk1.is_none() || chunk2.is_none());
273 chunk1.or(chunk2).map(Self::post_process)
274
275 } else {
277 self.stream_chunk_builder
281 .append_row(Op::Delete, row, &matched_row.row)
282 .map(Self::post_process)
283 }
284 }
285
286 #[inline]
287 pub fn forward_exactly_once_if_matched(
288 &mut self,
289 op: Op,
290 row: RowRef<'_>,
291 ) -> Option<StreamChunk> {
292 if is_semi(T) && forward_exactly_once(T, SIDE) {
294 self.stream_chunk_builder
295 .append_row_update(op, row)
296 .map(Self::post_process)
297 } else {
298 None
299 }
300 }
301
302 #[inline]
303 pub fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
304 if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) {
306 self.stream_chunk_builder
307 .append_row_update(op, row)
308 .map(Self::post_process)
309 } else {
310 None
311 }
312 }
313
314 #[inline]
315 pub fn take(&mut self) -> Option<StreamChunk> {
316 self.stream_chunk_builder.take().map(Self::post_process)
317 }
318}