1use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
46 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
47}
48
49#[derive(Debug, Clone)]
52pub struct InequalityPairInfo {
53 pub left_idx: usize,
55 pub right_idx: usize,
57 pub clean_left_state: bool,
59 pub clean_right_state: bool,
61 pub op: InequalityType,
63}
64
65impl InequalityPairInfo {
66 pub fn left_side_is_larger(&self) -> bool {
68 matches!(
69 self.op,
70 InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
71 )
72 }
73}
74
75pub struct JoinParams {
76 pub join_key_indices: Vec<usize>,
78 pub deduped_pk_indices: Vec<usize>,
80}
81
82impl JoinParams {
83 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
84 Self {
85 join_key_indices,
86 deduped_pk_indices,
87 }
88 }
89}
90
91struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
92 ht: JoinHashMap<K, S, E>,
94 join_key_indices: Vec<usize>,
96 all_data_types: Vec<DataType>,
98 start_pos: usize,
100 i2o_mapping: Vec<(usize, usize)>,
102 i2o_mapping_indexed: MultiMap<usize, usize>,
103 input2inequality_index: Vec<Vec<(usize, bool)>>,
111 non_null_fields: Vec<usize>,
113 state_clean_columns: Vec<(usize, usize)>,
116 need_degree_table: bool,
118 _marker: std::marker::PhantomData<E>,
119}
120
121impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.debug_struct("JoinSide")
124 .field("join_key_indices", &self.join_key_indices)
125 .field("col_types", &self.all_data_types)
126 .field("start_pos", &self.start_pos)
127 .field("i2o_mapping", &self.i2o_mapping)
128 .field("need_degree_table", &self.need_degree_table)
129 .finish()
130 }
131}
132
133impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
134 fn is_dirty(&self) -> bool {
136 unimplemented!()
137 }
138
139 #[expect(dead_code)]
140 fn clear_cache(&mut self) {
141 assert!(
142 !self.is_dirty(),
143 "cannot clear cache while states of hash join are dirty"
144 );
145
146 }
149
150 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
151 self.ht.init(epoch).await
152 }
153}
154
155pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
158{
159 ctx: ActorContextRef,
160 info: ExecutorInfo,
161
162 input_l: Option<Executor>,
164 input_r: Option<Executor>,
166 actual_output_data_types: Vec<DataType>,
168 side_l: JoinSide<K, S, E>,
170 side_r: JoinSide<K, S, E>,
172 cond: Option<NonStrictExpression>,
174 inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
177 inequality_watermarks: Vec<Option<Watermark>>,
181 watermark_indices_in_jk: Vec<(usize, bool)>,
184
185 append_only_optimize: bool,
187
188 metrics: Arc<StreamingMetrics>,
189 chunk_size: usize,
191 cnt_rows_received: u32,
193
194 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
196
197 high_join_amplification_threshold: usize,
199
200 entry_state_max_rows: usize,
202 join_cache_evict_interval_rows: u32,
204}
205
206impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
207 for HashJoinExecutor<K, S, T, E>
208{
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("HashJoinExecutor")
211 .field("join_type", &T)
212 .field("input_left", &self.input_l.as_ref().unwrap().identity())
213 .field("input_right", &self.input_r.as_ref().unwrap().identity())
214 .field("side_l", &self.side_l)
215 .field("side_r", &self.side_r)
216 .field("stream_key", &self.info.stream_key)
217 .field("schema", &self.info.schema)
218 .field("actual_output_data_types", &self.actual_output_data_types)
219 .field(
220 "join_cache_evict_interval_rows",
221 &self.join_cache_evict_interval_rows,
222 )
223 .finish()
224 }
225}
226
227impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
228 for HashJoinExecutor<K, S, T, E>
229{
230 fn execute(self: Box<Self>) -> BoxedMessageStream {
231 self.into_stream().boxed()
232 }
233}
234
235struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
236 ctx: &'a ActorContextRef,
237 side_l: &'a mut JoinSide<K, S, E>,
238 side_r: &'a mut JoinSide<K, S, E>,
239 actual_output_data_types: &'a [DataType],
240 cond: &'a mut Option<NonStrictExpression>,
241 inequality_watermarks: &'a [Option<Watermark>],
242 chunk: StreamChunk,
243 append_only_optimize: bool,
244 chunk_size: usize,
245 cnt_rows_received: &'a mut u32,
246 high_join_amplification_threshold: usize,
247 entry_state_max_rows: usize,
248 join_cache_evict_interval_rows: u32,
249}
250
251impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
252 HashJoinExecutor<K, S, T, E>
253{
254 #[allow(clippy::too_many_arguments)]
255 pub fn new(
256 ctx: ActorContextRef,
257 info: ExecutorInfo,
258 input_l: Executor,
259 input_r: Executor,
260 params_l: JoinParams,
261 params_r: JoinParams,
262 null_safe: Vec<bool>,
263 output_indices: Vec<usize>,
264 cond: Option<NonStrictExpression>,
265 inequality_pairs: Vec<InequalityPairInfo>,
266 state_table_l: StateTable<S>,
267 degree_state_table_l: StateTable<S>,
268 state_table_r: StateTable<S>,
269 degree_state_table_r: StateTable<S>,
270 watermark_epoch: AtomicU64Ref,
271 is_append_only: bool,
272 metrics: Arc<StreamingMetrics>,
273 chunk_size: usize,
274 high_join_amplification_threshold: usize,
275 watermark_indices_in_jk: Vec<(usize, bool)>,
276 ) -> Self {
277 Self::new_with_cache_size(
278 ctx,
279 info,
280 input_l,
281 input_r,
282 params_l,
283 params_r,
284 null_safe,
285 output_indices,
286 cond,
287 inequality_pairs,
288 state_table_l,
289 degree_state_table_l,
290 state_table_r,
291 degree_state_table_r,
292 watermark_epoch,
293 is_append_only,
294 metrics,
295 chunk_size,
296 high_join_amplification_threshold,
297 None,
298 watermark_indices_in_jk,
299 )
300 }
301
302 #[allow(clippy::too_many_arguments)]
303 pub fn new_with_cache_size(
304 ctx: ActorContextRef,
305 info: ExecutorInfo,
306 input_l: Executor,
307 input_r: Executor,
308 params_l: JoinParams,
309 params_r: JoinParams,
310 null_safe: Vec<bool>,
311 output_indices: Vec<usize>,
312 cond: Option<NonStrictExpression>,
313 inequality_pairs: Vec<InequalityPairInfo>,
314 state_table_l: StateTable<S>,
315 degree_state_table_l: StateTable<S>,
316 state_table_r: StateTable<S>,
317 degree_state_table_r: StateTable<S>,
318 watermark_epoch: AtomicU64Ref,
319 is_append_only: bool,
320 metrics: Arc<StreamingMetrics>,
321 chunk_size: usize,
322 high_join_amplification_threshold: usize,
323 entry_state_max_rows: Option<usize>,
324 watermark_indices_in_jk: Vec<(usize, bool)>,
325 ) -> Self {
326 let entry_state_max_rows = match entry_state_max_rows {
327 None => ctx.config.developer.hash_join_entry_state_max_rows,
328 Some(entry_state_max_rows) => entry_state_max_rows,
329 };
330 let join_cache_evict_interval_rows = ctx
331 .config
332 .developer
333 .join_hash_map_evict_interval_rows
334 .max(1);
335 let side_l_column_n = input_l.schema().len();
336
337 let schema_fields = match T {
338 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
339 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
340 _ => [
341 input_l.schema().fields.clone(),
342 input_r.schema().fields.clone(),
343 ]
344 .concat(),
345 };
346
347 let original_output_data_types = schema_fields
348 .iter()
349 .map(|field| field.data_type())
350 .collect_vec();
351 let actual_output_data_types = output_indices
352 .iter()
353 .map(|&idx| original_output_data_types[idx].clone())
354 .collect_vec();
355
356 let state_all_data_types_l = input_l.schema().data_types();
358 let state_all_data_types_r = input_r.schema().data_types();
359
360 let state_pk_indices_l = input_l.stream_key().to_vec();
361 let state_pk_indices_r = input_r.stream_key().to_vec();
362
363 let state_join_key_indices_l = params_l.join_key_indices;
364 let state_join_key_indices_r = params_r.join_key_indices;
365
366 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
367 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
368
369 let degree_pk_indices_l = (state_join_key_indices_l.len()
370 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
371 .collect_vec();
372 let degree_pk_indices_r = (state_join_key_indices_r.len()
373 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
374 .collect_vec();
375
376 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
378 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
379
380 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
382
383 let join_key_data_types_l = state_join_key_indices_l
384 .iter()
385 .map(|idx| state_all_data_types_l[*idx].clone())
386 .collect_vec();
387
388 let join_key_data_types_r = state_join_key_indices_r
389 .iter()
390 .map(|idx| state_all_data_types_r[*idx].clone())
391 .collect_vec();
392
393 assert_eq!(join_key_data_types_l, join_key_data_types_r);
394
395 let null_matched = K::Bitmap::from_bool_vec(null_safe);
396
397 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
398 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
399
400 let (left_to_output, right_to_output) = {
401 let (left_len, right_len) = if is_left_semi_or_anti(T) {
402 (state_all_data_types_l.len(), 0usize)
403 } else if is_right_semi_or_anti(T) {
404 (0usize, state_all_data_types_r.len())
405 } else {
406 (state_all_data_types_l.len(), state_all_data_types_r.len())
407 };
408 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
409 };
410
411 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
412 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
413
414 let left_input_len = input_l.schema().len();
415 let right_input_len = input_r.schema().len();
416 let mut l2inequality_index = vec![vec![]; left_input_len];
417 let mut r2inequality_index = vec![vec![]; right_input_len];
418 let mut l_inequal_state_clean_columns = vec![];
419 let mut r_inequal_state_clean_columns = vec![];
420 let inequality_pairs = inequality_pairs
421 .into_iter()
422 .enumerate()
423 .map(|(index, pair)| {
424 let left_is_larger = pair.left_side_is_larger();
430 l2inequality_index[pair.left_idx].push((index, !left_is_larger));
431 r2inequality_index[pair.right_idx].push((index, left_is_larger));
432
433 if pair.clean_left_state {
435 l_inequal_state_clean_columns.push((pair.left_idx, index));
436 }
437 if pair.clean_right_state {
438 r_inequal_state_clean_columns.push((pair.right_idx, index));
439 }
440
441 let output_indices = if pair.left_side_is_larger() {
444 l2o_indexed
446 .get_vec(&pair.left_idx)
447 .cloned()
448 .unwrap_or_default()
449 } else {
450 r2o_indexed
452 .get_vec(&pair.right_idx)
453 .cloned()
454 .unwrap_or_default()
455 };
456
457 (output_indices, pair)
458 })
459 .collect_vec();
460
461 let mut l_non_null_fields = l2inequality_index
462 .iter()
463 .positions(|inequalities| !inequalities.is_empty())
464 .collect_vec();
465 let mut r_non_null_fields = r2inequality_index
466 .iter()
467 .positions(|inequalities| !inequalities.is_empty())
468 .collect_vec();
469
470 if append_only_optimize {
471 l_inequal_state_clean_columns.clear();
472 r_inequal_state_clean_columns.clear();
473 l_non_null_fields.clear();
474 r_non_null_fields.clear();
475 }
476
477 let l_inequality_idx = l_inequal_state_clean_columns
481 .first()
482 .map(|(col_idx, _)| *col_idx);
483 let r_inequality_idx = r_inequal_state_clean_columns
484 .first()
485 .map(|(col_idx, _)| *col_idx);
486
487 let degree_state_l = need_degree_table_l.then(|| {
488 TableInner::new(
489 degree_pk_indices_l,
490 degree_join_key_indices_l,
491 degree_state_table_l,
492 l_inequality_idx,
493 )
494 });
495 let degree_state_r = need_degree_table_r.then(|| {
496 TableInner::new(
497 degree_pk_indices_r,
498 degree_join_key_indices_r,
499 degree_state_table_r,
500 r_inequality_idx,
501 )
502 });
503
504 let inequality_watermarks = vec![None; inequality_pairs.len()];
505 let watermark_buffers = BTreeMap::new();
506 Self {
507 ctx: ctx.clone(),
508 info,
509 input_l: Some(input_l),
510 input_r: Some(input_r),
511 actual_output_data_types,
512 side_l: JoinSide {
513 ht: JoinHashMap::new(
514 watermark_epoch.clone(),
515 join_key_data_types_l,
516 state_join_key_indices_l.clone(),
517 state_all_data_types_l.clone(),
518 state_table_l,
519 params_l.deduped_pk_indices,
520 degree_state_l,
521 null_matched.clone(),
522 pk_contained_in_jk_l,
523 None,
524 metrics.clone(),
525 ctx.id,
526 ctx.fragment_id,
527 "left",
528 ),
529 join_key_indices: state_join_key_indices_l,
530 all_data_types: state_all_data_types_l,
531 i2o_mapping: left_to_output,
532 i2o_mapping_indexed: l2o_indexed,
533 input2inequality_index: l2inequality_index,
534 non_null_fields: l_non_null_fields,
535 state_clean_columns: l_inequal_state_clean_columns,
536 start_pos: 0,
537 need_degree_table: need_degree_table_l,
538 _marker: PhantomData,
539 },
540 side_r: JoinSide {
541 ht: JoinHashMap::new(
542 watermark_epoch,
543 join_key_data_types_r,
544 state_join_key_indices_r.clone(),
545 state_all_data_types_r.clone(),
546 state_table_r,
547 params_r.deduped_pk_indices,
548 degree_state_r,
549 null_matched,
550 pk_contained_in_jk_r,
551 None,
552 metrics.clone(),
553 ctx.id,
554 ctx.fragment_id,
555 "right",
556 ),
557 join_key_indices: state_join_key_indices_r,
558 all_data_types: state_all_data_types_r,
559 start_pos: side_l_column_n,
560 i2o_mapping: right_to_output,
561 i2o_mapping_indexed: r2o_indexed,
562 input2inequality_index: r2inequality_index,
563 non_null_fields: r_non_null_fields,
564 state_clean_columns: r_inequal_state_clean_columns,
565 need_degree_table: need_degree_table_r,
566 _marker: PhantomData,
567 },
568 cond,
569 inequality_pairs,
570 inequality_watermarks,
571 watermark_indices_in_jk,
572 append_only_optimize,
573 metrics,
574 chunk_size,
575 cnt_rows_received: 0,
576 watermark_buffers,
577 high_join_amplification_threshold,
578 entry_state_max_rows,
579 join_cache_evict_interval_rows,
580 }
581 }
582
583 #[try_stream(ok = Message, error = StreamExecutorError)]
584 async fn into_stream(mut self) {
585 let input_l = self.input_l.take().unwrap();
586 let input_r = self.input_r.take().unwrap();
587 let aligned_stream = barrier_align(
588 input_l.execute(),
589 input_r.execute(),
590 self.ctx.id,
591 self.ctx.fragment_id,
592 self.metrics.clone(),
593 "Join",
594 );
595 pin_mut!(aligned_stream);
596
597 let actor_id = self.ctx.id;
598
599 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
600 let first_epoch = barrier.epoch;
601 yield Message::Barrier(barrier);
603 self.side_l.init(first_epoch).await?;
604 self.side_r.init(first_epoch).await?;
605
606 let actor_id_str = self.ctx.id.to_string();
607 let fragment_id_str = self.ctx.fragment_id.to_string();
608
609 let join_actor_input_waiting_duration_ns = self
611 .metrics
612 .join_actor_input_waiting_duration_ns
613 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
614 let left_join_match_duration_ns = self
615 .metrics
616 .join_match_duration_ns
617 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
618 let right_join_match_duration_ns = self
619 .metrics
620 .join_match_duration_ns
621 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
622
623 let barrier_join_match_duration_ns = self
624 .metrics
625 .join_match_duration_ns
626 .with_guarded_label_values(&[
627 actor_id_str.as_str(),
628 fragment_id_str.as_str(),
629 "barrier",
630 ]);
631
632 let left_join_cached_entry_count = self
633 .metrics
634 .join_cached_entry_count
635 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
636
637 let right_join_cached_entry_count = self
638 .metrics
639 .join_cached_entry_count
640 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
641
642 let mut start_time = Instant::now();
643
644 while let Some(msg) = aligned_stream
645 .next()
646 .instrument_await("hash_join_barrier_align")
647 .await
648 {
649 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
650 match msg? {
651 AlignedMessage::WatermarkLeft(watermark) => {
652 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
653 yield Message::Watermark(watermark_to_emit);
654 }
655 }
656 AlignedMessage::WatermarkRight(watermark) => {
657 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
658 yield Message::Watermark(watermark_to_emit);
659 }
660 }
661 AlignedMessage::Left(chunk) => {
662 let mut left_time = Duration::from_nanos(0);
663 let mut left_start_time = Instant::now();
664 #[for_await]
665 for chunk in Self::eq_join_left(EqJoinArgs {
666 ctx: &self.ctx,
667 side_l: &mut self.side_l,
668 side_r: &mut self.side_r,
669 actual_output_data_types: &self.actual_output_data_types,
670 cond: &mut self.cond,
671 inequality_watermarks: &self.inequality_watermarks,
672 chunk,
673 append_only_optimize: self.append_only_optimize,
674 chunk_size: self.chunk_size,
675 cnt_rows_received: &mut self.cnt_rows_received,
676 high_join_amplification_threshold: self.high_join_amplification_threshold,
677 entry_state_max_rows: self.entry_state_max_rows,
678 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
679 }) {
680 left_time += left_start_time.elapsed();
681 yield Message::Chunk(chunk?);
682 left_start_time = Instant::now();
683 }
684 left_time += left_start_time.elapsed();
685 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
686 self.try_flush_data().await?;
687 }
688 AlignedMessage::Right(chunk) => {
689 let mut right_time = Duration::from_nanos(0);
690 let mut right_start_time = Instant::now();
691 #[for_await]
692 for chunk in Self::eq_join_right(EqJoinArgs {
693 ctx: &self.ctx,
694 side_l: &mut self.side_l,
695 side_r: &mut self.side_r,
696 actual_output_data_types: &self.actual_output_data_types,
697 cond: &mut self.cond,
698 inequality_watermarks: &self.inequality_watermarks,
699 chunk,
700 append_only_optimize: self.append_only_optimize,
701 chunk_size: self.chunk_size,
702 cnt_rows_received: &mut self.cnt_rows_received,
703 high_join_amplification_threshold: self.high_join_amplification_threshold,
704 entry_state_max_rows: self.entry_state_max_rows,
705 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
706 }) {
707 right_time += right_start_time.elapsed();
708 yield Message::Chunk(chunk?);
709 right_start_time = Instant::now();
710 }
711 right_time += right_start_time.elapsed();
712 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
713 self.try_flush_data().await?;
714 }
715 AlignedMessage::Barrier(barrier) => {
716 let barrier_start_time = Instant::now();
717 let (left_post_commit, right_post_commit) =
718 self.flush_data(barrier.epoch).await?;
719
720 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
721
722 barrier_join_match_duration_ns
725 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
726 yield Message::Barrier(barrier);
727
728 right_post_commit
730 .post_yield_barrier(update_vnode_bitmap.clone())
731 .await?;
732 if left_post_commit
733 .post_yield_barrier(update_vnode_bitmap)
734 .await?
735 .unwrap_or(false)
736 {
737 self.watermark_buffers
738 .values_mut()
739 .for_each(|buffers| buffers.clear());
740 self.inequality_watermarks.fill(None);
741 }
742
743 for (join_cached_entry_count, ht) in [
745 (&left_join_cached_entry_count, &self.side_l.ht),
746 (&right_join_cached_entry_count, &self.side_r.ht),
747 ] {
748 join_cached_entry_count.set(ht.entry_count() as i64);
749 }
750 }
751 }
752 start_time = Instant::now();
753 }
754 }
755
756 async fn flush_data(
757 &mut self,
758 epoch: EpochPair,
759 ) -> StreamExecutorResult<(
760 JoinHashMapPostCommit<'_, K, S, E>,
761 JoinHashMapPostCommit<'_, K, S, E>,
762 )> {
763 let left = self.side_l.ht.flush(epoch).await?;
766 let right = self.side_r.ht.flush(epoch).await?;
767 Ok((left, right))
768 }
769
770 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
771 self.side_l.ht.try_flush().await?;
774 self.side_r.ht.try_flush().await?;
775 Ok(())
776 }
777
778 fn evict_cache(
780 side_update: &mut JoinSide<K, S, E>,
781 side_match: &mut JoinSide<K, S, E>,
782 cnt_rows_received: &mut u32,
783 join_cache_evict_interval_rows: u32,
784 ) {
785 *cnt_rows_received += 1;
786 if *cnt_rows_received >= join_cache_evict_interval_rows {
787 side_update.ht.evict();
788 side_match.ht.evict();
789 *cnt_rows_received = 0;
790 }
791 }
792
793 fn handle_watermark(
794 &mut self,
795 side: SideTypePrimitive,
796 watermark: Watermark,
797 ) -> StreamExecutorResult<Vec<Watermark>> {
798 let (side_update, side_match) = if side == SideType::Left {
799 (&mut self.side_l, &mut self.side_r)
800 } else {
801 (&mut self.side_r, &mut self.side_l)
802 };
803
804 let wm_in_jk = side_update
806 .join_key_indices
807 .iter()
808 .positions(|idx| *idx == watermark.col_idx);
809 let mut watermarks_to_emit = vec![];
810 for idx in wm_in_jk {
811 let buffers = self
812 .watermark_buffers
813 .entry(idx)
814 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
815 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
816 if self
817 .watermark_indices_in_jk
818 .iter()
819 .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
820 {
821 side_match
822 .ht
823 .update_watermark(selected_watermark.val.clone());
824 side_update
825 .ht
826 .update_watermark(selected_watermark.val.clone());
827 }
828
829 let empty_indices = vec![];
830 let output_indices = side_update
831 .i2o_mapping_indexed
832 .get_vec(&side_update.join_key_indices[idx])
833 .unwrap_or(&empty_indices)
834 .iter()
835 .chain(
836 side_match
837 .i2o_mapping_indexed
838 .get_vec(&side_match.join_key_indices[idx])
839 .unwrap_or(&empty_indices),
840 );
841 for output_idx in output_indices {
842 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
843 }
844 };
845 }
846
847 let mut update_left_watermark = None;
852 let mut update_right_watermark = None;
853 if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
854 for (inequality_index, _) in watermark_indices {
855 let buffers = self
856 .watermark_buffers
857 .entry(side_update.join_key_indices.len() + inequality_index)
858 .or_insert_with(|| {
859 BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
860 });
861 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
862 {
863 let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
864 let left_is_larger = pair_info.left_side_is_larger();
865
866 for output_idx in output_indices {
868 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
869 }
870 self.inequality_watermarks[*inequality_index] =
872 Some(selected_watermark.clone());
873
874 if left_is_larger && pair_info.clean_left_state {
876 update_left_watermark = Some(selected_watermark.val.clone());
877 } else if !left_is_larger && pair_info.clean_right_state {
878 update_right_watermark = Some(selected_watermark.val.clone());
879 }
880 }
881 }
882 if let Some(val) = update_left_watermark {
886 self.side_l.ht.update_watermark(val);
887 }
888 if let Some(val) = update_right_watermark {
889 self.side_r.ht.update_watermark(val);
890 }
891 }
892 Ok(watermarks_to_emit)
893 }
894
895 fn row_concat(
896 row_update: impl Row,
897 update_start_pos: usize,
898 row_matched: impl Row,
899 matched_start_pos: usize,
900 ) -> OwnedRow {
901 let mut new_row = vec![None; row_update.len() + row_matched.len()];
902
903 for (i, datum_ref) in row_update.iter().enumerate() {
904 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
905 }
906 for (i, datum_ref) in row_matched.iter().enumerate() {
907 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
908 }
909 OwnedRow::new(new_row)
910 }
911
912 fn eq_join_left(
914 args: EqJoinArgs<'_, K, S, E>,
915 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
916 Self::eq_join_oneside::<{ SideType::Left }>(args)
917 }
918
919 fn eq_join_right(
921 args: EqJoinArgs<'_, K, S, E>,
922 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
923 Self::eq_join_oneside::<{ SideType::Right }>(args)
924 }
925
926 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
927 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
928 let EqJoinArgs {
929 ctx,
930 side_l,
931 side_r,
932 actual_output_data_types,
933 cond,
934 inequality_watermarks,
935 chunk,
936 append_only_optimize,
937 chunk_size,
938 cnt_rows_received,
939 high_join_amplification_threshold,
940 entry_state_max_rows,
941 join_cache_evict_interval_rows,
942 ..
943 } = args;
944
945 let (side_update, side_match) = if SIDE == SideType::Left {
946 (side_l, side_r)
947 } else {
948 (side_r, side_l)
949 };
950
951 let useful_state_clean_columns = side_match
952 .state_clean_columns
953 .iter()
954 .filter_map(|(column_idx, inequality_index)| {
955 inequality_watermarks[*inequality_index]
956 .as_ref()
957 .map(|watermark| (*column_idx, watermark))
958 })
959 .collect_vec();
960
961 let mut hashjoin_chunk_builder =
962 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
963 chunk_size,
964 actual_output_data_types.to_vec(),
965 side_update.i2o_mapping.clone(),
966 side_match.i2o_mapping.clone(),
967 ));
968
969 let join_matched_join_keys = ctx
970 .streaming_metrics
971 .join_matched_join_keys
972 .with_guarded_label_values(&[
973 &ctx.id.to_string(),
974 &ctx.fragment_id.to_string(),
975 &side_update.ht.table_id().to_string(),
976 ]);
977
978 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
979 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
980 let Some((op, row)) = r else {
981 continue;
982 };
983 Self::evict_cache(
984 side_update,
985 side_match,
986 cnt_rows_received,
987 join_cache_evict_interval_rows,
988 );
989
990 let cache_lookup_result = {
991 let probe_non_null_requirement_satisfied = side_update
992 .non_null_fields
993 .iter()
994 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
995 let build_non_null_requirement_satisfied =
996 key.null_bitmap().is_subset(side_match.ht.null_matched());
997 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
998 side_match.ht.take_state_opt(key)
999 } else {
1000 CacheResult::NeverMatch
1001 }
1002 };
1003 let mut total_matches = 0;
1004
1005 macro_rules! match_rows {
1006 ($op:ident) => {
1007 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
1008 cache_lookup_result,
1009 row,
1010 key,
1011 &mut hashjoin_chunk_builder,
1012 side_match,
1013 side_update,
1014 &useful_state_clean_columns,
1015 cond,
1016 append_only_optimize,
1017 entry_state_max_rows,
1018 )
1019 };
1020 }
1021
1022 match op {
1023 Op::Insert | Op::UpdateInsert =>
1024 {
1025 #[for_await]
1026 for chunk in match_rows!(Insert) {
1027 let chunk = chunk?;
1028 total_matches += chunk.cardinality();
1029 yield chunk;
1030 }
1031 }
1032 Op::Delete | Op::UpdateDelete =>
1033 {
1034 #[for_await]
1035 for chunk in match_rows!(Delete) {
1036 let chunk = chunk?;
1037 total_matches += chunk.cardinality();
1038 yield chunk;
1039 }
1040 }
1041 };
1042
1043 join_matched_join_keys.observe(total_matches as _);
1044 if total_matches > high_join_amplification_threshold {
1045 let join_key_data_types = side_update.ht.join_key_data_types();
1046 let key = key.deserialize(join_key_data_types)?;
1047 tracing::warn!(target: "high_join_amplification",
1048 matched_rows_len = total_matches,
1049 update_table_id = %side_update.ht.table_id(),
1050 match_table_id = %side_match.ht.table_id(),
1051 join_key = ?key,
1052 actor_id = %ctx.id,
1053 fragment_id = %ctx.fragment_id,
1054 "large rows matched for join key"
1055 );
1056 }
1057 }
1058 if let Some(chunk) = hashjoin_chunk_builder.take() {
1060 yield chunk;
1061 }
1062 }
1063
1064 #[allow(clippy::too_many_arguments)]
1073 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1074 async fn handle_match_rows<
1075 'a,
1076 const SIDE: SideTypePrimitive,
1077 const JOIN_OP: JoinOpPrimitive,
1078 >(
1079 cached_lookup_result: CacheResult<E>,
1080 row: RowRef<'a>,
1081 key: &'a K,
1082 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1083 side_match: &'a mut JoinSide<K, S, E>,
1084 side_update: &'a mut JoinSide<K, S, E>,
1085 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1086 cond: &'a mut Option<NonStrictExpression>,
1087 append_only_optimize: bool,
1088 entry_state_max_rows: usize,
1089 ) {
1090 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1091 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1092 let mut entry_state_count = 0;
1093
1094 let mut degree = 0;
1095 let mut append_only_matched_row = None;
1096 let mut matched_rows_to_clean = vec![];
1097
1098 macro_rules! match_row {
1099 (
1100 $match_order_key_indices:expr,
1101 $degree_table:expr,
1102 $matched_row:expr,
1103 $matched_row_ref:expr,
1104 $from_cache:literal,
1105 $map_output:expr,
1106 ) => {
1107 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1108 row,
1109 $matched_row,
1110 $matched_row_ref,
1111 hashjoin_chunk_builder,
1112 $match_order_key_indices,
1113 $degree_table,
1114 side_update.start_pos,
1115 side_match.start_pos,
1116 cond,
1117 &mut degree,
1118 useful_state_clean_columns,
1119 append_only_optimize,
1120 &mut append_only_matched_row,
1121 &mut matched_rows_to_clean,
1122 $map_output,
1123 )
1124 };
1125 }
1126
1127 let entry_state = match cached_lookup_result {
1128 CacheResult::NeverMatch => {
1129 let op = match JOIN_OP {
1130 JoinOp::Insert => Op::Insert,
1131 JoinOp::Delete => Op::Delete,
1132 };
1133 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1134 yield chunk;
1135 }
1136 return Ok(());
1137 }
1138 CacheResult::Hit(mut cached_rows) => {
1139 let (match_order_key_indices, match_degree_state) =
1140 side_match.ht.get_degree_state_mut_ref();
1141 for (matched_row_ref, matched_row) in
1143 cached_rows.values_mut(&side_match.all_data_types)
1144 {
1145 let matched_row = matched_row?;
1146 if let Some(chunk) = match_row!(
1147 match_order_key_indices,
1148 match_degree_state,
1149 matched_row,
1150 Some(matched_row_ref),
1151 true,
1152 Either::Left,
1153 )
1154 .await
1155 {
1156 yield chunk;
1157 }
1158 }
1159
1160 cached_rows
1161 }
1162 CacheResult::Miss => {
1163 let (matched_rows, match_order_key_indices, degree_table) = side_match
1165 .ht
1166 .fetch_matched_rows_and_get_degree_table_ref(key)
1167 .await?;
1168
1169 #[for_await]
1170 for matched_row in matched_rows {
1171 let (encoded_pk, matched_row) = matched_row?;
1172
1173 let mut matched_row_ref = None;
1174
1175 if entry_state_count <= entry_state_max_rows {
1177 let row_ref = entry_state
1178 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1180 matched_row_ref = Some(row_ref);
1181 entry_state_count += 1;
1182 }
1183 if let Some(chunk) = match_row!(
1184 match_order_key_indices,
1185 degree_table,
1186 matched_row,
1187 matched_row_ref,
1188 false,
1189 Either::Right,
1190 )
1191 .await
1192 {
1193 yield chunk;
1194 }
1195 }
1196 Box::new(entry_state)
1197 }
1198 };
1199
1200 let op = match JOIN_OP {
1202 JoinOp::Insert => Op::Insert,
1203 JoinOp::Delete => Op::Delete,
1204 };
1205 if degree == 0 {
1206 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1207 yield chunk;
1208 }
1209 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1210 {
1211 yield chunk;
1212 }
1213
1214 if cache_hit || entry_state_count <= entry_state_max_rows {
1216 side_match.ht.update_state(key, entry_state);
1217 }
1218
1219 for matched_row in matched_rows_to_clean {
1221 side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1223 }
1224
1225 if append_only_optimize && let Some(row) = append_only_matched_row {
1227 assert_matches!(JOIN_OP, JoinOp::Insert);
1228 side_match.ht.delete_handle_degree(key, row)?;
1229 return Ok(());
1230 }
1231
1232 match JOIN_OP {
1234 JoinOp::Insert => {
1235 side_update
1236 .ht
1237 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1238 }
1239 JoinOp::Delete => {
1240 side_update
1241 .ht
1242 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1243 }
1244 }
1245 }
1246
1247 #[allow(clippy::too_many_arguments)]
1248 #[inline]
1249 async fn handle_match_row<
1250 'a,
1251 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1254 const JOIN_OP: JoinOpPrimitive,
1255 const MATCHED_ROWS_FROM_CACHE: bool,
1256 >(
1257 update_row: RowRef<'a>,
1258 mut matched_row: JoinRow<R>,
1259 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1260 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1261 match_order_key_indices: &[usize],
1262 match_degree_table: &mut Option<TableInner<S>>,
1263 side_update_start_pos: usize,
1264 side_match_start_pos: usize,
1265 cond: &Option<NonStrictExpression>,
1266 update_row_degree: &mut u64,
1267 useful_state_clean_columns: &[(usize, &'a Watermark)],
1268 append_only_optimize: bool,
1269 append_only_matched_row: &mut Option<JoinRow<RO>>,
1270 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1271 map_output: impl Fn(R) -> RO,
1272 ) -> Option<StreamChunk> {
1273 let mut need_state_clean = false;
1274 let mut chunk_opt = None;
1275 let join_condition_satisfied = Self::check_join_condition(
1279 update_row,
1280 side_update_start_pos,
1281 &matched_row.row,
1282 side_match_start_pos,
1283 cond,
1284 )
1285 .await;
1286
1287 if join_condition_satisfied {
1288 *update_row_degree += 1;
1290 if matches!(JOIN_OP, JoinOp::Insert)
1295 && !forward_exactly_once(T, SIDE)
1296 && let Some(chunk) =
1297 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1298 {
1299 chunk_opt = Some(chunk);
1300 }
1301 if let Some(degree_table) = match_degree_table {
1303 update_degree::<S, { JOIN_OP }>(
1304 match_order_key_indices,
1305 degree_table,
1306 &mut matched_row,
1307 );
1308 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1309 match JOIN_OP {
1311 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1312 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1313 }
1314 }
1315 }
1316
1317 if matches!(JOIN_OP, JoinOp::Delete)
1320 && !forward_exactly_once(T, SIDE)
1321 && let Some(chunk) =
1322 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1323 {
1324 chunk_opt = Some(chunk);
1325 }
1326 } else {
1327 for (column_idx, watermark) in useful_state_clean_columns {
1329 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1330 scalar
1331 .default_cmp(&watermark.val.as_scalar_ref_impl())
1332 .is_lt()
1333 }) {
1334 need_state_clean = true;
1335 break;
1336 }
1337 }
1338 }
1339 if append_only_optimize {
1343 assert_matches!(JOIN_OP, JoinOp::Insert);
1344 assert!(append_only_matched_row.is_none());
1347 *append_only_matched_row = Some(matched_row.map(map_output));
1348 } else if need_state_clean {
1349 debug_assert!(
1350 !append_only_optimize,
1351 "`append_only_optimize` and `need_state_clean` must not both be true"
1352 );
1353 matched_rows_to_clean.push(matched_row.map(map_output));
1354 }
1355
1356 chunk_opt
1357 }
1358
1359 #[inline]
1364 async fn check_join_condition(
1365 row: impl Row,
1366 side_update_start_pos: usize,
1367 matched_row: impl Row,
1368 side_match_start_pos: usize,
1369 join_condition: &Option<NonStrictExpression>,
1370 ) -> bool {
1371 if let Some(join_condition) = join_condition {
1372 let new_row = Self::row_concat(
1373 row,
1374 side_update_start_pos,
1375 matched_row,
1376 side_match_start_pos,
1377 );
1378 join_condition
1379 .eval_row_infallible(&new_row)
1380 .await
1381 .map(|s| *s.as_bool())
1382 .unwrap_or(false)
1383 } else {
1384 true
1385 }
1386 }
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391 use std::sync::atomic::AtomicU64;
1392
1393 use pretty_assertions::assert_eq;
1394 use risingwave_common::array::*;
1395 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1396 use risingwave_common::config::StreamingConfig;
1397 use risingwave_common::hash::{Key64, Key128};
1398 use risingwave_common::util::epoch::test_epoch;
1399 use risingwave_common::util::sort_util::OrderType;
1400 use risingwave_storage::memory::MemoryStateStore;
1401
1402 use super::*;
1403 use crate::common::table::test_utils::gen_pbtable;
1404 use crate::executor::MemoryEncoding;
1405 use crate::executor::test_utils::expr::build_from_pretty;
1406 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1407
1408 async fn create_in_memory_state_table(
1409 mem_state: MemoryStateStore,
1410 data_types: &[DataType],
1411 order_types: &[OrderType],
1412 pk_indices: &[usize],
1413 table_id: u32,
1414 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1415 create_in_memory_state_table_with_inequality(
1416 mem_state,
1417 data_types,
1418 order_types,
1419 pk_indices,
1420 table_id,
1421 None,
1422 )
1423 .await
1424 }
1425
1426 async fn create_in_memory_state_table_with_inequality(
1427 mem_state: MemoryStateStore,
1428 data_types: &[DataType],
1429 order_types: &[OrderType],
1430 pk_indices: &[usize],
1431 table_id: u32,
1432 degree_inequality_type: Option<DataType>,
1433 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1434 create_in_memory_state_table_with_watermark(
1435 mem_state,
1436 data_types,
1437 order_types,
1438 pk_indices,
1439 table_id,
1440 degree_inequality_type,
1441 vec![],
1442 vec![],
1443 )
1444 .await
1445 }
1446
1447 #[allow(clippy::too_many_arguments)]
1448 async fn create_in_memory_state_table_with_watermark(
1449 mem_state: MemoryStateStore,
1450 data_types: &[DataType],
1451 order_types: &[OrderType],
1452 pk_indices: &[usize],
1453 table_id: u32,
1454 degree_inequality_type: Option<DataType>,
1455 state_clean_watermark_indices: Vec<usize>,
1456 degree_clean_watermark_indices: Vec<usize>,
1457 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1458 let column_descs = data_types
1459 .iter()
1460 .enumerate()
1461 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1462 .collect_vec();
1463 let mut state_table_catalog = gen_pbtable(
1464 TableId::new(table_id),
1465 column_descs,
1466 order_types.to_vec(),
1467 pk_indices.to_vec(),
1468 0,
1469 );
1470 state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1471 .into_iter()
1472 .map(|idx| idx as u32)
1473 .collect();
1474 let state_table =
1475 StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1476
1477 let mut degree_table_column_descs = vec![];
1479 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1480 degree_table_column_descs.push(ColumnDesc::unnamed(
1481 ColumnId::new(pk_id as i32),
1482 data_types[*idx].clone(),
1483 ))
1484 });
1485 degree_table_column_descs.push(ColumnDesc::unnamed(
1487 ColumnId::new(pk_indices.len() as i32),
1488 DataType::Int64,
1489 ));
1490 if let Some(ineq_type) = degree_inequality_type {
1492 degree_table_column_descs.push(ColumnDesc::unnamed(
1493 ColumnId::new((pk_indices.len() + 1) as i32),
1494 ineq_type,
1495 ));
1496 }
1497 let mut degree_table_catalog = gen_pbtable(
1498 TableId::new(table_id + 1),
1499 degree_table_column_descs,
1500 order_types.to_vec(),
1501 pk_indices.to_vec(),
1502 0,
1503 );
1504 degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1505 .into_iter()
1506 .map(|idx| idx as u32)
1507 .collect();
1508 let degree_state_table =
1509 StateTable::from_table_catalog(°ree_table_catalog, mem_state, None).await;
1510 (state_table, degree_state_table)
1511 }
1512
1513 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1514 build_from_pretty(
1515 condition_text
1516 .as_deref()
1517 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1518 )
1519 }
1520
1521 async fn create_executor<const T: JoinTypePrimitive>(
1522 with_condition: bool,
1523 null_safe: bool,
1524 condition_text: Option<String>,
1525 inequality_pairs: Vec<InequalityPairInfo>,
1526 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1527 let schema = Schema {
1528 fields: vec![
1529 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1531 ],
1532 };
1533 let (tx_l, source_l) = MockSource::channel();
1534 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1535 let (tx_r, source_r) = MockSource::channel();
1536 let source_r = source_r.into_executor(schema, vec![1]);
1537 let params_l = JoinParams::new(vec![0], vec![1]);
1538 let params_r = JoinParams::new(vec![0], vec![1]);
1539 let cond = with_condition.then(|| create_cond(condition_text));
1540
1541 let mem_state = MemoryStateStore::new();
1542
1543 let l_degree_ineq_type = inequality_pairs
1545 .iter()
1546 .find(|pair| pair.clean_left_state)
1547 .map(|_| DataType::Int64); let r_degree_ineq_type = inequality_pairs
1549 .iter()
1550 .find(|pair| pair.clean_right_state)
1551 .map(|_| DataType::Int64); let l_clean_watermark_indices = inequality_pairs
1553 .iter()
1554 .find(|pair| pair.clean_left_state)
1555 .map(|pair| vec![pair.left_idx])
1556 .unwrap_or_default();
1557 let r_clean_watermark_indices = inequality_pairs
1558 .iter()
1559 .find(|pair| pair.clean_right_state)
1560 .map(|pair| vec![pair.right_idx])
1561 .unwrap_or_default();
1562 let degree_inequality_column_idx = 3;
1563 let l_degree_clean_watermark_indices = l_degree_ineq_type
1564 .as_ref()
1565 .map(|_| vec![degree_inequality_column_idx])
1566 .unwrap_or_default();
1567 let r_degree_clean_watermark_indices = r_degree_ineq_type
1568 .as_ref()
1569 .map(|_| vec![degree_inequality_column_idx])
1570 .unwrap_or_default();
1571
1572 let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1573 mem_state.clone(),
1574 &[DataType::Int64, DataType::Int64],
1575 &[OrderType::ascending(), OrderType::ascending()],
1576 &[0, 1],
1577 0,
1578 l_degree_ineq_type,
1579 l_clean_watermark_indices,
1580 l_degree_clean_watermark_indices,
1581 )
1582 .await;
1583
1584 let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1585 mem_state,
1586 &[DataType::Int64, DataType::Int64],
1587 &[OrderType::ascending(), OrderType::ascending()],
1588 &[0, 1],
1589 2,
1590 r_degree_ineq_type,
1591 r_clean_watermark_indices,
1592 r_degree_clean_watermark_indices,
1593 )
1594 .await;
1595
1596 let schema = match T {
1597 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1598 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1599 _ => [source_l.schema().fields(), source_r.schema().fields()]
1600 .concat()
1601 .into_iter()
1602 .collect(),
1603 };
1604 let schema_len = schema.len();
1605 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1606
1607 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1608 ActorContext::for_test(123),
1609 info,
1610 source_l,
1611 source_r,
1612 params_l,
1613 params_r,
1614 vec![null_safe],
1615 (0..schema_len).collect_vec(),
1616 cond,
1617 inequality_pairs,
1618 state_l,
1619 degree_state_l,
1620 state_r,
1621 degree_state_r,
1622 Arc::new(AtomicU64::new(0)),
1623 false,
1624 Arc::new(StreamingMetrics::unused()),
1625 1024,
1626 2048,
1627 vec![(0, true)],
1628 );
1629 (tx_l, tx_r, executor.boxed().execute())
1630 }
1631
1632 async fn create_classical_executor<const T: JoinTypePrimitive>(
1633 with_condition: bool,
1634 null_safe: bool,
1635 condition_text: Option<String>,
1636 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1637 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1638 }
1639
1640 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1641 with_condition: bool,
1642 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1643 let schema = Schema {
1644 fields: vec![
1645 Field::unnamed(DataType::Int64),
1646 Field::unnamed(DataType::Int64),
1647 Field::unnamed(DataType::Int64),
1648 ],
1649 };
1650 let (tx_l, source_l) = MockSource::channel();
1651 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1652 let (tx_r, source_r) = MockSource::channel();
1653 let source_r = source_r.into_executor(schema, vec![0]);
1654 let params_l = JoinParams::new(vec![0, 1], vec![]);
1655 let params_r = JoinParams::new(vec![0, 1], vec![]);
1656 let cond = with_condition.then(|| create_cond(None));
1657
1658 let mem_state = MemoryStateStore::new();
1659
1660 let (state_l, degree_state_l) = create_in_memory_state_table(
1661 mem_state.clone(),
1662 &[DataType::Int64, DataType::Int64, DataType::Int64],
1663 &[
1664 OrderType::ascending(),
1665 OrderType::ascending(),
1666 OrderType::ascending(),
1667 ],
1668 &[0, 1, 0],
1669 0,
1670 )
1671 .await;
1672
1673 let (state_r, degree_state_r) = create_in_memory_state_table(
1674 mem_state,
1675 &[DataType::Int64, DataType::Int64, DataType::Int64],
1676 &[
1677 OrderType::ascending(),
1678 OrderType::ascending(),
1679 OrderType::ascending(),
1680 ],
1681 &[0, 1, 1],
1682 1,
1683 )
1684 .await;
1685
1686 let schema = match T {
1687 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1688 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1689 _ => [source_l.schema().fields(), source_r.schema().fields()]
1690 .concat()
1691 .into_iter()
1692 .collect(),
1693 };
1694 let schema_len = schema.len();
1695 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1696
1697 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1698 ActorContext::for_test(123),
1699 info,
1700 source_l,
1701 source_r,
1702 params_l,
1703 params_r,
1704 vec![false],
1705 (0..schema_len).collect_vec(),
1706 cond,
1707 vec![],
1708 state_l,
1709 degree_state_l,
1710 state_r,
1711 degree_state_r,
1712 Arc::new(AtomicU64::new(0)),
1713 true,
1714 Arc::new(StreamingMetrics::unused()),
1715 1024,
1716 2048,
1717 vec![(0, true)],
1718 );
1719 (tx_l, tx_r, executor.boxed().execute())
1720 }
1721
1722 #[tokio::test]
1723 async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1724 let chunk_l1 = StreamChunk::from_pretty(
1728 " I I
1729 + 2 4
1730 + 2 7
1731 + 3 8",
1732 );
1733 let chunk_r1 = StreamChunk::from_pretty(
1734 " I I
1735 + 2 6",
1736 );
1737 let chunk_r2 = StreamChunk::from_pretty(
1738 " I I
1739 + 2 3",
1740 );
1741 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1743 true,
1744 false,
1745 Some(String::from(
1746 "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1747 )),
1748 vec![InequalityPairInfo {
1749 left_idx: 1,
1750 right_idx: 1,
1751 clean_left_state: true, clean_right_state: false,
1753 op: InequalityType::GreaterThanOrEqual,
1754 }],
1755 )
1756 .await;
1757
1758 tx_l.push_barrier(test_epoch(1), false);
1760 tx_r.push_barrier(test_epoch(1), false);
1761 hash_join.next_unwrap_ready_barrier()?;
1762
1763 tx_l.push_chunk(chunk_l1);
1765 hash_join.next_unwrap_pending();
1766
1767 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1770 hash_join.next_unwrap_pending();
1771
1772 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1773 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1774 assert_eq!(
1775 output_watermark,
1776 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1777 );
1778
1779 tx_r.push_chunk(chunk_r1);
1788 let chunk = hash_join.next_unwrap_ready_chunk()?;
1789 assert_eq!(
1790 chunk,
1791 StreamChunk::from_pretty(
1792 " I I I I
1793 + 2 7 2 6"
1794 )
1795 );
1796
1797 tx_r.push_chunk(chunk_r2);
1801 let chunk = hash_join.next_unwrap_ready_chunk()?;
1802 assert_eq!(
1803 chunk,
1804 StreamChunk::from_pretty(
1805 " I I I I
1806 + 2 7 2 3"
1807 )
1808 );
1809
1810 Ok(())
1811 }
1812
1813 #[tokio::test]
1814 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1815 let chunk_l1 = StreamChunk::from_pretty(
1816 " I I
1817 + 1 4
1818 + 2 5
1819 + 3 6",
1820 );
1821 let chunk_l2 = StreamChunk::from_pretty(
1822 " I I
1823 + 3 8
1824 - 3 8",
1825 );
1826 let chunk_r1 = StreamChunk::from_pretty(
1827 " I I
1828 + 2 7
1829 + 4 8
1830 + 6 9",
1831 );
1832 let chunk_r2 = StreamChunk::from_pretty(
1833 " I I
1834 + 3 10
1835 + 6 11",
1836 );
1837 let (mut tx_l, mut tx_r, mut hash_join) =
1838 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1839
1840 tx_l.push_barrier(test_epoch(1), false);
1842 tx_r.push_barrier(test_epoch(1), false);
1843 hash_join.next_unwrap_ready_barrier()?;
1844
1845 tx_l.push_chunk(chunk_l1);
1847 hash_join.next_unwrap_pending();
1848
1849 tx_l.push_barrier(test_epoch(2), false);
1851 tx_r.push_barrier(test_epoch(2), false);
1852 hash_join.next_unwrap_ready_barrier()?;
1853
1854 tx_l.push_chunk(chunk_l2);
1856 hash_join.next_unwrap_pending();
1857
1858 tx_r.push_chunk(chunk_r1);
1860 let chunk = hash_join.next_unwrap_ready_chunk()?;
1861 assert_eq!(
1862 chunk,
1863 StreamChunk::from_pretty(
1864 " I I I I
1865 + 2 5 2 7"
1866 )
1867 );
1868
1869 tx_r.push_chunk(chunk_r2);
1871 let chunk = hash_join.next_unwrap_ready_chunk()?;
1872 assert_eq!(
1873 chunk,
1874 StreamChunk::from_pretty(
1875 " I I I I
1876 + 3 6 3 10"
1877 )
1878 );
1879
1880 Ok(())
1881 }
1882
1883 #[tokio::test]
1884 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1885 let chunk_l1 = StreamChunk::from_pretty(
1886 " I I
1887 + 1 4
1888 + 2 5
1889 + . 6",
1890 );
1891 let chunk_l2 = StreamChunk::from_pretty(
1892 " I I
1893 + . 8
1894 - . 8",
1895 );
1896 let chunk_r1 = StreamChunk::from_pretty(
1897 " I I
1898 + 2 7
1899 + 4 8
1900 + 6 9",
1901 );
1902 let chunk_r2 = StreamChunk::from_pretty(
1903 " I I
1904 + . 10
1905 + 6 11",
1906 );
1907 let (mut tx_l, mut tx_r, mut hash_join) =
1908 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1909
1910 tx_l.push_barrier(test_epoch(1), false);
1912 tx_r.push_barrier(test_epoch(1), false);
1913 hash_join.next_unwrap_ready_barrier()?;
1914
1915 tx_l.push_chunk(chunk_l1);
1917 hash_join.next_unwrap_pending();
1918
1919 tx_l.push_barrier(test_epoch(2), false);
1921 tx_r.push_barrier(test_epoch(2), false);
1922 hash_join.next_unwrap_ready_barrier()?;
1923
1924 tx_l.push_chunk(chunk_l2);
1926 hash_join.next_unwrap_pending();
1927
1928 tx_r.push_chunk(chunk_r1);
1930 let chunk = hash_join.next_unwrap_ready_chunk()?;
1931 assert_eq!(
1932 chunk,
1933 StreamChunk::from_pretty(
1934 " I I I I
1935 + 2 5 2 7"
1936 )
1937 );
1938
1939 tx_r.push_chunk(chunk_r2);
1941 let chunk = hash_join.next_unwrap_ready_chunk()?;
1942 assert_eq!(
1943 chunk,
1944 StreamChunk::from_pretty(
1945 " I I I I
1946 + . 6 . 10"
1947 )
1948 );
1949
1950 Ok(())
1951 }
1952
1953 #[tokio::test]
1954 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1955 let chunk_l1 = StreamChunk::from_pretty(
1956 " I I
1957 + 1 4
1958 + 2 5
1959 + 3 6",
1960 );
1961 let chunk_l2 = StreamChunk::from_pretty(
1962 " I I
1963 + 3 8
1964 - 3 8",
1965 );
1966 let chunk_r1 = StreamChunk::from_pretty(
1967 " I I
1968 + 2 7
1969 + 4 8
1970 + 6 9",
1971 );
1972 let chunk_r2 = StreamChunk::from_pretty(
1973 " I I
1974 + 3 10
1975 + 6 11",
1976 );
1977 let chunk_l3 = StreamChunk::from_pretty(
1978 " I I
1979 + 6 10",
1980 );
1981 let chunk_r3 = StreamChunk::from_pretty(
1982 " I I
1983 - 6 11",
1984 );
1985 let chunk_r4 = StreamChunk::from_pretty(
1986 " I I
1987 - 6 9",
1988 );
1989 let (mut tx_l, mut tx_r, mut hash_join) =
1990 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1991
1992 tx_l.push_barrier(test_epoch(1), false);
1994 tx_r.push_barrier(test_epoch(1), false);
1995 hash_join.next_unwrap_ready_barrier()?;
1996
1997 tx_l.push_chunk(chunk_l1);
1999 hash_join.next_unwrap_pending();
2000
2001 tx_l.push_barrier(test_epoch(2), false);
2003 tx_r.push_barrier(test_epoch(2), false);
2004 hash_join.next_unwrap_ready_barrier()?;
2005
2006 tx_l.push_chunk(chunk_l2);
2008 hash_join.next_unwrap_pending();
2009
2010 tx_r.push_chunk(chunk_r1);
2012 let chunk = hash_join.next_unwrap_ready_chunk()?;
2013 assert_eq!(
2014 chunk,
2015 StreamChunk::from_pretty(
2016 " I I
2017 + 2 5"
2018 )
2019 );
2020
2021 tx_r.push_chunk(chunk_r2);
2023 let chunk = hash_join.next_unwrap_ready_chunk()?;
2024 assert_eq!(
2025 chunk,
2026 StreamChunk::from_pretty(
2027 " I I
2028 + 3 6"
2029 )
2030 );
2031
2032 tx_l.push_chunk(chunk_l3);
2034 let chunk = hash_join.next_unwrap_ready_chunk()?;
2035 assert_eq!(
2036 chunk,
2037 StreamChunk::from_pretty(
2038 " I I
2039 + 6 10"
2040 )
2041 );
2042
2043 tx_r.push_chunk(chunk_r3);
2046 hash_join.next_unwrap_pending();
2047
2048 tx_r.push_chunk(chunk_r4);
2051 let chunk = hash_join.next_unwrap_ready_chunk()?;
2052 assert_eq!(
2053 chunk,
2054 StreamChunk::from_pretty(
2055 " I I
2056 - 6 10"
2057 )
2058 );
2059
2060 Ok(())
2061 }
2062
2063 #[tokio::test]
2064 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2065 let chunk_l1 = StreamChunk::from_pretty(
2066 " I I
2067 + 1 4
2068 + 2 5
2069 + . 6",
2070 );
2071 let chunk_l2 = StreamChunk::from_pretty(
2072 " I I
2073 + . 8
2074 - . 8",
2075 );
2076 let chunk_r1 = StreamChunk::from_pretty(
2077 " I I
2078 + 2 7
2079 + 4 8
2080 + 6 9",
2081 );
2082 let chunk_r2 = StreamChunk::from_pretty(
2083 " I I
2084 + . 10
2085 + 6 11",
2086 );
2087 let chunk_l3 = StreamChunk::from_pretty(
2088 " I I
2089 + 6 10",
2090 );
2091 let chunk_r3 = StreamChunk::from_pretty(
2092 " I I
2093 - 6 11",
2094 );
2095 let chunk_r4 = StreamChunk::from_pretty(
2096 " I I
2097 - 6 9",
2098 );
2099 let (mut tx_l, mut tx_r, mut hash_join) =
2100 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2101
2102 tx_l.push_barrier(test_epoch(1), false);
2104 tx_r.push_barrier(test_epoch(1), false);
2105 hash_join.next_unwrap_ready_barrier()?;
2106
2107 tx_l.push_chunk(chunk_l1);
2109 hash_join.next_unwrap_pending();
2110
2111 tx_l.push_barrier(test_epoch(2), false);
2113 tx_r.push_barrier(test_epoch(2), false);
2114 hash_join.next_unwrap_ready_barrier()?;
2115
2116 tx_l.push_chunk(chunk_l2);
2118 hash_join.next_unwrap_pending();
2119
2120 tx_r.push_chunk(chunk_r1);
2122 let chunk = hash_join.next_unwrap_ready_chunk()?;
2123 assert_eq!(
2124 chunk,
2125 StreamChunk::from_pretty(
2126 " I I
2127 + 2 5"
2128 )
2129 );
2130
2131 tx_r.push_chunk(chunk_r2);
2133 let chunk = hash_join.next_unwrap_ready_chunk()?;
2134 assert_eq!(
2135 chunk,
2136 StreamChunk::from_pretty(
2137 " I I
2138 + . 6"
2139 )
2140 );
2141
2142 tx_l.push_chunk(chunk_l3);
2144 let chunk = hash_join.next_unwrap_ready_chunk()?;
2145 assert_eq!(
2146 chunk,
2147 StreamChunk::from_pretty(
2148 " I I
2149 + 6 10"
2150 )
2151 );
2152
2153 tx_r.push_chunk(chunk_r3);
2156 hash_join.next_unwrap_pending();
2157
2158 tx_r.push_chunk(chunk_r4);
2161 let chunk = hash_join.next_unwrap_ready_chunk()?;
2162 assert_eq!(
2163 chunk,
2164 StreamChunk::from_pretty(
2165 " I I
2166 - 6 10"
2167 )
2168 );
2169
2170 Ok(())
2171 }
2172
2173 #[tokio::test]
2174 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2175 let chunk_l1 = StreamChunk::from_pretty(
2176 " I I I
2177 + 1 4 1
2178 + 2 5 2
2179 + 3 6 3",
2180 );
2181 let chunk_l2 = StreamChunk::from_pretty(
2182 " I I I
2183 + 4 9 4
2184 + 5 10 5",
2185 );
2186 let chunk_r1 = StreamChunk::from_pretty(
2187 " I I I
2188 + 2 5 1
2189 + 4 9 2
2190 + 6 9 3",
2191 );
2192 let chunk_r2 = StreamChunk::from_pretty(
2193 " I I I
2194 + 1 4 4
2195 + 3 6 5",
2196 );
2197
2198 let (mut tx_l, mut tx_r, mut hash_join) =
2199 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2200
2201 tx_l.push_barrier(test_epoch(1), false);
2203 tx_r.push_barrier(test_epoch(1), false);
2204 hash_join.next_unwrap_ready_barrier()?;
2205
2206 tx_l.push_chunk(chunk_l1);
2208 hash_join.next_unwrap_pending();
2209
2210 tx_l.push_barrier(test_epoch(2), false);
2212 tx_r.push_barrier(test_epoch(2), false);
2213 hash_join.next_unwrap_ready_barrier()?;
2214
2215 tx_l.push_chunk(chunk_l2);
2217 hash_join.next_unwrap_pending();
2218
2219 tx_r.push_chunk(chunk_r1);
2221 let chunk = hash_join.next_unwrap_ready_chunk()?;
2222 assert_eq!(
2223 chunk,
2224 StreamChunk::from_pretty(
2225 " I I I I I I
2226 + 2 5 2 2 5 1
2227 + 4 9 4 4 9 2"
2228 )
2229 );
2230
2231 tx_r.push_chunk(chunk_r2);
2233 let chunk = hash_join.next_unwrap_ready_chunk()?;
2234 assert_eq!(
2235 chunk,
2236 StreamChunk::from_pretty(
2237 " I I I I I I
2238 + 1 4 1 1 4 4
2239 + 3 6 3 3 6 5"
2240 )
2241 );
2242
2243 Ok(())
2244 }
2245
2246 #[tokio::test]
2247 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2248 let chunk_l1 = StreamChunk::from_pretty(
2249 " I I I
2250 + 1 4 1
2251 + 2 5 2
2252 + 3 6 3",
2253 );
2254 let chunk_l2 = StreamChunk::from_pretty(
2255 " I I I
2256 + 4 9 4
2257 + 5 10 5",
2258 );
2259 let chunk_r1 = StreamChunk::from_pretty(
2260 " I I I
2261 + 2 5 1
2262 + 4 9 2
2263 + 6 9 3",
2264 );
2265 let chunk_r2 = StreamChunk::from_pretty(
2266 " I I I
2267 + 1 4 4
2268 + 3 6 5",
2269 );
2270
2271 let (mut tx_l, mut tx_r, mut hash_join) =
2272 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2273
2274 tx_l.push_barrier(test_epoch(1), false);
2276 tx_r.push_barrier(test_epoch(1), false);
2277 hash_join.next_unwrap_ready_barrier()?;
2278
2279 tx_l.push_chunk(chunk_l1);
2281 hash_join.next_unwrap_pending();
2282
2283 tx_l.push_barrier(test_epoch(2), false);
2285 tx_r.push_barrier(test_epoch(2), false);
2286 hash_join.next_unwrap_ready_barrier()?;
2287
2288 tx_l.push_chunk(chunk_l2);
2290 hash_join.next_unwrap_pending();
2291
2292 tx_r.push_chunk(chunk_r1);
2294 let chunk = hash_join.next_unwrap_ready_chunk()?;
2295 assert_eq!(
2296 chunk,
2297 StreamChunk::from_pretty(
2298 " I I I
2299 + 2 5 2
2300 + 4 9 4"
2301 )
2302 );
2303
2304 tx_r.push_chunk(chunk_r2);
2306 let chunk = hash_join.next_unwrap_ready_chunk()?;
2307 assert_eq!(
2308 chunk,
2309 StreamChunk::from_pretty(
2310 " I I I
2311 + 1 4 1
2312 + 3 6 3"
2313 )
2314 );
2315
2316 Ok(())
2317 }
2318
2319 #[tokio::test]
2320 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2321 let chunk_l1 = StreamChunk::from_pretty(
2322 " I I I
2323 + 1 4 1
2324 + 2 5 2
2325 + 3 6 3",
2326 );
2327 let chunk_l2 = StreamChunk::from_pretty(
2328 " I I I
2329 + 4 9 4
2330 + 5 10 5",
2331 );
2332 let chunk_r1 = StreamChunk::from_pretty(
2333 " I I I
2334 + 2 5 1
2335 + 4 9 2
2336 + 6 9 3",
2337 );
2338 let chunk_r2 = StreamChunk::from_pretty(
2339 " I I I
2340 + 1 4 4
2341 + 3 6 5",
2342 );
2343
2344 let (mut tx_l, mut tx_r, mut hash_join) =
2345 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2346
2347 tx_l.push_barrier(test_epoch(1), false);
2349 tx_r.push_barrier(test_epoch(1), false);
2350 hash_join.next_unwrap_ready_barrier()?;
2351
2352 tx_l.push_chunk(chunk_l1);
2354 hash_join.next_unwrap_pending();
2355
2356 tx_l.push_barrier(test_epoch(2), false);
2358 tx_r.push_barrier(test_epoch(2), false);
2359 hash_join.next_unwrap_ready_barrier()?;
2360
2361 tx_l.push_chunk(chunk_l2);
2363 hash_join.next_unwrap_pending();
2364
2365 tx_r.push_chunk(chunk_r1);
2367 let chunk = hash_join.next_unwrap_ready_chunk()?;
2368 assert_eq!(
2369 chunk,
2370 StreamChunk::from_pretty(
2371 " I I I
2372 + 2 5 1
2373 + 4 9 2"
2374 )
2375 );
2376
2377 tx_r.push_chunk(chunk_r2);
2379 let chunk = hash_join.next_unwrap_ready_chunk()?;
2380 assert_eq!(
2381 chunk,
2382 StreamChunk::from_pretty(
2383 " I I I
2384 + 1 4 4
2385 + 3 6 5"
2386 )
2387 );
2388
2389 Ok(())
2390 }
2391
2392 #[tokio::test]
2393 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2394 let chunk_r1 = StreamChunk::from_pretty(
2395 " I I
2396 + 1 4
2397 + 2 5
2398 + 3 6",
2399 );
2400 let chunk_r2 = StreamChunk::from_pretty(
2401 " I I
2402 + 3 8
2403 - 3 8",
2404 );
2405 let chunk_l1 = StreamChunk::from_pretty(
2406 " I I
2407 + 2 7
2408 + 4 8
2409 + 6 9",
2410 );
2411 let chunk_l2 = StreamChunk::from_pretty(
2412 " I I
2413 + 3 10
2414 + 6 11",
2415 );
2416 let chunk_r3 = StreamChunk::from_pretty(
2417 " I I
2418 + 6 10",
2419 );
2420 let chunk_l3 = StreamChunk::from_pretty(
2421 " I I
2422 - 6 11",
2423 );
2424 let chunk_l4 = StreamChunk::from_pretty(
2425 " I I
2426 - 6 9",
2427 );
2428 let (mut tx_l, mut tx_r, mut hash_join) =
2429 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2430
2431 tx_l.push_barrier(test_epoch(1), false);
2433 tx_r.push_barrier(test_epoch(1), false);
2434 hash_join.next_unwrap_ready_barrier()?;
2435
2436 tx_r.push_chunk(chunk_r1);
2438 hash_join.next_unwrap_pending();
2439
2440 tx_l.push_barrier(test_epoch(2), false);
2442 tx_r.push_barrier(test_epoch(2), false);
2443 hash_join.next_unwrap_ready_barrier()?;
2444
2445 tx_r.push_chunk(chunk_r2);
2447 hash_join.next_unwrap_pending();
2448
2449 tx_l.push_chunk(chunk_l1);
2451 let chunk = hash_join.next_unwrap_ready_chunk()?;
2452 assert_eq!(
2453 chunk,
2454 StreamChunk::from_pretty(
2455 " I I
2456 + 2 5"
2457 )
2458 );
2459
2460 tx_l.push_chunk(chunk_l2);
2462 let chunk = hash_join.next_unwrap_ready_chunk()?;
2463 assert_eq!(
2464 chunk,
2465 StreamChunk::from_pretty(
2466 " I I
2467 + 3 6"
2468 )
2469 );
2470
2471 tx_r.push_chunk(chunk_r3);
2473 let chunk = hash_join.next_unwrap_ready_chunk()?;
2474 assert_eq!(
2475 chunk,
2476 StreamChunk::from_pretty(
2477 " I I
2478 + 6 10"
2479 )
2480 );
2481
2482 tx_l.push_chunk(chunk_l3);
2485 hash_join.next_unwrap_pending();
2486
2487 tx_l.push_chunk(chunk_l4);
2490 let chunk = hash_join.next_unwrap_ready_chunk()?;
2491 assert_eq!(
2492 chunk,
2493 StreamChunk::from_pretty(
2494 " I I
2495 - 6 10"
2496 )
2497 );
2498
2499 Ok(())
2500 }
2501
2502 #[tokio::test]
2503 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2504 let chunk_l1 = StreamChunk::from_pretty(
2505 " I I
2506 + 1 4
2507 + 2 5
2508 + 3 6",
2509 );
2510 let chunk_l2 = StreamChunk::from_pretty(
2511 " I I
2512 + 3 8
2513 - 3 8",
2514 );
2515 let chunk_r1 = StreamChunk::from_pretty(
2516 " I I
2517 + 2 7
2518 + 4 8
2519 + 6 9",
2520 );
2521 let chunk_r2 = StreamChunk::from_pretty(
2522 " I I
2523 + 3 10
2524 + 6 11
2525 + 1 2
2526 + 1 3",
2527 );
2528 let chunk_l3 = StreamChunk::from_pretty(
2529 " I I
2530 + 9 10",
2531 );
2532 let chunk_r3 = StreamChunk::from_pretty(
2533 " I I
2534 - 1 2",
2535 );
2536 let chunk_r4 = StreamChunk::from_pretty(
2537 " I I
2538 - 1 3",
2539 );
2540 let (mut tx_l, mut tx_r, mut hash_join) =
2541 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2542
2543 tx_l.push_barrier(test_epoch(1), false);
2545 tx_r.push_barrier(test_epoch(1), false);
2546 hash_join.next_unwrap_ready_barrier()?;
2547
2548 tx_l.push_chunk(chunk_l1);
2550 let chunk = hash_join.next_unwrap_ready_chunk()?;
2551 assert_eq!(
2552 chunk,
2553 StreamChunk::from_pretty(
2554 " I I
2555 + 1 4
2556 + 2 5
2557 + 3 6",
2558 )
2559 );
2560
2561 tx_l.push_barrier(test_epoch(2), false);
2563 tx_r.push_barrier(test_epoch(2), false);
2564 hash_join.next_unwrap_ready_barrier()?;
2565
2566 tx_l.push_chunk(chunk_l2);
2568 let chunk = hash_join.next_unwrap_ready_chunk()?;
2569 assert_eq!(
2570 chunk,
2571 StreamChunk::from_pretty(
2572 " I I
2573 + 3 8 D
2574 - 3 8 D",
2575 )
2576 );
2577
2578 tx_r.push_chunk(chunk_r1);
2580 let chunk = hash_join.next_unwrap_ready_chunk()?;
2581 assert_eq!(
2582 chunk,
2583 StreamChunk::from_pretty(
2584 " I I
2585 - 2 5"
2586 )
2587 );
2588
2589 tx_r.push_chunk(chunk_r2);
2591 let chunk = hash_join.next_unwrap_ready_chunk()?;
2592 assert_eq!(
2593 chunk,
2594 StreamChunk::from_pretty(
2595 " I I
2596 - 3 6
2597 - 1 4"
2598 )
2599 );
2600
2601 tx_l.push_chunk(chunk_l3);
2603 let chunk = hash_join.next_unwrap_ready_chunk()?;
2604 assert_eq!(
2605 chunk,
2606 StreamChunk::from_pretty(
2607 " I I
2608 + 9 10"
2609 )
2610 );
2611
2612 tx_r.push_chunk(chunk_r3);
2615 hash_join.next_unwrap_pending();
2616
2617 tx_r.push_chunk(chunk_r4);
2620 let chunk = hash_join.next_unwrap_ready_chunk()?;
2621 assert_eq!(
2622 chunk,
2623 StreamChunk::from_pretty(
2624 " I I
2625 + 1 4"
2626 )
2627 );
2628
2629 Ok(())
2630 }
2631
2632 #[tokio::test]
2633 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2634 let chunk_r1 = StreamChunk::from_pretty(
2635 " I I
2636 + 1 4
2637 + 2 5
2638 + 3 6",
2639 );
2640 let chunk_r2 = StreamChunk::from_pretty(
2641 " I I
2642 + 3 8
2643 - 3 8",
2644 );
2645 let chunk_l1 = StreamChunk::from_pretty(
2646 " I I
2647 + 2 7
2648 + 4 8
2649 + 6 9",
2650 );
2651 let chunk_l2 = StreamChunk::from_pretty(
2652 " I I
2653 + 3 10
2654 + 6 11
2655 + 1 2
2656 + 1 3",
2657 );
2658 let chunk_r3 = StreamChunk::from_pretty(
2659 " I I
2660 + 9 10",
2661 );
2662 let chunk_l3 = StreamChunk::from_pretty(
2663 " I I
2664 - 1 2",
2665 );
2666 let chunk_l4 = StreamChunk::from_pretty(
2667 " I I
2668 - 1 3",
2669 );
2670 let (mut tx_r, mut tx_l, mut hash_join) =
2671 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2672
2673 tx_r.push_barrier(test_epoch(1), false);
2675 tx_l.push_barrier(test_epoch(1), false);
2676 hash_join.next_unwrap_ready_barrier()?;
2677
2678 tx_r.push_chunk(chunk_r1);
2680 let chunk = hash_join.next_unwrap_ready_chunk()?;
2681 assert_eq!(
2682 chunk,
2683 StreamChunk::from_pretty(
2684 " I I
2685 + 1 4
2686 + 2 5
2687 + 3 6",
2688 )
2689 );
2690
2691 tx_r.push_barrier(test_epoch(2), false);
2693 tx_l.push_barrier(test_epoch(2), false);
2694 hash_join.next_unwrap_ready_barrier()?;
2695
2696 tx_r.push_chunk(chunk_r2);
2698 let chunk = hash_join.next_unwrap_ready_chunk()?;
2699 assert_eq!(
2700 chunk,
2701 StreamChunk::from_pretty(
2702 " I I
2703 + 3 8 D
2704 - 3 8 D",
2705 )
2706 );
2707
2708 tx_l.push_chunk(chunk_l1);
2710 let chunk = hash_join.next_unwrap_ready_chunk()?;
2711 assert_eq!(
2712 chunk,
2713 StreamChunk::from_pretty(
2714 " I I
2715 - 2 5"
2716 )
2717 );
2718
2719 tx_l.push_chunk(chunk_l2);
2721 let chunk = hash_join.next_unwrap_ready_chunk()?;
2722 assert_eq!(
2723 chunk,
2724 StreamChunk::from_pretty(
2725 " I I
2726 - 3 6
2727 - 1 4"
2728 )
2729 );
2730
2731 tx_r.push_chunk(chunk_r3);
2733 let chunk = hash_join.next_unwrap_ready_chunk()?;
2734 assert_eq!(
2735 chunk,
2736 StreamChunk::from_pretty(
2737 " I I
2738 + 9 10"
2739 )
2740 );
2741
2742 tx_l.push_chunk(chunk_l3);
2745 hash_join.next_unwrap_pending();
2746
2747 tx_l.push_chunk(chunk_l4);
2750 let chunk = hash_join.next_unwrap_ready_chunk()?;
2751 assert_eq!(
2752 chunk,
2753 StreamChunk::from_pretty(
2754 " I I
2755 + 1 4"
2756 )
2757 );
2758
2759 Ok(())
2760 }
2761
2762 #[tokio::test]
2763 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2764 let chunk_l1 = StreamChunk::from_pretty(
2765 " I I
2766 + 1 4
2767 + 2 5
2768 + 3 6",
2769 );
2770 let chunk_l2 = StreamChunk::from_pretty(
2771 " I I
2772 + 6 8
2773 + 3 8",
2774 );
2775 let chunk_r1 = StreamChunk::from_pretty(
2776 " I I
2777 + 2 7
2778 + 4 8
2779 + 6 9",
2780 );
2781 let chunk_r2 = StreamChunk::from_pretty(
2782 " I I
2783 + 3 10
2784 + 6 11",
2785 );
2786 let (mut tx_l, mut tx_r, mut hash_join) =
2787 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2788
2789 tx_l.push_barrier(test_epoch(1), false);
2791 tx_r.push_barrier(test_epoch(1), false);
2792 hash_join.next_unwrap_ready_barrier()?;
2793
2794 tx_l.push_chunk(chunk_l1);
2796 hash_join.next_unwrap_pending();
2797
2798 tx_l.push_barrier(test_epoch(2), false);
2800
2801 tx_l.push_chunk(chunk_l2);
2803
2804 tx_r.push_chunk(chunk_r1);
2806
2807 let chunk = hash_join.next_unwrap_ready_chunk()?;
2809 assert_eq!(
2810 chunk,
2811 StreamChunk::from_pretty(
2812 " I I I I
2813 + 2 5 2 7"
2814 )
2815 );
2816
2817 tx_r.push_barrier(test_epoch(2), false);
2819
2820 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2822 assert!(matches!(
2823 hash_join.next_unwrap_ready_barrier()?,
2824 Barrier {
2825 epoch,
2826 mutation: None,
2827 ..
2828 } if epoch == expected_epoch
2829 ));
2830
2831 let chunk = hash_join.next_unwrap_ready_chunk()?;
2833 assert_eq!(
2834 chunk,
2835 StreamChunk::from_pretty(
2836 " I I I I
2837 + 6 8 6 9"
2838 )
2839 );
2840
2841 tx_r.push_chunk(chunk_r2);
2843 let chunk = hash_join.next_unwrap_ready_chunk()?;
2844 assert_eq!(
2845 chunk,
2846 StreamChunk::from_pretty(
2847 " I I I I
2848 + 3 6 3 10
2849 + 3 8 3 10
2850 + 6 8 6 11"
2851 )
2852 );
2853
2854 Ok(())
2855 }
2856
2857 #[tokio::test]
2858 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2859 let chunk_l1 = StreamChunk::from_pretty(
2860 " I I
2861 + 1 4
2862 + 2 .
2863 + 3 .",
2864 );
2865 let chunk_l2 = StreamChunk::from_pretty(
2866 " I I
2867 + 6 .
2868 + 3 8",
2869 );
2870 let chunk_r1 = StreamChunk::from_pretty(
2871 " I I
2872 + 2 7
2873 + 4 8
2874 + 6 9",
2875 );
2876 let chunk_r2 = StreamChunk::from_pretty(
2877 " I I
2878 + 3 10
2879 + 6 11",
2880 );
2881 let (mut tx_l, mut tx_r, mut hash_join) =
2882 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2883
2884 tx_l.push_barrier(test_epoch(1), false);
2886 tx_r.push_barrier(test_epoch(1), false);
2887 hash_join.next_unwrap_ready_barrier()?;
2888
2889 tx_l.push_chunk(chunk_l1);
2891 hash_join.next_unwrap_pending();
2892
2893 tx_l.push_barrier(test_epoch(2), false);
2895
2896 tx_l.push_chunk(chunk_l2);
2898
2899 tx_r.push_chunk(chunk_r1);
2901
2902 let chunk = hash_join.next_unwrap_ready_chunk()?;
2904 assert_eq!(
2905 chunk,
2906 StreamChunk::from_pretty(
2907 " I I I I
2908 + 2 . 2 7"
2909 )
2910 );
2911
2912 tx_r.push_barrier(test_epoch(2), false);
2914
2915 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2917 assert!(matches!(
2918 hash_join.next_unwrap_ready_barrier()?,
2919 Barrier {
2920 epoch,
2921 mutation: None,
2922 ..
2923 } if epoch == expected_epoch
2924 ));
2925
2926 let chunk = hash_join.next_unwrap_ready_chunk()?;
2928 assert_eq!(
2929 chunk,
2930 StreamChunk::from_pretty(
2931 " I I I I
2932 + 6 . 6 9"
2933 )
2934 );
2935
2936 tx_r.push_chunk(chunk_r2);
2938 let chunk = hash_join.next_unwrap_ready_chunk()?;
2939 assert_eq!(
2940 chunk,
2941 StreamChunk::from_pretty(
2942 " I I I I
2943 + 3 8 3 10
2944 + 3 . 3 10
2945 + 6 . 6 11"
2946 )
2947 );
2948
2949 Ok(())
2950 }
2951
2952 #[tokio::test]
2953 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2954 let chunk_l1 = StreamChunk::from_pretty(
2955 " I I
2956 + 1 4
2957 + 2 5
2958 + 3 6",
2959 );
2960 let chunk_l2 = StreamChunk::from_pretty(
2961 " I I
2962 + 3 8
2963 - 3 8",
2964 );
2965 let chunk_r1 = StreamChunk::from_pretty(
2966 " I I
2967 + 2 7
2968 + 4 8
2969 + 6 9",
2970 );
2971 let chunk_r2 = StreamChunk::from_pretty(
2972 " I I
2973 + 3 10
2974 + 6 11",
2975 );
2976 let (mut tx_l, mut tx_r, mut hash_join) =
2977 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2978
2979 tx_l.push_barrier(test_epoch(1), false);
2981 tx_r.push_barrier(test_epoch(1), false);
2982 hash_join.next_unwrap_ready_barrier()?;
2983
2984 tx_l.push_chunk(chunk_l1);
2986 let chunk = hash_join.next_unwrap_ready_chunk()?;
2987 assert_eq!(
2988 chunk,
2989 StreamChunk::from_pretty(
2990 " I I I I
2991 + 1 4 . .
2992 + 2 5 . .
2993 + 3 6 . ."
2994 )
2995 );
2996
2997 tx_l.push_chunk(chunk_l2);
2999 let chunk = hash_join.next_unwrap_ready_chunk()?;
3000 assert_eq!(
3001 chunk,
3002 StreamChunk::from_pretty(
3003 " I I I I
3004 + 3 8 . . D
3005 - 3 8 . . D"
3006 )
3007 );
3008
3009 tx_r.push_chunk(chunk_r1);
3011 let chunk = hash_join.next_unwrap_ready_chunk()?;
3012 assert_eq!(
3013 chunk,
3014 StreamChunk::from_pretty(
3015 " I I I I
3016 - 2 5 . .
3017 + 2 5 2 7"
3018 )
3019 );
3020
3021 tx_r.push_chunk(chunk_r2);
3023 let chunk = hash_join.next_unwrap_ready_chunk()?;
3024 assert_eq!(
3025 chunk,
3026 StreamChunk::from_pretty(
3027 " I I I I
3028 - 3 6 . .
3029 + 3 6 3 10"
3030 )
3031 );
3032
3033 Ok(())
3034 }
3035
3036 #[tokio::test]
3037 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3038 let chunk_l1 = StreamChunk::from_pretty(
3039 " I I
3040 + 1 4
3041 + 2 5
3042 + . 6",
3043 );
3044 let chunk_l2 = StreamChunk::from_pretty(
3045 " I I
3046 + . 8
3047 - . 8",
3048 );
3049 let chunk_r1 = StreamChunk::from_pretty(
3050 " I I
3051 + 2 7
3052 + 4 8
3053 + 6 9",
3054 );
3055 let chunk_r2 = StreamChunk::from_pretty(
3056 " I I
3057 + . 10
3058 + 6 11",
3059 );
3060 let (mut tx_l, mut tx_r, mut hash_join) =
3061 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3062
3063 tx_l.push_barrier(test_epoch(1), false);
3065 tx_r.push_barrier(test_epoch(1), false);
3066 hash_join.next_unwrap_ready_barrier()?;
3067
3068 tx_l.push_chunk(chunk_l1);
3070 let chunk = hash_join.next_unwrap_ready_chunk()?;
3071 assert_eq!(
3072 chunk,
3073 StreamChunk::from_pretty(
3074 " I I I I
3075 + 1 4 . .
3076 + 2 5 . .
3077 + . 6 . ."
3078 )
3079 );
3080
3081 tx_l.push_chunk(chunk_l2);
3083 let chunk = hash_join.next_unwrap_ready_chunk()?;
3084 assert_eq!(
3085 chunk,
3086 StreamChunk::from_pretty(
3087 " I I I I
3088 + . 8 . . D
3089 - . 8 . . D"
3090 )
3091 );
3092
3093 tx_r.push_chunk(chunk_r1);
3095 let chunk = hash_join.next_unwrap_ready_chunk()?;
3096 assert_eq!(
3097 chunk,
3098 StreamChunk::from_pretty(
3099 " I I I I
3100 - 2 5 . .
3101 + 2 5 2 7"
3102 )
3103 );
3104
3105 tx_r.push_chunk(chunk_r2);
3107 let chunk = hash_join.next_unwrap_ready_chunk()?;
3108 assert_eq!(
3109 chunk,
3110 StreamChunk::from_pretty(
3111 " I I I I
3112 - . 6 . .
3113 + . 6 . 10"
3114 )
3115 );
3116
3117 Ok(())
3118 }
3119
3120 #[tokio::test]
3121 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3122 let chunk_l1 = StreamChunk::from_pretty(
3123 " I I
3124 + 1 4
3125 + 2 5
3126 + 3 6",
3127 );
3128 let chunk_l2 = StreamChunk::from_pretty(
3129 " I I
3130 + 3 8
3131 - 3 8",
3132 );
3133 let chunk_r1 = StreamChunk::from_pretty(
3134 " I I
3135 + 2 7
3136 + 4 8
3137 + 6 9",
3138 );
3139 let chunk_r2 = StreamChunk::from_pretty(
3140 " I I
3141 + 5 10
3142 - 5 10",
3143 );
3144 let (mut tx_l, mut tx_r, mut hash_join) =
3145 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3146
3147 tx_l.push_barrier(test_epoch(1), false);
3149 tx_r.push_barrier(test_epoch(1), false);
3150 hash_join.next_unwrap_ready_barrier()?;
3151
3152 tx_l.push_chunk(chunk_l1);
3154 hash_join.next_unwrap_pending();
3155
3156 tx_l.push_chunk(chunk_l2);
3158 hash_join.next_unwrap_pending();
3159
3160 tx_r.push_chunk(chunk_r1);
3162 let chunk = hash_join.next_unwrap_ready_chunk()?;
3163 assert_eq!(
3164 chunk,
3165 StreamChunk::from_pretty(
3166 " I I I I
3167 + 2 5 2 7
3168 + . . 4 8
3169 + . . 6 9"
3170 )
3171 );
3172
3173 tx_r.push_chunk(chunk_r2);
3175 let chunk = hash_join.next_unwrap_ready_chunk()?;
3176 assert_eq!(
3177 chunk,
3178 StreamChunk::from_pretty(
3179 " I I I I
3180 + . . 5 10 D
3181 - . . 5 10 D"
3182 )
3183 );
3184
3185 Ok(())
3186 }
3187
3188 #[tokio::test]
3189 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3190 let chunk_l1 = StreamChunk::from_pretty(
3191 " I I I
3192 + 1 4 1
3193 + 2 5 2
3194 + 3 6 3",
3195 );
3196 let chunk_l2 = StreamChunk::from_pretty(
3197 " I I I
3198 + 4 9 4
3199 + 5 10 5",
3200 );
3201 let chunk_r1 = StreamChunk::from_pretty(
3202 " I I I
3203 + 2 5 1
3204 + 4 9 2
3205 + 6 9 3",
3206 );
3207 let chunk_r2 = StreamChunk::from_pretty(
3208 " I I I
3209 + 1 4 4
3210 + 3 6 5",
3211 );
3212
3213 let (mut tx_l, mut tx_r, mut hash_join) =
3214 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3215
3216 tx_l.push_barrier(test_epoch(1), false);
3218 tx_r.push_barrier(test_epoch(1), false);
3219 hash_join.next_unwrap_ready_barrier()?;
3220
3221 tx_l.push_chunk(chunk_l1);
3223 let chunk = hash_join.next_unwrap_ready_chunk()?;
3224 assert_eq!(
3225 chunk,
3226 StreamChunk::from_pretty(
3227 " I I I I I I
3228 + 1 4 1 . . .
3229 + 2 5 2 . . .
3230 + 3 6 3 . . ."
3231 )
3232 );
3233
3234 tx_l.push_chunk(chunk_l2);
3236 let chunk = hash_join.next_unwrap_ready_chunk()?;
3237 assert_eq!(
3238 chunk,
3239 StreamChunk::from_pretty(
3240 " I I I I I I
3241 + 4 9 4 . . .
3242 + 5 10 5 . . ."
3243 )
3244 );
3245
3246 tx_r.push_chunk(chunk_r1);
3248 let chunk = hash_join.next_unwrap_ready_chunk()?;
3249 assert_eq!(
3250 chunk,
3251 StreamChunk::from_pretty(
3252 " I I I I I I
3253 - 2 5 2 . . .
3254 + 2 5 2 2 5 1
3255 - 4 9 4 . . .
3256 + 4 9 4 4 9 2"
3257 )
3258 );
3259
3260 tx_r.push_chunk(chunk_r2);
3262 let chunk = hash_join.next_unwrap_ready_chunk()?;
3263 assert_eq!(
3264 chunk,
3265 StreamChunk::from_pretty(
3266 " I I I I I I
3267 - 1 4 1 . . .
3268 + 1 4 1 1 4 4
3269 - 3 6 3 . . .
3270 + 3 6 3 3 6 5"
3271 )
3272 );
3273
3274 Ok(())
3275 }
3276
3277 #[tokio::test]
3278 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3279 let chunk_l1 = StreamChunk::from_pretty(
3280 " I I I
3281 + 1 4 1
3282 + 2 5 2
3283 + 3 6 3",
3284 );
3285 let chunk_l2 = StreamChunk::from_pretty(
3286 " I I I
3287 + 4 9 4
3288 + 5 10 5",
3289 );
3290 let chunk_r1 = StreamChunk::from_pretty(
3291 " I I I
3292 + 2 5 1
3293 + 4 9 2
3294 + 6 9 3",
3295 );
3296 let chunk_r2 = StreamChunk::from_pretty(
3297 " I I I
3298 + 1 4 4
3299 + 3 6 5
3300 + 7 7 6",
3301 );
3302
3303 let (mut tx_l, mut tx_r, mut hash_join) =
3304 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3305
3306 tx_l.push_barrier(test_epoch(1), false);
3308 tx_r.push_barrier(test_epoch(1), false);
3309 hash_join.next_unwrap_ready_barrier()?;
3310
3311 tx_l.push_chunk(chunk_l1);
3313 hash_join.next_unwrap_pending();
3314
3315 tx_l.push_chunk(chunk_l2);
3317 hash_join.next_unwrap_pending();
3318
3319 tx_r.push_chunk(chunk_r1);
3321 let chunk = hash_join.next_unwrap_ready_chunk()?;
3322 assert_eq!(
3323 chunk,
3324 StreamChunk::from_pretty(
3325 " I I I I I I
3326 + 2 5 2 2 5 1
3327 + 4 9 4 4 9 2
3328 + . . . 6 9 3"
3329 )
3330 );
3331
3332 tx_r.push_chunk(chunk_r2);
3334 let chunk = hash_join.next_unwrap_ready_chunk()?;
3335 assert_eq!(
3336 chunk,
3337 StreamChunk::from_pretty(
3338 " I I I I I I
3339 + 1 4 1 1 4 4
3340 + 3 6 3 3 6 5
3341 + . . . 7 7 6"
3342 )
3343 );
3344
3345 Ok(())
3346 }
3347
3348 #[tokio::test]
3349 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3350 let chunk_l1 = StreamChunk::from_pretty(
3351 " I I
3352 + 1 4
3353 + 2 5
3354 + 3 6",
3355 );
3356 let chunk_l2 = StreamChunk::from_pretty(
3357 " I I
3358 + 3 8
3359 - 3 8",
3360 );
3361 let chunk_r1 = StreamChunk::from_pretty(
3362 " I I
3363 + 2 7
3364 + 4 8
3365 + 6 9",
3366 );
3367 let chunk_r2 = StreamChunk::from_pretty(
3368 " I I
3369 + 5 10
3370 - 5 10",
3371 );
3372 let (mut tx_l, mut tx_r, mut hash_join) =
3373 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3374
3375 tx_l.push_barrier(test_epoch(1), false);
3377 tx_r.push_barrier(test_epoch(1), false);
3378 hash_join.next_unwrap_ready_barrier()?;
3379
3380 tx_l.push_chunk(chunk_l1);
3382 let chunk = hash_join.next_unwrap_ready_chunk()?;
3383 assert_eq!(
3384 chunk,
3385 StreamChunk::from_pretty(
3386 " I I I I
3387 + 1 4 . .
3388 + 2 5 . .
3389 + 3 6 . ."
3390 )
3391 );
3392
3393 tx_l.push_chunk(chunk_l2);
3395 let chunk = hash_join.next_unwrap_ready_chunk()?;
3396 assert_eq!(
3397 chunk,
3398 StreamChunk::from_pretty(
3399 " I I I I
3400 + 3 8 . . D
3401 - 3 8 . . D"
3402 )
3403 );
3404
3405 tx_r.push_chunk(chunk_r1);
3407 let chunk = hash_join.next_unwrap_ready_chunk()?;
3408 assert_eq!(
3409 chunk,
3410 StreamChunk::from_pretty(
3411 " I I I I
3412 - 2 5 . .
3413 + 2 5 2 7
3414 + . . 4 8
3415 + . . 6 9"
3416 )
3417 );
3418
3419 tx_r.push_chunk(chunk_r2);
3421 let chunk = hash_join.next_unwrap_ready_chunk()?;
3422 assert_eq!(
3423 chunk,
3424 StreamChunk::from_pretty(
3425 " I I I I
3426 + . . 5 10 D
3427 - . . 5 10 D"
3428 )
3429 );
3430
3431 Ok(())
3432 }
3433
3434 #[tokio::test]
3435 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3436 let (mut tx_l, mut tx_r, mut hash_join) =
3437 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3438
3439 tx_l.push_barrier(test_epoch(1), false);
3441 tx_r.push_barrier(test_epoch(1), false);
3442 hash_join.next_unwrap_ready_barrier()?;
3443
3444 tx_l.push_chunk(StreamChunk::from_pretty(
3445 " I I
3446 + 1 1
3447 ",
3448 ));
3449 let chunk = hash_join.next_unwrap_ready_chunk()?;
3450 assert_eq!(
3451 chunk,
3452 StreamChunk::from_pretty(
3453 " I I I I
3454 + 1 1 . ."
3455 )
3456 );
3457
3458 tx_r.push_chunk(StreamChunk::from_pretty(
3459 " I I
3460 + 1 1
3461 ",
3462 ));
3463 let chunk = hash_join.next_unwrap_ready_chunk()?;
3464
3465 assert_eq!(
3466 chunk,
3467 StreamChunk::from_pretty(
3468 " I I I I
3469 - 1 1 . .
3470 + 1 1 1 1"
3471 )
3472 );
3473
3474 tx_l.push_chunk(StreamChunk::from_pretty(
3475 " I I
3476 - 1 1
3477 + 1 2
3478 ",
3479 ));
3480 let chunk = hash_join.next_unwrap_ready_chunk()?;
3481 let chunk = chunk.compact_vis();
3482 assert_eq!(
3483 chunk,
3484 StreamChunk::from_pretty(
3485 " I I I I
3486 - 1 1 1 1
3487 + 1 2 1 1
3488 "
3489 )
3490 );
3491
3492 Ok(())
3493 }
3494
3495 #[tokio::test]
3496 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3497 {
3498 let chunk_l1 = StreamChunk::from_pretty(
3499 " I I
3500 + 1 4
3501 + 2 5
3502 + 3 6
3503 + 3 7",
3504 );
3505 let chunk_l2 = StreamChunk::from_pretty(
3506 " I I
3507 + 3 8
3508 - 3 8
3509 - 1 4", );
3511 let chunk_r1 = StreamChunk::from_pretty(
3512 " I I
3513 + 2 6
3514 + 4 8
3515 + 3 4",
3516 );
3517 let chunk_r2 = StreamChunk::from_pretty(
3518 " I I
3519 + 5 10
3520 - 5 10
3521 + 1 2",
3522 );
3523 let (mut tx_l, mut tx_r, mut hash_join) =
3524 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3525
3526 tx_l.push_barrier(test_epoch(1), false);
3528 tx_r.push_barrier(test_epoch(1), false);
3529 hash_join.next_unwrap_ready_barrier()?;
3530
3531 tx_l.push_chunk(chunk_l1);
3533 let chunk = hash_join.next_unwrap_ready_chunk()?;
3534 assert_eq!(
3535 chunk,
3536 StreamChunk::from_pretty(
3537 " I I I I
3538 + 1 4 . .
3539 + 2 5 . .
3540 + 3 6 . .
3541 + 3 7 . ."
3542 )
3543 );
3544
3545 tx_l.push_chunk(chunk_l2);
3547 let chunk = hash_join.next_unwrap_ready_chunk()?;
3548 assert_eq!(
3549 chunk,
3550 StreamChunk::from_pretty(
3551 " I I I I
3552 + 3 8 . . D
3553 - 3 8 . . D
3554 - 1 4 . ."
3555 )
3556 );
3557
3558 tx_r.push_chunk(chunk_r1);
3560 let chunk = hash_join.next_unwrap_ready_chunk()?;
3561 assert_eq!(
3562 chunk,
3563 StreamChunk::from_pretty(
3564 " I I I I
3565 - 2 5 . .
3566 + 2 5 2 6
3567 + . . 4 8
3568 + . . 3 4" )
3572 );
3573
3574 tx_r.push_chunk(chunk_r2);
3576 let chunk = hash_join.next_unwrap_ready_chunk()?;
3577 assert_eq!(
3578 chunk,
3579 StreamChunk::from_pretty(
3580 " I I I I
3581 + . . 5 10 D
3582 - . . 5 10 D
3583 + . . 1 2" )
3586 );
3587
3588 Ok(())
3589 }
3590
3591 #[tokio::test]
3592 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3593 let chunk_l1 = StreamChunk::from_pretty(
3594 " I I
3595 + 1 4
3596 + 2 10
3597 + 3 6",
3598 );
3599 let chunk_l2 = StreamChunk::from_pretty(
3600 " I I
3601 + 3 8
3602 - 3 8",
3603 );
3604 let chunk_r1 = StreamChunk::from_pretty(
3605 " I I
3606 + 2 7
3607 + 4 8
3608 + 6 9",
3609 );
3610 let chunk_r2 = StreamChunk::from_pretty(
3611 " I I
3612 + 3 10
3613 + 6 11",
3614 );
3615 let (mut tx_l, mut tx_r, mut hash_join) =
3616 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3617
3618 tx_l.push_barrier(test_epoch(1), false);
3620 tx_r.push_barrier(test_epoch(1), false);
3621 hash_join.next_unwrap_ready_barrier()?;
3622
3623 tx_l.push_chunk(chunk_l1);
3625 hash_join.next_unwrap_pending();
3626
3627 tx_l.push_chunk(chunk_l2);
3629 hash_join.next_unwrap_pending();
3630
3631 tx_r.push_chunk(chunk_r1);
3633 hash_join.next_unwrap_pending();
3634
3635 tx_r.push_chunk(chunk_r2);
3637 let chunk = hash_join.next_unwrap_ready_chunk()?;
3638 assert_eq!(
3639 chunk,
3640 StreamChunk::from_pretty(
3641 " I I I I
3642 + 3 6 3 10"
3643 )
3644 );
3645
3646 Ok(())
3647 }
3648
3649 #[tokio::test]
3650 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3651 let (mut tx_l, mut tx_r, mut hash_join) =
3652 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3653
3654 tx_l.push_barrier(test_epoch(1), false);
3656 tx_r.push_barrier(test_epoch(1), false);
3657 hash_join.next_unwrap_ready_barrier()?;
3658
3659 tx_l.push_int64_watermark(0, 100);
3660
3661 tx_l.push_int64_watermark(0, 200);
3662
3663 tx_l.push_barrier(test_epoch(2), false);
3664 tx_r.push_barrier(test_epoch(2), false);
3665 hash_join.next_unwrap_ready_barrier()?;
3666
3667 tx_r.push_int64_watermark(0, 50);
3668
3669 let w1 = hash_join.next().await.unwrap().unwrap();
3670 let w1 = w1.as_watermark().unwrap();
3671
3672 let w2 = hash_join.next().await.unwrap().unwrap();
3673 let w2 = w2.as_watermark().unwrap();
3674
3675 tx_r.push_int64_watermark(0, 100);
3676
3677 let w3 = hash_join.next().await.unwrap().unwrap();
3678 let w3 = w3.as_watermark().unwrap();
3679
3680 let w4 = hash_join.next().await.unwrap().unwrap();
3681 let w4 = w4.as_watermark().unwrap();
3682
3683 assert_eq!(
3684 w1,
3685 &Watermark {
3686 col_idx: 2,
3687 data_type: DataType::Int64,
3688 val: ScalarImpl::Int64(50)
3689 }
3690 );
3691
3692 assert_eq!(
3693 w2,
3694 &Watermark {
3695 col_idx: 0,
3696 data_type: DataType::Int64,
3697 val: ScalarImpl::Int64(50)
3698 }
3699 );
3700
3701 assert_eq!(
3702 w3,
3703 &Watermark {
3704 col_idx: 2,
3705 data_type: DataType::Int64,
3706 val: ScalarImpl::Int64(100)
3707 }
3708 );
3709
3710 assert_eq!(
3711 w4,
3712 &Watermark {
3713 col_idx: 0,
3714 data_type: DataType::Int64,
3715 val: ScalarImpl::Int64(100)
3716 }
3717 );
3718
3719 Ok(())
3720 }
3721
3722 async fn create_executor_with_evict_interval<const T: JoinTypePrimitive>(
3723 evict_interval: u32,
3724 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
3725 let schema = Schema {
3726 fields: vec![
3727 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
3729 ],
3730 };
3731 let (tx_l, source_l) = MockSource::channel();
3732 let source_l = source_l.into_executor(schema.clone(), vec![1]);
3733 let (tx_r, source_r) = MockSource::channel();
3734 let source_r = source_r.into_executor(schema, vec![1]);
3735 let params_l = JoinParams::new(vec![0], vec![1]);
3736 let params_r = JoinParams::new(vec![0], vec![1]);
3737
3738 let mem_state = MemoryStateStore::new();
3739
3740 let (state_l, degree_state_l) = create_in_memory_state_table(
3741 mem_state.clone(),
3742 &[DataType::Int64, DataType::Int64],
3743 &[OrderType::ascending(), OrderType::ascending()],
3744 &[0, 1],
3745 0,
3746 )
3747 .await;
3748
3749 let (state_r, degree_state_r) = create_in_memory_state_table(
3750 mem_state,
3751 &[DataType::Int64, DataType::Int64],
3752 &[OrderType::ascending(), OrderType::ascending()],
3753 &[0, 1],
3754 2,
3755 )
3756 .await;
3757
3758 let schema = match T {
3759 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
3760 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
3761 _ => [source_l.schema().fields(), source_r.schema().fields()]
3762 .concat()
3763 .into_iter()
3764 .collect(),
3765 };
3766 let schema_len = schema.len();
3767 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
3768
3769 let mut streaming_config = StreamingConfig::default();
3770 streaming_config.developer.join_hash_map_evict_interval_rows = evict_interval;
3771
3772 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
3773 ActorContext::for_test_with_config(123, streaming_config),
3774 info,
3775 source_l,
3776 source_r,
3777 params_l,
3778 params_r,
3779 vec![false],
3780 (0..schema_len).collect_vec(),
3781 None,
3782 vec![],
3783 state_l,
3784 degree_state_l,
3785 state_r,
3786 degree_state_r,
3787 Arc::new(AtomicU64::new(0)),
3788 false,
3789 Arc::new(StreamingMetrics::unused()),
3790 1024,
3791 2048,
3792 vec![(0, true)],
3793 );
3794 (tx_l, tx_r, executor.boxed().execute())
3795 }
3796
3797 #[tokio::test]
3800 async fn test_hash_join_evict_interval_disabled() -> StreamExecutorResult<()> {
3801 let chunk_l = StreamChunk::from_pretty(
3802 " I I
3803 + 1 4
3804 + 2 5
3805 + 3 6",
3806 );
3807 let chunk_r = StreamChunk::from_pretty(
3808 " I I
3809 + 2 7
3810 + 3 8",
3811 );
3812
3813 let (mut tx_l, mut tx_r, mut hash_join) =
3815 create_executor_with_evict_interval::<{ JoinType::Inner }>(0).await;
3816
3817 tx_l.push_barrier(test_epoch(1), false);
3818 tx_r.push_barrier(test_epoch(1), false);
3819 hash_join.next_unwrap_ready_barrier()?;
3820
3821 tx_l.push_chunk(chunk_l);
3822 hash_join.next_unwrap_pending();
3823
3824 tx_r.push_chunk(chunk_r);
3825 let chunk = hash_join.next_unwrap_ready_chunk()?;
3826 assert_eq!(
3827 chunk,
3828 StreamChunk::from_pretty(
3829 " I I I I
3830 + 2 5 2 7
3831 + 3 6 3 8"
3832 )
3833 );
3834
3835 Ok(())
3836 }
3837
3838 #[tokio::test]
3841 async fn test_hash_join_evict_interval_one() -> StreamExecutorResult<()> {
3842 let chunk_l = StreamChunk::from_pretty(
3843 " I I
3844 + 1 4
3845 + 2 5
3846 + 3 6",
3847 );
3848 let chunk_r = StreamChunk::from_pretty(
3849 " I I
3850 + 2 7
3851 + 3 8",
3852 );
3853
3854 let (mut tx_l, mut tx_r, mut hash_join) =
3856 create_executor_with_evict_interval::<{ JoinType::Inner }>(1).await;
3857
3858 tx_l.push_barrier(test_epoch(1), false);
3859 tx_r.push_barrier(test_epoch(1), false);
3860 hash_join.next_unwrap_ready_barrier()?;
3861
3862 tx_l.push_chunk(chunk_l);
3863 hash_join.next_unwrap_pending();
3864
3865 tx_r.push_chunk(chunk_r);
3866 let chunk = hash_join.next_unwrap_ready_chunk()?;
3867 assert_eq!(
3868 chunk,
3869 StreamChunk::from_pretty(
3870 " I I I I
3871 + 2 5 2 7
3872 + 3 6 3 8"
3873 )
3874 );
3875
3876 Ok(())
3877 }
3878
3879 #[tokio::test]
3882 async fn test_hash_join_evict_interval_custom() -> StreamExecutorResult<()> {
3883 let chunk_l = StreamChunk::from_pretty(
3884 " I I
3885 + 1 4
3886 + 2 5
3887 + 3 6
3888 + 4 7
3889 + 5 8",
3890 );
3891 let chunk_r = StreamChunk::from_pretty(
3892 " I I
3893 + 1 9
3894 + 3 10
3895 + 5 11",
3896 );
3897
3898 let (mut tx_l, mut tx_r, mut hash_join) =
3900 create_executor_with_evict_interval::<{ JoinType::Inner }>(2).await;
3901
3902 tx_l.push_barrier(test_epoch(1), false);
3903 tx_r.push_barrier(test_epoch(1), false);
3904 hash_join.next_unwrap_ready_barrier()?;
3905
3906 tx_l.push_chunk(chunk_l);
3907 hash_join.next_unwrap_pending();
3908
3909 tx_r.push_chunk(chunk_r);
3910 let chunk = hash_join.next_unwrap_ready_chunk()?;
3911 assert_eq!(
3912 chunk,
3913 StreamChunk::from_pretty(
3914 " I I I I
3915 + 1 4 1 9
3916 + 3 6 3 10
3917 + 5 8 5 11"
3918 )
3919 );
3920
3921 Ok(())
3922 }
3923}