1use std::cmp::Ordering;
22use std::collections::BTreeMap;
23use std::ops::Bound;
24use std::time::Duration;
25
26use either::Either;
27use itertools::Itertools;
28use multimap::MultiMap;
29use risingwave_common::array::Op;
30use risingwave_common::row::RowExt;
31use risingwave_common::util::epoch::EpochPair;
32use risingwave_common::util::sort_util::cmp_rows_ascending;
33use tokio::time::Instant;
34
35use super::barrier_align::*;
36use super::join::asof_join::*;
37use super::join::builder::JoinStreamChunkBuilder;
38use super::join::row::JoinRow;
39use super::join::*;
40use super::watermark::*;
41use crate::executor::join::builder::JoinChunkBuilder;
42use crate::executor::prelude::*;
43
44pub struct JoinParams {
45 pub join_key_indices: Vec<usize>,
47 pub deduped_pk_indices: Vec<usize>,
49}
50
51impl JoinParams {
52 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
53 Self {
54 join_key_indices,
55 deduped_pk_indices,
56 }
57 }
58}
59
60struct JoinSide<S: StateStore, E: AsOfRowEncoding> {
61 ht: AsOfJoinHashMap<S, E>,
63 join_key_indices: Vec<usize>,
65 all_data_types: Vec<DataType>,
67 i2o_mapping: Vec<(usize, usize)>,
69 i2o_mapping_indexed: MultiMap<usize, usize>,
70 inequal_key_idx: usize,
72}
73
74impl<S: StateStore, E: AsOfRowEncoding> std::fmt::Debug for JoinSide<S, E> {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("JoinSide")
77 .field("join_key_indices", &self.join_key_indices)
78 .field("col_types", &self.all_data_types)
79 .field("i2o_mapping", &self.i2o_mapping)
80 .finish()
81 }
82}
83
84impl<S: StateStore, E: AsOfRowEncoding> JoinSide<S, E> {
85 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
86 self.ht.init(epoch).await
87 }
88}
89
90pub struct AsOfJoinExecutor<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> {
96 ctx: ActorContextRef,
97 info: ExecutorInfo,
98
99 input_l: Option<Executor>,
101 input_r: Option<Executor>,
103 actual_output_data_types: Vec<DataType>,
105 side_l: JoinSide<S, E>,
107 side_r: JoinSide<S, E>,
109
110 null_safe: Vec<bool>,
112
113 metrics: Arc<StreamingMetrics>,
114 chunk_size: usize,
116 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
118 asof_desc: AsOfDesc,
120 cnt_rows_received: u32,
122 join_cache_evict_interval_rows: u32,
124 high_join_amplification_threshold: usize,
126}
127
128impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> std::fmt::Debug
129 for AsOfJoinExecutor<S, T, E>
130{
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 f.debug_struct("AsOfJoinExecutor")
133 .field("join_type", &T)
134 .field("input_left", &self.input_l.as_ref().unwrap().identity())
135 .field("input_right", &self.input_r.as_ref().unwrap().identity())
136 .field("side_l", &self.side_l)
137 .field("side_r", &self.side_r)
138 .field("stream_key", &self.info.stream_key)
139 .field("schema", &self.info.schema)
140 .field("actual_output_data_types", &self.actual_output_data_types)
141 .finish()
142 }
143}
144
145impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> Execute
146 for AsOfJoinExecutor<S, T, E>
147{
148 fn execute(self: Box<Self>) -> BoxedMessageStream {
149 self.into_stream().boxed()
150 }
151}
152
153struct EqJoinArgs<'a, S: StateStore, E: AsOfRowEncoding> {
154 ctx: &'a ActorContextRef,
155 side_l: &'a mut JoinSide<S, E>,
156 side_r: &'a mut JoinSide<S, E>,
157 null_safe: &'a [bool],
158 asof_desc: &'a AsOfDesc,
159 actual_output_data_types: &'a [DataType],
160 chunk: StreamChunk,
161 chunk_size: usize,
162 cnt_rows_received: &'a mut u32,
163 join_cache_evict_interval_rows: u32,
164 high_join_amplification_threshold: usize,
165}
166
167impl<S: StateStore, const T: AsOfJoinTypePrimitive, E: AsOfRowEncoding> AsOfJoinExecutor<S, T, E> {
168 #[expect(clippy::too_many_arguments)]
169 pub fn new(
170 ctx: ActorContextRef,
171 info: ExecutorInfo,
172 input_l: Executor,
173 input_r: Executor,
174 params_l: JoinParams,
175 params_r: JoinParams,
176 null_safe: Vec<bool>,
177 output_indices: Vec<usize>,
178 state_table_l: StateTable<S>,
179 state_table_r: StateTable<S>,
180 watermark_epoch: AtomicU64Ref,
181 metrics: Arc<StreamingMetrics>,
182 chunk_size: usize,
183 asof_desc: AsOfDesc,
184 use_cache: bool,
185 high_join_amplification_threshold: usize,
186 ) -> Self {
187 let join_cache_evict_interval_rows = ctx
188 .config
189 .developer
190 .join_hash_map_evict_interval_rows
191 .max(1);
192 let cache_epoch = if use_cache {
193 Some(watermark_epoch)
194 } else {
195 None
196 };
197 let schema_fields = [
198 input_l.schema().fields.clone(),
199 input_r.schema().fields.clone(),
200 ]
201 .concat();
202
203 let original_output_data_types = schema_fields
204 .iter()
205 .map(|field| field.data_type())
206 .collect_vec();
207 let actual_output_data_types = output_indices
208 .iter()
209 .map(|&idx| original_output_data_types[idx].clone())
210 .collect_vec();
211
212 let state_all_data_types_l = input_l.schema().data_types();
213 let state_all_data_types_r = input_r.schema().data_types();
214
215 let state_join_key_indices_l = params_l.join_key_indices;
216 let state_join_key_indices_r = params_r.join_key_indices;
217
218 let join_key_data_types_l = state_join_key_indices_l
219 .iter()
220 .map(|idx| state_all_data_types_l[*idx].clone())
221 .collect_vec();
222
223 let join_key_data_types_r = state_join_key_indices_r
224 .iter()
225 .map(|idx| state_all_data_types_r[*idx].clone())
226 .collect_vec();
227
228 assert_eq!(join_key_data_types_l, join_key_data_types_r);
229
230 let (left_to_output, right_to_output) = {
231 let (left_len, right_len) = if is_left_semi_or_anti(T) {
232 (state_all_data_types_l.len(), 0usize)
233 } else if is_right_semi_or_anti(T) {
234 (0usize, state_all_data_types_r.len())
235 } else {
236 (state_all_data_types_l.len(), state_all_data_types_r.len())
237 };
238 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
239 };
240
241 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
242 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
243
244 let watermark_buffers = BTreeMap::new();
245
246 let inequal_key_idx_l = asof_desc.left_idx;
247 let inequal_key_idx_r = asof_desc.right_idx;
248
249 Self {
250 ctx: ctx.clone(),
251 info,
252 input_l: Some(input_l),
253 input_r: Some(input_r),
254 actual_output_data_types,
255 null_safe,
256 side_l: JoinSide {
257 ht: AsOfJoinHashMap::new(
258 state_join_key_indices_l.clone(),
259 state_table_l,
260 params_l.deduped_pk_indices,
261 inequal_key_idx_l,
262 state_all_data_types_l.clone(),
263 cache_epoch.clone(),
264 metrics.clone(),
265 ctx.id,
266 ctx.fragment_id,
267 "left",
268 ),
269 join_key_indices: state_join_key_indices_l,
270 all_data_types: state_all_data_types_l,
271 i2o_mapping: left_to_output,
272 i2o_mapping_indexed: l2o_indexed,
273 inequal_key_idx: inequal_key_idx_l,
274 },
275 side_r: JoinSide {
276 ht: AsOfJoinHashMap::new(
277 state_join_key_indices_r.clone(),
278 state_table_r,
279 params_r.deduped_pk_indices,
280 inequal_key_idx_r,
281 state_all_data_types_r.clone(),
282 cache_epoch,
283 metrics.clone(),
284 ctx.id,
285 ctx.fragment_id,
286 "right",
287 ),
288 join_key_indices: state_join_key_indices_r,
289 all_data_types: state_all_data_types_r,
290 i2o_mapping: right_to_output,
291 i2o_mapping_indexed: r2o_indexed,
292 inequal_key_idx: inequal_key_idx_r,
293 },
294 metrics,
295 chunk_size,
296 watermark_buffers,
297 asof_desc,
298 cnt_rows_received: 0,
299 join_cache_evict_interval_rows,
300 high_join_amplification_threshold,
301 }
302 }
303
304 fn evict_cache(
306 side_l: &mut JoinSide<S, E>,
307 side_r: &mut JoinSide<S, E>,
308 cnt_rows_received: &mut u32,
309 join_cache_evict_interval_rows: u32,
310 ) {
311 *cnt_rows_received += 1;
312 if *cnt_rows_received >= join_cache_evict_interval_rows {
313 side_l.ht.evict_cache();
314 side_r.ht.evict_cache();
315 *cnt_rows_received = 0;
316 }
317 }
318
319 #[try_stream(ok = Message, error = StreamExecutorError)]
320 async fn into_stream(mut self) {
321 let input_l = self.input_l.take().unwrap();
322 let input_r = self.input_r.take().unwrap();
323 let aligned_stream = barrier_align(
324 input_l.execute(),
325 input_r.execute(),
326 self.ctx.id,
327 self.ctx.fragment_id,
328 self.metrics.clone(),
329 "Join",
330 );
331 pin_mut!(aligned_stream);
332 let actor_id = self.ctx.id;
333
334 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
335 let first_epoch = barrier.epoch;
336 yield Message::Barrier(barrier);
337 self.side_l.init(first_epoch).await?;
338 self.side_r.init(first_epoch).await?;
339
340 let actor_id_str = self.ctx.id.to_string();
341 let fragment_id_str = self.ctx.fragment_id.to_string();
342
343 let join_actor_input_waiting_duration_ns = self
344 .metrics
345 .join_actor_input_waiting_duration_ns
346 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
347 let left_join_match_duration_ns = self
348 .metrics
349 .join_match_duration_ns
350 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
351 let right_join_match_duration_ns = self
352 .metrics
353 .join_match_duration_ns
354 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
355
356 let barrier_join_match_duration_ns = self
357 .metrics
358 .join_match_duration_ns
359 .with_guarded_label_values(&[
360 actor_id_str.as_str(),
361 fragment_id_str.as_str(),
362 "barrier",
363 ]);
364
365 let left_join_cached_entry_count = self
366 .metrics
367 .join_cached_entry_count
368 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
369
370 let right_join_cached_entry_count = self
371 .metrics
372 .join_cached_entry_count
373 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
374
375 let mut start_time = Instant::now();
376
377 while let Some(msg) = aligned_stream
378 .next()
379 .instrument_await("hash_join_barrier_align")
380 .await
381 {
382 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
383 match msg? {
384 AlignedMessage::WatermarkLeft(watermark) => {
385 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
386 yield Message::Watermark(watermark_to_emit);
387 }
388 }
389 AlignedMessage::WatermarkRight(watermark) => {
390 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
391 yield Message::Watermark(watermark_to_emit);
392 }
393 }
394 AlignedMessage::Left(chunk) => {
395 let mut left_time = Duration::from_nanos(0);
396 let mut left_start_time = Instant::now();
397 #[for_await]
398 for chunk in Self::eq_join_left(EqJoinArgs {
399 ctx: &self.ctx,
400 side_l: &mut self.side_l,
401 side_r: &mut self.side_r,
402 null_safe: &self.null_safe,
403 asof_desc: &self.asof_desc,
404 actual_output_data_types: &self.actual_output_data_types,
405 chunk,
406 chunk_size: self.chunk_size,
407 cnt_rows_received: &mut self.cnt_rows_received,
408 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
409 high_join_amplification_threshold: self.high_join_amplification_threshold,
410 }) {
411 left_time += left_start_time.elapsed();
412 yield Message::Chunk(chunk?);
413 left_start_time = Instant::now();
414 }
415 left_time += left_start_time.elapsed();
416 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
417 self.try_flush_data().await?;
418 }
419 AlignedMessage::Right(chunk) => {
420 let mut right_time = Duration::from_nanos(0);
421 let mut right_start_time = Instant::now();
422 #[for_await]
423 for chunk in Self::eq_join_right(EqJoinArgs {
424 ctx: &self.ctx,
425 side_l: &mut self.side_l,
426 side_r: &mut self.side_r,
427 null_safe: &self.null_safe,
428 asof_desc: &self.asof_desc,
429 actual_output_data_types: &self.actual_output_data_types,
430 chunk,
431 chunk_size: self.chunk_size,
432 cnt_rows_received: &mut self.cnt_rows_received,
433 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
434 high_join_amplification_threshold: self.high_join_amplification_threshold,
435 }) {
436 right_time += right_start_time.elapsed();
437 yield Message::Chunk(chunk?);
438 right_start_time = Instant::now();
439 }
440 right_time += right_start_time.elapsed();
441 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
442 self.try_flush_data().await?;
443 }
444 AlignedMessage::Barrier(barrier) => {
445 let barrier_start_time = Instant::now();
446 let (left_post_commit, right_post_commit) =
447 self.flush_data(barrier.epoch).await?;
448
449 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
450 yield Message::Barrier(barrier);
451
452 right_post_commit
454 .post_yield_barrier(update_vnode_bitmap.clone())
455 .await?;
456 if left_post_commit
457 .post_yield_barrier(update_vnode_bitmap)
458 .await?
459 .unwrap_or(false)
460 {
461 self.watermark_buffers
462 .values_mut()
463 .for_each(|buffers| buffers.clear());
464 }
465
466 for (join_cached_entry_count, ht) in [
468 (&left_join_cached_entry_count, &self.side_l.ht),
469 (&right_join_cached_entry_count, &self.side_r.ht),
470 ] {
471 join_cached_entry_count.set(ht.entry_count() as i64);
472 }
473
474 barrier_join_match_duration_ns
475 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
476 }
477 }
478 start_time = Instant::now();
479 }
480 }
481
482 async fn flush_data(
483 &mut self,
484 epoch: EpochPair,
485 ) -> StreamExecutorResult<(
486 AsOfJoinHashMapPostCommit<'_, S, E>,
487 AsOfJoinHashMapPostCommit<'_, S, E>,
488 )> {
489 let left = self.side_l.ht.flush(epoch).await?;
490 let right = self.side_r.ht.flush(epoch).await?;
491 Ok((left, right))
492 }
493
494 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
495 self.side_l.ht.try_flush().await?;
496 self.side_r.ht.try_flush().await?;
497 Ok(())
498 }
499
500 fn handle_watermark(
501 &mut self,
502 side: SideTypePrimitive,
503 watermark: Watermark,
504 ) -> StreamExecutorResult<Vec<Watermark>> {
505 let (side_update, side_match) = if side == SideType::Left {
506 (&mut self.side_l, &mut self.side_r)
507 } else {
508 (&mut self.side_r, &mut self.side_l)
509 };
510
511 if side_update.join_key_indices[0] == watermark.col_idx {
513 side_match.ht.update_watermark(watermark.val.clone());
514 }
515
516 let wm_in_jk = side_update
518 .join_key_indices
519 .iter()
520 .positions(|idx| *idx == watermark.col_idx);
521 let mut watermarks_to_emit = vec![];
522 for idx in wm_in_jk {
523 let buffers = self
524 .watermark_buffers
525 .entry(idx)
526 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
527 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
528 let empty_indices = vec![];
529 let output_indices = side_update
530 .i2o_mapping_indexed
531 .get_vec(&side_update.join_key_indices[idx])
532 .unwrap_or(&empty_indices)
533 .iter()
534 .chain(
535 side_match
536 .i2o_mapping_indexed
537 .get_vec(&side_match.join_key_indices[idx])
538 .unwrap_or(&empty_indices),
539 );
540 for output_idx in output_indices {
541 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
542 }
543 };
544 }
545 Ok(watermarks_to_emit)
546 }
547
548 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
549 async fn eq_join_left(args: EqJoinArgs<'_, S, E>) {
550 let EqJoinArgs {
551 ctx: _,
552 side_l,
553 side_r,
554 null_safe,
555 asof_desc,
556 actual_output_data_types,
557 chunk,
558 chunk_size,
559 cnt_rows_received,
560 join_cache_evict_interval_rows,
561 high_join_amplification_threshold: _,
562 } = args;
563
564 let (side_update, side_match) = (side_l, side_r);
565
566 let mut join_chunk_builder =
567 JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
568 chunk_size,
569 actual_output_data_types.to_vec(),
570 side_update.i2o_mapping.clone(),
571 side_match.i2o_mapping.clone(),
572 ));
573
574 let inequal_key_idx_update = [side_update.inequal_key_idx];
576
577 for r in chunk.rows_with_holes() {
578 let Some((op, row)) = r else {
579 continue;
580 };
581 Self::evict_cache(
582 side_update,
583 side_match,
584 cnt_rows_received,
585 join_cache_evict_interval_rows,
586 );
587
588 let join_key_null_not_safe = side_update
591 .join_key_indices
592 .iter()
593 .zip_eq(null_safe.iter())
594 .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
595 if join_key_null_not_safe {
596 match op {
597 Op::Insert | Op::UpdateInsert => {
598 if let Some(chunk) =
599 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
600 {
601 yield chunk;
602 }
603 }
604 Op::Delete | Op::UpdateDelete => {
605 if let Some(chunk) =
606 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
607 {
608 yield chunk;
609 }
610 }
611 }
612 continue;
613 }
614
615 let join_key = row.project(&side_update.join_key_indices);
616
617 let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
618 let inequal_key = row.project(&inequal_key_idx_update);
619
620 if !inequal_key_is_null {
621 let matched_row_by_inequality = match asof_desc.inequality_type {
622 AsOfInequalityType::Lt => {
623 side_match
624 .ht
625 .lower_bound_by_inequality_with_jk_prefix(
626 &join_key,
627 Bound::Excluded(&inequal_key),
628 )
629 .await
630 }
631 AsOfInequalityType::Le => {
632 side_match
633 .ht
634 .lower_bound_by_inequality_with_jk_prefix(
635 &join_key,
636 Bound::Included(&inequal_key),
637 )
638 .await
639 }
640 AsOfInequalityType::Gt => {
641 side_match
642 .ht
643 .upper_bound_by_inequality_with_jk_prefix(
644 &join_key,
645 Bound::Excluded(&inequal_key),
646 )
647 .await
648 }
649 AsOfInequalityType::Ge => {
650 side_match
651 .ht
652 .upper_bound_by_inequality_with_jk_prefix(
653 &join_key,
654 Bound::Included(&inequal_key),
655 )
656 .await
657 }
658 }?
659 .map(|row| JoinRow::new(row, 0));
660 match op {
661 Op::Insert | Op::UpdateInsert => {
662 if let Some(matched_row) = matched_row_by_inequality {
663 if let Some(chunk) =
664 join_chunk_builder.with_match_on_insert(&row, &matched_row)
665 {
666 yield chunk;
667 }
668 } else if let Some(chunk) =
669 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
670 {
671 yield chunk;
672 }
673 side_update.ht.insert(row)?;
674 }
675 Op::Delete | Op::UpdateDelete => {
676 if let Some(matched_row) = matched_row_by_inequality {
677 if let Some(chunk) =
678 join_chunk_builder.with_match_on_delete(&row, &matched_row)
679 {
680 yield chunk;
681 }
682 } else if let Some(chunk) =
683 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
684 {
685 yield chunk;
686 }
687 side_update.ht.delete(row)?;
688 }
689 }
690 } else {
691 match op {
694 Op::Insert | Op::UpdateInsert => {
695 if let Some(chunk) =
696 join_chunk_builder.forward_if_not_matched(Op::Insert, row)
697 {
698 yield chunk;
699 }
700 }
701 Op::Delete | Op::UpdateDelete => {
702 if let Some(chunk) =
703 join_chunk_builder.forward_if_not_matched(Op::Delete, row)
704 {
705 yield chunk;
706 }
707 }
708 }
709 }
710 }
711 if let Some(chunk) = join_chunk_builder.take() {
712 yield chunk;
713 }
714 }
715
716 fn cmp_pk_rows(pk1: &impl Row, pk2: &impl Row) -> Ordering {
717 cmp_rows_ascending(pk1, pk2)
718 }
719
720 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
721 async fn eq_join_right(args: EqJoinArgs<'_, S, E>) {
722 let EqJoinArgs {
723 ctx,
724 side_l,
725 side_r,
726 null_safe,
727 asof_desc,
728 actual_output_data_types,
729 chunk,
730 chunk_size,
731 cnt_rows_received,
732 join_cache_evict_interval_rows,
733 high_join_amplification_threshold,
734 } = args;
735
736 let (side_update, side_match) = (side_r, side_l);
737
738 let mut join_chunk_builder = JoinStreamChunkBuilder::new(
739 chunk_size,
740 actual_output_data_types.to_vec(),
741 side_update.i2o_mapping.clone(),
742 side_match.i2o_mapping.clone(),
743 );
744
745 let join_matched_join_keys = ctx
746 .streaming_metrics
747 .join_matched_join_keys
748 .with_guarded_label_values(&[
749 &ctx.id.to_string(),
750 &ctx.fragment_id.to_string(),
751 &side_update.ht.table_id().to_string(),
752 ]);
753
754 let inequal_key_idx_update = [side_update.inequal_key_idx];
756
757 for r in chunk.rows_with_holes() {
758 let Some((op, row)) = r else {
759 continue;
760 };
761 Self::evict_cache(
762 side_update,
763 side_match,
764 cnt_rows_received,
765 join_cache_evict_interval_rows,
766 );
767
768 let join_key_null_not_safe = side_update
771 .join_key_indices
772 .iter()
773 .zip_eq(null_safe.iter())
774 .any(|(idx, ns)| !ns && row.datum_at(*idx).is_none());
775 if join_key_null_not_safe {
776 continue;
777 }
778
779 let join_key = row.project(&side_update.join_key_indices);
780
781 let inequal_key_is_null = side_update.ht.check_inequal_key_null(&row);
782 let inequal_key = row.project(&inequal_key_idx_update);
783
784 let mut join_matched_rows_cnt = 0;
785
786 if !inequal_key_is_null {
787 let (row_to_delete_r, row_to_insert_r) = {
788 let (first_row, second_row) = side_update
789 .ht
790 .first_two_by_inequality_with_jk_prefix(&join_key, &inequal_key)
791 .await?;
792 if let Some(first_row) = first_row {
793 let row_pk = side_update.ht.get_pk_from_row(row);
794 let first_pk = side_update.ht.get_pk_from_row(&first_row).to_owned_row();
795 match op {
796 Op::Insert | Op::UpdateInsert => {
797 if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Greater {
799 (Some(Either::Left(first_row)), Some(Either::Right(row)))
800 } else {
801 (None, None)
803 }
804 }
805 Op::Delete | Op::UpdateDelete => {
806 if Self::cmp_pk_rows(&first_pk, &row_pk) == Ordering::Equal {
807 if let Some(second_row) = second_row {
808 (Some(Either::Right(row)), Some(Either::Left(second_row)))
809 } else {
810 (Some(Either::Right(row)), None)
811 }
812 } else {
813 (None, None)
815 }
816 }
817 }
818 } else {
819 match op {
820 Op::Insert | Op::UpdateInsert => (None, Some(Either::Right(row))),
822 Op::Delete | Op::UpdateDelete => (Some(Either::Right(row)), None),
824 }
825 }
826 };
827 if row_to_delete_r.is_none() && row_to_insert_r.is_none() {
833 } else {
835 let prev_inequality_key = side_update
836 .ht
837 .upper_bound_by_inequality_with_jk_prefix(
838 &join_key,
839 Bound::Excluded(&inequal_key),
840 )
841 .await?
842 .map(|r| r.project(&inequal_key_idx_update));
843 let next_inequality_key = side_update
844 .ht
845 .lower_bound_by_inequality_with_jk_prefix(
846 &join_key,
847 Bound::Excluded(&inequal_key),
848 )
849 .await?
850 .map(|r| r.project(&inequal_key_idx_update));
851
852 let affected_inequality_key_r = match asof_desc.inequality_type {
853 AsOfInequalityType::Lt | AsOfInequalityType::Le => &next_inequality_key,
854 AsOfInequalityType::Gt | AsOfInequalityType::Ge => &prev_inequality_key,
855 };
856 let affected_row_r =
857 if let Some(affected_inequality_key_r) = affected_inequality_key_r {
858 side_update
859 .ht
860 .first_by_inequality_with_jk_prefix(
861 &join_key,
862 &affected_inequality_key_r,
863 )
864 .await?
865 } else {
866 None
867 }
868 .map(Either::Left);
869
870 let (row_to_delete_r, row_to_insert_r) =
871 match (&row_to_delete_r, &row_to_insert_r) {
872 (Some(_), Some(_)) => (row_to_delete_r, row_to_insert_r),
873 (None, Some(_)) => (affected_row_r, row_to_insert_r),
874 (Some(_), None) => (row_to_delete_r, affected_row_r),
875 (None, None) => unreachable!(),
876 };
877 let range = match asof_desc.inequality_type {
878 AsOfInequalityType::Lt => (
879 prev_inequality_key
880 .map(Either::Left)
881 .map_or_else(|| Bound::Unbounded, Bound::Included),
882 Bound::Excluded(Either::Right(&inequal_key)),
883 ),
884 AsOfInequalityType::Le => (
885 prev_inequality_key
886 .map(Either::Left)
887 .map_or_else(|| Bound::Unbounded, Bound::Excluded),
888 Bound::Included(Either::Right(&inequal_key)),
889 ),
890 AsOfInequalityType::Gt => (
891 Bound::Excluded(Either::Right(&inequal_key)),
892 next_inequality_key
893 .map(Either::Left)
894 .map_or_else(|| Bound::Unbounded, Bound::Included),
895 ),
896 AsOfInequalityType::Ge => (
897 Bound::Included(Either::Right(&inequal_key)),
898 next_inequality_key
899 .map(Either::Left)
900 .map_or_else(|| Bound::Unbounded, Bound::Excluded),
901 ),
902 };
903
904 let rows_l_stream = side_match
905 .ht
906 .range_by_inequality_with_jk_prefix(&join_key, &range)
907 .await?;
908 #[for_await]
909 for row_l in rows_l_stream {
910 let row_l = row_l?;
911 join_matched_rows_cnt += 1;
912 if let Some(row_to_delete_r) = &row_to_delete_r {
913 if let Some(chunk) =
914 join_chunk_builder.append_row(Op::Delete, row_to_delete_r, &row_l)
915 {
916 yield chunk;
917 }
918 } else if is_as_of_left_outer(T)
919 && let Some(chunk) =
920 join_chunk_builder.append_row_matched(Op::Delete, &row_l)
921 {
922 yield chunk;
923 }
924 if let Some(row_to_insert_r) = &row_to_insert_r {
925 if let Some(chunk) =
926 join_chunk_builder.append_row(Op::Insert, row_to_insert_r, &row_l)
927 {
928 yield chunk;
929 }
930 } else if is_as_of_left_outer(T)
931 && let Some(chunk) =
932 join_chunk_builder.append_row_matched(Op::Insert, &row_l)
933 {
934 yield chunk;
935 }
936 }
937 }
938
939 match op {
940 Op::Insert | Op::UpdateInsert => {
941 side_update.ht.insert(row)?;
942 }
943 Op::Delete | Op::UpdateDelete => {
944 side_update.ht.delete(row)?;
945 }
946 }
947 } else {
948 }
951 join_matched_join_keys.observe(join_matched_rows_cnt as _);
952 if join_matched_rows_cnt > high_join_amplification_threshold {
953 let join_key = row.project(&side_update.join_key_indices);
954 tracing::warn!(target: "high_join_amplification",
955 matched_rows_len = join_matched_rows_cnt,
956 update_table_id = %side_update.ht.table_id(),
957 match_table_id = %side_match.ht.table_id(),
958 join_key = ?join_key,
959 actor_id = %ctx.id,
960 fragment_id = %ctx.fragment_id,
961 "large rows matched for join key when AsOf join updating right side",
962 );
963 }
964 }
965 if let Some(chunk) = join_chunk_builder.take() {
966 yield chunk;
967 }
968 }
969}
970
971#[cfg(test)]
972mod tests {
973 use std::sync::atomic::AtomicU64;
974
975 use risingwave_common::array::*;
976 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
977 use risingwave_common::util::epoch::test_epoch;
978 use risingwave_common::util::sort_util::OrderType;
979 use risingwave_storage::memory::MemoryStateStore;
980
981 use super::*;
982 use crate::common::table::test_utils::gen_pbtable;
983 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
984
985 async fn create_in_memory_state_table(
986 mem_state: MemoryStateStore,
987 data_types: &[DataType],
988 order_types: &[OrderType],
989 pk_indices: &[usize],
990 table_id: u32,
991 ) -> StateTable<MemoryStateStore> {
992 let column_descs = data_types
993 .iter()
994 .enumerate()
995 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
996 .collect_vec();
997 StateTable::from_table_catalog(
998 &gen_pbtable(
999 TableId::new(table_id),
1000 column_descs,
1001 order_types.to_vec(),
1002 pk_indices.to_vec(),
1003 0,
1004 ),
1005 mem_state.clone(),
1006 None,
1007 )
1008 .await
1009 }
1010
1011 async fn create_executor<const T: AsOfJoinTypePrimitive>(
1012 asof_desc: AsOfDesc,
1013 use_cache: bool,
1014 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1015 let schema = Schema {
1016 fields: vec![
1017 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1019 Field::unnamed(DataType::Int64),
1020 ],
1021 };
1022 let (tx_l, source_l) = MockSource::channel();
1023 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1024 let (tx_r, source_r) = MockSource::channel();
1025 let source_r = source_r.into_executor(schema, vec![1]);
1026 let params_l = JoinParams::new(vec![0], vec![1]);
1027 let params_r = JoinParams::new(vec![0], vec![1]);
1028
1029 let mem_state = MemoryStateStore::new();
1030
1031 let state_l = create_in_memory_state_table(
1032 mem_state.clone(),
1033 &[DataType::Int64, DataType::Int64, DataType::Int64],
1034 &[
1035 OrderType::ascending(),
1036 OrderType::ascending(),
1037 OrderType::ascending(),
1038 ],
1039 &[0, asof_desc.left_idx, 1],
1040 0,
1041 )
1042 .await;
1043
1044 let state_r = create_in_memory_state_table(
1045 mem_state,
1046 &[DataType::Int64, DataType::Int64, DataType::Int64],
1047 &[
1048 OrderType::ascending(),
1049 OrderType::ascending(),
1050 OrderType::ascending(),
1051 ],
1052 &[0, asof_desc.right_idx, 1],
1053 1,
1054 )
1055 .await;
1056
1057 let schema: Schema = [source_l.schema().fields(), source_r.schema().fields()]
1058 .concat()
1059 .into_iter()
1060 .collect();
1061 let schema_len = schema.len();
1062 let info = ExecutorInfo::for_test(schema, vec![1], "AsOfJoinExecutor".to_owned(), 0);
1063
1064 let executor = AsOfJoinExecutor::<MemoryStateStore, T, AsOfCpuEncoding>::new(
1065 ActorContext::for_test(123),
1066 info,
1067 source_l,
1068 source_r,
1069 params_l,
1070 params_r,
1071 vec![false],
1072 (0..schema_len).collect_vec(),
1073 state_l,
1074 state_r,
1075 Arc::new(AtomicU64::new(0)),
1076 Arc::new(StreamingMetrics::unused()),
1077 1024,
1078 asof_desc,
1079 use_cache,
1080 2048, );
1082 (tx_l, tx_r, executor.boxed().execute())
1083 }
1084
1085 #[tokio::test]
1086 async fn test_asof_inner_join() -> StreamExecutorResult<()> {
1087 test_asof_inner_join_impl(false).await
1088 }
1089
1090 #[tokio::test]
1091 async fn test_asof_inner_join_with_cache() -> StreamExecutorResult<()> {
1092 test_asof_inner_join_impl(true).await
1093 }
1094
1095 async fn test_asof_inner_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1096 let asof_desc = AsOfDesc {
1097 left_idx: 0,
1098 right_idx: 2,
1099 inequality_type: AsOfInequalityType::Lt,
1100 };
1101
1102 let chunk_l1 = StreamChunk::from_pretty(
1103 " I I I
1104 + 1 4 7
1105 + 2 5 8
1106 + 3 6 9",
1107 );
1108 let chunk_l2 = StreamChunk::from_pretty(
1109 " I I I
1110 + 3 8 1
1111 - 3 8 1",
1112 );
1113 let chunk_r1 = StreamChunk::from_pretty(
1114 " I I I
1115 + 2 1 7
1116 + 2 2 1
1117 + 2 3 4
1118 + 2 4 2
1119 + 6 1 9
1120 + 6 2 9",
1121 );
1122 let chunk_r2 = StreamChunk::from_pretty(
1123 " I I I
1124 - 2 3 4",
1125 );
1126 let chunk_r3 = StreamChunk::from_pretty(
1127 " I I I
1128 + 2 3 3",
1129 );
1130 let chunk_l3 = StreamChunk::from_pretty(
1131 " I I I
1132 - 2 5 8",
1133 );
1134 let chunk_l4 = StreamChunk::from_pretty(
1135 " I I I
1136 + 6 3 1
1137 + 6 4 1",
1138 );
1139 let chunk_r4 = StreamChunk::from_pretty(
1140 " I I I
1141 - 6 1 9",
1142 );
1143
1144 let (mut tx_l, mut tx_r, mut hash_join) =
1145 create_executor::<{ AsOfJoinType::Inner }>(asof_desc, use_cache).await;
1146
1147 tx_l.push_barrier(test_epoch(1), false);
1149 tx_r.push_barrier(test_epoch(1), false);
1150 hash_join.next_unwrap_ready_barrier()?;
1151
1152 tx_l.push_chunk(chunk_l1);
1154 hash_join.next_unwrap_pending();
1155
1156 tx_l.push_barrier(test_epoch(2), false);
1158 tx_r.push_barrier(test_epoch(2), false);
1159 hash_join.next_unwrap_ready_barrier()?;
1160
1161 tx_l.push_chunk(chunk_l2);
1163 hash_join.next_unwrap_pending();
1164
1165 tx_r.push_chunk(chunk_r1);
1167 let chunk = hash_join.next_unwrap_ready_chunk()?;
1168 assert_eq!(
1169 chunk,
1170 StreamChunk::from_pretty(
1171 " I I I I I I
1172 + 2 5 8 2 1 7
1173 - 2 5 8 2 1 7
1174 + 2 5 8 2 3 4"
1175 )
1176 );
1177
1178 tx_r.push_chunk(chunk_r2);
1180 let chunk = hash_join.next_unwrap_ready_chunk()?;
1181 assert_eq!(
1182 chunk,
1183 StreamChunk::from_pretty(
1184 " I I I I I I
1185 - 2 5 8 2 3 4
1186 + 2 5 8 2 1 7"
1187 )
1188 );
1189
1190 tx_r.push_chunk(chunk_r3);
1192 let chunk = hash_join.next_unwrap_ready_chunk()?;
1193 assert_eq!(
1194 chunk,
1195 StreamChunk::from_pretty(
1196 " I I I I I I
1197 - 2 5 8 2 1 7
1198 + 2 5 8 2 3 3"
1199 )
1200 );
1201
1202 tx_l.push_chunk(chunk_l3);
1204 let chunk = hash_join.next_unwrap_ready_chunk()?;
1205 assert_eq!(
1206 chunk,
1207 StreamChunk::from_pretty(
1208 " I I I I I I
1209 - 2 5 8 2 3 3"
1210 )
1211 );
1212
1213 tx_l.push_chunk(chunk_l4);
1215 let chunk = hash_join.next_unwrap_ready_chunk()?;
1216 assert_eq!(
1217 chunk,
1218 StreamChunk::from_pretty(
1219 " I I I I I I
1220 + 6 3 1 6 1 9
1221 + 6 4 1 6 1 9"
1222 )
1223 );
1224
1225 tx_r.push_chunk(chunk_r4);
1227 let chunk = hash_join.next_unwrap_ready_chunk()?;
1228 assert_eq!(
1229 chunk,
1230 StreamChunk::from_pretty(
1231 " I I I I I I
1232 - 6 3 1 6 1 9
1233 + 6 3 1 6 2 9
1234 - 6 4 1 6 1 9
1235 + 6 4 1 6 2 9"
1236 )
1237 );
1238
1239 Ok(())
1240 }
1241
1242 #[tokio::test]
1243 async fn test_asof_left_outer_join() -> StreamExecutorResult<()> {
1244 test_asof_left_outer_join_impl(false).await
1245 }
1246
1247 #[tokio::test]
1248 async fn test_asof_left_outer_join_with_cache() -> StreamExecutorResult<()> {
1249 test_asof_left_outer_join_impl(true).await
1250 }
1251
1252 async fn test_asof_left_outer_join_impl(use_cache: bool) -> StreamExecutorResult<()> {
1253 let asof_desc = AsOfDesc {
1254 left_idx: 1,
1255 right_idx: 2,
1256 inequality_type: AsOfInequalityType::Ge,
1257 };
1258
1259 let chunk_l1 = StreamChunk::from_pretty(
1260 " I I I
1261 + 1 4 7
1262 + 2 5 8
1263 + 3 6 9",
1264 );
1265 let chunk_l2 = StreamChunk::from_pretty(
1266 " I I I
1267 + 3 8 1
1268 - 3 8 1",
1269 );
1270 let chunk_r1 = StreamChunk::from_pretty(
1271 " I I I
1272 + 2 3 4
1273 + 2 2 5
1274 + 2 1 5
1275 + 6 1 8
1276 + 6 2 9",
1277 );
1278 let chunk_r2 = StreamChunk::from_pretty(
1279 " I I I
1280 - 2 3 4
1281 - 2 1 5
1282 - 2 2 5",
1283 );
1284 let chunk_l3 = StreamChunk::from_pretty(
1285 " I I I
1286 + 6 8 9",
1287 );
1288 let chunk_r3 = StreamChunk::from_pretty(
1289 " I I I
1290 - 6 1 8",
1291 );
1292
1293 let (mut tx_l, mut tx_r, mut hash_join) =
1294 create_executor::<{ AsOfJoinType::LeftOuter }>(asof_desc, use_cache).await;
1295
1296 tx_l.push_barrier(test_epoch(1), false);
1298 tx_r.push_barrier(test_epoch(1), false);
1299 hash_join.next_unwrap_ready_barrier()?;
1300
1301 tx_l.push_chunk(chunk_l1);
1303 let chunk = hash_join.next_unwrap_ready_chunk()?;
1304 assert_eq!(
1305 chunk,
1306 StreamChunk::from_pretty(
1307 " I I I I I I
1308 + 1 4 7 . . .
1309 + 2 5 8 . . .
1310 + 3 6 9 . . ."
1311 )
1312 );
1313
1314 tx_l.push_barrier(test_epoch(2), false);
1316 tx_r.push_barrier(test_epoch(2), false);
1317 hash_join.next_unwrap_ready_barrier()?;
1318
1319 tx_l.push_chunk(chunk_l2);
1321 let chunk = hash_join.next_unwrap_ready_chunk()?;
1322 assert_eq!(
1323 chunk,
1324 StreamChunk::from_pretty(
1325 " I I I I I I
1326 + 3 8 1 . . . D
1327 - 3 8 1 . . . D"
1328 )
1329 );
1330
1331 tx_r.push_chunk(chunk_r1);
1333 let chunk = hash_join.next_unwrap_ready_chunk()?;
1334 assert_eq!(
1335 chunk,
1336 StreamChunk::from_pretty(
1337 " I I I I I I
1338 - 2 5 8 . . .
1339 + 2 5 8 2 3 4
1340 - 2 5 8 2 3 4
1341 + 2 5 8 2 2 5
1342 - 2 5 8 2 2 5
1343 + 2 5 8 2 1 5"
1344 )
1345 );
1346
1347 tx_r.push_chunk(chunk_r2);
1349 let chunk = hash_join.next_unwrap_ready_chunk()?;
1350 assert_eq!(
1351 chunk,
1352 StreamChunk::from_pretty(
1353 " I I I I I I
1354 - 2 5 8 2 1 5
1355 + 2 5 8 2 2 5
1356 - 2 5 8 2 2 5
1357 + 2 5 8 . . ."
1358 )
1359 );
1360
1361 tx_l.push_chunk(chunk_l3);
1363 let chunk = hash_join.next_unwrap_ready_chunk()?;
1364 assert_eq!(
1365 chunk,
1366 StreamChunk::from_pretty(
1367 " I I I I I I
1368 + 6 8 9 6 1 8"
1369 )
1370 );
1371
1372 tx_r.push_chunk(chunk_r3);
1374 let chunk = hash_join.next_unwrap_ready_chunk()?;
1375 assert_eq!(
1376 chunk,
1377 StreamChunk::from_pretty(
1378 " I I I I I I
1379 - 6 8 9 6 1 8
1380 + 6 8 9 . . ."
1381 )
1382 );
1383 Ok(())
1384 }
1385}