1use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::metrics::LabelGuardedHistogram;
27use risingwave_common::row::RowExt;
28use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
29use risingwave_common::util::epoch::EpochPair;
30use risingwave_common::util::iter_util::ZipEqDebug;
31use risingwave_expr::expr::NonStrictExpression;
32use risingwave_pb::stream_plan::InequalityType;
33use tokio::time::Instant;
34
35use self::builder::JoinChunkBuilder;
36use super::barrier_align::*;
37use super::join::hash_join::*;
38use super::join::row::{JoinEncoding, JoinRow};
39use super::join::*;
40use super::watermark::*;
41use crate::executor::CachedJoinRow;
42use crate::executor::join::builder::JoinStreamChunkBuilder;
43use crate::executor::join::hash_join::CacheResult;
44use crate::executor::prelude::*;
45
46fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
47 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
48}
49
50#[derive(Debug, Clone)]
53pub struct InequalityPairInfo {
54 pub left_idx: usize,
56 pub right_idx: usize,
58 pub clean_left_state: bool,
60 pub clean_right_state: bool,
62 pub op: InequalityType,
64}
65
66impl InequalityPairInfo {
67 pub fn left_side_is_larger(&self) -> bool {
69 matches!(
70 self.op,
71 InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
72 )
73 }
74}
75
76pub struct JoinParams {
77 pub join_key_indices: Vec<usize>,
79 pub deduped_pk_indices: Vec<usize>,
81}
82
83impl JoinParams {
84 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
85 Self {
86 join_key_indices,
87 deduped_pk_indices,
88 }
89 }
90}
91
92struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
93 ht: JoinHashMap<K, S, E>,
95 join_key_indices: Vec<usize>,
97 all_data_types: Vec<DataType>,
99 start_pos: usize,
101 i2o_mapping: Vec<(usize, usize)>,
103 i2o_mapping_indexed: MultiMap<usize, usize>,
104 input2inequality_index: Vec<Vec<(usize, bool)>>,
112 non_null_fields: Vec<usize>,
114 state_clean_columns: Vec<(usize, usize)>,
117 need_degree_table: bool,
119 _marker: std::marker::PhantomData<E>,
120}
121
122impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 f.debug_struct("JoinSide")
125 .field("join_key_indices", &self.join_key_indices)
126 .field("col_types", &self.all_data_types)
127 .field("start_pos", &self.start_pos)
128 .field("i2o_mapping", &self.i2o_mapping)
129 .field("need_degree_table", &self.need_degree_table)
130 .finish()
131 }
132}
133
134impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
135 fn is_dirty(&self) -> bool {
137 unimplemented!()
138 }
139
140 #[expect(dead_code)]
141 fn clear_cache(&mut self) {
142 assert!(
143 !self.is_dirty(),
144 "cannot clear cache while states of hash join are dirty"
145 );
146
147 }
150
151 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
152 self.ht.init(epoch).await
153 }
154}
155
156pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
159{
160 ctx: ActorContextRef,
161 info: ExecutorInfo,
162
163 input_l: Option<Executor>,
165 input_r: Option<Executor>,
167 actual_output_data_types: Vec<DataType>,
169 side_l: JoinSide<K, S, E>,
171 side_r: JoinSide<K, S, E>,
173 cond: Option<NonStrictExpression>,
175 inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
178 inequality_watermarks: Vec<Option<Watermark>>,
182 watermark_indices_in_jk: Vec<(usize, bool)>,
185
186 append_only_optimize: bool,
188
189 metrics: Arc<StreamingMetrics>,
190 chunk_size: usize,
192 cnt_rows_received: u32,
194
195 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
197
198 high_join_amplification_threshold: usize,
200
201 entry_state_max_rows: usize,
203 join_cache_evict_interval_rows: u32,
205}
206
207impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
208 for HashJoinExecutor<K, S, T, E>
209{
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("HashJoinExecutor")
212 .field("join_type", &T)
213 .field("input_left", &self.input_l.as_ref().unwrap().identity())
214 .field("input_right", &self.input_r.as_ref().unwrap().identity())
215 .field("side_l", &self.side_l)
216 .field("side_r", &self.side_r)
217 .field("stream_key", &self.info.stream_key)
218 .field("schema", &self.info.schema)
219 .field("actual_output_data_types", &self.actual_output_data_types)
220 .field(
221 "join_cache_evict_interval_rows",
222 &self.join_cache_evict_interval_rows,
223 )
224 .finish()
225 }
226}
227
228impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
229 for HashJoinExecutor<K, S, T, E>
230{
231 fn execute(self: Box<Self>) -> BoxedMessageStream {
232 self.into_stream().boxed()
233 }
234}
235
236struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
237 ctx: &'a ActorContextRef,
238 side_l: &'a mut JoinSide<K, S, E>,
239 side_r: &'a mut JoinSide<K, S, E>,
240 actual_output_data_types: &'a [DataType],
241 cond: &'a mut Option<NonStrictExpression>,
242 inequality_watermarks: &'a [Option<Watermark>],
243 chunk: StreamChunk,
244 append_only_optimize: bool,
245 chunk_size: usize,
246 cnt_rows_received: &'a mut u32,
247 high_join_amplification_threshold: usize,
248 entry_state_max_rows: usize,
249 join_cache_evict_interval_rows: u32,
250 join_matched_join_keys: &'a LabelGuardedHistogram,
251}
252
253impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
254 HashJoinExecutor<K, S, T, E>
255{
256 #[expect(clippy::too_many_arguments)]
257 pub fn new(
258 ctx: ActorContextRef,
259 info: ExecutorInfo,
260 input_l: Executor,
261 input_r: Executor,
262 params_l: JoinParams,
263 params_r: JoinParams,
264 null_safe: Vec<bool>,
265 output_indices: Vec<usize>,
266 cond: Option<NonStrictExpression>,
267 inequality_pairs: Vec<InequalityPairInfo>,
268 state_table_l: StateTable<S>,
269 degree_state_table_l: StateTable<S>,
270 state_table_r: StateTable<S>,
271 degree_state_table_r: StateTable<S>,
272 watermark_epoch: AtomicU64Ref,
273 is_append_only: bool,
274 metrics: Arc<StreamingMetrics>,
275 chunk_size: usize,
276 high_join_amplification_threshold: usize,
277 watermark_indices_in_jk: Vec<(usize, bool)>,
278 ) -> Self {
279 Self::new_with_cache_size(
280 ctx,
281 info,
282 input_l,
283 input_r,
284 params_l,
285 params_r,
286 null_safe,
287 output_indices,
288 cond,
289 inequality_pairs,
290 state_table_l,
291 degree_state_table_l,
292 state_table_r,
293 degree_state_table_r,
294 watermark_epoch,
295 is_append_only,
296 metrics,
297 chunk_size,
298 high_join_amplification_threshold,
299 None,
300 watermark_indices_in_jk,
301 )
302 }
303
304 #[expect(clippy::too_many_arguments)]
305 pub fn new_with_cache_size(
306 ctx: ActorContextRef,
307 info: ExecutorInfo,
308 input_l: Executor,
309 input_r: Executor,
310 params_l: JoinParams,
311 params_r: JoinParams,
312 null_safe: Vec<bool>,
313 output_indices: Vec<usize>,
314 cond: Option<NonStrictExpression>,
315 inequality_pairs: Vec<InequalityPairInfo>,
316 state_table_l: StateTable<S>,
317 degree_state_table_l: StateTable<S>,
318 state_table_r: StateTable<S>,
319 degree_state_table_r: StateTable<S>,
320 watermark_epoch: AtomicU64Ref,
321 is_append_only: bool,
322 metrics: Arc<StreamingMetrics>,
323 chunk_size: usize,
324 high_join_amplification_threshold: usize,
325 entry_state_max_rows: Option<usize>,
326 watermark_indices_in_jk: Vec<(usize, bool)>,
327 ) -> Self {
328 let entry_state_max_rows = match entry_state_max_rows {
329 None => ctx.config.developer.hash_join_entry_state_max_rows,
330 Some(entry_state_max_rows) => entry_state_max_rows,
331 };
332 let join_cache_evict_interval_rows = ctx
333 .config
334 .developer
335 .join_hash_map_evict_interval_rows
336 .max(1);
337 let side_l_column_n = input_l.schema().len();
338
339 let schema_fields = match T {
340 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
341 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
342 _ => [
343 input_l.schema().fields.clone(),
344 input_r.schema().fields.clone(),
345 ]
346 .concat(),
347 };
348
349 let original_output_data_types = schema_fields
350 .iter()
351 .map(|field| field.data_type())
352 .collect_vec();
353 let actual_output_data_types = output_indices
354 .iter()
355 .map(|&idx| original_output_data_types[idx].clone())
356 .collect_vec();
357
358 let state_all_data_types_l = input_l.schema().data_types();
360 let state_all_data_types_r = input_r.schema().data_types();
361
362 let state_pk_indices_l = input_l.stream_key().to_vec();
363 let state_pk_indices_r = input_r.stream_key().to_vec();
364
365 let state_join_key_indices_l = params_l.join_key_indices;
366 let state_join_key_indices_r = params_r.join_key_indices;
367
368 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
369 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
370
371 let degree_pk_indices_l = (state_join_key_indices_l.len()
372 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
373 .collect_vec();
374 let degree_pk_indices_r = (state_join_key_indices_r.len()
375 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
376 .collect_vec();
377
378 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
380 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
381
382 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
384
385 let join_key_data_types_l = state_join_key_indices_l
386 .iter()
387 .map(|idx| state_all_data_types_l[*idx].clone())
388 .collect_vec();
389
390 let join_key_data_types_r = state_join_key_indices_r
391 .iter()
392 .map(|idx| state_all_data_types_r[*idx].clone())
393 .collect_vec();
394
395 assert_eq!(join_key_data_types_l, join_key_data_types_r);
396
397 let null_matched = K::Bitmap::from_bool_vec(null_safe);
398
399 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
400 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
401
402 let (left_to_output, right_to_output) = {
403 let (left_len, right_len) = if is_left_semi_or_anti(T) {
404 (state_all_data_types_l.len(), 0usize)
405 } else if is_right_semi_or_anti(T) {
406 (0usize, state_all_data_types_r.len())
407 } else {
408 (state_all_data_types_l.len(), state_all_data_types_r.len())
409 };
410 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
411 };
412
413 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
414 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
415
416 let left_input_len = input_l.schema().len();
417 let right_input_len = input_r.schema().len();
418 let mut l2inequality_index = vec![vec![]; left_input_len];
419 let mut r2inequality_index = vec![vec![]; right_input_len];
420 let mut l_inequal_state_clean_columns = vec![];
421 let mut r_inequal_state_clean_columns = vec![];
422 let inequality_pairs = inequality_pairs
423 .into_iter()
424 .enumerate()
425 .map(|(index, pair)| {
426 let left_is_larger = pair.left_side_is_larger();
432 l2inequality_index[pair.left_idx].push((index, !left_is_larger));
433 r2inequality_index[pair.right_idx].push((index, left_is_larger));
434
435 if pair.clean_left_state {
437 l_inequal_state_clean_columns.push((pair.left_idx, index));
438 }
439 if pair.clean_right_state {
440 r_inequal_state_clean_columns.push((pair.right_idx, index));
441 }
442
443 let output_indices = if pair.left_side_is_larger() {
446 l2o_indexed
448 .get_vec(&pair.left_idx)
449 .cloned()
450 .unwrap_or_default()
451 } else {
452 r2o_indexed
454 .get_vec(&pair.right_idx)
455 .cloned()
456 .unwrap_or_default()
457 };
458
459 (output_indices, pair)
460 })
461 .collect_vec();
462
463 let mut l_non_null_fields = l2inequality_index
464 .iter()
465 .positions(|inequalities| !inequalities.is_empty())
466 .collect_vec();
467 let mut r_non_null_fields = r2inequality_index
468 .iter()
469 .positions(|inequalities| !inequalities.is_empty())
470 .collect_vec();
471
472 if append_only_optimize {
473 l_inequal_state_clean_columns.clear();
474 r_inequal_state_clean_columns.clear();
475 l_non_null_fields.clear();
476 r_non_null_fields.clear();
477 }
478
479 let l_inequality_idx = l_inequal_state_clean_columns
483 .first()
484 .map(|(col_idx, _)| *col_idx);
485 let r_inequality_idx = r_inequal_state_clean_columns
486 .first()
487 .map(|(col_idx, _)| *col_idx);
488
489 let degree_state_l = need_degree_table_l.then(|| {
490 TableInner::new(
491 degree_pk_indices_l,
492 degree_join_key_indices_l,
493 degree_state_table_l,
494 l_inequality_idx,
495 )
496 });
497 let degree_state_r = need_degree_table_r.then(|| {
498 TableInner::new(
499 degree_pk_indices_r,
500 degree_join_key_indices_r,
501 degree_state_table_r,
502 r_inequality_idx,
503 )
504 });
505
506 let inequality_watermarks = vec![None; inequality_pairs.len()];
507 let watermark_buffers = BTreeMap::new();
508 Self {
509 ctx: ctx.clone(),
510 info,
511 input_l: Some(input_l),
512 input_r: Some(input_r),
513 actual_output_data_types,
514 side_l: JoinSide {
515 ht: JoinHashMap::new(
516 watermark_epoch.clone(),
517 join_key_data_types_l,
518 state_join_key_indices_l.clone(),
519 state_all_data_types_l.clone(),
520 state_table_l,
521 params_l.deduped_pk_indices,
522 degree_state_l,
523 null_matched.clone(),
524 pk_contained_in_jk_l,
525 metrics.clone(),
526 ctx.id,
527 ctx.fragment_id,
528 "left",
529 ),
530 join_key_indices: state_join_key_indices_l,
531 all_data_types: state_all_data_types_l,
532 i2o_mapping: left_to_output,
533 i2o_mapping_indexed: l2o_indexed,
534 input2inequality_index: l2inequality_index,
535 non_null_fields: l_non_null_fields,
536 state_clean_columns: l_inequal_state_clean_columns,
537 start_pos: 0,
538 need_degree_table: need_degree_table_l,
539 _marker: PhantomData,
540 },
541 side_r: JoinSide {
542 ht: JoinHashMap::new(
543 watermark_epoch,
544 join_key_data_types_r,
545 state_join_key_indices_r.clone(),
546 state_all_data_types_r.clone(),
547 state_table_r,
548 params_r.deduped_pk_indices,
549 degree_state_r,
550 null_matched,
551 pk_contained_in_jk_r,
552 metrics.clone(),
553 ctx.id,
554 ctx.fragment_id,
555 "right",
556 ),
557 join_key_indices: state_join_key_indices_r,
558 all_data_types: state_all_data_types_r,
559 start_pos: side_l_column_n,
560 i2o_mapping: right_to_output,
561 i2o_mapping_indexed: r2o_indexed,
562 input2inequality_index: r2inequality_index,
563 non_null_fields: r_non_null_fields,
564 state_clean_columns: r_inequal_state_clean_columns,
565 need_degree_table: need_degree_table_r,
566 _marker: PhantomData,
567 },
568 cond,
569 inequality_pairs,
570 inequality_watermarks,
571 watermark_indices_in_jk,
572 append_only_optimize,
573 metrics,
574 chunk_size,
575 cnt_rows_received: 0,
576 watermark_buffers,
577 high_join_amplification_threshold,
578 entry_state_max_rows,
579 join_cache_evict_interval_rows,
580 }
581 }
582
583 #[try_stream(ok = Message, error = StreamExecutorError)]
584 async fn into_stream(mut self) {
585 let input_l = self.input_l.take().unwrap();
586 let input_r = self.input_r.take().unwrap();
587 let aligned_stream = barrier_align(
588 input_l.execute(),
589 input_r.execute(),
590 self.ctx.id,
591 self.ctx.fragment_id,
592 self.metrics.clone(),
593 "Join",
594 );
595 pin_mut!(aligned_stream);
596
597 let actor_id = self.ctx.id;
598
599 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
600 let first_epoch = barrier.epoch;
601 yield Message::Barrier(barrier);
603 self.side_l.init(first_epoch).await?;
604 self.side_r.init(first_epoch).await?;
605
606 let actor_id_str = self.ctx.id.to_string();
607 let fragment_id_str = self.ctx.fragment_id.to_string();
608
609 let join_actor_input_waiting_duration_ns = self
611 .metrics
612 .join_actor_input_waiting_duration_ns
613 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
614 let left_join_match_duration_ns = self
615 .metrics
616 .join_match_duration_ns
617 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
618 let right_join_match_duration_ns = self
619 .metrics
620 .join_match_duration_ns
621 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
622
623 let barrier_join_match_duration_ns = self
624 .metrics
625 .join_match_duration_ns
626 .with_guarded_label_values(&[
627 actor_id_str.as_str(),
628 fragment_id_str.as_str(),
629 "barrier",
630 ]);
631
632 let left_join_cached_entry_count = self
633 .metrics
634 .join_cached_entry_count
635 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
636
637 let right_join_cached_entry_count = self
638 .metrics
639 .join_cached_entry_count
640 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
641
642 let left_table_id_str = self.side_l.ht.table_id().to_string();
644 let right_table_id_str = self.side_r.ht.table_id().to_string();
645 let left_join_matched_join_keys = self
646 .metrics
647 .join_matched_join_keys
648 .with_guarded_label_values(&[
649 actor_id_str.as_str(),
650 fragment_id_str.as_str(),
651 left_table_id_str.as_str(),
652 ]);
653 let right_join_matched_join_keys = self
654 .metrics
655 .join_matched_join_keys
656 .with_guarded_label_values(&[
657 actor_id_str.as_str(),
658 fragment_id_str.as_str(),
659 right_table_id_str.as_str(),
660 ]);
661
662 let mut start_time = Instant::now();
663
664 while let Some(msg) = aligned_stream
665 .next()
666 .instrument_await("hash_join_barrier_align")
667 .await
668 {
669 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
670 match msg? {
671 AlignedMessage::WatermarkLeft(watermark) => {
672 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
673 yield Message::Watermark(watermark_to_emit);
674 }
675 }
676 AlignedMessage::WatermarkRight(watermark) => {
677 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
678 yield Message::Watermark(watermark_to_emit);
679 }
680 }
681 AlignedMessage::Left(chunk) => {
682 let mut left_time = Duration::from_nanos(0);
683 let mut left_start_time = Instant::now();
684 #[for_await]
685 for chunk in Self::eq_join_left(EqJoinArgs {
686 ctx: &self.ctx,
687 side_l: &mut self.side_l,
688 side_r: &mut self.side_r,
689 actual_output_data_types: &self.actual_output_data_types,
690 cond: &mut self.cond,
691 inequality_watermarks: &self.inequality_watermarks,
692 chunk,
693 append_only_optimize: self.append_only_optimize,
694 chunk_size: self.chunk_size,
695 cnt_rows_received: &mut self.cnt_rows_received,
696 high_join_amplification_threshold: self.high_join_amplification_threshold,
697 entry_state_max_rows: self.entry_state_max_rows,
698 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
699 join_matched_join_keys: &left_join_matched_join_keys,
700 }) {
701 left_time += left_start_time.elapsed();
702 yield Message::Chunk(chunk?);
703 left_start_time = Instant::now();
704 }
705 left_time += left_start_time.elapsed();
706 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
707 self.try_flush_data().await?;
708 }
709 AlignedMessage::Right(chunk) => {
710 let mut right_time = Duration::from_nanos(0);
711 let mut right_start_time = Instant::now();
712 #[for_await]
713 for chunk in Self::eq_join_right(EqJoinArgs {
714 ctx: &self.ctx,
715 side_l: &mut self.side_l,
716 side_r: &mut self.side_r,
717 actual_output_data_types: &self.actual_output_data_types,
718 cond: &mut self.cond,
719 inequality_watermarks: &self.inequality_watermarks,
720 chunk,
721 append_only_optimize: self.append_only_optimize,
722 chunk_size: self.chunk_size,
723 cnt_rows_received: &mut self.cnt_rows_received,
724 high_join_amplification_threshold: self.high_join_amplification_threshold,
725 entry_state_max_rows: self.entry_state_max_rows,
726 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
727 join_matched_join_keys: &right_join_matched_join_keys,
728 }) {
729 right_time += right_start_time.elapsed();
730 yield Message::Chunk(chunk?);
731 right_start_time = Instant::now();
732 }
733 right_time += right_start_time.elapsed();
734 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
735 self.try_flush_data().await?;
736 }
737 AlignedMessage::Barrier(barrier) => {
738 let barrier_start_time = Instant::now();
739 let (left_post_commit, right_post_commit) =
740 self.flush_data(barrier.epoch).await?;
741
742 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
743
744 barrier_join_match_duration_ns
747 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
748 yield Message::Barrier(barrier);
749
750 right_post_commit
752 .post_yield_barrier(update_vnode_bitmap.clone())
753 .await?;
754 if left_post_commit
755 .post_yield_barrier(update_vnode_bitmap)
756 .await?
757 .unwrap_or(false)
758 {
759 self.watermark_buffers
760 .values_mut()
761 .for_each(|buffers| buffers.clear());
762 self.inequality_watermarks.fill(None);
763 }
764
765 for (join_cached_entry_count, ht) in [
767 (&left_join_cached_entry_count, &self.side_l.ht),
768 (&right_join_cached_entry_count, &self.side_r.ht),
769 ] {
770 join_cached_entry_count.set(ht.entry_count() as i64);
771 }
772 }
773 }
774 start_time = Instant::now();
775 }
776 }
777
778 async fn flush_data(
779 &mut self,
780 epoch: EpochPair,
781 ) -> StreamExecutorResult<(
782 JoinHashMapPostCommit<'_, K, S, E>,
783 JoinHashMapPostCommit<'_, K, S, E>,
784 )> {
785 let left = self.side_l.ht.flush(epoch).await?;
788 let right = self.side_r.ht.flush(epoch).await?;
789 Ok((left, right))
790 }
791
792 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
793 self.side_l.ht.try_flush().await?;
796 self.side_r.ht.try_flush().await?;
797 Ok(())
798 }
799
800 fn evict_cache(
802 side_update: &mut JoinSide<K, S, E>,
803 side_match: &mut JoinSide<K, S, E>,
804 cnt_rows_received: &mut u32,
805 join_cache_evict_interval_rows: u32,
806 ) {
807 *cnt_rows_received += 1;
808 if *cnt_rows_received >= join_cache_evict_interval_rows {
809 side_update.ht.evict();
810 side_match.ht.evict();
811 *cnt_rows_received = 0;
812 }
813 }
814
815 fn handle_watermark(
816 &mut self,
817 side: SideTypePrimitive,
818 watermark: Watermark,
819 ) -> StreamExecutorResult<Vec<Watermark>> {
820 let (side_update, side_match) = if side == SideType::Left {
821 (&mut self.side_l, &mut self.side_r)
822 } else {
823 (&mut self.side_r, &mut self.side_l)
824 };
825
826 let wm_in_jk = side_update
828 .join_key_indices
829 .iter()
830 .positions(|idx| *idx == watermark.col_idx);
831 let mut watermarks_to_emit = vec![];
832 for idx in wm_in_jk {
833 let buffers = self
834 .watermark_buffers
835 .entry(idx)
836 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
837 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
838 if self
839 .watermark_indices_in_jk
840 .iter()
841 .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
842 {
843 side_match
844 .ht
845 .update_watermark(selected_watermark.val.clone());
846 side_update
847 .ht
848 .update_watermark(selected_watermark.val.clone());
849 }
850
851 let empty_indices = vec![];
852 let output_indices = side_update
853 .i2o_mapping_indexed
854 .get_vec(&side_update.join_key_indices[idx])
855 .unwrap_or(&empty_indices)
856 .iter()
857 .chain(
858 side_match
859 .i2o_mapping_indexed
860 .get_vec(&side_match.join_key_indices[idx])
861 .unwrap_or(&empty_indices),
862 );
863 for output_idx in output_indices {
864 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
865 }
866 };
867 }
868
869 let mut update_left_watermark = None;
874 let mut update_right_watermark = None;
875 if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
876 for (inequality_index, _) in watermark_indices {
877 let buffers = self
878 .watermark_buffers
879 .entry(side_update.join_key_indices.len() + inequality_index)
880 .or_insert_with(|| {
881 BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
882 });
883 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
884 {
885 let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
886 let left_is_larger = pair_info.left_side_is_larger();
887
888 for output_idx in output_indices {
890 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
891 }
892 self.inequality_watermarks[*inequality_index] =
894 Some(selected_watermark.clone());
895
896 if left_is_larger && pair_info.clean_left_state {
898 update_left_watermark = Some(selected_watermark.val.clone());
899 } else if !left_is_larger && pair_info.clean_right_state {
900 update_right_watermark = Some(selected_watermark.val.clone());
901 }
902 }
903 }
904 if let Some(val) = update_left_watermark {
908 self.side_l.ht.update_watermark(val);
909 }
910 if let Some(val) = update_right_watermark {
911 self.side_r.ht.update_watermark(val);
912 }
913 }
914 Ok(watermarks_to_emit)
915 }
916
917 fn row_concat(
918 row_update: impl Row,
919 update_start_pos: usize,
920 row_matched: impl Row,
921 matched_start_pos: usize,
922 ) -> OwnedRow {
923 let mut new_row = vec![None; row_update.len() + row_matched.len()];
924
925 for (i, datum_ref) in row_update.iter().enumerate() {
926 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
927 }
928 for (i, datum_ref) in row_matched.iter().enumerate() {
929 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
930 }
931 OwnedRow::new(new_row)
932 }
933
934 fn eq_join_left(
936 args: EqJoinArgs<'_, K, S, E>,
937 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
938 Self::eq_join_oneside::<{ SideType::Left }>(args)
939 }
940
941 fn eq_join_right(
943 args: EqJoinArgs<'_, K, S, E>,
944 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
945 Self::eq_join_oneside::<{ SideType::Right }>(args)
946 }
947
948 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
949 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
950 let EqJoinArgs {
951 ctx,
952 side_l,
953 side_r,
954 actual_output_data_types,
955 cond,
956 inequality_watermarks,
957 chunk,
958 append_only_optimize,
959 chunk_size,
960 cnt_rows_received,
961 high_join_amplification_threshold,
962 entry_state_max_rows,
963 join_cache_evict_interval_rows,
964 join_matched_join_keys,
965 ..
966 } = args;
967
968 let (side_update, side_match) = if SIDE == SideType::Left {
969 (side_l, side_r)
970 } else {
971 (side_r, side_l)
972 };
973
974 let useful_state_clean_columns = side_match
975 .state_clean_columns
976 .iter()
977 .filter_map(|(column_idx, inequality_index)| {
978 inequality_watermarks[*inequality_index]
979 .as_ref()
980 .map(|watermark| (*column_idx, watermark))
981 })
982 .collect_vec();
983
984 let mut hashjoin_chunk_builder =
985 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
986 chunk_size,
987 actual_output_data_types.to_vec(),
988 side_update.i2o_mapping.clone(),
989 side_match.i2o_mapping.clone(),
990 ));
991
992 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
993 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
994 let Some((op, row)) = r else {
995 continue;
996 };
997 Self::evict_cache(
998 side_update,
999 side_match,
1000 cnt_rows_received,
1001 join_cache_evict_interval_rows,
1002 );
1003
1004 let cache_lookup_result = {
1005 let probe_non_null_requirement_satisfied = side_update
1006 .non_null_fields
1007 .iter()
1008 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
1009 let build_non_null_requirement_satisfied =
1010 key.null_bitmap().is_subset(side_match.ht.null_matched());
1011 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
1012 side_match.ht.take_state_opt(key)
1013 } else {
1014 CacheResult::NeverMatch
1015 }
1016 };
1017 let mut total_matches = 0;
1018
1019 macro_rules! match_rows {
1020 ($op:ident) => {
1021 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
1022 cache_lookup_result,
1023 row,
1024 key,
1025 &mut hashjoin_chunk_builder,
1026 side_match,
1027 side_update,
1028 &useful_state_clean_columns,
1029 cond,
1030 append_only_optimize,
1031 entry_state_max_rows,
1032 )
1033 };
1034 }
1035
1036 match op {
1037 Op::Insert | Op::UpdateInsert =>
1038 {
1039 #[for_await]
1040 for chunk in match_rows!(Insert) {
1041 let chunk = chunk?;
1042 total_matches += chunk.cardinality();
1043 yield chunk;
1044 }
1045 }
1046 Op::Delete | Op::UpdateDelete =>
1047 {
1048 #[for_await]
1049 for chunk in match_rows!(Delete) {
1050 let chunk = chunk?;
1051 total_matches += chunk.cardinality();
1052 yield chunk;
1053 }
1054 }
1055 };
1056
1057 join_matched_join_keys.observe(total_matches as _);
1058 if total_matches > high_join_amplification_threshold {
1059 let join_key_data_types = side_update.ht.join_key_data_types();
1060 let key = key.deserialize(join_key_data_types)?;
1061 tracing::warn!(target: "high_join_amplification",
1062 matched_rows_len = total_matches,
1063 update_table_id = %side_update.ht.table_id(),
1064 match_table_id = %side_match.ht.table_id(),
1065 join_key = ?key,
1066 actor_id = %ctx.id,
1067 fragment_id = %ctx.fragment_id,
1068 "large rows matched for join key"
1069 );
1070 }
1071 }
1072 if let Some(chunk) = hashjoin_chunk_builder.take() {
1074 yield chunk;
1075 }
1076 }
1077
1078 #[expect(clippy::too_many_arguments)]
1087 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1088 async fn handle_match_rows<
1089 'a,
1090 const SIDE: SideTypePrimitive,
1091 const JOIN_OP: JoinOpPrimitive,
1092 >(
1093 cached_lookup_result: CacheResult<E>,
1094 row: RowRef<'a>,
1095 key: &'a K,
1096 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1097 side_match: &'a mut JoinSide<K, S, E>,
1098 side_update: &'a mut JoinSide<K, S, E>,
1099 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1100 cond: &'a mut Option<NonStrictExpression>,
1101 append_only_optimize: bool,
1102 entry_state_max_rows: usize,
1103 ) {
1104 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1105 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1106 let mut entry_state_count = 0;
1107
1108 let mut degree = 0;
1109 let mut append_only_matched_row = None;
1110 let mut matched_rows_to_clean = vec![];
1111
1112 macro_rules! match_row {
1113 (
1114 $match_order_key_indices:expr,
1115 $degree_table:expr,
1116 $matched_row:expr,
1117 $matched_row_ref:expr,
1118 $from_cache:literal,
1119 $map_output:expr,
1120 ) => {
1121 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1122 row,
1123 $matched_row,
1124 $matched_row_ref,
1125 hashjoin_chunk_builder,
1126 $match_order_key_indices,
1127 $degree_table,
1128 side_update.start_pos,
1129 side_match.start_pos,
1130 cond,
1131 &mut degree,
1132 useful_state_clean_columns,
1133 append_only_optimize,
1134 &mut append_only_matched_row,
1135 &mut matched_rows_to_clean,
1136 $map_output,
1137 )
1138 };
1139 }
1140
1141 let entry_state = match cached_lookup_result {
1142 CacheResult::NeverMatch => {
1143 let op = match JOIN_OP {
1144 JoinOp::Insert => Op::Insert,
1145 JoinOp::Delete => Op::Delete,
1146 };
1147 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1148 yield chunk;
1149 }
1150 return Ok(());
1151 }
1152 CacheResult::Hit(mut cached_rows) => {
1153 let (match_order_key_indices, match_degree_state) =
1154 side_match.ht.get_degree_state_mut_ref();
1155 for (matched_row_ref, matched_row) in
1157 cached_rows.values_mut(&side_match.all_data_types)
1158 {
1159 let matched_row = matched_row?;
1160 if let Some(chunk) = match_row!(
1161 match_order_key_indices,
1162 match_degree_state,
1163 matched_row,
1164 Some(matched_row_ref),
1165 true,
1166 Either::Left,
1167 )
1168 .await
1169 {
1170 yield chunk;
1171 }
1172 }
1173
1174 cached_rows
1175 }
1176 CacheResult::Miss => {
1177 let (matched_rows, match_order_key_indices, degree_table) = side_match
1179 .ht
1180 .fetch_matched_rows_and_get_degree_table_ref(key)
1181 .await?;
1182
1183 #[for_await]
1184 for matched_row in matched_rows {
1185 let (encoded_pk, matched_row) = matched_row?;
1186
1187 let mut matched_row_ref = None;
1188
1189 if entry_state_count <= entry_state_max_rows {
1191 let row_ref = entry_state
1192 .insert(encoded_pk, E::encode(&matched_row))
1193 .with_context(|| format!("row: {}", row.display(),))?;
1194 matched_row_ref = Some(row_ref);
1195 entry_state_count += 1;
1196 }
1197 if let Some(chunk) = match_row!(
1198 match_order_key_indices,
1199 degree_table,
1200 matched_row,
1201 matched_row_ref,
1202 false,
1203 Either::Right,
1204 )
1205 .await
1206 {
1207 yield chunk;
1208 }
1209 }
1210 Box::new(entry_state)
1211 }
1212 };
1213
1214 let op = match JOIN_OP {
1216 JoinOp::Insert => Op::Insert,
1217 JoinOp::Delete => Op::Delete,
1218 };
1219 if degree == 0 {
1220 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1221 yield chunk;
1222 }
1223 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1224 {
1225 yield chunk;
1226 }
1227
1228 if cache_hit || entry_state_count <= entry_state_max_rows {
1230 side_match.ht.update_state(key, entry_state);
1231 }
1232
1233 for matched_row in matched_rows_to_clean {
1235 side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1237 }
1238
1239 if append_only_optimize && let Some(row) = append_only_matched_row {
1241 assert_matches!(JOIN_OP, JoinOp::Insert);
1242 side_match.ht.delete_handle_degree(key, row)?;
1243 return Ok(());
1244 }
1245
1246 match JOIN_OP {
1248 JoinOp::Insert => {
1249 side_update
1250 .ht
1251 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1252 }
1253 JoinOp::Delete => {
1254 side_update
1255 .ht
1256 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1257 }
1258 }
1259 }
1260
1261 #[expect(clippy::too_many_arguments)]
1262 #[inline]
1263 async fn handle_match_row<
1264 'a,
1265 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1268 const JOIN_OP: JoinOpPrimitive,
1269 const MATCHED_ROWS_FROM_CACHE: bool,
1270 >(
1271 update_row: RowRef<'a>,
1272 mut matched_row: JoinRow<R>,
1273 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1274 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1275 match_order_key_indices: &[usize],
1276 match_degree_table: &mut Option<TableInner<S>>,
1277 side_update_start_pos: usize,
1278 side_match_start_pos: usize,
1279 cond: &Option<NonStrictExpression>,
1280 update_row_degree: &mut u64,
1281 useful_state_clean_columns: &[(usize, &'a Watermark)],
1282 append_only_optimize: bool,
1283 append_only_matched_row: &mut Option<JoinRow<RO>>,
1284 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1285 map_output: impl Fn(R) -> RO,
1286 ) -> Option<StreamChunk> {
1287 let mut need_state_clean = false;
1288 let mut chunk_opt = None;
1289 let join_condition_satisfied = Self::check_join_condition(
1293 update_row,
1294 side_update_start_pos,
1295 &matched_row.row,
1296 side_match_start_pos,
1297 cond,
1298 )
1299 .await;
1300
1301 if join_condition_satisfied {
1302 *update_row_degree += 1;
1304 if matches!(JOIN_OP, JoinOp::Insert)
1309 && !forward_exactly_once(T, SIDE)
1310 && let Some(chunk) =
1311 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1312 {
1313 chunk_opt = Some(chunk);
1314 }
1315 if let Some(degree_table) = match_degree_table {
1317 update_degree::<S, { JOIN_OP }>(
1318 match_order_key_indices,
1319 degree_table,
1320 &mut matched_row,
1321 );
1322 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1323 match JOIN_OP {
1325 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1326 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1327 }
1328 }
1329 }
1330
1331 if matches!(JOIN_OP, JoinOp::Delete)
1334 && !forward_exactly_once(T, SIDE)
1335 && let Some(chunk) =
1336 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1337 {
1338 chunk_opt = Some(chunk);
1339 }
1340 } else {
1341 for (column_idx, watermark) in useful_state_clean_columns {
1343 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1344 scalar
1345 .default_cmp(&watermark.val.as_scalar_ref_impl())
1346 .is_lt()
1347 }) {
1348 need_state_clean = true;
1349 break;
1350 }
1351 }
1352 }
1353 if append_only_optimize {
1357 assert_matches!(JOIN_OP, JoinOp::Insert);
1358 assert!(append_only_matched_row.is_none());
1361 *append_only_matched_row = Some(matched_row.map(map_output));
1362 } else if need_state_clean {
1363 debug_assert!(
1364 !append_only_optimize,
1365 "`append_only_optimize` and `need_state_clean` must not both be true"
1366 );
1367 matched_rows_to_clean.push(matched_row.map(map_output));
1368 }
1369
1370 chunk_opt
1371 }
1372
1373 #[inline]
1378 async fn check_join_condition(
1379 row: impl Row,
1380 side_update_start_pos: usize,
1381 matched_row: impl Row,
1382 side_match_start_pos: usize,
1383 join_condition: &Option<NonStrictExpression>,
1384 ) -> bool {
1385 if let Some(join_condition) = join_condition {
1386 let new_row = Self::row_concat(
1387 row,
1388 side_update_start_pos,
1389 matched_row,
1390 side_match_start_pos,
1391 );
1392 join_condition
1393 .eval_row_infallible(&new_row)
1394 .await
1395 .map(|s| *s.as_bool())
1396 .unwrap_or(false)
1397 } else {
1398 true
1399 }
1400 }
1401}
1402
1403#[cfg(test)]
1404mod tests {
1405 use std::sync::atomic::AtomicU64;
1406
1407 use pretty_assertions::assert_eq;
1408 use risingwave_common::array::*;
1409 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1410 use risingwave_common::config::StreamingConfig;
1411 use risingwave_common::hash::{Key64, Key128};
1412 use risingwave_common::util::epoch::test_epoch;
1413 use risingwave_common::util::sort_util::OrderType;
1414 use risingwave_storage::memory::MemoryStateStore;
1415
1416 use super::*;
1417 use crate::common::table::test_utils::gen_pbtable;
1418 use crate::executor::MemoryEncoding;
1419 use crate::executor::test_utils::expr::build_from_pretty;
1420 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1421
1422 async fn create_in_memory_state_table(
1423 mem_state: MemoryStateStore,
1424 data_types: &[DataType],
1425 order_types: &[OrderType],
1426 pk_indices: &[usize],
1427 table_id: u32,
1428 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1429 create_in_memory_state_table_with_inequality(
1430 mem_state,
1431 data_types,
1432 order_types,
1433 pk_indices,
1434 table_id,
1435 None,
1436 )
1437 .await
1438 }
1439
1440 async fn create_in_memory_state_table_with_inequality(
1441 mem_state: MemoryStateStore,
1442 data_types: &[DataType],
1443 order_types: &[OrderType],
1444 pk_indices: &[usize],
1445 table_id: u32,
1446 degree_inequality_type: Option<DataType>,
1447 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1448 create_in_memory_state_table_with_watermark(
1449 mem_state,
1450 data_types,
1451 order_types,
1452 pk_indices,
1453 table_id,
1454 degree_inequality_type,
1455 vec![],
1456 vec![],
1457 )
1458 .await
1459 }
1460
1461 #[expect(clippy::too_many_arguments)]
1462 async fn create_in_memory_state_table_with_watermark(
1463 mem_state: MemoryStateStore,
1464 data_types: &[DataType],
1465 order_types: &[OrderType],
1466 pk_indices: &[usize],
1467 table_id: u32,
1468 degree_inequality_type: Option<DataType>,
1469 state_clean_watermark_indices: Vec<usize>,
1470 degree_clean_watermark_indices: Vec<usize>,
1471 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1472 let column_descs = data_types
1473 .iter()
1474 .enumerate()
1475 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1476 .collect_vec();
1477 let mut state_table_catalog = gen_pbtable(
1478 TableId::new(table_id),
1479 column_descs,
1480 order_types.to_vec(),
1481 pk_indices.to_vec(),
1482 0,
1483 );
1484 state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1485 .into_iter()
1486 .map(|idx| idx as u32)
1487 .collect();
1488 let state_table =
1489 StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1490
1491 let mut degree_table_column_descs = vec![];
1493 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1494 degree_table_column_descs.push(ColumnDesc::unnamed(
1495 ColumnId::new(pk_id as i32),
1496 data_types[*idx].clone(),
1497 ))
1498 });
1499 degree_table_column_descs.push(ColumnDesc::unnamed(
1501 ColumnId::new(pk_indices.len() as i32),
1502 DataType::Int64,
1503 ));
1504 if let Some(ineq_type) = degree_inequality_type {
1506 degree_table_column_descs.push(ColumnDesc::unnamed(
1507 ColumnId::new((pk_indices.len() + 1) as i32),
1508 ineq_type,
1509 ));
1510 }
1511 let mut degree_table_catalog = gen_pbtable(
1512 TableId::new(table_id + 1),
1513 degree_table_column_descs,
1514 order_types.to_vec(),
1515 pk_indices.to_vec(),
1516 0,
1517 );
1518 degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1519 .into_iter()
1520 .map(|idx| idx as u32)
1521 .collect();
1522 let degree_state_table =
1523 StateTable::from_table_catalog(°ree_table_catalog, mem_state, None).await;
1524 (state_table, degree_state_table)
1525 }
1526
1527 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1528 build_from_pretty(
1529 condition_text
1530 .as_deref()
1531 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1532 )
1533 }
1534
1535 async fn create_executor<const T: JoinTypePrimitive>(
1536 with_condition: bool,
1537 null_safe: bool,
1538 condition_text: Option<String>,
1539 inequality_pairs: Vec<InequalityPairInfo>,
1540 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1541 let schema = Schema {
1542 fields: vec![
1543 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1545 ],
1546 };
1547 let (tx_l, source_l) = MockSource::channel();
1548 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1549 let (tx_r, source_r) = MockSource::channel();
1550 let source_r = source_r.into_executor(schema, vec![1]);
1551 let params_l = JoinParams::new(vec![0], vec![1]);
1552 let params_r = JoinParams::new(vec![0], vec![1]);
1553 let cond = with_condition.then(|| create_cond(condition_text));
1554
1555 let mem_state = MemoryStateStore::new();
1556
1557 let l_degree_ineq_type = inequality_pairs
1559 .iter()
1560 .find(|pair| pair.clean_left_state)
1561 .map(|_| DataType::Int64); let r_degree_ineq_type = inequality_pairs
1563 .iter()
1564 .find(|pair| pair.clean_right_state)
1565 .map(|_| DataType::Int64); let l_clean_watermark_indices = inequality_pairs
1567 .iter()
1568 .find(|pair| pair.clean_left_state)
1569 .map(|pair| vec![pair.left_idx])
1570 .unwrap_or_default();
1571 let r_clean_watermark_indices = inequality_pairs
1572 .iter()
1573 .find(|pair| pair.clean_right_state)
1574 .map(|pair| vec![pair.right_idx])
1575 .unwrap_or_default();
1576 let degree_inequality_column_idx = 3;
1577 let l_degree_clean_watermark_indices = l_degree_ineq_type
1578 .as_ref()
1579 .map(|_| vec![degree_inequality_column_idx])
1580 .unwrap_or_default();
1581 let r_degree_clean_watermark_indices = r_degree_ineq_type
1582 .as_ref()
1583 .map(|_| vec![degree_inequality_column_idx])
1584 .unwrap_or_default();
1585
1586 let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1587 mem_state.clone(),
1588 &[DataType::Int64, DataType::Int64],
1589 &[OrderType::ascending(), OrderType::ascending()],
1590 &[0, 1],
1591 0,
1592 l_degree_ineq_type,
1593 l_clean_watermark_indices,
1594 l_degree_clean_watermark_indices,
1595 )
1596 .await;
1597
1598 let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1599 mem_state,
1600 &[DataType::Int64, DataType::Int64],
1601 &[OrderType::ascending(), OrderType::ascending()],
1602 &[0, 1],
1603 2,
1604 r_degree_ineq_type,
1605 r_clean_watermark_indices,
1606 r_degree_clean_watermark_indices,
1607 )
1608 .await;
1609
1610 let schema = match T {
1611 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1612 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1613 _ => [source_l.schema().fields(), source_r.schema().fields()]
1614 .concat()
1615 .into_iter()
1616 .collect(),
1617 };
1618 let schema_len = schema.len();
1619 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1620
1621 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1622 ActorContext::for_test(123),
1623 info,
1624 source_l,
1625 source_r,
1626 params_l,
1627 params_r,
1628 vec![null_safe],
1629 (0..schema_len).collect_vec(),
1630 cond,
1631 inequality_pairs,
1632 state_l,
1633 degree_state_l,
1634 state_r,
1635 degree_state_r,
1636 Arc::new(AtomicU64::new(0)),
1637 false,
1638 Arc::new(StreamingMetrics::unused()),
1639 1024,
1640 2048,
1641 vec![(0, true)],
1642 );
1643 (tx_l, tx_r, executor.boxed().execute())
1644 }
1645
1646 async fn create_classical_executor<const T: JoinTypePrimitive>(
1647 with_condition: bool,
1648 null_safe: bool,
1649 condition_text: Option<String>,
1650 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1651 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1652 }
1653
1654 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1655 with_condition: bool,
1656 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1657 let schema = Schema {
1658 fields: vec![
1659 Field::unnamed(DataType::Int64),
1660 Field::unnamed(DataType::Int64),
1661 Field::unnamed(DataType::Int64),
1662 ],
1663 };
1664 let (tx_l, source_l) = MockSource::channel();
1665 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1666 let (tx_r, source_r) = MockSource::channel();
1667 let source_r = source_r.into_executor(schema, vec![0]);
1668 let params_l = JoinParams::new(vec![0, 1], vec![]);
1669 let params_r = JoinParams::new(vec![0, 1], vec![]);
1670 let cond = with_condition.then(|| create_cond(None));
1671
1672 let mem_state = MemoryStateStore::new();
1673
1674 let (state_l, degree_state_l) = create_in_memory_state_table(
1675 mem_state.clone(),
1676 &[DataType::Int64, DataType::Int64, DataType::Int64],
1677 &[
1678 OrderType::ascending(),
1679 OrderType::ascending(),
1680 OrderType::ascending(),
1681 ],
1682 &[0, 1, 0],
1683 0,
1684 )
1685 .await;
1686
1687 let (state_r, degree_state_r) = create_in_memory_state_table(
1688 mem_state,
1689 &[DataType::Int64, DataType::Int64, DataType::Int64],
1690 &[
1691 OrderType::ascending(),
1692 OrderType::ascending(),
1693 OrderType::ascending(),
1694 ],
1695 &[0, 1, 1],
1696 1,
1697 )
1698 .await;
1699
1700 let schema = match T {
1701 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1702 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1703 _ => [source_l.schema().fields(), source_r.schema().fields()]
1704 .concat()
1705 .into_iter()
1706 .collect(),
1707 };
1708 let schema_len = schema.len();
1709 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1710
1711 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1712 ActorContext::for_test(123),
1713 info,
1714 source_l,
1715 source_r,
1716 params_l,
1717 params_r,
1718 vec![false],
1719 (0..schema_len).collect_vec(),
1720 cond,
1721 vec![],
1722 state_l,
1723 degree_state_l,
1724 state_r,
1725 degree_state_r,
1726 Arc::new(AtomicU64::new(0)),
1727 true,
1728 Arc::new(StreamingMetrics::unused()),
1729 1024,
1730 2048,
1731 vec![(0, true)],
1732 );
1733 (tx_l, tx_r, executor.boxed().execute())
1734 }
1735
1736 #[tokio::test]
1737 async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1738 let chunk_l1 = StreamChunk::from_pretty(
1742 " I I
1743 + 2 4
1744 + 2 7
1745 + 3 8",
1746 );
1747 let chunk_r1 = StreamChunk::from_pretty(
1748 " I I
1749 + 2 6",
1750 );
1751 let chunk_r2 = StreamChunk::from_pretty(
1752 " I I
1753 + 2 3",
1754 );
1755 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1757 true,
1758 false,
1759 Some(String::from(
1760 "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1761 )),
1762 vec![InequalityPairInfo {
1763 left_idx: 1,
1764 right_idx: 1,
1765 clean_left_state: true, clean_right_state: false,
1767 op: InequalityType::GreaterThanOrEqual,
1768 }],
1769 )
1770 .await;
1771
1772 tx_l.push_barrier(test_epoch(1), false);
1774 tx_r.push_barrier(test_epoch(1), false);
1775 hash_join.next_unwrap_ready_barrier()?;
1776
1777 tx_l.push_chunk(chunk_l1);
1779 hash_join.next_unwrap_pending();
1780
1781 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1784 hash_join.next_unwrap_pending();
1785
1786 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1787 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1788 assert_eq!(
1789 output_watermark,
1790 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1791 );
1792
1793 tx_r.push_chunk(chunk_r1);
1802 let chunk = hash_join.next_unwrap_ready_chunk()?;
1803 assert_eq!(
1804 chunk,
1805 StreamChunk::from_pretty(
1806 " I I I I
1807 + 2 7 2 6"
1808 )
1809 );
1810
1811 tx_r.push_chunk(chunk_r2);
1815 let chunk = hash_join.next_unwrap_ready_chunk()?;
1816 assert_eq!(
1817 chunk,
1818 StreamChunk::from_pretty(
1819 " I I I I
1820 + 2 7 2 3"
1821 )
1822 );
1823
1824 Ok(())
1825 }
1826
1827 #[tokio::test]
1828 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1829 let chunk_l1 = StreamChunk::from_pretty(
1830 " I I
1831 + 1 4
1832 + 2 5
1833 + 3 6",
1834 );
1835 let chunk_l2 = StreamChunk::from_pretty(
1836 " I I
1837 + 3 8
1838 - 3 8",
1839 );
1840 let chunk_r1 = StreamChunk::from_pretty(
1841 " I I
1842 + 2 7
1843 + 4 8
1844 + 6 9",
1845 );
1846 let chunk_r2 = StreamChunk::from_pretty(
1847 " I I
1848 + 3 10
1849 + 6 11",
1850 );
1851 let (mut tx_l, mut tx_r, mut hash_join) =
1852 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1853
1854 tx_l.push_barrier(test_epoch(1), false);
1856 tx_r.push_barrier(test_epoch(1), false);
1857 hash_join.next_unwrap_ready_barrier()?;
1858
1859 tx_l.push_chunk(chunk_l1);
1861 hash_join.next_unwrap_pending();
1862
1863 tx_l.push_barrier(test_epoch(2), false);
1865 tx_r.push_barrier(test_epoch(2), false);
1866 hash_join.next_unwrap_ready_barrier()?;
1867
1868 tx_l.push_chunk(chunk_l2);
1870 hash_join.next_unwrap_pending();
1871
1872 tx_r.push_chunk(chunk_r1);
1874 let chunk = hash_join.next_unwrap_ready_chunk()?;
1875 assert_eq!(
1876 chunk,
1877 StreamChunk::from_pretty(
1878 " I I I I
1879 + 2 5 2 7"
1880 )
1881 );
1882
1883 tx_r.push_chunk(chunk_r2);
1885 let chunk = hash_join.next_unwrap_ready_chunk()?;
1886 assert_eq!(
1887 chunk,
1888 StreamChunk::from_pretty(
1889 " I I I I
1890 + 3 6 3 10"
1891 )
1892 );
1893
1894 Ok(())
1895 }
1896
1897 #[tokio::test]
1898 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1899 let chunk_l1 = StreamChunk::from_pretty(
1900 " I I
1901 + 1 4
1902 + 2 5
1903 + . 6",
1904 );
1905 let chunk_l2 = StreamChunk::from_pretty(
1906 " I I
1907 + . 8
1908 - . 8",
1909 );
1910 let chunk_r1 = StreamChunk::from_pretty(
1911 " I I
1912 + 2 7
1913 + 4 8
1914 + 6 9",
1915 );
1916 let chunk_r2 = StreamChunk::from_pretty(
1917 " I I
1918 + . 10
1919 + 6 11",
1920 );
1921 let (mut tx_l, mut tx_r, mut hash_join) =
1922 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1923
1924 tx_l.push_barrier(test_epoch(1), false);
1926 tx_r.push_barrier(test_epoch(1), false);
1927 hash_join.next_unwrap_ready_barrier()?;
1928
1929 tx_l.push_chunk(chunk_l1);
1931 hash_join.next_unwrap_pending();
1932
1933 tx_l.push_barrier(test_epoch(2), false);
1935 tx_r.push_barrier(test_epoch(2), false);
1936 hash_join.next_unwrap_ready_barrier()?;
1937
1938 tx_l.push_chunk(chunk_l2);
1940 hash_join.next_unwrap_pending();
1941
1942 tx_r.push_chunk(chunk_r1);
1944 let chunk = hash_join.next_unwrap_ready_chunk()?;
1945 assert_eq!(
1946 chunk,
1947 StreamChunk::from_pretty(
1948 " I I I I
1949 + 2 5 2 7"
1950 )
1951 );
1952
1953 tx_r.push_chunk(chunk_r2);
1955 let chunk = hash_join.next_unwrap_ready_chunk()?;
1956 assert_eq!(
1957 chunk,
1958 StreamChunk::from_pretty(
1959 " I I I I
1960 + . 6 . 10"
1961 )
1962 );
1963
1964 Ok(())
1965 }
1966
1967 #[tokio::test]
1968 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1969 let chunk_l1 = StreamChunk::from_pretty(
1970 " I I
1971 + 1 4
1972 + 2 5
1973 + 3 6",
1974 );
1975 let chunk_l2 = StreamChunk::from_pretty(
1976 " I I
1977 + 3 8
1978 - 3 8",
1979 );
1980 let chunk_r1 = StreamChunk::from_pretty(
1981 " I I
1982 + 2 7
1983 + 4 8
1984 + 6 9",
1985 );
1986 let chunk_r2 = StreamChunk::from_pretty(
1987 " I I
1988 + 3 10
1989 + 6 11",
1990 );
1991 let chunk_l3 = StreamChunk::from_pretty(
1992 " I I
1993 + 6 10",
1994 );
1995 let chunk_r3 = StreamChunk::from_pretty(
1996 " I I
1997 - 6 11",
1998 );
1999 let chunk_r4 = StreamChunk::from_pretty(
2000 " I I
2001 - 6 9",
2002 );
2003 let (mut tx_l, mut tx_r, mut hash_join) =
2004 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
2005
2006 tx_l.push_barrier(test_epoch(1), false);
2008 tx_r.push_barrier(test_epoch(1), false);
2009 hash_join.next_unwrap_ready_barrier()?;
2010
2011 tx_l.push_chunk(chunk_l1);
2013 hash_join.next_unwrap_pending();
2014
2015 tx_l.push_barrier(test_epoch(2), false);
2017 tx_r.push_barrier(test_epoch(2), false);
2018 hash_join.next_unwrap_ready_barrier()?;
2019
2020 tx_l.push_chunk(chunk_l2);
2022 hash_join.next_unwrap_pending();
2023
2024 tx_r.push_chunk(chunk_r1);
2026 let chunk = hash_join.next_unwrap_ready_chunk()?;
2027 assert_eq!(
2028 chunk,
2029 StreamChunk::from_pretty(
2030 " I I
2031 + 2 5"
2032 )
2033 );
2034
2035 tx_r.push_chunk(chunk_r2);
2037 let chunk = hash_join.next_unwrap_ready_chunk()?;
2038 assert_eq!(
2039 chunk,
2040 StreamChunk::from_pretty(
2041 " I I
2042 + 3 6"
2043 )
2044 );
2045
2046 tx_l.push_chunk(chunk_l3);
2048 let chunk = hash_join.next_unwrap_ready_chunk()?;
2049 assert_eq!(
2050 chunk,
2051 StreamChunk::from_pretty(
2052 " I I
2053 + 6 10"
2054 )
2055 );
2056
2057 tx_r.push_chunk(chunk_r3);
2060 hash_join.next_unwrap_pending();
2061
2062 tx_r.push_chunk(chunk_r4);
2065 let chunk = hash_join.next_unwrap_ready_chunk()?;
2066 assert_eq!(
2067 chunk,
2068 StreamChunk::from_pretty(
2069 " I I
2070 - 6 10"
2071 )
2072 );
2073
2074 Ok(())
2075 }
2076
2077 #[tokio::test]
2078 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2079 let chunk_l1 = StreamChunk::from_pretty(
2080 " I I
2081 + 1 4
2082 + 2 5
2083 + . 6",
2084 );
2085 let chunk_l2 = StreamChunk::from_pretty(
2086 " I I
2087 + . 8
2088 - . 8",
2089 );
2090 let chunk_r1 = StreamChunk::from_pretty(
2091 " I I
2092 + 2 7
2093 + 4 8
2094 + 6 9",
2095 );
2096 let chunk_r2 = StreamChunk::from_pretty(
2097 " I I
2098 + . 10
2099 + 6 11",
2100 );
2101 let chunk_l3 = StreamChunk::from_pretty(
2102 " I I
2103 + 6 10",
2104 );
2105 let chunk_r3 = StreamChunk::from_pretty(
2106 " I I
2107 - 6 11",
2108 );
2109 let chunk_r4 = StreamChunk::from_pretty(
2110 " I I
2111 - 6 9",
2112 );
2113 let (mut tx_l, mut tx_r, mut hash_join) =
2114 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2115
2116 tx_l.push_barrier(test_epoch(1), false);
2118 tx_r.push_barrier(test_epoch(1), false);
2119 hash_join.next_unwrap_ready_barrier()?;
2120
2121 tx_l.push_chunk(chunk_l1);
2123 hash_join.next_unwrap_pending();
2124
2125 tx_l.push_barrier(test_epoch(2), false);
2127 tx_r.push_barrier(test_epoch(2), false);
2128 hash_join.next_unwrap_ready_barrier()?;
2129
2130 tx_l.push_chunk(chunk_l2);
2132 hash_join.next_unwrap_pending();
2133
2134 tx_r.push_chunk(chunk_r1);
2136 let chunk = hash_join.next_unwrap_ready_chunk()?;
2137 assert_eq!(
2138 chunk,
2139 StreamChunk::from_pretty(
2140 " I I
2141 + 2 5"
2142 )
2143 );
2144
2145 tx_r.push_chunk(chunk_r2);
2147 let chunk = hash_join.next_unwrap_ready_chunk()?;
2148 assert_eq!(
2149 chunk,
2150 StreamChunk::from_pretty(
2151 " I I
2152 + . 6"
2153 )
2154 );
2155
2156 tx_l.push_chunk(chunk_l3);
2158 let chunk = hash_join.next_unwrap_ready_chunk()?;
2159 assert_eq!(
2160 chunk,
2161 StreamChunk::from_pretty(
2162 " I I
2163 + 6 10"
2164 )
2165 );
2166
2167 tx_r.push_chunk(chunk_r3);
2170 hash_join.next_unwrap_pending();
2171
2172 tx_r.push_chunk(chunk_r4);
2175 let chunk = hash_join.next_unwrap_ready_chunk()?;
2176 assert_eq!(
2177 chunk,
2178 StreamChunk::from_pretty(
2179 " I I
2180 - 6 10"
2181 )
2182 );
2183
2184 Ok(())
2185 }
2186
2187 #[tokio::test]
2188 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2189 let chunk_l1 = StreamChunk::from_pretty(
2190 " I I I
2191 + 1 4 1
2192 + 2 5 2
2193 + 3 6 3",
2194 );
2195 let chunk_l2 = StreamChunk::from_pretty(
2196 " I I I
2197 + 4 9 4
2198 + 5 10 5",
2199 );
2200 let chunk_r1 = StreamChunk::from_pretty(
2201 " I I I
2202 + 2 5 1
2203 + 4 9 2
2204 + 6 9 3",
2205 );
2206 let chunk_r2 = StreamChunk::from_pretty(
2207 " I I I
2208 + 1 4 4
2209 + 3 6 5",
2210 );
2211
2212 let (mut tx_l, mut tx_r, mut hash_join) =
2213 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2214
2215 tx_l.push_barrier(test_epoch(1), false);
2217 tx_r.push_barrier(test_epoch(1), false);
2218 hash_join.next_unwrap_ready_barrier()?;
2219
2220 tx_l.push_chunk(chunk_l1);
2222 hash_join.next_unwrap_pending();
2223
2224 tx_l.push_barrier(test_epoch(2), false);
2226 tx_r.push_barrier(test_epoch(2), false);
2227 hash_join.next_unwrap_ready_barrier()?;
2228
2229 tx_l.push_chunk(chunk_l2);
2231 hash_join.next_unwrap_pending();
2232
2233 tx_r.push_chunk(chunk_r1);
2235 let chunk = hash_join.next_unwrap_ready_chunk()?;
2236 assert_eq!(
2237 chunk,
2238 StreamChunk::from_pretty(
2239 " I I I I I I
2240 + 2 5 2 2 5 1
2241 + 4 9 4 4 9 2"
2242 )
2243 );
2244
2245 tx_r.push_chunk(chunk_r2);
2247 let chunk = hash_join.next_unwrap_ready_chunk()?;
2248 assert_eq!(
2249 chunk,
2250 StreamChunk::from_pretty(
2251 " I I I I I I
2252 + 1 4 1 1 4 4
2253 + 3 6 3 3 6 5"
2254 )
2255 );
2256
2257 Ok(())
2258 }
2259
2260 #[tokio::test]
2261 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2262 let chunk_l1 = StreamChunk::from_pretty(
2263 " I I I
2264 + 1 4 1
2265 + 2 5 2
2266 + 3 6 3",
2267 );
2268 let chunk_l2 = StreamChunk::from_pretty(
2269 " I I I
2270 + 4 9 4
2271 + 5 10 5",
2272 );
2273 let chunk_r1 = StreamChunk::from_pretty(
2274 " I I I
2275 + 2 5 1
2276 + 4 9 2
2277 + 6 9 3",
2278 );
2279 let chunk_r2 = StreamChunk::from_pretty(
2280 " I I I
2281 + 1 4 4
2282 + 3 6 5",
2283 );
2284
2285 let (mut tx_l, mut tx_r, mut hash_join) =
2286 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2287
2288 tx_l.push_barrier(test_epoch(1), false);
2290 tx_r.push_barrier(test_epoch(1), false);
2291 hash_join.next_unwrap_ready_barrier()?;
2292
2293 tx_l.push_chunk(chunk_l1);
2295 hash_join.next_unwrap_pending();
2296
2297 tx_l.push_barrier(test_epoch(2), false);
2299 tx_r.push_barrier(test_epoch(2), false);
2300 hash_join.next_unwrap_ready_barrier()?;
2301
2302 tx_l.push_chunk(chunk_l2);
2304 hash_join.next_unwrap_pending();
2305
2306 tx_r.push_chunk(chunk_r1);
2308 let chunk = hash_join.next_unwrap_ready_chunk()?;
2309 assert_eq!(
2310 chunk,
2311 StreamChunk::from_pretty(
2312 " I I I
2313 + 2 5 2
2314 + 4 9 4"
2315 )
2316 );
2317
2318 tx_r.push_chunk(chunk_r2);
2320 let chunk = hash_join.next_unwrap_ready_chunk()?;
2321 assert_eq!(
2322 chunk,
2323 StreamChunk::from_pretty(
2324 " I I I
2325 + 1 4 1
2326 + 3 6 3"
2327 )
2328 );
2329
2330 Ok(())
2331 }
2332
2333 #[tokio::test]
2334 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2335 let chunk_l1 = StreamChunk::from_pretty(
2336 " I I I
2337 + 1 4 1
2338 + 2 5 2
2339 + 3 6 3",
2340 );
2341 let chunk_l2 = StreamChunk::from_pretty(
2342 " I I I
2343 + 4 9 4
2344 + 5 10 5",
2345 );
2346 let chunk_r1 = StreamChunk::from_pretty(
2347 " I I I
2348 + 2 5 1
2349 + 4 9 2
2350 + 6 9 3",
2351 );
2352 let chunk_r2 = StreamChunk::from_pretty(
2353 " I I I
2354 + 1 4 4
2355 + 3 6 5",
2356 );
2357
2358 let (mut tx_l, mut tx_r, mut hash_join) =
2359 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2360
2361 tx_l.push_barrier(test_epoch(1), false);
2363 tx_r.push_barrier(test_epoch(1), false);
2364 hash_join.next_unwrap_ready_barrier()?;
2365
2366 tx_l.push_chunk(chunk_l1);
2368 hash_join.next_unwrap_pending();
2369
2370 tx_l.push_barrier(test_epoch(2), false);
2372 tx_r.push_barrier(test_epoch(2), false);
2373 hash_join.next_unwrap_ready_barrier()?;
2374
2375 tx_l.push_chunk(chunk_l2);
2377 hash_join.next_unwrap_pending();
2378
2379 tx_r.push_chunk(chunk_r1);
2381 let chunk = hash_join.next_unwrap_ready_chunk()?;
2382 assert_eq!(
2383 chunk,
2384 StreamChunk::from_pretty(
2385 " I I I
2386 + 2 5 1
2387 + 4 9 2"
2388 )
2389 );
2390
2391 tx_r.push_chunk(chunk_r2);
2393 let chunk = hash_join.next_unwrap_ready_chunk()?;
2394 assert_eq!(
2395 chunk,
2396 StreamChunk::from_pretty(
2397 " I I I
2398 + 1 4 4
2399 + 3 6 5"
2400 )
2401 );
2402
2403 Ok(())
2404 }
2405
2406 #[tokio::test]
2407 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2408 let chunk_r1 = StreamChunk::from_pretty(
2409 " I I
2410 + 1 4
2411 + 2 5
2412 + 3 6",
2413 );
2414 let chunk_r2 = StreamChunk::from_pretty(
2415 " I I
2416 + 3 8
2417 - 3 8",
2418 );
2419 let chunk_l1 = StreamChunk::from_pretty(
2420 " I I
2421 + 2 7
2422 + 4 8
2423 + 6 9",
2424 );
2425 let chunk_l2 = StreamChunk::from_pretty(
2426 " I I
2427 + 3 10
2428 + 6 11",
2429 );
2430 let chunk_r3 = StreamChunk::from_pretty(
2431 " I I
2432 + 6 10",
2433 );
2434 let chunk_l3 = StreamChunk::from_pretty(
2435 " I I
2436 - 6 11",
2437 );
2438 let chunk_l4 = StreamChunk::from_pretty(
2439 " I I
2440 - 6 9",
2441 );
2442 let (mut tx_l, mut tx_r, mut hash_join) =
2443 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2444
2445 tx_l.push_barrier(test_epoch(1), false);
2447 tx_r.push_barrier(test_epoch(1), false);
2448 hash_join.next_unwrap_ready_barrier()?;
2449
2450 tx_r.push_chunk(chunk_r1);
2452 hash_join.next_unwrap_pending();
2453
2454 tx_l.push_barrier(test_epoch(2), false);
2456 tx_r.push_barrier(test_epoch(2), false);
2457 hash_join.next_unwrap_ready_barrier()?;
2458
2459 tx_r.push_chunk(chunk_r2);
2461 hash_join.next_unwrap_pending();
2462
2463 tx_l.push_chunk(chunk_l1);
2465 let chunk = hash_join.next_unwrap_ready_chunk()?;
2466 assert_eq!(
2467 chunk,
2468 StreamChunk::from_pretty(
2469 " I I
2470 + 2 5"
2471 )
2472 );
2473
2474 tx_l.push_chunk(chunk_l2);
2476 let chunk = hash_join.next_unwrap_ready_chunk()?;
2477 assert_eq!(
2478 chunk,
2479 StreamChunk::from_pretty(
2480 " I I
2481 + 3 6"
2482 )
2483 );
2484
2485 tx_r.push_chunk(chunk_r3);
2487 let chunk = hash_join.next_unwrap_ready_chunk()?;
2488 assert_eq!(
2489 chunk,
2490 StreamChunk::from_pretty(
2491 " I I
2492 + 6 10"
2493 )
2494 );
2495
2496 tx_l.push_chunk(chunk_l3);
2499 hash_join.next_unwrap_pending();
2500
2501 tx_l.push_chunk(chunk_l4);
2504 let chunk = hash_join.next_unwrap_ready_chunk()?;
2505 assert_eq!(
2506 chunk,
2507 StreamChunk::from_pretty(
2508 " I I
2509 - 6 10"
2510 )
2511 );
2512
2513 Ok(())
2514 }
2515
2516 #[tokio::test]
2517 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2518 let chunk_l1 = StreamChunk::from_pretty(
2519 " I I
2520 + 1 4
2521 + 2 5
2522 + 3 6",
2523 );
2524 let chunk_l2 = StreamChunk::from_pretty(
2525 " I I
2526 + 3 8
2527 - 3 8",
2528 );
2529 let chunk_r1 = StreamChunk::from_pretty(
2530 " I I
2531 + 2 7
2532 + 4 8
2533 + 6 9",
2534 );
2535 let chunk_r2 = StreamChunk::from_pretty(
2536 " I I
2537 + 3 10
2538 + 6 11
2539 + 1 2
2540 + 1 3",
2541 );
2542 let chunk_l3 = StreamChunk::from_pretty(
2543 " I I
2544 + 9 10",
2545 );
2546 let chunk_r3 = StreamChunk::from_pretty(
2547 " I I
2548 - 1 2",
2549 );
2550 let chunk_r4 = StreamChunk::from_pretty(
2551 " I I
2552 - 1 3",
2553 );
2554 let (mut tx_l, mut tx_r, mut hash_join) =
2555 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2556
2557 tx_l.push_barrier(test_epoch(1), false);
2559 tx_r.push_barrier(test_epoch(1), false);
2560 hash_join.next_unwrap_ready_barrier()?;
2561
2562 tx_l.push_chunk(chunk_l1);
2564 let chunk = hash_join.next_unwrap_ready_chunk()?;
2565 assert_eq!(
2566 chunk,
2567 StreamChunk::from_pretty(
2568 " I I
2569 + 1 4
2570 + 2 5
2571 + 3 6",
2572 )
2573 );
2574
2575 tx_l.push_barrier(test_epoch(2), false);
2577 tx_r.push_barrier(test_epoch(2), false);
2578 hash_join.next_unwrap_ready_barrier()?;
2579
2580 tx_l.push_chunk(chunk_l2);
2582 let chunk = hash_join.next_unwrap_ready_chunk()?;
2583 assert_eq!(
2584 chunk,
2585 StreamChunk::from_pretty(
2586 " I I
2587 + 3 8 D
2588 - 3 8 D",
2589 )
2590 );
2591
2592 tx_r.push_chunk(chunk_r1);
2594 let chunk = hash_join.next_unwrap_ready_chunk()?;
2595 assert_eq!(
2596 chunk,
2597 StreamChunk::from_pretty(
2598 " I I
2599 - 2 5"
2600 )
2601 );
2602
2603 tx_r.push_chunk(chunk_r2);
2605 let chunk = hash_join.next_unwrap_ready_chunk()?;
2606 assert_eq!(
2607 chunk,
2608 StreamChunk::from_pretty(
2609 " I I
2610 - 3 6
2611 - 1 4"
2612 )
2613 );
2614
2615 tx_l.push_chunk(chunk_l3);
2617 let chunk = hash_join.next_unwrap_ready_chunk()?;
2618 assert_eq!(
2619 chunk,
2620 StreamChunk::from_pretty(
2621 " I I
2622 + 9 10"
2623 )
2624 );
2625
2626 tx_r.push_chunk(chunk_r3);
2629 hash_join.next_unwrap_pending();
2630
2631 tx_r.push_chunk(chunk_r4);
2634 let chunk = hash_join.next_unwrap_ready_chunk()?;
2635 assert_eq!(
2636 chunk,
2637 StreamChunk::from_pretty(
2638 " I I
2639 + 1 4"
2640 )
2641 );
2642
2643 Ok(())
2644 }
2645
2646 #[tokio::test]
2647 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2648 let chunk_r1 = StreamChunk::from_pretty(
2649 " I I
2650 + 1 4
2651 + 2 5
2652 + 3 6",
2653 );
2654 let chunk_r2 = StreamChunk::from_pretty(
2655 " I I
2656 + 3 8
2657 - 3 8",
2658 );
2659 let chunk_l1 = StreamChunk::from_pretty(
2660 " I I
2661 + 2 7
2662 + 4 8
2663 + 6 9",
2664 );
2665 let chunk_l2 = StreamChunk::from_pretty(
2666 " I I
2667 + 3 10
2668 + 6 11
2669 + 1 2
2670 + 1 3",
2671 );
2672 let chunk_r3 = StreamChunk::from_pretty(
2673 " I I
2674 + 9 10",
2675 );
2676 let chunk_l3 = StreamChunk::from_pretty(
2677 " I I
2678 - 1 2",
2679 );
2680 let chunk_l4 = StreamChunk::from_pretty(
2681 " I I
2682 - 1 3",
2683 );
2684 let (mut tx_r, mut tx_l, mut hash_join) =
2685 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2686
2687 tx_r.push_barrier(test_epoch(1), false);
2689 tx_l.push_barrier(test_epoch(1), false);
2690 hash_join.next_unwrap_ready_barrier()?;
2691
2692 tx_r.push_chunk(chunk_r1);
2694 let chunk = hash_join.next_unwrap_ready_chunk()?;
2695 assert_eq!(
2696 chunk,
2697 StreamChunk::from_pretty(
2698 " I I
2699 + 1 4
2700 + 2 5
2701 + 3 6",
2702 )
2703 );
2704
2705 tx_r.push_barrier(test_epoch(2), false);
2707 tx_l.push_barrier(test_epoch(2), false);
2708 hash_join.next_unwrap_ready_barrier()?;
2709
2710 tx_r.push_chunk(chunk_r2);
2712 let chunk = hash_join.next_unwrap_ready_chunk()?;
2713 assert_eq!(
2714 chunk,
2715 StreamChunk::from_pretty(
2716 " I I
2717 + 3 8 D
2718 - 3 8 D",
2719 )
2720 );
2721
2722 tx_l.push_chunk(chunk_l1);
2724 let chunk = hash_join.next_unwrap_ready_chunk()?;
2725 assert_eq!(
2726 chunk,
2727 StreamChunk::from_pretty(
2728 " I I
2729 - 2 5"
2730 )
2731 );
2732
2733 tx_l.push_chunk(chunk_l2);
2735 let chunk = hash_join.next_unwrap_ready_chunk()?;
2736 assert_eq!(
2737 chunk,
2738 StreamChunk::from_pretty(
2739 " I I
2740 - 3 6
2741 - 1 4"
2742 )
2743 );
2744
2745 tx_r.push_chunk(chunk_r3);
2747 let chunk = hash_join.next_unwrap_ready_chunk()?;
2748 assert_eq!(
2749 chunk,
2750 StreamChunk::from_pretty(
2751 " I I
2752 + 9 10"
2753 )
2754 );
2755
2756 tx_l.push_chunk(chunk_l3);
2759 hash_join.next_unwrap_pending();
2760
2761 tx_l.push_chunk(chunk_l4);
2764 let chunk = hash_join.next_unwrap_ready_chunk()?;
2765 assert_eq!(
2766 chunk,
2767 StreamChunk::from_pretty(
2768 " I I
2769 + 1 4"
2770 )
2771 );
2772
2773 Ok(())
2774 }
2775
2776 #[tokio::test]
2777 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2778 let chunk_l1 = StreamChunk::from_pretty(
2779 " I I
2780 + 1 4
2781 + 2 5
2782 + 3 6",
2783 );
2784 let chunk_l2 = StreamChunk::from_pretty(
2785 " I I
2786 + 6 8
2787 + 3 8",
2788 );
2789 let chunk_r1 = StreamChunk::from_pretty(
2790 " I I
2791 + 2 7
2792 + 4 8
2793 + 6 9",
2794 );
2795 let chunk_r2 = StreamChunk::from_pretty(
2796 " I I
2797 + 3 10
2798 + 6 11",
2799 );
2800 let (mut tx_l, mut tx_r, mut hash_join) =
2801 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2802
2803 tx_l.push_barrier(test_epoch(1), false);
2805 tx_r.push_barrier(test_epoch(1), false);
2806 hash_join.next_unwrap_ready_barrier()?;
2807
2808 tx_l.push_chunk(chunk_l1);
2810 hash_join.next_unwrap_pending();
2811
2812 tx_l.push_barrier(test_epoch(2), false);
2814
2815 tx_l.push_chunk(chunk_l2);
2817
2818 tx_r.push_chunk(chunk_r1);
2820
2821 let chunk = hash_join.next_unwrap_ready_chunk()?;
2823 assert_eq!(
2824 chunk,
2825 StreamChunk::from_pretty(
2826 " I I I I
2827 + 2 5 2 7"
2828 )
2829 );
2830
2831 tx_r.push_barrier(test_epoch(2), false);
2833
2834 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2836 assert!(matches!(
2837 hash_join.next_unwrap_ready_barrier()?,
2838 Barrier {
2839 epoch,
2840 mutation: None,
2841 ..
2842 } if epoch == expected_epoch
2843 ));
2844
2845 let chunk = hash_join.next_unwrap_ready_chunk()?;
2847 assert_eq!(
2848 chunk,
2849 StreamChunk::from_pretty(
2850 " I I I I
2851 + 6 8 6 9"
2852 )
2853 );
2854
2855 tx_r.push_chunk(chunk_r2);
2857 let chunk = hash_join.next_unwrap_ready_chunk()?;
2858 assert_eq!(
2859 chunk,
2860 StreamChunk::from_pretty(
2861 " I I I I
2862 + 3 6 3 10
2863 + 3 8 3 10
2864 + 6 8 6 11"
2865 )
2866 );
2867
2868 Ok(())
2869 }
2870
2871 #[tokio::test]
2872 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2873 let chunk_l1 = StreamChunk::from_pretty(
2874 " I I
2875 + 1 4
2876 + 2 .
2877 + 3 .",
2878 );
2879 let chunk_l2 = StreamChunk::from_pretty(
2880 " I I
2881 + 6 .
2882 + 3 8",
2883 );
2884 let chunk_r1 = StreamChunk::from_pretty(
2885 " I I
2886 + 2 7
2887 + 4 8
2888 + 6 9",
2889 );
2890 let chunk_r2 = StreamChunk::from_pretty(
2891 " I I
2892 + 3 10
2893 + 6 11",
2894 );
2895 let (mut tx_l, mut tx_r, mut hash_join) =
2896 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2897
2898 tx_l.push_barrier(test_epoch(1), false);
2900 tx_r.push_barrier(test_epoch(1), false);
2901 hash_join.next_unwrap_ready_barrier()?;
2902
2903 tx_l.push_chunk(chunk_l1);
2905 hash_join.next_unwrap_pending();
2906
2907 tx_l.push_barrier(test_epoch(2), false);
2909
2910 tx_l.push_chunk(chunk_l2);
2912
2913 tx_r.push_chunk(chunk_r1);
2915
2916 let chunk = hash_join.next_unwrap_ready_chunk()?;
2918 assert_eq!(
2919 chunk,
2920 StreamChunk::from_pretty(
2921 " I I I I
2922 + 2 . 2 7"
2923 )
2924 );
2925
2926 tx_r.push_barrier(test_epoch(2), false);
2928
2929 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2931 assert!(matches!(
2932 hash_join.next_unwrap_ready_barrier()?,
2933 Barrier {
2934 epoch,
2935 mutation: None,
2936 ..
2937 } if epoch == expected_epoch
2938 ));
2939
2940 let chunk = hash_join.next_unwrap_ready_chunk()?;
2942 assert_eq!(
2943 chunk,
2944 StreamChunk::from_pretty(
2945 " I I I I
2946 + 6 . 6 9"
2947 )
2948 );
2949
2950 tx_r.push_chunk(chunk_r2);
2952 let chunk = hash_join.next_unwrap_ready_chunk()?;
2953 assert_eq!(
2954 chunk,
2955 StreamChunk::from_pretty(
2956 " I I I I
2957 + 3 8 3 10
2958 + 3 . 3 10
2959 + 6 . 6 11"
2960 )
2961 );
2962
2963 Ok(())
2964 }
2965
2966 #[tokio::test]
2967 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2968 let chunk_l1 = StreamChunk::from_pretty(
2969 " I I
2970 + 1 4
2971 + 2 5
2972 + 3 6",
2973 );
2974 let chunk_l2 = StreamChunk::from_pretty(
2975 " I I
2976 + 3 8
2977 - 3 8",
2978 );
2979 let chunk_r1 = StreamChunk::from_pretty(
2980 " I I
2981 + 2 7
2982 + 4 8
2983 + 6 9",
2984 );
2985 let chunk_r2 = StreamChunk::from_pretty(
2986 " I I
2987 + 3 10
2988 + 6 11",
2989 );
2990 let (mut tx_l, mut tx_r, mut hash_join) =
2991 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2992
2993 tx_l.push_barrier(test_epoch(1), false);
2995 tx_r.push_barrier(test_epoch(1), false);
2996 hash_join.next_unwrap_ready_barrier()?;
2997
2998 tx_l.push_chunk(chunk_l1);
3000 let chunk = hash_join.next_unwrap_ready_chunk()?;
3001 assert_eq!(
3002 chunk,
3003 StreamChunk::from_pretty(
3004 " I I I I
3005 + 1 4 . .
3006 + 2 5 . .
3007 + 3 6 . ."
3008 )
3009 );
3010
3011 tx_l.push_chunk(chunk_l2);
3013 let chunk = hash_join.next_unwrap_ready_chunk()?;
3014 assert_eq!(
3015 chunk,
3016 StreamChunk::from_pretty(
3017 " I I I I
3018 + 3 8 . . D
3019 - 3 8 . . D"
3020 )
3021 );
3022
3023 tx_r.push_chunk(chunk_r1);
3025 let chunk = hash_join.next_unwrap_ready_chunk()?;
3026 assert_eq!(
3027 chunk,
3028 StreamChunk::from_pretty(
3029 " I I I I
3030 - 2 5 . .
3031 + 2 5 2 7"
3032 )
3033 );
3034
3035 tx_r.push_chunk(chunk_r2);
3037 let chunk = hash_join.next_unwrap_ready_chunk()?;
3038 assert_eq!(
3039 chunk,
3040 StreamChunk::from_pretty(
3041 " I I I I
3042 - 3 6 . .
3043 + 3 6 3 10"
3044 )
3045 );
3046
3047 Ok(())
3048 }
3049
3050 #[tokio::test]
3051 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3052 let chunk_l1 = StreamChunk::from_pretty(
3053 " I I
3054 + 1 4
3055 + 2 5
3056 + . 6",
3057 );
3058 let chunk_l2 = StreamChunk::from_pretty(
3059 " I I
3060 + . 8
3061 - . 8",
3062 );
3063 let chunk_r1 = StreamChunk::from_pretty(
3064 " I I
3065 + 2 7
3066 + 4 8
3067 + 6 9",
3068 );
3069 let chunk_r2 = StreamChunk::from_pretty(
3070 " I I
3071 + . 10
3072 + 6 11",
3073 );
3074 let (mut tx_l, mut tx_r, mut hash_join) =
3075 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3076
3077 tx_l.push_barrier(test_epoch(1), false);
3079 tx_r.push_barrier(test_epoch(1), false);
3080 hash_join.next_unwrap_ready_barrier()?;
3081
3082 tx_l.push_chunk(chunk_l1);
3084 let chunk = hash_join.next_unwrap_ready_chunk()?;
3085 assert_eq!(
3086 chunk,
3087 StreamChunk::from_pretty(
3088 " I I I I
3089 + 1 4 . .
3090 + 2 5 . .
3091 + . 6 . ."
3092 )
3093 );
3094
3095 tx_l.push_chunk(chunk_l2);
3097 let chunk = hash_join.next_unwrap_ready_chunk()?;
3098 assert_eq!(
3099 chunk,
3100 StreamChunk::from_pretty(
3101 " I I I I
3102 + . 8 . . D
3103 - . 8 . . D"
3104 )
3105 );
3106
3107 tx_r.push_chunk(chunk_r1);
3109 let chunk = hash_join.next_unwrap_ready_chunk()?;
3110 assert_eq!(
3111 chunk,
3112 StreamChunk::from_pretty(
3113 " I I I I
3114 - 2 5 . .
3115 + 2 5 2 7"
3116 )
3117 );
3118
3119 tx_r.push_chunk(chunk_r2);
3121 let chunk = hash_join.next_unwrap_ready_chunk()?;
3122 assert_eq!(
3123 chunk,
3124 StreamChunk::from_pretty(
3125 " I I I I
3126 - . 6 . .
3127 + . 6 . 10"
3128 )
3129 );
3130
3131 Ok(())
3132 }
3133
3134 #[tokio::test]
3135 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3136 let chunk_l1 = StreamChunk::from_pretty(
3137 " I I
3138 + 1 4
3139 + 2 5
3140 + 3 6",
3141 );
3142 let chunk_l2 = StreamChunk::from_pretty(
3143 " I I
3144 + 3 8
3145 - 3 8",
3146 );
3147 let chunk_r1 = StreamChunk::from_pretty(
3148 " I I
3149 + 2 7
3150 + 4 8
3151 + 6 9",
3152 );
3153 let chunk_r2 = StreamChunk::from_pretty(
3154 " I I
3155 + 5 10
3156 - 5 10",
3157 );
3158 let (mut tx_l, mut tx_r, mut hash_join) =
3159 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3160
3161 tx_l.push_barrier(test_epoch(1), false);
3163 tx_r.push_barrier(test_epoch(1), false);
3164 hash_join.next_unwrap_ready_barrier()?;
3165
3166 tx_l.push_chunk(chunk_l1);
3168 hash_join.next_unwrap_pending();
3169
3170 tx_l.push_chunk(chunk_l2);
3172 hash_join.next_unwrap_pending();
3173
3174 tx_r.push_chunk(chunk_r1);
3176 let chunk = hash_join.next_unwrap_ready_chunk()?;
3177 assert_eq!(
3178 chunk,
3179 StreamChunk::from_pretty(
3180 " I I I I
3181 + 2 5 2 7
3182 + . . 4 8
3183 + . . 6 9"
3184 )
3185 );
3186
3187 tx_r.push_chunk(chunk_r2);
3189 let chunk = hash_join.next_unwrap_ready_chunk()?;
3190 assert_eq!(
3191 chunk,
3192 StreamChunk::from_pretty(
3193 " I I I I
3194 + . . 5 10 D
3195 - . . 5 10 D"
3196 )
3197 );
3198
3199 Ok(())
3200 }
3201
3202 #[tokio::test]
3203 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3204 let chunk_l1 = StreamChunk::from_pretty(
3205 " I I I
3206 + 1 4 1
3207 + 2 5 2
3208 + 3 6 3",
3209 );
3210 let chunk_l2 = StreamChunk::from_pretty(
3211 " I I I
3212 + 4 9 4
3213 + 5 10 5",
3214 );
3215 let chunk_r1 = StreamChunk::from_pretty(
3216 " I I I
3217 + 2 5 1
3218 + 4 9 2
3219 + 6 9 3",
3220 );
3221 let chunk_r2 = StreamChunk::from_pretty(
3222 " I I I
3223 + 1 4 4
3224 + 3 6 5",
3225 );
3226
3227 let (mut tx_l, mut tx_r, mut hash_join) =
3228 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3229
3230 tx_l.push_barrier(test_epoch(1), false);
3232 tx_r.push_barrier(test_epoch(1), false);
3233 hash_join.next_unwrap_ready_barrier()?;
3234
3235 tx_l.push_chunk(chunk_l1);
3237 let chunk = hash_join.next_unwrap_ready_chunk()?;
3238 assert_eq!(
3239 chunk,
3240 StreamChunk::from_pretty(
3241 " I I I I I I
3242 + 1 4 1 . . .
3243 + 2 5 2 . . .
3244 + 3 6 3 . . ."
3245 )
3246 );
3247
3248 tx_l.push_chunk(chunk_l2);
3250 let chunk = hash_join.next_unwrap_ready_chunk()?;
3251 assert_eq!(
3252 chunk,
3253 StreamChunk::from_pretty(
3254 " I I I I I I
3255 + 4 9 4 . . .
3256 + 5 10 5 . . ."
3257 )
3258 );
3259
3260 tx_r.push_chunk(chunk_r1);
3262 let chunk = hash_join.next_unwrap_ready_chunk()?;
3263 assert_eq!(
3264 chunk,
3265 StreamChunk::from_pretty(
3266 " I I I I I I
3267 - 2 5 2 . . .
3268 + 2 5 2 2 5 1
3269 - 4 9 4 . . .
3270 + 4 9 4 4 9 2"
3271 )
3272 );
3273
3274 tx_r.push_chunk(chunk_r2);
3276 let chunk = hash_join.next_unwrap_ready_chunk()?;
3277 assert_eq!(
3278 chunk,
3279 StreamChunk::from_pretty(
3280 " I I I I I I
3281 - 1 4 1 . . .
3282 + 1 4 1 1 4 4
3283 - 3 6 3 . . .
3284 + 3 6 3 3 6 5"
3285 )
3286 );
3287
3288 Ok(())
3289 }
3290
3291 #[tokio::test]
3292 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3293 let chunk_l1 = StreamChunk::from_pretty(
3294 " I I I
3295 + 1 4 1
3296 + 2 5 2
3297 + 3 6 3",
3298 );
3299 let chunk_l2 = StreamChunk::from_pretty(
3300 " I I I
3301 + 4 9 4
3302 + 5 10 5",
3303 );
3304 let chunk_r1 = StreamChunk::from_pretty(
3305 " I I I
3306 + 2 5 1
3307 + 4 9 2
3308 + 6 9 3",
3309 );
3310 let chunk_r2 = StreamChunk::from_pretty(
3311 " I I I
3312 + 1 4 4
3313 + 3 6 5
3314 + 7 7 6",
3315 );
3316
3317 let (mut tx_l, mut tx_r, mut hash_join) =
3318 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3319
3320 tx_l.push_barrier(test_epoch(1), false);
3322 tx_r.push_barrier(test_epoch(1), false);
3323 hash_join.next_unwrap_ready_barrier()?;
3324
3325 tx_l.push_chunk(chunk_l1);
3327 hash_join.next_unwrap_pending();
3328
3329 tx_l.push_chunk(chunk_l2);
3331 hash_join.next_unwrap_pending();
3332
3333 tx_r.push_chunk(chunk_r1);
3335 let chunk = hash_join.next_unwrap_ready_chunk()?;
3336 assert_eq!(
3337 chunk,
3338 StreamChunk::from_pretty(
3339 " I I I I I I
3340 + 2 5 2 2 5 1
3341 + 4 9 4 4 9 2
3342 + . . . 6 9 3"
3343 )
3344 );
3345
3346 tx_r.push_chunk(chunk_r2);
3348 let chunk = hash_join.next_unwrap_ready_chunk()?;
3349 assert_eq!(
3350 chunk,
3351 StreamChunk::from_pretty(
3352 " I I I I I I
3353 + 1 4 1 1 4 4
3354 + 3 6 3 3 6 5
3355 + . . . 7 7 6"
3356 )
3357 );
3358
3359 Ok(())
3360 }
3361
3362 #[tokio::test]
3363 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3364 let chunk_l1 = StreamChunk::from_pretty(
3365 " I I
3366 + 1 4
3367 + 2 5
3368 + 3 6",
3369 );
3370 let chunk_l2 = StreamChunk::from_pretty(
3371 " I I
3372 + 3 8
3373 - 3 8",
3374 );
3375 let chunk_r1 = StreamChunk::from_pretty(
3376 " I I
3377 + 2 7
3378 + 4 8
3379 + 6 9",
3380 );
3381 let chunk_r2 = StreamChunk::from_pretty(
3382 " I I
3383 + 5 10
3384 - 5 10",
3385 );
3386 let (mut tx_l, mut tx_r, mut hash_join) =
3387 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3388
3389 tx_l.push_barrier(test_epoch(1), false);
3391 tx_r.push_barrier(test_epoch(1), false);
3392 hash_join.next_unwrap_ready_barrier()?;
3393
3394 tx_l.push_chunk(chunk_l1);
3396 let chunk = hash_join.next_unwrap_ready_chunk()?;
3397 assert_eq!(
3398 chunk,
3399 StreamChunk::from_pretty(
3400 " I I I I
3401 + 1 4 . .
3402 + 2 5 . .
3403 + 3 6 . ."
3404 )
3405 );
3406
3407 tx_l.push_chunk(chunk_l2);
3409 let chunk = hash_join.next_unwrap_ready_chunk()?;
3410 assert_eq!(
3411 chunk,
3412 StreamChunk::from_pretty(
3413 " I I I I
3414 + 3 8 . . D
3415 - 3 8 . . D"
3416 )
3417 );
3418
3419 tx_r.push_chunk(chunk_r1);
3421 let chunk = hash_join.next_unwrap_ready_chunk()?;
3422 assert_eq!(
3423 chunk,
3424 StreamChunk::from_pretty(
3425 " I I I I
3426 - 2 5 . .
3427 + 2 5 2 7
3428 + . . 4 8
3429 + . . 6 9"
3430 )
3431 );
3432
3433 tx_r.push_chunk(chunk_r2);
3435 let chunk = hash_join.next_unwrap_ready_chunk()?;
3436 assert_eq!(
3437 chunk,
3438 StreamChunk::from_pretty(
3439 " I I I I
3440 + . . 5 10 D
3441 - . . 5 10 D"
3442 )
3443 );
3444
3445 Ok(())
3446 }
3447
3448 #[tokio::test]
3449 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3450 let (mut tx_l, mut tx_r, mut hash_join) =
3451 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3452
3453 tx_l.push_barrier(test_epoch(1), false);
3455 tx_r.push_barrier(test_epoch(1), false);
3456 hash_join.next_unwrap_ready_barrier()?;
3457
3458 tx_l.push_chunk(StreamChunk::from_pretty(
3459 " I I
3460 + 1 1
3461 ",
3462 ));
3463 let chunk = hash_join.next_unwrap_ready_chunk()?;
3464 assert_eq!(
3465 chunk,
3466 StreamChunk::from_pretty(
3467 " I I I I
3468 + 1 1 . ."
3469 )
3470 );
3471
3472 tx_r.push_chunk(StreamChunk::from_pretty(
3473 " I I
3474 + 1 1
3475 ",
3476 ));
3477 let chunk = hash_join.next_unwrap_ready_chunk()?;
3478
3479 assert_eq!(
3480 chunk,
3481 StreamChunk::from_pretty(
3482 " I I I I
3483 - 1 1 . .
3484 + 1 1 1 1"
3485 )
3486 );
3487
3488 tx_l.push_chunk(StreamChunk::from_pretty(
3489 " I I
3490 - 1 1
3491 + 1 2
3492 ",
3493 ));
3494 let chunk = hash_join.next_unwrap_ready_chunk()?;
3495 let chunk = chunk.compact_vis();
3496 assert_eq!(
3497 chunk,
3498 StreamChunk::from_pretty(
3499 " I I I I
3500 - 1 1 1 1
3501 + 1 2 1 1
3502 "
3503 )
3504 );
3505
3506 Ok(())
3507 }
3508
3509 #[tokio::test]
3510 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3511 {
3512 let chunk_l1 = StreamChunk::from_pretty(
3513 " I I
3514 + 1 4
3515 + 2 5
3516 + 3 6
3517 + 3 7",
3518 );
3519 let chunk_l2 = StreamChunk::from_pretty(
3520 " I I
3521 + 3 8
3522 - 3 8
3523 - 1 4", );
3525 let chunk_r1 = StreamChunk::from_pretty(
3526 " I I
3527 + 2 6
3528 + 4 8
3529 + 3 4",
3530 );
3531 let chunk_r2 = StreamChunk::from_pretty(
3532 " I I
3533 + 5 10
3534 - 5 10
3535 + 1 2",
3536 );
3537 let (mut tx_l, mut tx_r, mut hash_join) =
3538 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3539
3540 tx_l.push_barrier(test_epoch(1), false);
3542 tx_r.push_barrier(test_epoch(1), false);
3543 hash_join.next_unwrap_ready_barrier()?;
3544
3545 tx_l.push_chunk(chunk_l1);
3547 let chunk = hash_join.next_unwrap_ready_chunk()?;
3548 assert_eq!(
3549 chunk,
3550 StreamChunk::from_pretty(
3551 " I I I I
3552 + 1 4 . .
3553 + 2 5 . .
3554 + 3 6 . .
3555 + 3 7 . ."
3556 )
3557 );
3558
3559 tx_l.push_chunk(chunk_l2);
3561 let chunk = hash_join.next_unwrap_ready_chunk()?;
3562 assert_eq!(
3563 chunk,
3564 StreamChunk::from_pretty(
3565 " I I I I
3566 + 3 8 . . D
3567 - 3 8 . . D
3568 - 1 4 . ."
3569 )
3570 );
3571
3572 tx_r.push_chunk(chunk_r1);
3574 let chunk = hash_join.next_unwrap_ready_chunk()?;
3575 assert_eq!(
3576 chunk,
3577 StreamChunk::from_pretty(
3578 " I I I I
3579 - 2 5 . .
3580 + 2 5 2 6
3581 + . . 4 8
3582 + . . 3 4" )
3586 );
3587
3588 tx_r.push_chunk(chunk_r2);
3590 let chunk = hash_join.next_unwrap_ready_chunk()?;
3591 assert_eq!(
3592 chunk,
3593 StreamChunk::from_pretty(
3594 " I I I I
3595 + . . 5 10 D
3596 - . . 5 10 D
3597 + . . 1 2" )
3600 );
3601
3602 Ok(())
3603 }
3604
3605 #[tokio::test]
3606 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3607 let chunk_l1 = StreamChunk::from_pretty(
3608 " I I
3609 + 1 4
3610 + 2 10
3611 + 3 6",
3612 );
3613 let chunk_l2 = StreamChunk::from_pretty(
3614 " I I
3615 + 3 8
3616 - 3 8",
3617 );
3618 let chunk_r1 = StreamChunk::from_pretty(
3619 " I I
3620 + 2 7
3621 + 4 8
3622 + 6 9",
3623 );
3624 let chunk_r2 = StreamChunk::from_pretty(
3625 " I I
3626 + 3 10
3627 + 6 11",
3628 );
3629 let (mut tx_l, mut tx_r, mut hash_join) =
3630 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3631
3632 tx_l.push_barrier(test_epoch(1), false);
3634 tx_r.push_barrier(test_epoch(1), false);
3635 hash_join.next_unwrap_ready_barrier()?;
3636
3637 tx_l.push_chunk(chunk_l1);
3639 hash_join.next_unwrap_pending();
3640
3641 tx_l.push_chunk(chunk_l2);
3643 hash_join.next_unwrap_pending();
3644
3645 tx_r.push_chunk(chunk_r1);
3647 hash_join.next_unwrap_pending();
3648
3649 tx_r.push_chunk(chunk_r2);
3651 let chunk = hash_join.next_unwrap_ready_chunk()?;
3652 assert_eq!(
3653 chunk,
3654 StreamChunk::from_pretty(
3655 " I I I I
3656 + 3 6 3 10"
3657 )
3658 );
3659
3660 Ok(())
3661 }
3662
3663 #[tokio::test]
3664 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3665 let (mut tx_l, mut tx_r, mut hash_join) =
3666 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3667
3668 tx_l.push_barrier(test_epoch(1), false);
3670 tx_r.push_barrier(test_epoch(1), false);
3671 hash_join.next_unwrap_ready_barrier()?;
3672
3673 tx_l.push_int64_watermark(0, 100);
3674
3675 tx_l.push_int64_watermark(0, 200);
3676
3677 tx_l.push_barrier(test_epoch(2), false);
3678 tx_r.push_barrier(test_epoch(2), false);
3679 hash_join.next_unwrap_ready_barrier()?;
3680
3681 tx_r.push_int64_watermark(0, 50);
3682
3683 let w1 = hash_join.next().await.unwrap().unwrap();
3684 let w1 = w1.as_watermark().unwrap();
3685
3686 let w2 = hash_join.next().await.unwrap().unwrap();
3687 let w2 = w2.as_watermark().unwrap();
3688
3689 tx_r.push_int64_watermark(0, 100);
3690
3691 let w3 = hash_join.next().await.unwrap().unwrap();
3692 let w3 = w3.as_watermark().unwrap();
3693
3694 let w4 = hash_join.next().await.unwrap().unwrap();
3695 let w4 = w4.as_watermark().unwrap();
3696
3697 assert_eq!(
3698 w1,
3699 &Watermark {
3700 col_idx: 2,
3701 data_type: DataType::Int64,
3702 val: ScalarImpl::Int64(50)
3703 }
3704 );
3705
3706 assert_eq!(
3707 w2,
3708 &Watermark {
3709 col_idx: 0,
3710 data_type: DataType::Int64,
3711 val: ScalarImpl::Int64(50)
3712 }
3713 );
3714
3715 assert_eq!(
3716 w3,
3717 &Watermark {
3718 col_idx: 2,
3719 data_type: DataType::Int64,
3720 val: ScalarImpl::Int64(100)
3721 }
3722 );
3723
3724 assert_eq!(
3725 w4,
3726 &Watermark {
3727 col_idx: 0,
3728 data_type: DataType::Int64,
3729 val: ScalarImpl::Int64(100)
3730 }
3731 );
3732
3733 Ok(())
3734 }
3735
3736 async fn create_executor_with_evict_interval<const T: JoinTypePrimitive>(
3737 evict_interval: u32,
3738 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
3739 let schema = Schema {
3740 fields: vec![
3741 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
3743 ],
3744 };
3745 let (tx_l, source_l) = MockSource::channel();
3746 let source_l = source_l.into_executor(schema.clone(), vec![1]);
3747 let (tx_r, source_r) = MockSource::channel();
3748 let source_r = source_r.into_executor(schema, vec![1]);
3749 let params_l = JoinParams::new(vec![0], vec![1]);
3750 let params_r = JoinParams::new(vec![0], vec![1]);
3751
3752 let mem_state = MemoryStateStore::new();
3753
3754 let (state_l, degree_state_l) = create_in_memory_state_table(
3755 mem_state.clone(),
3756 &[DataType::Int64, DataType::Int64],
3757 &[OrderType::ascending(), OrderType::ascending()],
3758 &[0, 1],
3759 0,
3760 )
3761 .await;
3762
3763 let (state_r, degree_state_r) = create_in_memory_state_table(
3764 mem_state,
3765 &[DataType::Int64, DataType::Int64],
3766 &[OrderType::ascending(), OrderType::ascending()],
3767 &[0, 1],
3768 2,
3769 )
3770 .await;
3771
3772 let schema = match T {
3773 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
3774 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
3775 _ => [source_l.schema().fields(), source_r.schema().fields()]
3776 .concat()
3777 .into_iter()
3778 .collect(),
3779 };
3780 let schema_len = schema.len();
3781 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
3782
3783 let mut streaming_config = StreamingConfig::default();
3784 streaming_config.developer.join_hash_map_evict_interval_rows = evict_interval;
3785
3786 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
3787 ActorContext::for_test_with_config(123, streaming_config),
3788 info,
3789 source_l,
3790 source_r,
3791 params_l,
3792 params_r,
3793 vec![false],
3794 (0..schema_len).collect_vec(),
3795 None,
3796 vec![],
3797 state_l,
3798 degree_state_l,
3799 state_r,
3800 degree_state_r,
3801 Arc::new(AtomicU64::new(0)),
3802 false,
3803 Arc::new(StreamingMetrics::unused()),
3804 1024,
3805 2048,
3806 vec![(0, true)],
3807 );
3808 (tx_l, tx_r, executor.boxed().execute())
3809 }
3810
3811 #[tokio::test]
3814 async fn test_hash_join_evict_interval_disabled() -> StreamExecutorResult<()> {
3815 let chunk_l = StreamChunk::from_pretty(
3816 " I I
3817 + 1 4
3818 + 2 5
3819 + 3 6",
3820 );
3821 let chunk_r = StreamChunk::from_pretty(
3822 " I I
3823 + 2 7
3824 + 3 8",
3825 );
3826
3827 let (mut tx_l, mut tx_r, mut hash_join) =
3829 create_executor_with_evict_interval::<{ JoinType::Inner }>(0).await;
3830
3831 tx_l.push_barrier(test_epoch(1), false);
3832 tx_r.push_barrier(test_epoch(1), false);
3833 hash_join.next_unwrap_ready_barrier()?;
3834
3835 tx_l.push_chunk(chunk_l);
3836 hash_join.next_unwrap_pending();
3837
3838 tx_r.push_chunk(chunk_r);
3839 let chunk = hash_join.next_unwrap_ready_chunk()?;
3840 assert_eq!(
3841 chunk,
3842 StreamChunk::from_pretty(
3843 " I I I I
3844 + 2 5 2 7
3845 + 3 6 3 8"
3846 )
3847 );
3848
3849 Ok(())
3850 }
3851
3852 #[tokio::test]
3855 async fn test_hash_join_evict_interval_one() -> StreamExecutorResult<()> {
3856 let chunk_l = StreamChunk::from_pretty(
3857 " I I
3858 + 1 4
3859 + 2 5
3860 + 3 6",
3861 );
3862 let chunk_r = StreamChunk::from_pretty(
3863 " I I
3864 + 2 7
3865 + 3 8",
3866 );
3867
3868 let (mut tx_l, mut tx_r, mut hash_join) =
3870 create_executor_with_evict_interval::<{ JoinType::Inner }>(1).await;
3871
3872 tx_l.push_barrier(test_epoch(1), false);
3873 tx_r.push_barrier(test_epoch(1), false);
3874 hash_join.next_unwrap_ready_barrier()?;
3875
3876 tx_l.push_chunk(chunk_l);
3877 hash_join.next_unwrap_pending();
3878
3879 tx_r.push_chunk(chunk_r);
3880 let chunk = hash_join.next_unwrap_ready_chunk()?;
3881 assert_eq!(
3882 chunk,
3883 StreamChunk::from_pretty(
3884 " I I I I
3885 + 2 5 2 7
3886 + 3 6 3 8"
3887 )
3888 );
3889
3890 Ok(())
3891 }
3892
3893 #[tokio::test]
3896 async fn test_hash_join_evict_interval_custom() -> StreamExecutorResult<()> {
3897 let chunk_l = StreamChunk::from_pretty(
3898 " I I
3899 + 1 4
3900 + 2 5
3901 + 3 6
3902 + 4 7
3903 + 5 8",
3904 );
3905 let chunk_r = StreamChunk::from_pretty(
3906 " I I
3907 + 1 9
3908 + 3 10
3909 + 5 11",
3910 );
3911
3912 let (mut tx_l, mut tx_r, mut hash_join) =
3914 create_executor_with_evict_interval::<{ JoinType::Inner }>(2).await;
3915
3916 tx_l.push_barrier(test_epoch(1), false);
3917 tx_r.push_barrier(test_epoch(1), false);
3918 hash_join.next_unwrap_ready_barrier()?;
3919
3920 tx_l.push_chunk(chunk_l);
3921 hash_join.next_unwrap_pending();
3922
3923 tx_r.push_chunk(chunk_r);
3924 let chunk = hash_join.next_unwrap_ready_chunk()?;
3925 assert_eq!(
3926 chunk,
3927 StreamChunk::from_pretty(
3928 " I I I I
3929 + 1 4 1 9
3930 + 3 6 3 10
3931 + 5 8 5 11"
3932 )
3933 );
3934
3935 Ok(())
3936 }
3937}