risingwave_stream/executor/join/
builder.rs1use risingwave_common::array::stream_chunk::StreamChunkMut;
16use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
17use risingwave_common::array::{Op, RowRef, StreamChunk};
18use risingwave_common::row::{OwnedRow, Row};
19use risingwave_common::types::{DataType, DatumRef};
20
21use self::row::JoinRow;
22use super::*;
24
25type IndexMappings = Vec<(usize, usize)>;
26
27pub struct JoinStreamChunkBuilder {
29 builder: StreamChunkBuilder,
30
31 update_to_output: IndexMappings,
33
34 matched_to_output: IndexMappings,
36}
37
38impl JoinStreamChunkBuilder {
39 pub fn new(
40 chunk_size: usize,
41 data_types: Vec<DataType>,
42 update_to_output: IndexMappings,
43 matched_to_output: IndexMappings,
44 ) -> Self {
45 Self {
46 builder: StreamChunkBuilder::new(chunk_size, data_types),
47 update_to_output,
48 matched_to_output,
49 }
50 }
51
52 pub fn get_i2o_mapping(
60 output_indices: &[usize],
61 left_len: usize,
62 right_len: usize,
63 ) -> (IndexMappings, IndexMappings) {
64 let mut left_to_output = vec![];
65 let mut right_to_output = vec![];
66
67 for (output_idx, &idx) in output_indices.iter().enumerate() {
68 if idx < left_len {
69 left_to_output.push((idx, output_idx))
70 } else if idx >= left_len && idx < left_len + right_len {
71 right_to_output.push((idx - left_len, output_idx));
72 } else {
73 unreachable!("output_indices out of bound")
74 }
75 }
76 (left_to_output, right_to_output)
77 }
78
79 #[must_use]
83 pub fn append_row(
84 &mut self,
85 op: Op,
86 row_update: impl Row,
87 row_matched: impl Row,
88 ) -> Option<StreamChunk> {
89 self.builder.append_iter(
90 op,
91 self.update_to_output
92 .iter()
93 .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
94 .chain(
95 self.matched_to_output
96 .iter()
97 .map(|&(matched_idx, output_idx)| {
98 (output_idx, row_matched.datum_at(matched_idx))
99 }),
100 ),
101 )
102 }
103
104 #[must_use]
108 pub fn append_row_update(&mut self, op: Op, row_update: impl Row) -> Option<StreamChunk> {
109 self.builder.append_iter(
110 op,
111 self.update_to_output
112 .iter()
113 .map(|&(update_idx, output_idx)| (output_idx, row_update.datum_at(update_idx)))
114 .chain(
115 self.matched_to_output
116 .iter()
117 .map(|&(_, output_idx)| (output_idx, DatumRef::None)),
118 ),
119 )
120 }
121
122 #[must_use]
126 pub fn append_row_matched(&mut self, op: Op, row_matched: impl Row) -> Option<StreamChunk> {
127 self.builder.append_iter(
128 op,
129 self.update_to_output
130 .iter()
131 .map(|&(_, output_idx)| (output_idx, DatumRef::None))
132 .chain(
133 self.matched_to_output
134 .iter()
135 .map(|&(matched_idx, output_idx)| {
136 (output_idx, row_matched.datum_at(matched_idx))
137 }),
138 ),
139 )
140 }
141
142 #[must_use]
144 pub fn take(&mut self) -> Option<StreamChunk> {
145 self.builder.take()
146 }
147}
148
149pub struct JoinChunkBuilder<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> {
150 stream_chunk_builder: JoinStreamChunkBuilder,
151}
152
153impl<const T: JoinTypePrimitive, const SIDE: SideTypePrimitive> JoinChunkBuilder<T, SIDE> {
154 pub fn new(stream_chunk_builder: JoinStreamChunkBuilder) -> Self {
155 Self {
156 stream_chunk_builder,
157 }
158 }
159
160 pub fn post_process(c: StreamChunk) -> StreamChunk {
161 let mut c = StreamChunkMut::from(c);
162
163 let mut i = 2;
166 while i < c.capacity() {
167 if c.op(i - 1) == Op::UpdateInsert
168 && c.op(i) == Op::UpdateDelete
169 && c.row_ref(i) == c.row_ref(i - 1)
170 {
171 if c.op(i - 2) == Op::UpdateDelete && c.op(i + 1) == Op::UpdateInsert {
172 c.set_op(i - 2, Op::Delete);
173 c.set_vis(i - 1, false);
174 c.set_vis(i, false);
175 c.set_op(i + 1, Op::Insert);
176 i += 3;
177 } else {
178 debug_assert!(
179 false,
180 "unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
181 c.op(i - 2),
182 c.op(i - 1),
183 c.op(i),
184 c.op(i + 1)
185 );
186 warn!(
187 "unexpected Op sequences {:?}, {:?}, {:?}, {:?}",
188 c.op(i - 2),
189 c.op(i - 1),
190 c.op(i),
191 c.op(i + 1)
192 );
193 i += 1;
194 }
195 } else {
196 i += 1;
197 }
198 }
199 c.into()
200 }
201
202 pub fn with_match<const OP: JoinOpPrimitive>(
206 &mut self,
207 row: &RowRef<'_>,
208 matched_row: &JoinRow<OwnedRow>,
209 ) -> Option<StreamChunk> {
210 match OP {
211 JoinOp::Insert => self.with_match_on_insert(row, matched_row),
212 JoinOp::Delete => self.with_match_on_delete(row, matched_row),
213 }
214 }
215
216 pub fn with_match_on_insert(
217 &mut self,
218 row: &RowRef<'_>,
219 matched_row: &JoinRow<OwnedRow>,
220 ) -> Option<StreamChunk> {
221 if is_anti(T) {
223 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
224 self.stream_chunk_builder
225 .append_row_matched(Op::Delete, &matched_row.row)
226 .map(Self::post_process)
227 } else {
228 None
229 }
230 } else if is_semi(T) {
232 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
233 self.stream_chunk_builder
234 .append_row_matched(Op::Insert, &matched_row.row)
235 .map(Self::post_process)
236 } else {
237 None
238 }
239 } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
241 if self
245 .stream_chunk_builder
246 .append_row_matched(Op::UpdateDelete, &matched_row.row)
247 .is_some()
248 {
249 unreachable!("`Op::UpdateDelete` should not yield chunk");
250 }
251 self.stream_chunk_builder
252 .append_row(Op::UpdateInsert, row, &matched_row.row)
253 .map(Self::post_process)
254 } else {
256 self.stream_chunk_builder
257 .append_row(Op::Insert, row, &matched_row.row)
258 .map(Self::post_process)
259 }
260 }
261
262 pub fn with_match_on_delete(
263 &mut self,
264 row: &RowRef<'_>,
265 matched_row: &JoinRow<OwnedRow>,
266 ) -> Option<StreamChunk> {
267 if is_anti(T) {
269 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
270 self.stream_chunk_builder
271 .append_row_matched(Op::Insert, &matched_row.row)
272 .map(Self::post_process)
273 } else {
274 None
275 }
276 } else if is_semi(T) {
278 if matched_row.is_zero_degree() && only_forward_matched_side(T, SIDE) {
279 self.stream_chunk_builder
280 .append_row_matched(Op::Delete, &matched_row.row)
281 .map(Self::post_process)
282 } else {
283 None
284 }
285 } else if matched_row.is_zero_degree() && outer_side_null(T, SIDE) {
287 if self
290 .stream_chunk_builder
291 .append_row(Op::UpdateDelete, row, &matched_row.row)
292 .is_some()
293 {
294 unreachable!("`Op::UpdateDelete` should not yield chunk");
295 }
296 self.stream_chunk_builder
297 .append_row_matched(Op::UpdateInsert, &matched_row.row)
298 .map(|c: StreamChunk| Self::post_process(c))
299
300 } else {
302 self.stream_chunk_builder
308 .append_row(Op::Delete, row, &matched_row.row)
309 .map(Self::post_process)
310 }
311 }
312
313 #[inline]
314 pub fn forward_exactly_once_if_matched(
315 &mut self,
316 op: Op,
317 row: RowRef<'_>,
318 ) -> Option<StreamChunk> {
319 if is_semi(T) && forward_exactly_once(T, SIDE) {
321 self.stream_chunk_builder
322 .append_row_update(op, row)
323 .map(Self::post_process)
324 } else {
325 None
326 }
327 }
328
329 #[inline]
330 pub fn forward_if_not_matched(&mut self, op: Op, row: RowRef<'_>) -> Option<StreamChunk> {
331 if (is_anti(T) && forward_exactly_once(T, SIDE)) || is_outer_side(T, SIDE) {
333 self.stream_chunk_builder
334 .append_row_update(op, row)
335 .map(Self::post_process)
336 } else {
337 None
338 }
339 }
340
341 #[inline]
342 pub fn take(&mut self) -> Option<StreamChunk> {
343 self.stream_chunk_builder.take().map(Self::post_process)
344 }
345}