1use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
46 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
47}
48
49#[derive(Debug, Clone)]
52pub struct InequalityPairInfo {
53 pub left_idx: usize,
55 pub right_idx: usize,
57 pub clean_left_state: bool,
59 pub clean_right_state: bool,
61 pub op: InequalityType,
63}
64
65impl InequalityPairInfo {
66 pub fn left_side_is_larger(&self) -> bool {
68 matches!(
69 self.op,
70 InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
71 )
72 }
73}
74
75pub struct JoinParams {
76 pub join_key_indices: Vec<usize>,
78 pub deduped_pk_indices: Vec<usize>,
80}
81
82impl JoinParams {
83 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
84 Self {
85 join_key_indices,
86 deduped_pk_indices,
87 }
88 }
89}
90
91struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
92 ht: JoinHashMap<K, S, E>,
94 join_key_indices: Vec<usize>,
96 all_data_types: Vec<DataType>,
98 start_pos: usize,
100 i2o_mapping: Vec<(usize, usize)>,
102 i2o_mapping_indexed: MultiMap<usize, usize>,
103 input2inequality_index: Vec<Vec<(usize, bool)>>,
111 non_null_fields: Vec<usize>,
113 state_clean_columns: Vec<(usize, usize)>,
116 need_degree_table: bool,
118 _marker: std::marker::PhantomData<E>,
119}
120
121impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.debug_struct("JoinSide")
124 .field("join_key_indices", &self.join_key_indices)
125 .field("col_types", &self.all_data_types)
126 .field("start_pos", &self.start_pos)
127 .field("i2o_mapping", &self.i2o_mapping)
128 .field("need_degree_table", &self.need_degree_table)
129 .finish()
130 }
131}
132
133impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
134 fn is_dirty(&self) -> bool {
136 unimplemented!()
137 }
138
139 #[expect(dead_code)]
140 fn clear_cache(&mut self) {
141 assert!(
142 !self.is_dirty(),
143 "cannot clear cache while states of hash join are dirty"
144 );
145
146 }
149
150 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
151 self.ht.init(epoch).await
152 }
153}
154
155pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
158{
159 ctx: ActorContextRef,
160 info: ExecutorInfo,
161
162 input_l: Option<Executor>,
164 input_r: Option<Executor>,
166 actual_output_data_types: Vec<DataType>,
168 side_l: JoinSide<K, S, E>,
170 side_r: JoinSide<K, S, E>,
172 cond: Option<NonStrictExpression>,
174 inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
177 inequality_watermarks: Vec<Option<Watermark>>,
181 watermark_indices_in_jk: Vec<(usize, bool)>,
184
185 append_only_optimize: bool,
187
188 metrics: Arc<StreamingMetrics>,
189 chunk_size: usize,
191 cnt_rows_received: u32,
193
194 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
196
197 high_join_amplification_threshold: usize,
199
200 entry_state_max_rows: usize,
202 join_cache_evict_interval_rows: u32,
204}
205
206impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
207 for HashJoinExecutor<K, S, T, E>
208{
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("HashJoinExecutor")
211 .field("join_type", &T)
212 .field("input_left", &self.input_l.as_ref().unwrap().identity())
213 .field("input_right", &self.input_r.as_ref().unwrap().identity())
214 .field("side_l", &self.side_l)
215 .field("side_r", &self.side_r)
216 .field("stream_key", &self.info.stream_key)
217 .field("schema", &self.info.schema)
218 .field("actual_output_data_types", &self.actual_output_data_types)
219 .field(
220 "join_cache_evict_interval_rows",
221 &self.join_cache_evict_interval_rows,
222 )
223 .finish()
224 }
225}
226
227impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
228 for HashJoinExecutor<K, S, T, E>
229{
230 fn execute(self: Box<Self>) -> BoxedMessageStream {
231 self.into_stream().boxed()
232 }
233}
234
235struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
236 ctx: &'a ActorContextRef,
237 side_l: &'a mut JoinSide<K, S, E>,
238 side_r: &'a mut JoinSide<K, S, E>,
239 actual_output_data_types: &'a [DataType],
240 cond: &'a mut Option<NonStrictExpression>,
241 inequality_watermarks: &'a [Option<Watermark>],
242 chunk: StreamChunk,
243 append_only_optimize: bool,
244 chunk_size: usize,
245 cnt_rows_received: &'a mut u32,
246 high_join_amplification_threshold: usize,
247 entry_state_max_rows: usize,
248 join_cache_evict_interval_rows: u32,
249}
250
251impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
252 HashJoinExecutor<K, S, T, E>
253{
254 #[allow(clippy::too_many_arguments)]
255 pub fn new(
256 ctx: ActorContextRef,
257 info: ExecutorInfo,
258 input_l: Executor,
259 input_r: Executor,
260 params_l: JoinParams,
261 params_r: JoinParams,
262 null_safe: Vec<bool>,
263 output_indices: Vec<usize>,
264 cond: Option<NonStrictExpression>,
265 inequality_pairs: Vec<InequalityPairInfo>,
266 state_table_l: StateTable<S>,
267 degree_state_table_l: StateTable<S>,
268 state_table_r: StateTable<S>,
269 degree_state_table_r: StateTable<S>,
270 watermark_epoch: AtomicU64Ref,
271 is_append_only: bool,
272 metrics: Arc<StreamingMetrics>,
273 chunk_size: usize,
274 high_join_amplification_threshold: usize,
275 watermark_indices_in_jk: Vec<(usize, bool)>,
276 ) -> Self {
277 Self::new_with_cache_size(
278 ctx,
279 info,
280 input_l,
281 input_r,
282 params_l,
283 params_r,
284 null_safe,
285 output_indices,
286 cond,
287 inequality_pairs,
288 state_table_l,
289 degree_state_table_l,
290 state_table_r,
291 degree_state_table_r,
292 watermark_epoch,
293 is_append_only,
294 metrics,
295 chunk_size,
296 high_join_amplification_threshold,
297 None,
298 watermark_indices_in_jk,
299 )
300 }
301
302 #[allow(clippy::too_many_arguments)]
303 pub fn new_with_cache_size(
304 ctx: ActorContextRef,
305 info: ExecutorInfo,
306 input_l: Executor,
307 input_r: Executor,
308 params_l: JoinParams,
309 params_r: JoinParams,
310 null_safe: Vec<bool>,
311 output_indices: Vec<usize>,
312 cond: Option<NonStrictExpression>,
313 inequality_pairs: Vec<InequalityPairInfo>,
314 state_table_l: StateTable<S>,
315 degree_state_table_l: StateTable<S>,
316 state_table_r: StateTable<S>,
317 degree_state_table_r: StateTable<S>,
318 watermark_epoch: AtomicU64Ref,
319 is_append_only: bool,
320 metrics: Arc<StreamingMetrics>,
321 chunk_size: usize,
322 high_join_amplification_threshold: usize,
323 entry_state_max_rows: Option<usize>,
324 watermark_indices_in_jk: Vec<(usize, bool)>,
325 ) -> Self {
326 let entry_state_max_rows = match entry_state_max_rows {
327 None => ctx.config.developer.hash_join_entry_state_max_rows,
328 Some(entry_state_max_rows) => entry_state_max_rows,
329 };
330 let join_cache_evict_interval_rows = ctx
331 .config
332 .developer
333 .join_hash_map_evict_interval_rows
334 .max(1);
335 let side_l_column_n = input_l.schema().len();
336
337 let schema_fields = match T {
338 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
339 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
340 _ => [
341 input_l.schema().fields.clone(),
342 input_r.schema().fields.clone(),
343 ]
344 .concat(),
345 };
346
347 let original_output_data_types = schema_fields
348 .iter()
349 .map(|field| field.data_type())
350 .collect_vec();
351 let actual_output_data_types = output_indices
352 .iter()
353 .map(|&idx| original_output_data_types[idx].clone())
354 .collect_vec();
355
356 let state_all_data_types_l = input_l.schema().data_types();
358 let state_all_data_types_r = input_r.schema().data_types();
359
360 let state_pk_indices_l = input_l.stream_key().to_vec();
361 let state_pk_indices_r = input_r.stream_key().to_vec();
362
363 let state_join_key_indices_l = params_l.join_key_indices;
364 let state_join_key_indices_r = params_r.join_key_indices;
365
366 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
367 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
368
369 let degree_pk_indices_l = (state_join_key_indices_l.len()
370 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
371 .collect_vec();
372 let degree_pk_indices_r = (state_join_key_indices_r.len()
373 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
374 .collect_vec();
375
376 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
378 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
379
380 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
382
383 let join_key_data_types_l = state_join_key_indices_l
384 .iter()
385 .map(|idx| state_all_data_types_l[*idx].clone())
386 .collect_vec();
387
388 let join_key_data_types_r = state_join_key_indices_r
389 .iter()
390 .map(|idx| state_all_data_types_r[*idx].clone())
391 .collect_vec();
392
393 assert_eq!(join_key_data_types_l, join_key_data_types_r);
394
395 let null_matched = K::Bitmap::from_bool_vec(null_safe);
396
397 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
398 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
399
400 let (left_to_output, right_to_output) = {
401 let (left_len, right_len) = if is_left_semi_or_anti(T) {
402 (state_all_data_types_l.len(), 0usize)
403 } else if is_right_semi_or_anti(T) {
404 (0usize, state_all_data_types_r.len())
405 } else {
406 (state_all_data_types_l.len(), state_all_data_types_r.len())
407 };
408 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
409 };
410
411 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
412 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
413
414 let left_input_len = input_l.schema().len();
415 let right_input_len = input_r.schema().len();
416 let mut l2inequality_index = vec![vec![]; left_input_len];
417 let mut r2inequality_index = vec![vec![]; right_input_len];
418 let mut l_inequal_state_clean_columns = vec![];
419 let mut r_inequal_state_clean_columns = vec![];
420 let inequality_pairs = inequality_pairs
421 .into_iter()
422 .enumerate()
423 .map(|(index, pair)| {
424 let left_is_larger = pair.left_side_is_larger();
430 l2inequality_index[pair.left_idx].push((index, !left_is_larger));
431 r2inequality_index[pair.right_idx].push((index, left_is_larger));
432
433 if pair.clean_left_state {
435 l_inequal_state_clean_columns.push((pair.left_idx, index));
436 }
437 if pair.clean_right_state {
438 r_inequal_state_clean_columns.push((pair.right_idx, index));
439 }
440
441 let output_indices = if pair.left_side_is_larger() {
444 l2o_indexed
446 .get_vec(&pair.left_idx)
447 .cloned()
448 .unwrap_or_default()
449 } else {
450 r2o_indexed
452 .get_vec(&pair.right_idx)
453 .cloned()
454 .unwrap_or_default()
455 };
456
457 (output_indices, pair)
458 })
459 .collect_vec();
460
461 let mut l_non_null_fields = l2inequality_index
462 .iter()
463 .positions(|inequalities| !inequalities.is_empty())
464 .collect_vec();
465 let mut r_non_null_fields = r2inequality_index
466 .iter()
467 .positions(|inequalities| !inequalities.is_empty())
468 .collect_vec();
469
470 if append_only_optimize {
471 l_inequal_state_clean_columns.clear();
472 r_inequal_state_clean_columns.clear();
473 l_non_null_fields.clear();
474 r_non_null_fields.clear();
475 }
476
477 let l_inequality_idx = l_inequal_state_clean_columns
481 .first()
482 .map(|(col_idx, _)| *col_idx);
483 let r_inequality_idx = r_inequal_state_clean_columns
484 .first()
485 .map(|(col_idx, _)| *col_idx);
486
487 let degree_state_l = need_degree_table_l.then(|| {
488 TableInner::new(
489 degree_pk_indices_l,
490 degree_join_key_indices_l,
491 degree_state_table_l,
492 l_inequality_idx,
493 )
494 });
495 let degree_state_r = need_degree_table_r.then(|| {
496 TableInner::new(
497 degree_pk_indices_r,
498 degree_join_key_indices_r,
499 degree_state_table_r,
500 r_inequality_idx,
501 )
502 });
503
504 let inequality_watermarks = vec![None; inequality_pairs.len()];
505 let watermark_buffers = BTreeMap::new();
506 Self {
507 ctx: ctx.clone(),
508 info,
509 input_l: Some(input_l),
510 input_r: Some(input_r),
511 actual_output_data_types,
512 side_l: JoinSide {
513 ht: JoinHashMap::new(
514 watermark_epoch.clone(),
515 join_key_data_types_l,
516 state_join_key_indices_l.clone(),
517 state_all_data_types_l.clone(),
518 state_table_l,
519 params_l.deduped_pk_indices,
520 degree_state_l,
521 null_matched.clone(),
522 pk_contained_in_jk_l,
523 metrics.clone(),
524 ctx.id,
525 ctx.fragment_id,
526 "left",
527 ),
528 join_key_indices: state_join_key_indices_l,
529 all_data_types: state_all_data_types_l,
530 i2o_mapping: left_to_output,
531 i2o_mapping_indexed: l2o_indexed,
532 input2inequality_index: l2inequality_index,
533 non_null_fields: l_non_null_fields,
534 state_clean_columns: l_inequal_state_clean_columns,
535 start_pos: 0,
536 need_degree_table: need_degree_table_l,
537 _marker: PhantomData,
538 },
539 side_r: JoinSide {
540 ht: JoinHashMap::new(
541 watermark_epoch,
542 join_key_data_types_r,
543 state_join_key_indices_r.clone(),
544 state_all_data_types_r.clone(),
545 state_table_r,
546 params_r.deduped_pk_indices,
547 degree_state_r,
548 null_matched,
549 pk_contained_in_jk_r,
550 metrics.clone(),
551 ctx.id,
552 ctx.fragment_id,
553 "right",
554 ),
555 join_key_indices: state_join_key_indices_r,
556 all_data_types: state_all_data_types_r,
557 start_pos: side_l_column_n,
558 i2o_mapping: right_to_output,
559 i2o_mapping_indexed: r2o_indexed,
560 input2inequality_index: r2inequality_index,
561 non_null_fields: r_non_null_fields,
562 state_clean_columns: r_inequal_state_clean_columns,
563 need_degree_table: need_degree_table_r,
564 _marker: PhantomData,
565 },
566 cond,
567 inequality_pairs,
568 inequality_watermarks,
569 watermark_indices_in_jk,
570 append_only_optimize,
571 metrics,
572 chunk_size,
573 cnt_rows_received: 0,
574 watermark_buffers,
575 high_join_amplification_threshold,
576 entry_state_max_rows,
577 join_cache_evict_interval_rows,
578 }
579 }
580
581 #[try_stream(ok = Message, error = StreamExecutorError)]
582 async fn into_stream(mut self) {
583 let input_l = self.input_l.take().unwrap();
584 let input_r = self.input_r.take().unwrap();
585 let aligned_stream = barrier_align(
586 input_l.execute(),
587 input_r.execute(),
588 self.ctx.id,
589 self.ctx.fragment_id,
590 self.metrics.clone(),
591 "Join",
592 );
593 pin_mut!(aligned_stream);
594
595 let actor_id = self.ctx.id;
596
597 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
598 let first_epoch = barrier.epoch;
599 yield Message::Barrier(barrier);
601 self.side_l.init(first_epoch).await?;
602 self.side_r.init(first_epoch).await?;
603
604 let actor_id_str = self.ctx.id.to_string();
605 let fragment_id_str = self.ctx.fragment_id.to_string();
606
607 let join_actor_input_waiting_duration_ns = self
609 .metrics
610 .join_actor_input_waiting_duration_ns
611 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
612 let left_join_match_duration_ns = self
613 .metrics
614 .join_match_duration_ns
615 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
616 let right_join_match_duration_ns = self
617 .metrics
618 .join_match_duration_ns
619 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
620
621 let barrier_join_match_duration_ns = self
622 .metrics
623 .join_match_duration_ns
624 .with_guarded_label_values(&[
625 actor_id_str.as_str(),
626 fragment_id_str.as_str(),
627 "barrier",
628 ]);
629
630 let left_join_cached_entry_count = self
631 .metrics
632 .join_cached_entry_count
633 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
634
635 let right_join_cached_entry_count = self
636 .metrics
637 .join_cached_entry_count
638 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
639
640 let mut start_time = Instant::now();
641
642 while let Some(msg) = aligned_stream
643 .next()
644 .instrument_await("hash_join_barrier_align")
645 .await
646 {
647 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
648 match msg? {
649 AlignedMessage::WatermarkLeft(watermark) => {
650 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
651 yield Message::Watermark(watermark_to_emit);
652 }
653 }
654 AlignedMessage::WatermarkRight(watermark) => {
655 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
656 yield Message::Watermark(watermark_to_emit);
657 }
658 }
659 AlignedMessage::Left(chunk) => {
660 let mut left_time = Duration::from_nanos(0);
661 let mut left_start_time = Instant::now();
662 #[for_await]
663 for chunk in Self::eq_join_left(EqJoinArgs {
664 ctx: &self.ctx,
665 side_l: &mut self.side_l,
666 side_r: &mut self.side_r,
667 actual_output_data_types: &self.actual_output_data_types,
668 cond: &mut self.cond,
669 inequality_watermarks: &self.inequality_watermarks,
670 chunk,
671 append_only_optimize: self.append_only_optimize,
672 chunk_size: self.chunk_size,
673 cnt_rows_received: &mut self.cnt_rows_received,
674 high_join_amplification_threshold: self.high_join_amplification_threshold,
675 entry_state_max_rows: self.entry_state_max_rows,
676 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
677 }) {
678 left_time += left_start_time.elapsed();
679 yield Message::Chunk(chunk?);
680 left_start_time = Instant::now();
681 }
682 left_time += left_start_time.elapsed();
683 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
684 self.try_flush_data().await?;
685 }
686 AlignedMessage::Right(chunk) => {
687 let mut right_time = Duration::from_nanos(0);
688 let mut right_start_time = Instant::now();
689 #[for_await]
690 for chunk in Self::eq_join_right(EqJoinArgs {
691 ctx: &self.ctx,
692 side_l: &mut self.side_l,
693 side_r: &mut self.side_r,
694 actual_output_data_types: &self.actual_output_data_types,
695 cond: &mut self.cond,
696 inequality_watermarks: &self.inequality_watermarks,
697 chunk,
698 append_only_optimize: self.append_only_optimize,
699 chunk_size: self.chunk_size,
700 cnt_rows_received: &mut self.cnt_rows_received,
701 high_join_amplification_threshold: self.high_join_amplification_threshold,
702 entry_state_max_rows: self.entry_state_max_rows,
703 join_cache_evict_interval_rows: self.join_cache_evict_interval_rows,
704 }) {
705 right_time += right_start_time.elapsed();
706 yield Message::Chunk(chunk?);
707 right_start_time = Instant::now();
708 }
709 right_time += right_start_time.elapsed();
710 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
711 self.try_flush_data().await?;
712 }
713 AlignedMessage::Barrier(barrier) => {
714 let barrier_start_time = Instant::now();
715 let (left_post_commit, right_post_commit) =
716 self.flush_data(barrier.epoch).await?;
717
718 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
719
720 barrier_join_match_duration_ns
723 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
724 yield Message::Barrier(barrier);
725
726 right_post_commit
728 .post_yield_barrier(update_vnode_bitmap.clone())
729 .await?;
730 if left_post_commit
731 .post_yield_barrier(update_vnode_bitmap)
732 .await?
733 .unwrap_or(false)
734 {
735 self.watermark_buffers
736 .values_mut()
737 .for_each(|buffers| buffers.clear());
738 self.inequality_watermarks.fill(None);
739 }
740
741 for (join_cached_entry_count, ht) in [
743 (&left_join_cached_entry_count, &self.side_l.ht),
744 (&right_join_cached_entry_count, &self.side_r.ht),
745 ] {
746 join_cached_entry_count.set(ht.entry_count() as i64);
747 }
748 }
749 }
750 start_time = Instant::now();
751 }
752 }
753
754 async fn flush_data(
755 &mut self,
756 epoch: EpochPair,
757 ) -> StreamExecutorResult<(
758 JoinHashMapPostCommit<'_, K, S, E>,
759 JoinHashMapPostCommit<'_, K, S, E>,
760 )> {
761 let left = self.side_l.ht.flush(epoch).await?;
764 let right = self.side_r.ht.flush(epoch).await?;
765 Ok((left, right))
766 }
767
768 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
769 self.side_l.ht.try_flush().await?;
772 self.side_r.ht.try_flush().await?;
773 Ok(())
774 }
775
776 fn evict_cache(
778 side_update: &mut JoinSide<K, S, E>,
779 side_match: &mut JoinSide<K, S, E>,
780 cnt_rows_received: &mut u32,
781 join_cache_evict_interval_rows: u32,
782 ) {
783 *cnt_rows_received += 1;
784 if *cnt_rows_received >= join_cache_evict_interval_rows {
785 side_update.ht.evict();
786 side_match.ht.evict();
787 *cnt_rows_received = 0;
788 }
789 }
790
791 fn handle_watermark(
792 &mut self,
793 side: SideTypePrimitive,
794 watermark: Watermark,
795 ) -> StreamExecutorResult<Vec<Watermark>> {
796 let (side_update, side_match) = if side == SideType::Left {
797 (&mut self.side_l, &mut self.side_r)
798 } else {
799 (&mut self.side_r, &mut self.side_l)
800 };
801
802 let wm_in_jk = side_update
804 .join_key_indices
805 .iter()
806 .positions(|idx| *idx == watermark.col_idx);
807 let mut watermarks_to_emit = vec![];
808 for idx in wm_in_jk {
809 let buffers = self
810 .watermark_buffers
811 .entry(idx)
812 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
813 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
814 if self
815 .watermark_indices_in_jk
816 .iter()
817 .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
818 {
819 side_match
820 .ht
821 .update_watermark(selected_watermark.val.clone());
822 side_update
823 .ht
824 .update_watermark(selected_watermark.val.clone());
825 }
826
827 let empty_indices = vec![];
828 let output_indices = side_update
829 .i2o_mapping_indexed
830 .get_vec(&side_update.join_key_indices[idx])
831 .unwrap_or(&empty_indices)
832 .iter()
833 .chain(
834 side_match
835 .i2o_mapping_indexed
836 .get_vec(&side_match.join_key_indices[idx])
837 .unwrap_or(&empty_indices),
838 );
839 for output_idx in output_indices {
840 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
841 }
842 };
843 }
844
845 let mut update_left_watermark = None;
850 let mut update_right_watermark = None;
851 if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
852 for (inequality_index, _) in watermark_indices {
853 let buffers = self
854 .watermark_buffers
855 .entry(side_update.join_key_indices.len() + inequality_index)
856 .or_insert_with(|| {
857 BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
858 });
859 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
860 {
861 let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
862 let left_is_larger = pair_info.left_side_is_larger();
863
864 for output_idx in output_indices {
866 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
867 }
868 self.inequality_watermarks[*inequality_index] =
870 Some(selected_watermark.clone());
871
872 if left_is_larger && pair_info.clean_left_state {
874 update_left_watermark = Some(selected_watermark.val.clone());
875 } else if !left_is_larger && pair_info.clean_right_state {
876 update_right_watermark = Some(selected_watermark.val.clone());
877 }
878 }
879 }
880 if let Some(val) = update_left_watermark {
884 self.side_l.ht.update_watermark(val);
885 }
886 if let Some(val) = update_right_watermark {
887 self.side_r.ht.update_watermark(val);
888 }
889 }
890 Ok(watermarks_to_emit)
891 }
892
893 fn row_concat(
894 row_update: impl Row,
895 update_start_pos: usize,
896 row_matched: impl Row,
897 matched_start_pos: usize,
898 ) -> OwnedRow {
899 let mut new_row = vec![None; row_update.len() + row_matched.len()];
900
901 for (i, datum_ref) in row_update.iter().enumerate() {
902 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
903 }
904 for (i, datum_ref) in row_matched.iter().enumerate() {
905 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
906 }
907 OwnedRow::new(new_row)
908 }
909
910 fn eq_join_left(
912 args: EqJoinArgs<'_, K, S, E>,
913 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
914 Self::eq_join_oneside::<{ SideType::Left }>(args)
915 }
916
917 fn eq_join_right(
919 args: EqJoinArgs<'_, K, S, E>,
920 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
921 Self::eq_join_oneside::<{ SideType::Right }>(args)
922 }
923
924 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
925 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
926 let EqJoinArgs {
927 ctx,
928 side_l,
929 side_r,
930 actual_output_data_types,
931 cond,
932 inequality_watermarks,
933 chunk,
934 append_only_optimize,
935 chunk_size,
936 cnt_rows_received,
937 high_join_amplification_threshold,
938 entry_state_max_rows,
939 join_cache_evict_interval_rows,
940 ..
941 } = args;
942
943 let (side_update, side_match) = if SIDE == SideType::Left {
944 (side_l, side_r)
945 } else {
946 (side_r, side_l)
947 };
948
949 let useful_state_clean_columns = side_match
950 .state_clean_columns
951 .iter()
952 .filter_map(|(column_idx, inequality_index)| {
953 inequality_watermarks[*inequality_index]
954 .as_ref()
955 .map(|watermark| (*column_idx, watermark))
956 })
957 .collect_vec();
958
959 let mut hashjoin_chunk_builder =
960 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
961 chunk_size,
962 actual_output_data_types.to_vec(),
963 side_update.i2o_mapping.clone(),
964 side_match.i2o_mapping.clone(),
965 ));
966
967 let join_matched_join_keys = ctx
968 .streaming_metrics
969 .join_matched_join_keys
970 .with_guarded_label_values(&[
971 &ctx.id.to_string(),
972 &ctx.fragment_id.to_string(),
973 &side_update.ht.table_id().to_string(),
974 ]);
975
976 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
977 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
978 let Some((op, row)) = r else {
979 continue;
980 };
981 Self::evict_cache(
982 side_update,
983 side_match,
984 cnt_rows_received,
985 join_cache_evict_interval_rows,
986 );
987
988 let cache_lookup_result = {
989 let probe_non_null_requirement_satisfied = side_update
990 .non_null_fields
991 .iter()
992 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
993 let build_non_null_requirement_satisfied =
994 key.null_bitmap().is_subset(side_match.ht.null_matched());
995 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
996 side_match.ht.take_state_opt(key)
997 } else {
998 CacheResult::NeverMatch
999 }
1000 };
1001 let mut total_matches = 0;
1002
1003 macro_rules! match_rows {
1004 ($op:ident) => {
1005 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
1006 cache_lookup_result,
1007 row,
1008 key,
1009 &mut hashjoin_chunk_builder,
1010 side_match,
1011 side_update,
1012 &useful_state_clean_columns,
1013 cond,
1014 append_only_optimize,
1015 entry_state_max_rows,
1016 )
1017 };
1018 }
1019
1020 match op {
1021 Op::Insert | Op::UpdateInsert =>
1022 {
1023 #[for_await]
1024 for chunk in match_rows!(Insert) {
1025 let chunk = chunk?;
1026 total_matches += chunk.cardinality();
1027 yield chunk;
1028 }
1029 }
1030 Op::Delete | Op::UpdateDelete =>
1031 {
1032 #[for_await]
1033 for chunk in match_rows!(Delete) {
1034 let chunk = chunk?;
1035 total_matches += chunk.cardinality();
1036 yield chunk;
1037 }
1038 }
1039 };
1040
1041 join_matched_join_keys.observe(total_matches as _);
1042 if total_matches > high_join_amplification_threshold {
1043 let join_key_data_types = side_update.ht.join_key_data_types();
1044 let key = key.deserialize(join_key_data_types)?;
1045 tracing::warn!(target: "high_join_amplification",
1046 matched_rows_len = total_matches,
1047 update_table_id = %side_update.ht.table_id(),
1048 match_table_id = %side_match.ht.table_id(),
1049 join_key = ?key,
1050 actor_id = %ctx.id,
1051 fragment_id = %ctx.fragment_id,
1052 "large rows matched for join key"
1053 );
1054 }
1055 }
1056 if let Some(chunk) = hashjoin_chunk_builder.take() {
1058 yield chunk;
1059 }
1060 }
1061
1062 #[allow(clippy::too_many_arguments)]
1071 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1072 async fn handle_match_rows<
1073 'a,
1074 const SIDE: SideTypePrimitive,
1075 const JOIN_OP: JoinOpPrimitive,
1076 >(
1077 cached_lookup_result: CacheResult<E>,
1078 row: RowRef<'a>,
1079 key: &'a K,
1080 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1081 side_match: &'a mut JoinSide<K, S, E>,
1082 side_update: &'a mut JoinSide<K, S, E>,
1083 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1084 cond: &'a mut Option<NonStrictExpression>,
1085 append_only_optimize: bool,
1086 entry_state_max_rows: usize,
1087 ) {
1088 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1089 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1090 let mut entry_state_count = 0;
1091
1092 let mut degree = 0;
1093 let mut append_only_matched_row = None;
1094 let mut matched_rows_to_clean = vec![];
1095
1096 macro_rules! match_row {
1097 (
1098 $match_order_key_indices:expr,
1099 $degree_table:expr,
1100 $matched_row:expr,
1101 $matched_row_ref:expr,
1102 $from_cache:literal,
1103 $map_output:expr,
1104 ) => {
1105 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1106 row,
1107 $matched_row,
1108 $matched_row_ref,
1109 hashjoin_chunk_builder,
1110 $match_order_key_indices,
1111 $degree_table,
1112 side_update.start_pos,
1113 side_match.start_pos,
1114 cond,
1115 &mut degree,
1116 useful_state_clean_columns,
1117 append_only_optimize,
1118 &mut append_only_matched_row,
1119 &mut matched_rows_to_clean,
1120 $map_output,
1121 )
1122 };
1123 }
1124
1125 let entry_state = match cached_lookup_result {
1126 CacheResult::NeverMatch => {
1127 let op = match JOIN_OP {
1128 JoinOp::Insert => Op::Insert,
1129 JoinOp::Delete => Op::Delete,
1130 };
1131 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1132 yield chunk;
1133 }
1134 return Ok(());
1135 }
1136 CacheResult::Hit(mut cached_rows) => {
1137 let (match_order_key_indices, match_degree_state) =
1138 side_match.ht.get_degree_state_mut_ref();
1139 for (matched_row_ref, matched_row) in
1141 cached_rows.values_mut(&side_match.all_data_types)
1142 {
1143 let matched_row = matched_row?;
1144 if let Some(chunk) = match_row!(
1145 match_order_key_indices,
1146 match_degree_state,
1147 matched_row,
1148 Some(matched_row_ref),
1149 true,
1150 Either::Left,
1151 )
1152 .await
1153 {
1154 yield chunk;
1155 }
1156 }
1157
1158 cached_rows
1159 }
1160 CacheResult::Miss => {
1161 let (matched_rows, match_order_key_indices, degree_table) = side_match
1163 .ht
1164 .fetch_matched_rows_and_get_degree_table_ref(key)
1165 .await?;
1166
1167 #[for_await]
1168 for matched_row in matched_rows {
1169 let (encoded_pk, matched_row) = matched_row?;
1170
1171 let mut matched_row_ref = None;
1172
1173 if entry_state_count <= entry_state_max_rows {
1175 let row_ref = entry_state
1176 .insert(encoded_pk, E::encode(&matched_row))
1177 .with_context(|| format!("row: {}", row.display(),))?;
1178 matched_row_ref = Some(row_ref);
1179 entry_state_count += 1;
1180 }
1181 if let Some(chunk) = match_row!(
1182 match_order_key_indices,
1183 degree_table,
1184 matched_row,
1185 matched_row_ref,
1186 false,
1187 Either::Right,
1188 )
1189 .await
1190 {
1191 yield chunk;
1192 }
1193 }
1194 Box::new(entry_state)
1195 }
1196 };
1197
1198 let op = match JOIN_OP {
1200 JoinOp::Insert => Op::Insert,
1201 JoinOp::Delete => Op::Delete,
1202 };
1203 if degree == 0 {
1204 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1205 yield chunk;
1206 }
1207 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1208 {
1209 yield chunk;
1210 }
1211
1212 if cache_hit || entry_state_count <= entry_state_max_rows {
1214 side_match.ht.update_state(key, entry_state);
1215 }
1216
1217 for matched_row in matched_rows_to_clean {
1219 side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1221 }
1222
1223 if append_only_optimize && let Some(row) = append_only_matched_row {
1225 assert_matches!(JOIN_OP, JoinOp::Insert);
1226 side_match.ht.delete_handle_degree(key, row)?;
1227 return Ok(());
1228 }
1229
1230 match JOIN_OP {
1232 JoinOp::Insert => {
1233 side_update
1234 .ht
1235 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1236 }
1237 JoinOp::Delete => {
1238 side_update
1239 .ht
1240 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1241 }
1242 }
1243 }
1244
1245 #[allow(clippy::too_many_arguments)]
1246 #[inline]
1247 async fn handle_match_row<
1248 'a,
1249 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1252 const JOIN_OP: JoinOpPrimitive,
1253 const MATCHED_ROWS_FROM_CACHE: bool,
1254 >(
1255 update_row: RowRef<'a>,
1256 mut matched_row: JoinRow<R>,
1257 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1258 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1259 match_order_key_indices: &[usize],
1260 match_degree_table: &mut Option<TableInner<S>>,
1261 side_update_start_pos: usize,
1262 side_match_start_pos: usize,
1263 cond: &Option<NonStrictExpression>,
1264 update_row_degree: &mut u64,
1265 useful_state_clean_columns: &[(usize, &'a Watermark)],
1266 append_only_optimize: bool,
1267 append_only_matched_row: &mut Option<JoinRow<RO>>,
1268 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1269 map_output: impl Fn(R) -> RO,
1270 ) -> Option<StreamChunk> {
1271 let mut need_state_clean = false;
1272 let mut chunk_opt = None;
1273 let join_condition_satisfied = Self::check_join_condition(
1277 update_row,
1278 side_update_start_pos,
1279 &matched_row.row,
1280 side_match_start_pos,
1281 cond,
1282 )
1283 .await;
1284
1285 if join_condition_satisfied {
1286 *update_row_degree += 1;
1288 if matches!(JOIN_OP, JoinOp::Insert)
1293 && !forward_exactly_once(T, SIDE)
1294 && let Some(chunk) =
1295 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1296 {
1297 chunk_opt = Some(chunk);
1298 }
1299 if let Some(degree_table) = match_degree_table {
1301 update_degree::<S, { JOIN_OP }>(
1302 match_order_key_indices,
1303 degree_table,
1304 &mut matched_row,
1305 );
1306 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1307 match JOIN_OP {
1309 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1310 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1311 }
1312 }
1313 }
1314
1315 if matches!(JOIN_OP, JoinOp::Delete)
1318 && !forward_exactly_once(T, SIDE)
1319 && let Some(chunk) =
1320 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1321 {
1322 chunk_opt = Some(chunk);
1323 }
1324 } else {
1325 for (column_idx, watermark) in useful_state_clean_columns {
1327 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1328 scalar
1329 .default_cmp(&watermark.val.as_scalar_ref_impl())
1330 .is_lt()
1331 }) {
1332 need_state_clean = true;
1333 break;
1334 }
1335 }
1336 }
1337 if append_only_optimize {
1341 assert_matches!(JOIN_OP, JoinOp::Insert);
1342 assert!(append_only_matched_row.is_none());
1345 *append_only_matched_row = Some(matched_row.map(map_output));
1346 } else if need_state_clean {
1347 debug_assert!(
1348 !append_only_optimize,
1349 "`append_only_optimize` and `need_state_clean` must not both be true"
1350 );
1351 matched_rows_to_clean.push(matched_row.map(map_output));
1352 }
1353
1354 chunk_opt
1355 }
1356
1357 #[inline]
1362 async fn check_join_condition(
1363 row: impl Row,
1364 side_update_start_pos: usize,
1365 matched_row: impl Row,
1366 side_match_start_pos: usize,
1367 join_condition: &Option<NonStrictExpression>,
1368 ) -> bool {
1369 if let Some(join_condition) = join_condition {
1370 let new_row = Self::row_concat(
1371 row,
1372 side_update_start_pos,
1373 matched_row,
1374 side_match_start_pos,
1375 );
1376 join_condition
1377 .eval_row_infallible(&new_row)
1378 .await
1379 .map(|s| *s.as_bool())
1380 .unwrap_or(false)
1381 } else {
1382 true
1383 }
1384 }
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389 use std::sync::atomic::AtomicU64;
1390
1391 use pretty_assertions::assert_eq;
1392 use risingwave_common::array::*;
1393 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1394 use risingwave_common::config::StreamingConfig;
1395 use risingwave_common::hash::{Key64, Key128};
1396 use risingwave_common::util::epoch::test_epoch;
1397 use risingwave_common::util::sort_util::OrderType;
1398 use risingwave_storage::memory::MemoryStateStore;
1399
1400 use super::*;
1401 use crate::common::table::test_utils::gen_pbtable;
1402 use crate::executor::MemoryEncoding;
1403 use crate::executor::test_utils::expr::build_from_pretty;
1404 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1405
1406 async fn create_in_memory_state_table(
1407 mem_state: MemoryStateStore,
1408 data_types: &[DataType],
1409 order_types: &[OrderType],
1410 pk_indices: &[usize],
1411 table_id: u32,
1412 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1413 create_in_memory_state_table_with_inequality(
1414 mem_state,
1415 data_types,
1416 order_types,
1417 pk_indices,
1418 table_id,
1419 None,
1420 )
1421 .await
1422 }
1423
1424 async fn create_in_memory_state_table_with_inequality(
1425 mem_state: MemoryStateStore,
1426 data_types: &[DataType],
1427 order_types: &[OrderType],
1428 pk_indices: &[usize],
1429 table_id: u32,
1430 degree_inequality_type: Option<DataType>,
1431 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1432 create_in_memory_state_table_with_watermark(
1433 mem_state,
1434 data_types,
1435 order_types,
1436 pk_indices,
1437 table_id,
1438 degree_inequality_type,
1439 vec![],
1440 vec![],
1441 )
1442 .await
1443 }
1444
1445 #[allow(clippy::too_many_arguments)]
1446 async fn create_in_memory_state_table_with_watermark(
1447 mem_state: MemoryStateStore,
1448 data_types: &[DataType],
1449 order_types: &[OrderType],
1450 pk_indices: &[usize],
1451 table_id: u32,
1452 degree_inequality_type: Option<DataType>,
1453 state_clean_watermark_indices: Vec<usize>,
1454 degree_clean_watermark_indices: Vec<usize>,
1455 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1456 let column_descs = data_types
1457 .iter()
1458 .enumerate()
1459 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1460 .collect_vec();
1461 let mut state_table_catalog = gen_pbtable(
1462 TableId::new(table_id),
1463 column_descs,
1464 order_types.to_vec(),
1465 pk_indices.to_vec(),
1466 0,
1467 );
1468 state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1469 .into_iter()
1470 .map(|idx| idx as u32)
1471 .collect();
1472 let state_table =
1473 StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1474
1475 let mut degree_table_column_descs = vec![];
1477 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1478 degree_table_column_descs.push(ColumnDesc::unnamed(
1479 ColumnId::new(pk_id as i32),
1480 data_types[*idx].clone(),
1481 ))
1482 });
1483 degree_table_column_descs.push(ColumnDesc::unnamed(
1485 ColumnId::new(pk_indices.len() as i32),
1486 DataType::Int64,
1487 ));
1488 if let Some(ineq_type) = degree_inequality_type {
1490 degree_table_column_descs.push(ColumnDesc::unnamed(
1491 ColumnId::new((pk_indices.len() + 1) as i32),
1492 ineq_type,
1493 ));
1494 }
1495 let mut degree_table_catalog = gen_pbtable(
1496 TableId::new(table_id + 1),
1497 degree_table_column_descs,
1498 order_types.to_vec(),
1499 pk_indices.to_vec(),
1500 0,
1501 );
1502 degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1503 .into_iter()
1504 .map(|idx| idx as u32)
1505 .collect();
1506 let degree_state_table =
1507 StateTable::from_table_catalog(°ree_table_catalog, mem_state, None).await;
1508 (state_table, degree_state_table)
1509 }
1510
1511 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1512 build_from_pretty(
1513 condition_text
1514 .as_deref()
1515 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1516 )
1517 }
1518
1519 async fn create_executor<const T: JoinTypePrimitive>(
1520 with_condition: bool,
1521 null_safe: bool,
1522 condition_text: Option<String>,
1523 inequality_pairs: Vec<InequalityPairInfo>,
1524 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1525 let schema = Schema {
1526 fields: vec![
1527 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1529 ],
1530 };
1531 let (tx_l, source_l) = MockSource::channel();
1532 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1533 let (tx_r, source_r) = MockSource::channel();
1534 let source_r = source_r.into_executor(schema, vec![1]);
1535 let params_l = JoinParams::new(vec![0], vec![1]);
1536 let params_r = JoinParams::new(vec![0], vec![1]);
1537 let cond = with_condition.then(|| create_cond(condition_text));
1538
1539 let mem_state = MemoryStateStore::new();
1540
1541 let l_degree_ineq_type = inequality_pairs
1543 .iter()
1544 .find(|pair| pair.clean_left_state)
1545 .map(|_| DataType::Int64); let r_degree_ineq_type = inequality_pairs
1547 .iter()
1548 .find(|pair| pair.clean_right_state)
1549 .map(|_| DataType::Int64); let l_clean_watermark_indices = inequality_pairs
1551 .iter()
1552 .find(|pair| pair.clean_left_state)
1553 .map(|pair| vec![pair.left_idx])
1554 .unwrap_or_default();
1555 let r_clean_watermark_indices = inequality_pairs
1556 .iter()
1557 .find(|pair| pair.clean_right_state)
1558 .map(|pair| vec![pair.right_idx])
1559 .unwrap_or_default();
1560 let degree_inequality_column_idx = 3;
1561 let l_degree_clean_watermark_indices = l_degree_ineq_type
1562 .as_ref()
1563 .map(|_| vec![degree_inequality_column_idx])
1564 .unwrap_or_default();
1565 let r_degree_clean_watermark_indices = r_degree_ineq_type
1566 .as_ref()
1567 .map(|_| vec![degree_inequality_column_idx])
1568 .unwrap_or_default();
1569
1570 let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1571 mem_state.clone(),
1572 &[DataType::Int64, DataType::Int64],
1573 &[OrderType::ascending(), OrderType::ascending()],
1574 &[0, 1],
1575 0,
1576 l_degree_ineq_type,
1577 l_clean_watermark_indices,
1578 l_degree_clean_watermark_indices,
1579 )
1580 .await;
1581
1582 let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1583 mem_state,
1584 &[DataType::Int64, DataType::Int64],
1585 &[OrderType::ascending(), OrderType::ascending()],
1586 &[0, 1],
1587 2,
1588 r_degree_ineq_type,
1589 r_clean_watermark_indices,
1590 r_degree_clean_watermark_indices,
1591 )
1592 .await;
1593
1594 let schema = match T {
1595 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1596 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1597 _ => [source_l.schema().fields(), source_r.schema().fields()]
1598 .concat()
1599 .into_iter()
1600 .collect(),
1601 };
1602 let schema_len = schema.len();
1603 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1604
1605 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1606 ActorContext::for_test(123),
1607 info,
1608 source_l,
1609 source_r,
1610 params_l,
1611 params_r,
1612 vec![null_safe],
1613 (0..schema_len).collect_vec(),
1614 cond,
1615 inequality_pairs,
1616 state_l,
1617 degree_state_l,
1618 state_r,
1619 degree_state_r,
1620 Arc::new(AtomicU64::new(0)),
1621 false,
1622 Arc::new(StreamingMetrics::unused()),
1623 1024,
1624 2048,
1625 vec![(0, true)],
1626 );
1627 (tx_l, tx_r, executor.boxed().execute())
1628 }
1629
1630 async fn create_classical_executor<const T: JoinTypePrimitive>(
1631 with_condition: bool,
1632 null_safe: bool,
1633 condition_text: Option<String>,
1634 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1635 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1636 }
1637
1638 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1639 with_condition: bool,
1640 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1641 let schema = Schema {
1642 fields: vec![
1643 Field::unnamed(DataType::Int64),
1644 Field::unnamed(DataType::Int64),
1645 Field::unnamed(DataType::Int64),
1646 ],
1647 };
1648 let (tx_l, source_l) = MockSource::channel();
1649 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1650 let (tx_r, source_r) = MockSource::channel();
1651 let source_r = source_r.into_executor(schema, vec![0]);
1652 let params_l = JoinParams::new(vec![0, 1], vec![]);
1653 let params_r = JoinParams::new(vec![0, 1], vec![]);
1654 let cond = with_condition.then(|| create_cond(None));
1655
1656 let mem_state = MemoryStateStore::new();
1657
1658 let (state_l, degree_state_l) = create_in_memory_state_table(
1659 mem_state.clone(),
1660 &[DataType::Int64, DataType::Int64, DataType::Int64],
1661 &[
1662 OrderType::ascending(),
1663 OrderType::ascending(),
1664 OrderType::ascending(),
1665 ],
1666 &[0, 1, 0],
1667 0,
1668 )
1669 .await;
1670
1671 let (state_r, degree_state_r) = create_in_memory_state_table(
1672 mem_state,
1673 &[DataType::Int64, DataType::Int64, DataType::Int64],
1674 &[
1675 OrderType::ascending(),
1676 OrderType::ascending(),
1677 OrderType::ascending(),
1678 ],
1679 &[0, 1, 1],
1680 1,
1681 )
1682 .await;
1683
1684 let schema = match T {
1685 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1686 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1687 _ => [source_l.schema().fields(), source_r.schema().fields()]
1688 .concat()
1689 .into_iter()
1690 .collect(),
1691 };
1692 let schema_len = schema.len();
1693 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1694
1695 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1696 ActorContext::for_test(123),
1697 info,
1698 source_l,
1699 source_r,
1700 params_l,
1701 params_r,
1702 vec![false],
1703 (0..schema_len).collect_vec(),
1704 cond,
1705 vec![],
1706 state_l,
1707 degree_state_l,
1708 state_r,
1709 degree_state_r,
1710 Arc::new(AtomicU64::new(0)),
1711 true,
1712 Arc::new(StreamingMetrics::unused()),
1713 1024,
1714 2048,
1715 vec![(0, true)],
1716 );
1717 (tx_l, tx_r, executor.boxed().execute())
1718 }
1719
1720 #[tokio::test]
1721 async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1722 let chunk_l1 = StreamChunk::from_pretty(
1726 " I I
1727 + 2 4
1728 + 2 7
1729 + 3 8",
1730 );
1731 let chunk_r1 = StreamChunk::from_pretty(
1732 " I I
1733 + 2 6",
1734 );
1735 let chunk_r2 = StreamChunk::from_pretty(
1736 " I I
1737 + 2 3",
1738 );
1739 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1741 true,
1742 false,
1743 Some(String::from(
1744 "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1745 )),
1746 vec![InequalityPairInfo {
1747 left_idx: 1,
1748 right_idx: 1,
1749 clean_left_state: true, clean_right_state: false,
1751 op: InequalityType::GreaterThanOrEqual,
1752 }],
1753 )
1754 .await;
1755
1756 tx_l.push_barrier(test_epoch(1), false);
1758 tx_r.push_barrier(test_epoch(1), false);
1759 hash_join.next_unwrap_ready_barrier()?;
1760
1761 tx_l.push_chunk(chunk_l1);
1763 hash_join.next_unwrap_pending();
1764
1765 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1768 hash_join.next_unwrap_pending();
1769
1770 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1771 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1772 assert_eq!(
1773 output_watermark,
1774 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1775 );
1776
1777 tx_r.push_chunk(chunk_r1);
1786 let chunk = hash_join.next_unwrap_ready_chunk()?;
1787 assert_eq!(
1788 chunk,
1789 StreamChunk::from_pretty(
1790 " I I I I
1791 + 2 7 2 6"
1792 )
1793 );
1794
1795 tx_r.push_chunk(chunk_r2);
1799 let chunk = hash_join.next_unwrap_ready_chunk()?;
1800 assert_eq!(
1801 chunk,
1802 StreamChunk::from_pretty(
1803 " I I I I
1804 + 2 7 2 3"
1805 )
1806 );
1807
1808 Ok(())
1809 }
1810
1811 #[tokio::test]
1812 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1813 let chunk_l1 = StreamChunk::from_pretty(
1814 " I I
1815 + 1 4
1816 + 2 5
1817 + 3 6",
1818 );
1819 let chunk_l2 = StreamChunk::from_pretty(
1820 " I I
1821 + 3 8
1822 - 3 8",
1823 );
1824 let chunk_r1 = StreamChunk::from_pretty(
1825 " I I
1826 + 2 7
1827 + 4 8
1828 + 6 9",
1829 );
1830 let chunk_r2 = StreamChunk::from_pretty(
1831 " I I
1832 + 3 10
1833 + 6 11",
1834 );
1835 let (mut tx_l, mut tx_r, mut hash_join) =
1836 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1837
1838 tx_l.push_barrier(test_epoch(1), false);
1840 tx_r.push_barrier(test_epoch(1), false);
1841 hash_join.next_unwrap_ready_barrier()?;
1842
1843 tx_l.push_chunk(chunk_l1);
1845 hash_join.next_unwrap_pending();
1846
1847 tx_l.push_barrier(test_epoch(2), false);
1849 tx_r.push_barrier(test_epoch(2), false);
1850 hash_join.next_unwrap_ready_barrier()?;
1851
1852 tx_l.push_chunk(chunk_l2);
1854 hash_join.next_unwrap_pending();
1855
1856 tx_r.push_chunk(chunk_r1);
1858 let chunk = hash_join.next_unwrap_ready_chunk()?;
1859 assert_eq!(
1860 chunk,
1861 StreamChunk::from_pretty(
1862 " I I I I
1863 + 2 5 2 7"
1864 )
1865 );
1866
1867 tx_r.push_chunk(chunk_r2);
1869 let chunk = hash_join.next_unwrap_ready_chunk()?;
1870 assert_eq!(
1871 chunk,
1872 StreamChunk::from_pretty(
1873 " I I I I
1874 + 3 6 3 10"
1875 )
1876 );
1877
1878 Ok(())
1879 }
1880
1881 #[tokio::test]
1882 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1883 let chunk_l1 = StreamChunk::from_pretty(
1884 " I I
1885 + 1 4
1886 + 2 5
1887 + . 6",
1888 );
1889 let chunk_l2 = StreamChunk::from_pretty(
1890 " I I
1891 + . 8
1892 - . 8",
1893 );
1894 let chunk_r1 = StreamChunk::from_pretty(
1895 " I I
1896 + 2 7
1897 + 4 8
1898 + 6 9",
1899 );
1900 let chunk_r2 = StreamChunk::from_pretty(
1901 " I I
1902 + . 10
1903 + 6 11",
1904 );
1905 let (mut tx_l, mut tx_r, mut hash_join) =
1906 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1907
1908 tx_l.push_barrier(test_epoch(1), false);
1910 tx_r.push_barrier(test_epoch(1), false);
1911 hash_join.next_unwrap_ready_barrier()?;
1912
1913 tx_l.push_chunk(chunk_l1);
1915 hash_join.next_unwrap_pending();
1916
1917 tx_l.push_barrier(test_epoch(2), false);
1919 tx_r.push_barrier(test_epoch(2), false);
1920 hash_join.next_unwrap_ready_barrier()?;
1921
1922 tx_l.push_chunk(chunk_l2);
1924 hash_join.next_unwrap_pending();
1925
1926 tx_r.push_chunk(chunk_r1);
1928 let chunk = hash_join.next_unwrap_ready_chunk()?;
1929 assert_eq!(
1930 chunk,
1931 StreamChunk::from_pretty(
1932 " I I I I
1933 + 2 5 2 7"
1934 )
1935 );
1936
1937 tx_r.push_chunk(chunk_r2);
1939 let chunk = hash_join.next_unwrap_ready_chunk()?;
1940 assert_eq!(
1941 chunk,
1942 StreamChunk::from_pretty(
1943 " I I I I
1944 + . 6 . 10"
1945 )
1946 );
1947
1948 Ok(())
1949 }
1950
1951 #[tokio::test]
1952 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1953 let chunk_l1 = StreamChunk::from_pretty(
1954 " I I
1955 + 1 4
1956 + 2 5
1957 + 3 6",
1958 );
1959 let chunk_l2 = StreamChunk::from_pretty(
1960 " I I
1961 + 3 8
1962 - 3 8",
1963 );
1964 let chunk_r1 = StreamChunk::from_pretty(
1965 " I I
1966 + 2 7
1967 + 4 8
1968 + 6 9",
1969 );
1970 let chunk_r2 = StreamChunk::from_pretty(
1971 " I I
1972 + 3 10
1973 + 6 11",
1974 );
1975 let chunk_l3 = StreamChunk::from_pretty(
1976 " I I
1977 + 6 10",
1978 );
1979 let chunk_r3 = StreamChunk::from_pretty(
1980 " I I
1981 - 6 11",
1982 );
1983 let chunk_r4 = StreamChunk::from_pretty(
1984 " I I
1985 - 6 9",
1986 );
1987 let (mut tx_l, mut tx_r, mut hash_join) =
1988 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1989
1990 tx_l.push_barrier(test_epoch(1), false);
1992 tx_r.push_barrier(test_epoch(1), false);
1993 hash_join.next_unwrap_ready_barrier()?;
1994
1995 tx_l.push_chunk(chunk_l1);
1997 hash_join.next_unwrap_pending();
1998
1999 tx_l.push_barrier(test_epoch(2), false);
2001 tx_r.push_barrier(test_epoch(2), false);
2002 hash_join.next_unwrap_ready_barrier()?;
2003
2004 tx_l.push_chunk(chunk_l2);
2006 hash_join.next_unwrap_pending();
2007
2008 tx_r.push_chunk(chunk_r1);
2010 let chunk = hash_join.next_unwrap_ready_chunk()?;
2011 assert_eq!(
2012 chunk,
2013 StreamChunk::from_pretty(
2014 " I I
2015 + 2 5"
2016 )
2017 );
2018
2019 tx_r.push_chunk(chunk_r2);
2021 let chunk = hash_join.next_unwrap_ready_chunk()?;
2022 assert_eq!(
2023 chunk,
2024 StreamChunk::from_pretty(
2025 " I I
2026 + 3 6"
2027 )
2028 );
2029
2030 tx_l.push_chunk(chunk_l3);
2032 let chunk = hash_join.next_unwrap_ready_chunk()?;
2033 assert_eq!(
2034 chunk,
2035 StreamChunk::from_pretty(
2036 " I I
2037 + 6 10"
2038 )
2039 );
2040
2041 tx_r.push_chunk(chunk_r3);
2044 hash_join.next_unwrap_pending();
2045
2046 tx_r.push_chunk(chunk_r4);
2049 let chunk = hash_join.next_unwrap_ready_chunk()?;
2050 assert_eq!(
2051 chunk,
2052 StreamChunk::from_pretty(
2053 " I I
2054 - 6 10"
2055 )
2056 );
2057
2058 Ok(())
2059 }
2060
2061 #[tokio::test]
2062 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2063 let chunk_l1 = StreamChunk::from_pretty(
2064 " I I
2065 + 1 4
2066 + 2 5
2067 + . 6",
2068 );
2069 let chunk_l2 = StreamChunk::from_pretty(
2070 " I I
2071 + . 8
2072 - . 8",
2073 );
2074 let chunk_r1 = StreamChunk::from_pretty(
2075 " I I
2076 + 2 7
2077 + 4 8
2078 + 6 9",
2079 );
2080 let chunk_r2 = StreamChunk::from_pretty(
2081 " I I
2082 + . 10
2083 + 6 11",
2084 );
2085 let chunk_l3 = StreamChunk::from_pretty(
2086 " I I
2087 + 6 10",
2088 );
2089 let chunk_r3 = StreamChunk::from_pretty(
2090 " I I
2091 - 6 11",
2092 );
2093 let chunk_r4 = StreamChunk::from_pretty(
2094 " I I
2095 - 6 9",
2096 );
2097 let (mut tx_l, mut tx_r, mut hash_join) =
2098 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2099
2100 tx_l.push_barrier(test_epoch(1), false);
2102 tx_r.push_barrier(test_epoch(1), false);
2103 hash_join.next_unwrap_ready_barrier()?;
2104
2105 tx_l.push_chunk(chunk_l1);
2107 hash_join.next_unwrap_pending();
2108
2109 tx_l.push_barrier(test_epoch(2), false);
2111 tx_r.push_barrier(test_epoch(2), false);
2112 hash_join.next_unwrap_ready_barrier()?;
2113
2114 tx_l.push_chunk(chunk_l2);
2116 hash_join.next_unwrap_pending();
2117
2118 tx_r.push_chunk(chunk_r1);
2120 let chunk = hash_join.next_unwrap_ready_chunk()?;
2121 assert_eq!(
2122 chunk,
2123 StreamChunk::from_pretty(
2124 " I I
2125 + 2 5"
2126 )
2127 );
2128
2129 tx_r.push_chunk(chunk_r2);
2131 let chunk = hash_join.next_unwrap_ready_chunk()?;
2132 assert_eq!(
2133 chunk,
2134 StreamChunk::from_pretty(
2135 " I I
2136 + . 6"
2137 )
2138 );
2139
2140 tx_l.push_chunk(chunk_l3);
2142 let chunk = hash_join.next_unwrap_ready_chunk()?;
2143 assert_eq!(
2144 chunk,
2145 StreamChunk::from_pretty(
2146 " I I
2147 + 6 10"
2148 )
2149 );
2150
2151 tx_r.push_chunk(chunk_r3);
2154 hash_join.next_unwrap_pending();
2155
2156 tx_r.push_chunk(chunk_r4);
2159 let chunk = hash_join.next_unwrap_ready_chunk()?;
2160 assert_eq!(
2161 chunk,
2162 StreamChunk::from_pretty(
2163 " I I
2164 - 6 10"
2165 )
2166 );
2167
2168 Ok(())
2169 }
2170
2171 #[tokio::test]
2172 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2173 let chunk_l1 = StreamChunk::from_pretty(
2174 " I I I
2175 + 1 4 1
2176 + 2 5 2
2177 + 3 6 3",
2178 );
2179 let chunk_l2 = StreamChunk::from_pretty(
2180 " I I I
2181 + 4 9 4
2182 + 5 10 5",
2183 );
2184 let chunk_r1 = StreamChunk::from_pretty(
2185 " I I I
2186 + 2 5 1
2187 + 4 9 2
2188 + 6 9 3",
2189 );
2190 let chunk_r2 = StreamChunk::from_pretty(
2191 " I I I
2192 + 1 4 4
2193 + 3 6 5",
2194 );
2195
2196 let (mut tx_l, mut tx_r, mut hash_join) =
2197 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2198
2199 tx_l.push_barrier(test_epoch(1), false);
2201 tx_r.push_barrier(test_epoch(1), false);
2202 hash_join.next_unwrap_ready_barrier()?;
2203
2204 tx_l.push_chunk(chunk_l1);
2206 hash_join.next_unwrap_pending();
2207
2208 tx_l.push_barrier(test_epoch(2), false);
2210 tx_r.push_barrier(test_epoch(2), false);
2211 hash_join.next_unwrap_ready_barrier()?;
2212
2213 tx_l.push_chunk(chunk_l2);
2215 hash_join.next_unwrap_pending();
2216
2217 tx_r.push_chunk(chunk_r1);
2219 let chunk = hash_join.next_unwrap_ready_chunk()?;
2220 assert_eq!(
2221 chunk,
2222 StreamChunk::from_pretty(
2223 " I I I I I I
2224 + 2 5 2 2 5 1
2225 + 4 9 4 4 9 2"
2226 )
2227 );
2228
2229 tx_r.push_chunk(chunk_r2);
2231 let chunk = hash_join.next_unwrap_ready_chunk()?;
2232 assert_eq!(
2233 chunk,
2234 StreamChunk::from_pretty(
2235 " I I I I I I
2236 + 1 4 1 1 4 4
2237 + 3 6 3 3 6 5"
2238 )
2239 );
2240
2241 Ok(())
2242 }
2243
2244 #[tokio::test]
2245 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2246 let chunk_l1 = StreamChunk::from_pretty(
2247 " I I I
2248 + 1 4 1
2249 + 2 5 2
2250 + 3 6 3",
2251 );
2252 let chunk_l2 = StreamChunk::from_pretty(
2253 " I I I
2254 + 4 9 4
2255 + 5 10 5",
2256 );
2257 let chunk_r1 = StreamChunk::from_pretty(
2258 " I I I
2259 + 2 5 1
2260 + 4 9 2
2261 + 6 9 3",
2262 );
2263 let chunk_r2 = StreamChunk::from_pretty(
2264 " I I I
2265 + 1 4 4
2266 + 3 6 5",
2267 );
2268
2269 let (mut tx_l, mut tx_r, mut hash_join) =
2270 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2271
2272 tx_l.push_barrier(test_epoch(1), false);
2274 tx_r.push_barrier(test_epoch(1), false);
2275 hash_join.next_unwrap_ready_barrier()?;
2276
2277 tx_l.push_chunk(chunk_l1);
2279 hash_join.next_unwrap_pending();
2280
2281 tx_l.push_barrier(test_epoch(2), false);
2283 tx_r.push_barrier(test_epoch(2), false);
2284 hash_join.next_unwrap_ready_barrier()?;
2285
2286 tx_l.push_chunk(chunk_l2);
2288 hash_join.next_unwrap_pending();
2289
2290 tx_r.push_chunk(chunk_r1);
2292 let chunk = hash_join.next_unwrap_ready_chunk()?;
2293 assert_eq!(
2294 chunk,
2295 StreamChunk::from_pretty(
2296 " I I I
2297 + 2 5 2
2298 + 4 9 4"
2299 )
2300 );
2301
2302 tx_r.push_chunk(chunk_r2);
2304 let chunk = hash_join.next_unwrap_ready_chunk()?;
2305 assert_eq!(
2306 chunk,
2307 StreamChunk::from_pretty(
2308 " I I I
2309 + 1 4 1
2310 + 3 6 3"
2311 )
2312 );
2313
2314 Ok(())
2315 }
2316
2317 #[tokio::test]
2318 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2319 let chunk_l1 = StreamChunk::from_pretty(
2320 " I I I
2321 + 1 4 1
2322 + 2 5 2
2323 + 3 6 3",
2324 );
2325 let chunk_l2 = StreamChunk::from_pretty(
2326 " I I I
2327 + 4 9 4
2328 + 5 10 5",
2329 );
2330 let chunk_r1 = StreamChunk::from_pretty(
2331 " I I I
2332 + 2 5 1
2333 + 4 9 2
2334 + 6 9 3",
2335 );
2336 let chunk_r2 = StreamChunk::from_pretty(
2337 " I I I
2338 + 1 4 4
2339 + 3 6 5",
2340 );
2341
2342 let (mut tx_l, mut tx_r, mut hash_join) =
2343 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2344
2345 tx_l.push_barrier(test_epoch(1), false);
2347 tx_r.push_barrier(test_epoch(1), false);
2348 hash_join.next_unwrap_ready_barrier()?;
2349
2350 tx_l.push_chunk(chunk_l1);
2352 hash_join.next_unwrap_pending();
2353
2354 tx_l.push_barrier(test_epoch(2), false);
2356 tx_r.push_barrier(test_epoch(2), false);
2357 hash_join.next_unwrap_ready_barrier()?;
2358
2359 tx_l.push_chunk(chunk_l2);
2361 hash_join.next_unwrap_pending();
2362
2363 tx_r.push_chunk(chunk_r1);
2365 let chunk = hash_join.next_unwrap_ready_chunk()?;
2366 assert_eq!(
2367 chunk,
2368 StreamChunk::from_pretty(
2369 " I I I
2370 + 2 5 1
2371 + 4 9 2"
2372 )
2373 );
2374
2375 tx_r.push_chunk(chunk_r2);
2377 let chunk = hash_join.next_unwrap_ready_chunk()?;
2378 assert_eq!(
2379 chunk,
2380 StreamChunk::from_pretty(
2381 " I I I
2382 + 1 4 4
2383 + 3 6 5"
2384 )
2385 );
2386
2387 Ok(())
2388 }
2389
2390 #[tokio::test]
2391 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2392 let chunk_r1 = StreamChunk::from_pretty(
2393 " I I
2394 + 1 4
2395 + 2 5
2396 + 3 6",
2397 );
2398 let chunk_r2 = StreamChunk::from_pretty(
2399 " I I
2400 + 3 8
2401 - 3 8",
2402 );
2403 let chunk_l1 = StreamChunk::from_pretty(
2404 " I I
2405 + 2 7
2406 + 4 8
2407 + 6 9",
2408 );
2409 let chunk_l2 = StreamChunk::from_pretty(
2410 " I I
2411 + 3 10
2412 + 6 11",
2413 );
2414 let chunk_r3 = StreamChunk::from_pretty(
2415 " I I
2416 + 6 10",
2417 );
2418 let chunk_l3 = StreamChunk::from_pretty(
2419 " I I
2420 - 6 11",
2421 );
2422 let chunk_l4 = StreamChunk::from_pretty(
2423 " I I
2424 - 6 9",
2425 );
2426 let (mut tx_l, mut tx_r, mut hash_join) =
2427 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2428
2429 tx_l.push_barrier(test_epoch(1), false);
2431 tx_r.push_barrier(test_epoch(1), false);
2432 hash_join.next_unwrap_ready_barrier()?;
2433
2434 tx_r.push_chunk(chunk_r1);
2436 hash_join.next_unwrap_pending();
2437
2438 tx_l.push_barrier(test_epoch(2), false);
2440 tx_r.push_barrier(test_epoch(2), false);
2441 hash_join.next_unwrap_ready_barrier()?;
2442
2443 tx_r.push_chunk(chunk_r2);
2445 hash_join.next_unwrap_pending();
2446
2447 tx_l.push_chunk(chunk_l1);
2449 let chunk = hash_join.next_unwrap_ready_chunk()?;
2450 assert_eq!(
2451 chunk,
2452 StreamChunk::from_pretty(
2453 " I I
2454 + 2 5"
2455 )
2456 );
2457
2458 tx_l.push_chunk(chunk_l2);
2460 let chunk = hash_join.next_unwrap_ready_chunk()?;
2461 assert_eq!(
2462 chunk,
2463 StreamChunk::from_pretty(
2464 " I I
2465 + 3 6"
2466 )
2467 );
2468
2469 tx_r.push_chunk(chunk_r3);
2471 let chunk = hash_join.next_unwrap_ready_chunk()?;
2472 assert_eq!(
2473 chunk,
2474 StreamChunk::from_pretty(
2475 " I I
2476 + 6 10"
2477 )
2478 );
2479
2480 tx_l.push_chunk(chunk_l3);
2483 hash_join.next_unwrap_pending();
2484
2485 tx_l.push_chunk(chunk_l4);
2488 let chunk = hash_join.next_unwrap_ready_chunk()?;
2489 assert_eq!(
2490 chunk,
2491 StreamChunk::from_pretty(
2492 " I I
2493 - 6 10"
2494 )
2495 );
2496
2497 Ok(())
2498 }
2499
2500 #[tokio::test]
2501 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2502 let chunk_l1 = StreamChunk::from_pretty(
2503 " I I
2504 + 1 4
2505 + 2 5
2506 + 3 6",
2507 );
2508 let chunk_l2 = StreamChunk::from_pretty(
2509 " I I
2510 + 3 8
2511 - 3 8",
2512 );
2513 let chunk_r1 = StreamChunk::from_pretty(
2514 " I I
2515 + 2 7
2516 + 4 8
2517 + 6 9",
2518 );
2519 let chunk_r2 = StreamChunk::from_pretty(
2520 " I I
2521 + 3 10
2522 + 6 11
2523 + 1 2
2524 + 1 3",
2525 );
2526 let chunk_l3 = StreamChunk::from_pretty(
2527 " I I
2528 + 9 10",
2529 );
2530 let chunk_r3 = StreamChunk::from_pretty(
2531 " I I
2532 - 1 2",
2533 );
2534 let chunk_r4 = StreamChunk::from_pretty(
2535 " I I
2536 - 1 3",
2537 );
2538 let (mut tx_l, mut tx_r, mut hash_join) =
2539 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2540
2541 tx_l.push_barrier(test_epoch(1), false);
2543 tx_r.push_barrier(test_epoch(1), false);
2544 hash_join.next_unwrap_ready_barrier()?;
2545
2546 tx_l.push_chunk(chunk_l1);
2548 let chunk = hash_join.next_unwrap_ready_chunk()?;
2549 assert_eq!(
2550 chunk,
2551 StreamChunk::from_pretty(
2552 " I I
2553 + 1 4
2554 + 2 5
2555 + 3 6",
2556 )
2557 );
2558
2559 tx_l.push_barrier(test_epoch(2), false);
2561 tx_r.push_barrier(test_epoch(2), false);
2562 hash_join.next_unwrap_ready_barrier()?;
2563
2564 tx_l.push_chunk(chunk_l2);
2566 let chunk = hash_join.next_unwrap_ready_chunk()?;
2567 assert_eq!(
2568 chunk,
2569 StreamChunk::from_pretty(
2570 " I I
2571 + 3 8 D
2572 - 3 8 D",
2573 )
2574 );
2575
2576 tx_r.push_chunk(chunk_r1);
2578 let chunk = hash_join.next_unwrap_ready_chunk()?;
2579 assert_eq!(
2580 chunk,
2581 StreamChunk::from_pretty(
2582 " I I
2583 - 2 5"
2584 )
2585 );
2586
2587 tx_r.push_chunk(chunk_r2);
2589 let chunk = hash_join.next_unwrap_ready_chunk()?;
2590 assert_eq!(
2591 chunk,
2592 StreamChunk::from_pretty(
2593 " I I
2594 - 3 6
2595 - 1 4"
2596 )
2597 );
2598
2599 tx_l.push_chunk(chunk_l3);
2601 let chunk = hash_join.next_unwrap_ready_chunk()?;
2602 assert_eq!(
2603 chunk,
2604 StreamChunk::from_pretty(
2605 " I I
2606 + 9 10"
2607 )
2608 );
2609
2610 tx_r.push_chunk(chunk_r3);
2613 hash_join.next_unwrap_pending();
2614
2615 tx_r.push_chunk(chunk_r4);
2618 let chunk = hash_join.next_unwrap_ready_chunk()?;
2619 assert_eq!(
2620 chunk,
2621 StreamChunk::from_pretty(
2622 " I I
2623 + 1 4"
2624 )
2625 );
2626
2627 Ok(())
2628 }
2629
2630 #[tokio::test]
2631 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2632 let chunk_r1 = StreamChunk::from_pretty(
2633 " I I
2634 + 1 4
2635 + 2 5
2636 + 3 6",
2637 );
2638 let chunk_r2 = StreamChunk::from_pretty(
2639 " I I
2640 + 3 8
2641 - 3 8",
2642 );
2643 let chunk_l1 = StreamChunk::from_pretty(
2644 " I I
2645 + 2 7
2646 + 4 8
2647 + 6 9",
2648 );
2649 let chunk_l2 = StreamChunk::from_pretty(
2650 " I I
2651 + 3 10
2652 + 6 11
2653 + 1 2
2654 + 1 3",
2655 );
2656 let chunk_r3 = StreamChunk::from_pretty(
2657 " I I
2658 + 9 10",
2659 );
2660 let chunk_l3 = StreamChunk::from_pretty(
2661 " I I
2662 - 1 2",
2663 );
2664 let chunk_l4 = StreamChunk::from_pretty(
2665 " I I
2666 - 1 3",
2667 );
2668 let (mut tx_r, mut tx_l, mut hash_join) =
2669 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2670
2671 tx_r.push_barrier(test_epoch(1), false);
2673 tx_l.push_barrier(test_epoch(1), false);
2674 hash_join.next_unwrap_ready_barrier()?;
2675
2676 tx_r.push_chunk(chunk_r1);
2678 let chunk = hash_join.next_unwrap_ready_chunk()?;
2679 assert_eq!(
2680 chunk,
2681 StreamChunk::from_pretty(
2682 " I I
2683 + 1 4
2684 + 2 5
2685 + 3 6",
2686 )
2687 );
2688
2689 tx_r.push_barrier(test_epoch(2), false);
2691 tx_l.push_barrier(test_epoch(2), false);
2692 hash_join.next_unwrap_ready_barrier()?;
2693
2694 tx_r.push_chunk(chunk_r2);
2696 let chunk = hash_join.next_unwrap_ready_chunk()?;
2697 assert_eq!(
2698 chunk,
2699 StreamChunk::from_pretty(
2700 " I I
2701 + 3 8 D
2702 - 3 8 D",
2703 )
2704 );
2705
2706 tx_l.push_chunk(chunk_l1);
2708 let chunk = hash_join.next_unwrap_ready_chunk()?;
2709 assert_eq!(
2710 chunk,
2711 StreamChunk::from_pretty(
2712 " I I
2713 - 2 5"
2714 )
2715 );
2716
2717 tx_l.push_chunk(chunk_l2);
2719 let chunk = hash_join.next_unwrap_ready_chunk()?;
2720 assert_eq!(
2721 chunk,
2722 StreamChunk::from_pretty(
2723 " I I
2724 - 3 6
2725 - 1 4"
2726 )
2727 );
2728
2729 tx_r.push_chunk(chunk_r3);
2731 let chunk = hash_join.next_unwrap_ready_chunk()?;
2732 assert_eq!(
2733 chunk,
2734 StreamChunk::from_pretty(
2735 " I I
2736 + 9 10"
2737 )
2738 );
2739
2740 tx_l.push_chunk(chunk_l3);
2743 hash_join.next_unwrap_pending();
2744
2745 tx_l.push_chunk(chunk_l4);
2748 let chunk = hash_join.next_unwrap_ready_chunk()?;
2749 assert_eq!(
2750 chunk,
2751 StreamChunk::from_pretty(
2752 " I I
2753 + 1 4"
2754 )
2755 );
2756
2757 Ok(())
2758 }
2759
2760 #[tokio::test]
2761 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2762 let chunk_l1 = StreamChunk::from_pretty(
2763 " I I
2764 + 1 4
2765 + 2 5
2766 + 3 6",
2767 );
2768 let chunk_l2 = StreamChunk::from_pretty(
2769 " I I
2770 + 6 8
2771 + 3 8",
2772 );
2773 let chunk_r1 = StreamChunk::from_pretty(
2774 " I I
2775 + 2 7
2776 + 4 8
2777 + 6 9",
2778 );
2779 let chunk_r2 = StreamChunk::from_pretty(
2780 " I I
2781 + 3 10
2782 + 6 11",
2783 );
2784 let (mut tx_l, mut tx_r, mut hash_join) =
2785 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2786
2787 tx_l.push_barrier(test_epoch(1), false);
2789 tx_r.push_barrier(test_epoch(1), false);
2790 hash_join.next_unwrap_ready_barrier()?;
2791
2792 tx_l.push_chunk(chunk_l1);
2794 hash_join.next_unwrap_pending();
2795
2796 tx_l.push_barrier(test_epoch(2), false);
2798
2799 tx_l.push_chunk(chunk_l2);
2801
2802 tx_r.push_chunk(chunk_r1);
2804
2805 let chunk = hash_join.next_unwrap_ready_chunk()?;
2807 assert_eq!(
2808 chunk,
2809 StreamChunk::from_pretty(
2810 " I I I I
2811 + 2 5 2 7"
2812 )
2813 );
2814
2815 tx_r.push_barrier(test_epoch(2), false);
2817
2818 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2820 assert!(matches!(
2821 hash_join.next_unwrap_ready_barrier()?,
2822 Barrier {
2823 epoch,
2824 mutation: None,
2825 ..
2826 } if epoch == expected_epoch
2827 ));
2828
2829 let chunk = hash_join.next_unwrap_ready_chunk()?;
2831 assert_eq!(
2832 chunk,
2833 StreamChunk::from_pretty(
2834 " I I I I
2835 + 6 8 6 9"
2836 )
2837 );
2838
2839 tx_r.push_chunk(chunk_r2);
2841 let chunk = hash_join.next_unwrap_ready_chunk()?;
2842 assert_eq!(
2843 chunk,
2844 StreamChunk::from_pretty(
2845 " I I I I
2846 + 3 6 3 10
2847 + 3 8 3 10
2848 + 6 8 6 11"
2849 )
2850 );
2851
2852 Ok(())
2853 }
2854
2855 #[tokio::test]
2856 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2857 let chunk_l1 = StreamChunk::from_pretty(
2858 " I I
2859 + 1 4
2860 + 2 .
2861 + 3 .",
2862 );
2863 let chunk_l2 = StreamChunk::from_pretty(
2864 " I I
2865 + 6 .
2866 + 3 8",
2867 );
2868 let chunk_r1 = StreamChunk::from_pretty(
2869 " I I
2870 + 2 7
2871 + 4 8
2872 + 6 9",
2873 );
2874 let chunk_r2 = StreamChunk::from_pretty(
2875 " I I
2876 + 3 10
2877 + 6 11",
2878 );
2879 let (mut tx_l, mut tx_r, mut hash_join) =
2880 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2881
2882 tx_l.push_barrier(test_epoch(1), false);
2884 tx_r.push_barrier(test_epoch(1), false);
2885 hash_join.next_unwrap_ready_barrier()?;
2886
2887 tx_l.push_chunk(chunk_l1);
2889 hash_join.next_unwrap_pending();
2890
2891 tx_l.push_barrier(test_epoch(2), false);
2893
2894 tx_l.push_chunk(chunk_l2);
2896
2897 tx_r.push_chunk(chunk_r1);
2899
2900 let chunk = hash_join.next_unwrap_ready_chunk()?;
2902 assert_eq!(
2903 chunk,
2904 StreamChunk::from_pretty(
2905 " I I I I
2906 + 2 . 2 7"
2907 )
2908 );
2909
2910 tx_r.push_barrier(test_epoch(2), false);
2912
2913 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2915 assert!(matches!(
2916 hash_join.next_unwrap_ready_barrier()?,
2917 Barrier {
2918 epoch,
2919 mutation: None,
2920 ..
2921 } if epoch == expected_epoch
2922 ));
2923
2924 let chunk = hash_join.next_unwrap_ready_chunk()?;
2926 assert_eq!(
2927 chunk,
2928 StreamChunk::from_pretty(
2929 " I I I I
2930 + 6 . 6 9"
2931 )
2932 );
2933
2934 tx_r.push_chunk(chunk_r2);
2936 let chunk = hash_join.next_unwrap_ready_chunk()?;
2937 assert_eq!(
2938 chunk,
2939 StreamChunk::from_pretty(
2940 " I I I I
2941 + 3 8 3 10
2942 + 3 . 3 10
2943 + 6 . 6 11"
2944 )
2945 );
2946
2947 Ok(())
2948 }
2949
2950 #[tokio::test]
2951 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2952 let chunk_l1 = StreamChunk::from_pretty(
2953 " I I
2954 + 1 4
2955 + 2 5
2956 + 3 6",
2957 );
2958 let chunk_l2 = StreamChunk::from_pretty(
2959 " I I
2960 + 3 8
2961 - 3 8",
2962 );
2963 let chunk_r1 = StreamChunk::from_pretty(
2964 " I I
2965 + 2 7
2966 + 4 8
2967 + 6 9",
2968 );
2969 let chunk_r2 = StreamChunk::from_pretty(
2970 " I I
2971 + 3 10
2972 + 6 11",
2973 );
2974 let (mut tx_l, mut tx_r, mut hash_join) =
2975 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2976
2977 tx_l.push_barrier(test_epoch(1), false);
2979 tx_r.push_barrier(test_epoch(1), false);
2980 hash_join.next_unwrap_ready_barrier()?;
2981
2982 tx_l.push_chunk(chunk_l1);
2984 let chunk = hash_join.next_unwrap_ready_chunk()?;
2985 assert_eq!(
2986 chunk,
2987 StreamChunk::from_pretty(
2988 " I I I I
2989 + 1 4 . .
2990 + 2 5 . .
2991 + 3 6 . ."
2992 )
2993 );
2994
2995 tx_l.push_chunk(chunk_l2);
2997 let chunk = hash_join.next_unwrap_ready_chunk()?;
2998 assert_eq!(
2999 chunk,
3000 StreamChunk::from_pretty(
3001 " I I I I
3002 + 3 8 . . D
3003 - 3 8 . . D"
3004 )
3005 );
3006
3007 tx_r.push_chunk(chunk_r1);
3009 let chunk = hash_join.next_unwrap_ready_chunk()?;
3010 assert_eq!(
3011 chunk,
3012 StreamChunk::from_pretty(
3013 " I I I I
3014 - 2 5 . .
3015 + 2 5 2 7"
3016 )
3017 );
3018
3019 tx_r.push_chunk(chunk_r2);
3021 let chunk = hash_join.next_unwrap_ready_chunk()?;
3022 assert_eq!(
3023 chunk,
3024 StreamChunk::from_pretty(
3025 " I I I I
3026 - 3 6 . .
3027 + 3 6 3 10"
3028 )
3029 );
3030
3031 Ok(())
3032 }
3033
3034 #[tokio::test]
3035 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3036 let chunk_l1 = StreamChunk::from_pretty(
3037 " I I
3038 + 1 4
3039 + 2 5
3040 + . 6",
3041 );
3042 let chunk_l2 = StreamChunk::from_pretty(
3043 " I I
3044 + . 8
3045 - . 8",
3046 );
3047 let chunk_r1 = StreamChunk::from_pretty(
3048 " I I
3049 + 2 7
3050 + 4 8
3051 + 6 9",
3052 );
3053 let chunk_r2 = StreamChunk::from_pretty(
3054 " I I
3055 + . 10
3056 + 6 11",
3057 );
3058 let (mut tx_l, mut tx_r, mut hash_join) =
3059 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3060
3061 tx_l.push_barrier(test_epoch(1), false);
3063 tx_r.push_barrier(test_epoch(1), false);
3064 hash_join.next_unwrap_ready_barrier()?;
3065
3066 tx_l.push_chunk(chunk_l1);
3068 let chunk = hash_join.next_unwrap_ready_chunk()?;
3069 assert_eq!(
3070 chunk,
3071 StreamChunk::from_pretty(
3072 " I I I I
3073 + 1 4 . .
3074 + 2 5 . .
3075 + . 6 . ."
3076 )
3077 );
3078
3079 tx_l.push_chunk(chunk_l2);
3081 let chunk = hash_join.next_unwrap_ready_chunk()?;
3082 assert_eq!(
3083 chunk,
3084 StreamChunk::from_pretty(
3085 " I I I I
3086 + . 8 . . D
3087 - . 8 . . D"
3088 )
3089 );
3090
3091 tx_r.push_chunk(chunk_r1);
3093 let chunk = hash_join.next_unwrap_ready_chunk()?;
3094 assert_eq!(
3095 chunk,
3096 StreamChunk::from_pretty(
3097 " I I I I
3098 - 2 5 . .
3099 + 2 5 2 7"
3100 )
3101 );
3102
3103 tx_r.push_chunk(chunk_r2);
3105 let chunk = hash_join.next_unwrap_ready_chunk()?;
3106 assert_eq!(
3107 chunk,
3108 StreamChunk::from_pretty(
3109 " I I I I
3110 - . 6 . .
3111 + . 6 . 10"
3112 )
3113 );
3114
3115 Ok(())
3116 }
3117
3118 #[tokio::test]
3119 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3120 let chunk_l1 = StreamChunk::from_pretty(
3121 " I I
3122 + 1 4
3123 + 2 5
3124 + 3 6",
3125 );
3126 let chunk_l2 = StreamChunk::from_pretty(
3127 " I I
3128 + 3 8
3129 - 3 8",
3130 );
3131 let chunk_r1 = StreamChunk::from_pretty(
3132 " I I
3133 + 2 7
3134 + 4 8
3135 + 6 9",
3136 );
3137 let chunk_r2 = StreamChunk::from_pretty(
3138 " I I
3139 + 5 10
3140 - 5 10",
3141 );
3142 let (mut tx_l, mut tx_r, mut hash_join) =
3143 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3144
3145 tx_l.push_barrier(test_epoch(1), false);
3147 tx_r.push_barrier(test_epoch(1), false);
3148 hash_join.next_unwrap_ready_barrier()?;
3149
3150 tx_l.push_chunk(chunk_l1);
3152 hash_join.next_unwrap_pending();
3153
3154 tx_l.push_chunk(chunk_l2);
3156 hash_join.next_unwrap_pending();
3157
3158 tx_r.push_chunk(chunk_r1);
3160 let chunk = hash_join.next_unwrap_ready_chunk()?;
3161 assert_eq!(
3162 chunk,
3163 StreamChunk::from_pretty(
3164 " I I I I
3165 + 2 5 2 7
3166 + . . 4 8
3167 + . . 6 9"
3168 )
3169 );
3170
3171 tx_r.push_chunk(chunk_r2);
3173 let chunk = hash_join.next_unwrap_ready_chunk()?;
3174 assert_eq!(
3175 chunk,
3176 StreamChunk::from_pretty(
3177 " I I I I
3178 + . . 5 10 D
3179 - . . 5 10 D"
3180 )
3181 );
3182
3183 Ok(())
3184 }
3185
3186 #[tokio::test]
3187 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3188 let chunk_l1 = StreamChunk::from_pretty(
3189 " I I I
3190 + 1 4 1
3191 + 2 5 2
3192 + 3 6 3",
3193 );
3194 let chunk_l2 = StreamChunk::from_pretty(
3195 " I I I
3196 + 4 9 4
3197 + 5 10 5",
3198 );
3199 let chunk_r1 = StreamChunk::from_pretty(
3200 " I I I
3201 + 2 5 1
3202 + 4 9 2
3203 + 6 9 3",
3204 );
3205 let chunk_r2 = StreamChunk::from_pretty(
3206 " I I I
3207 + 1 4 4
3208 + 3 6 5",
3209 );
3210
3211 let (mut tx_l, mut tx_r, mut hash_join) =
3212 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3213
3214 tx_l.push_barrier(test_epoch(1), false);
3216 tx_r.push_barrier(test_epoch(1), false);
3217 hash_join.next_unwrap_ready_barrier()?;
3218
3219 tx_l.push_chunk(chunk_l1);
3221 let chunk = hash_join.next_unwrap_ready_chunk()?;
3222 assert_eq!(
3223 chunk,
3224 StreamChunk::from_pretty(
3225 " I I I I I I
3226 + 1 4 1 . . .
3227 + 2 5 2 . . .
3228 + 3 6 3 . . ."
3229 )
3230 );
3231
3232 tx_l.push_chunk(chunk_l2);
3234 let chunk = hash_join.next_unwrap_ready_chunk()?;
3235 assert_eq!(
3236 chunk,
3237 StreamChunk::from_pretty(
3238 " I I I I I I
3239 + 4 9 4 . . .
3240 + 5 10 5 . . ."
3241 )
3242 );
3243
3244 tx_r.push_chunk(chunk_r1);
3246 let chunk = hash_join.next_unwrap_ready_chunk()?;
3247 assert_eq!(
3248 chunk,
3249 StreamChunk::from_pretty(
3250 " I I I I I I
3251 - 2 5 2 . . .
3252 + 2 5 2 2 5 1
3253 - 4 9 4 . . .
3254 + 4 9 4 4 9 2"
3255 )
3256 );
3257
3258 tx_r.push_chunk(chunk_r2);
3260 let chunk = hash_join.next_unwrap_ready_chunk()?;
3261 assert_eq!(
3262 chunk,
3263 StreamChunk::from_pretty(
3264 " I I I I I I
3265 - 1 4 1 . . .
3266 + 1 4 1 1 4 4
3267 - 3 6 3 . . .
3268 + 3 6 3 3 6 5"
3269 )
3270 );
3271
3272 Ok(())
3273 }
3274
3275 #[tokio::test]
3276 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3277 let chunk_l1 = StreamChunk::from_pretty(
3278 " I I I
3279 + 1 4 1
3280 + 2 5 2
3281 + 3 6 3",
3282 );
3283 let chunk_l2 = StreamChunk::from_pretty(
3284 " I I I
3285 + 4 9 4
3286 + 5 10 5",
3287 );
3288 let chunk_r1 = StreamChunk::from_pretty(
3289 " I I I
3290 + 2 5 1
3291 + 4 9 2
3292 + 6 9 3",
3293 );
3294 let chunk_r2 = StreamChunk::from_pretty(
3295 " I I I
3296 + 1 4 4
3297 + 3 6 5
3298 + 7 7 6",
3299 );
3300
3301 let (mut tx_l, mut tx_r, mut hash_join) =
3302 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3303
3304 tx_l.push_barrier(test_epoch(1), false);
3306 tx_r.push_barrier(test_epoch(1), false);
3307 hash_join.next_unwrap_ready_barrier()?;
3308
3309 tx_l.push_chunk(chunk_l1);
3311 hash_join.next_unwrap_pending();
3312
3313 tx_l.push_chunk(chunk_l2);
3315 hash_join.next_unwrap_pending();
3316
3317 tx_r.push_chunk(chunk_r1);
3319 let chunk = hash_join.next_unwrap_ready_chunk()?;
3320 assert_eq!(
3321 chunk,
3322 StreamChunk::from_pretty(
3323 " I I I I I I
3324 + 2 5 2 2 5 1
3325 + 4 9 4 4 9 2
3326 + . . . 6 9 3"
3327 )
3328 );
3329
3330 tx_r.push_chunk(chunk_r2);
3332 let chunk = hash_join.next_unwrap_ready_chunk()?;
3333 assert_eq!(
3334 chunk,
3335 StreamChunk::from_pretty(
3336 " I I I I I I
3337 + 1 4 1 1 4 4
3338 + 3 6 3 3 6 5
3339 + . . . 7 7 6"
3340 )
3341 );
3342
3343 Ok(())
3344 }
3345
3346 #[tokio::test]
3347 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3348 let chunk_l1 = StreamChunk::from_pretty(
3349 " I I
3350 + 1 4
3351 + 2 5
3352 + 3 6",
3353 );
3354 let chunk_l2 = StreamChunk::from_pretty(
3355 " I I
3356 + 3 8
3357 - 3 8",
3358 );
3359 let chunk_r1 = StreamChunk::from_pretty(
3360 " I I
3361 + 2 7
3362 + 4 8
3363 + 6 9",
3364 );
3365 let chunk_r2 = StreamChunk::from_pretty(
3366 " I I
3367 + 5 10
3368 - 5 10",
3369 );
3370 let (mut tx_l, mut tx_r, mut hash_join) =
3371 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3372
3373 tx_l.push_barrier(test_epoch(1), false);
3375 tx_r.push_barrier(test_epoch(1), false);
3376 hash_join.next_unwrap_ready_barrier()?;
3377
3378 tx_l.push_chunk(chunk_l1);
3380 let chunk = hash_join.next_unwrap_ready_chunk()?;
3381 assert_eq!(
3382 chunk,
3383 StreamChunk::from_pretty(
3384 " I I I I
3385 + 1 4 . .
3386 + 2 5 . .
3387 + 3 6 . ."
3388 )
3389 );
3390
3391 tx_l.push_chunk(chunk_l2);
3393 let chunk = hash_join.next_unwrap_ready_chunk()?;
3394 assert_eq!(
3395 chunk,
3396 StreamChunk::from_pretty(
3397 " I I I I
3398 + 3 8 . . D
3399 - 3 8 . . D"
3400 )
3401 );
3402
3403 tx_r.push_chunk(chunk_r1);
3405 let chunk = hash_join.next_unwrap_ready_chunk()?;
3406 assert_eq!(
3407 chunk,
3408 StreamChunk::from_pretty(
3409 " I I I I
3410 - 2 5 . .
3411 + 2 5 2 7
3412 + . . 4 8
3413 + . . 6 9"
3414 )
3415 );
3416
3417 tx_r.push_chunk(chunk_r2);
3419 let chunk = hash_join.next_unwrap_ready_chunk()?;
3420 assert_eq!(
3421 chunk,
3422 StreamChunk::from_pretty(
3423 " I I I I
3424 + . . 5 10 D
3425 - . . 5 10 D"
3426 )
3427 );
3428
3429 Ok(())
3430 }
3431
3432 #[tokio::test]
3433 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3434 let (mut tx_l, mut tx_r, mut hash_join) =
3435 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3436
3437 tx_l.push_barrier(test_epoch(1), false);
3439 tx_r.push_barrier(test_epoch(1), false);
3440 hash_join.next_unwrap_ready_barrier()?;
3441
3442 tx_l.push_chunk(StreamChunk::from_pretty(
3443 " I I
3444 + 1 1
3445 ",
3446 ));
3447 let chunk = hash_join.next_unwrap_ready_chunk()?;
3448 assert_eq!(
3449 chunk,
3450 StreamChunk::from_pretty(
3451 " I I I I
3452 + 1 1 . ."
3453 )
3454 );
3455
3456 tx_r.push_chunk(StreamChunk::from_pretty(
3457 " I I
3458 + 1 1
3459 ",
3460 ));
3461 let chunk = hash_join.next_unwrap_ready_chunk()?;
3462
3463 assert_eq!(
3464 chunk,
3465 StreamChunk::from_pretty(
3466 " I I I I
3467 - 1 1 . .
3468 + 1 1 1 1"
3469 )
3470 );
3471
3472 tx_l.push_chunk(StreamChunk::from_pretty(
3473 " I I
3474 - 1 1
3475 + 1 2
3476 ",
3477 ));
3478 let chunk = hash_join.next_unwrap_ready_chunk()?;
3479 let chunk = chunk.compact_vis();
3480 assert_eq!(
3481 chunk,
3482 StreamChunk::from_pretty(
3483 " I I I I
3484 - 1 1 1 1
3485 + 1 2 1 1
3486 "
3487 )
3488 );
3489
3490 Ok(())
3491 }
3492
3493 #[tokio::test]
3494 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3495 {
3496 let chunk_l1 = StreamChunk::from_pretty(
3497 " I I
3498 + 1 4
3499 + 2 5
3500 + 3 6
3501 + 3 7",
3502 );
3503 let chunk_l2 = StreamChunk::from_pretty(
3504 " I I
3505 + 3 8
3506 - 3 8
3507 - 1 4", );
3509 let chunk_r1 = StreamChunk::from_pretty(
3510 " I I
3511 + 2 6
3512 + 4 8
3513 + 3 4",
3514 );
3515 let chunk_r2 = StreamChunk::from_pretty(
3516 " I I
3517 + 5 10
3518 - 5 10
3519 + 1 2",
3520 );
3521 let (mut tx_l, mut tx_r, mut hash_join) =
3522 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3523
3524 tx_l.push_barrier(test_epoch(1), false);
3526 tx_r.push_barrier(test_epoch(1), false);
3527 hash_join.next_unwrap_ready_barrier()?;
3528
3529 tx_l.push_chunk(chunk_l1);
3531 let chunk = hash_join.next_unwrap_ready_chunk()?;
3532 assert_eq!(
3533 chunk,
3534 StreamChunk::from_pretty(
3535 " I I I I
3536 + 1 4 . .
3537 + 2 5 . .
3538 + 3 6 . .
3539 + 3 7 . ."
3540 )
3541 );
3542
3543 tx_l.push_chunk(chunk_l2);
3545 let chunk = hash_join.next_unwrap_ready_chunk()?;
3546 assert_eq!(
3547 chunk,
3548 StreamChunk::from_pretty(
3549 " I I I I
3550 + 3 8 . . D
3551 - 3 8 . . D
3552 - 1 4 . ."
3553 )
3554 );
3555
3556 tx_r.push_chunk(chunk_r1);
3558 let chunk = hash_join.next_unwrap_ready_chunk()?;
3559 assert_eq!(
3560 chunk,
3561 StreamChunk::from_pretty(
3562 " I I I I
3563 - 2 5 . .
3564 + 2 5 2 6
3565 + . . 4 8
3566 + . . 3 4" )
3570 );
3571
3572 tx_r.push_chunk(chunk_r2);
3574 let chunk = hash_join.next_unwrap_ready_chunk()?;
3575 assert_eq!(
3576 chunk,
3577 StreamChunk::from_pretty(
3578 " I I I I
3579 + . . 5 10 D
3580 - . . 5 10 D
3581 + . . 1 2" )
3584 );
3585
3586 Ok(())
3587 }
3588
3589 #[tokio::test]
3590 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3591 let chunk_l1 = StreamChunk::from_pretty(
3592 " I I
3593 + 1 4
3594 + 2 10
3595 + 3 6",
3596 );
3597 let chunk_l2 = StreamChunk::from_pretty(
3598 " I I
3599 + 3 8
3600 - 3 8",
3601 );
3602 let chunk_r1 = StreamChunk::from_pretty(
3603 " I I
3604 + 2 7
3605 + 4 8
3606 + 6 9",
3607 );
3608 let chunk_r2 = StreamChunk::from_pretty(
3609 " I I
3610 + 3 10
3611 + 6 11",
3612 );
3613 let (mut tx_l, mut tx_r, mut hash_join) =
3614 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3615
3616 tx_l.push_barrier(test_epoch(1), false);
3618 tx_r.push_barrier(test_epoch(1), false);
3619 hash_join.next_unwrap_ready_barrier()?;
3620
3621 tx_l.push_chunk(chunk_l1);
3623 hash_join.next_unwrap_pending();
3624
3625 tx_l.push_chunk(chunk_l2);
3627 hash_join.next_unwrap_pending();
3628
3629 tx_r.push_chunk(chunk_r1);
3631 hash_join.next_unwrap_pending();
3632
3633 tx_r.push_chunk(chunk_r2);
3635 let chunk = hash_join.next_unwrap_ready_chunk()?;
3636 assert_eq!(
3637 chunk,
3638 StreamChunk::from_pretty(
3639 " I I I I
3640 + 3 6 3 10"
3641 )
3642 );
3643
3644 Ok(())
3645 }
3646
3647 #[tokio::test]
3648 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3649 let (mut tx_l, mut tx_r, mut hash_join) =
3650 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3651
3652 tx_l.push_barrier(test_epoch(1), false);
3654 tx_r.push_barrier(test_epoch(1), false);
3655 hash_join.next_unwrap_ready_barrier()?;
3656
3657 tx_l.push_int64_watermark(0, 100);
3658
3659 tx_l.push_int64_watermark(0, 200);
3660
3661 tx_l.push_barrier(test_epoch(2), false);
3662 tx_r.push_barrier(test_epoch(2), false);
3663 hash_join.next_unwrap_ready_barrier()?;
3664
3665 tx_r.push_int64_watermark(0, 50);
3666
3667 let w1 = hash_join.next().await.unwrap().unwrap();
3668 let w1 = w1.as_watermark().unwrap();
3669
3670 let w2 = hash_join.next().await.unwrap().unwrap();
3671 let w2 = w2.as_watermark().unwrap();
3672
3673 tx_r.push_int64_watermark(0, 100);
3674
3675 let w3 = hash_join.next().await.unwrap().unwrap();
3676 let w3 = w3.as_watermark().unwrap();
3677
3678 let w4 = hash_join.next().await.unwrap().unwrap();
3679 let w4 = w4.as_watermark().unwrap();
3680
3681 assert_eq!(
3682 w1,
3683 &Watermark {
3684 col_idx: 2,
3685 data_type: DataType::Int64,
3686 val: ScalarImpl::Int64(50)
3687 }
3688 );
3689
3690 assert_eq!(
3691 w2,
3692 &Watermark {
3693 col_idx: 0,
3694 data_type: DataType::Int64,
3695 val: ScalarImpl::Int64(50)
3696 }
3697 );
3698
3699 assert_eq!(
3700 w3,
3701 &Watermark {
3702 col_idx: 2,
3703 data_type: DataType::Int64,
3704 val: ScalarImpl::Int64(100)
3705 }
3706 );
3707
3708 assert_eq!(
3709 w4,
3710 &Watermark {
3711 col_idx: 0,
3712 data_type: DataType::Int64,
3713 val: ScalarImpl::Int64(100)
3714 }
3715 );
3716
3717 Ok(())
3718 }
3719
3720 async fn create_executor_with_evict_interval<const T: JoinTypePrimitive>(
3721 evict_interval: u32,
3722 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
3723 let schema = Schema {
3724 fields: vec![
3725 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
3727 ],
3728 };
3729 let (tx_l, source_l) = MockSource::channel();
3730 let source_l = source_l.into_executor(schema.clone(), vec![1]);
3731 let (tx_r, source_r) = MockSource::channel();
3732 let source_r = source_r.into_executor(schema, vec![1]);
3733 let params_l = JoinParams::new(vec![0], vec![1]);
3734 let params_r = JoinParams::new(vec![0], vec![1]);
3735
3736 let mem_state = MemoryStateStore::new();
3737
3738 let (state_l, degree_state_l) = create_in_memory_state_table(
3739 mem_state.clone(),
3740 &[DataType::Int64, DataType::Int64],
3741 &[OrderType::ascending(), OrderType::ascending()],
3742 &[0, 1],
3743 0,
3744 )
3745 .await;
3746
3747 let (state_r, degree_state_r) = create_in_memory_state_table(
3748 mem_state,
3749 &[DataType::Int64, DataType::Int64],
3750 &[OrderType::ascending(), OrderType::ascending()],
3751 &[0, 1],
3752 2,
3753 )
3754 .await;
3755
3756 let schema = match T {
3757 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
3758 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
3759 _ => [source_l.schema().fields(), source_r.schema().fields()]
3760 .concat()
3761 .into_iter()
3762 .collect(),
3763 };
3764 let schema_len = schema.len();
3765 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
3766
3767 let mut streaming_config = StreamingConfig::default();
3768 streaming_config.developer.join_hash_map_evict_interval_rows = evict_interval;
3769
3770 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
3771 ActorContext::for_test_with_config(123, streaming_config),
3772 info,
3773 source_l,
3774 source_r,
3775 params_l,
3776 params_r,
3777 vec![false],
3778 (0..schema_len).collect_vec(),
3779 None,
3780 vec![],
3781 state_l,
3782 degree_state_l,
3783 state_r,
3784 degree_state_r,
3785 Arc::new(AtomicU64::new(0)),
3786 false,
3787 Arc::new(StreamingMetrics::unused()),
3788 1024,
3789 2048,
3790 vec![(0, true)],
3791 );
3792 (tx_l, tx_r, executor.boxed().execute())
3793 }
3794
3795 #[tokio::test]
3798 async fn test_hash_join_evict_interval_disabled() -> StreamExecutorResult<()> {
3799 let chunk_l = StreamChunk::from_pretty(
3800 " I I
3801 + 1 4
3802 + 2 5
3803 + 3 6",
3804 );
3805 let chunk_r = StreamChunk::from_pretty(
3806 " I I
3807 + 2 7
3808 + 3 8",
3809 );
3810
3811 let (mut tx_l, mut tx_r, mut hash_join) =
3813 create_executor_with_evict_interval::<{ JoinType::Inner }>(0).await;
3814
3815 tx_l.push_barrier(test_epoch(1), false);
3816 tx_r.push_barrier(test_epoch(1), false);
3817 hash_join.next_unwrap_ready_barrier()?;
3818
3819 tx_l.push_chunk(chunk_l);
3820 hash_join.next_unwrap_pending();
3821
3822 tx_r.push_chunk(chunk_r);
3823 let chunk = hash_join.next_unwrap_ready_chunk()?;
3824 assert_eq!(
3825 chunk,
3826 StreamChunk::from_pretty(
3827 " I I I I
3828 + 2 5 2 7
3829 + 3 6 3 8"
3830 )
3831 );
3832
3833 Ok(())
3834 }
3835
3836 #[tokio::test]
3839 async fn test_hash_join_evict_interval_one() -> StreamExecutorResult<()> {
3840 let chunk_l = StreamChunk::from_pretty(
3841 " I I
3842 + 1 4
3843 + 2 5
3844 + 3 6",
3845 );
3846 let chunk_r = StreamChunk::from_pretty(
3847 " I I
3848 + 2 7
3849 + 3 8",
3850 );
3851
3852 let (mut tx_l, mut tx_r, mut hash_join) =
3854 create_executor_with_evict_interval::<{ JoinType::Inner }>(1).await;
3855
3856 tx_l.push_barrier(test_epoch(1), false);
3857 tx_r.push_barrier(test_epoch(1), false);
3858 hash_join.next_unwrap_ready_barrier()?;
3859
3860 tx_l.push_chunk(chunk_l);
3861 hash_join.next_unwrap_pending();
3862
3863 tx_r.push_chunk(chunk_r);
3864 let chunk = hash_join.next_unwrap_ready_chunk()?;
3865 assert_eq!(
3866 chunk,
3867 StreamChunk::from_pretty(
3868 " I I I I
3869 + 2 5 2 7
3870 + 3 6 3 8"
3871 )
3872 );
3873
3874 Ok(())
3875 }
3876
3877 #[tokio::test]
3880 async fn test_hash_join_evict_interval_custom() -> StreamExecutorResult<()> {
3881 let chunk_l = StreamChunk::from_pretty(
3882 " I I
3883 + 1 4
3884 + 2 5
3885 + 3 6
3886 + 4 7
3887 + 5 8",
3888 );
3889 let chunk_r = StreamChunk::from_pretty(
3890 " I I
3891 + 1 9
3892 + 3 10
3893 + 5 11",
3894 );
3895
3896 let (mut tx_l, mut tx_r, mut hash_join) =
3898 create_executor_with_evict_interval::<{ JoinType::Inner }>(2).await;
3899
3900 tx_l.push_barrier(test_epoch(1), false);
3901 tx_r.push_barrier(test_epoch(1), false);
3902 hash_join.next_unwrap_ready_barrier()?;
3903
3904 tx_l.push_chunk(chunk_l);
3905 hash_join.next_unwrap_pending();
3906
3907 tx_r.push_chunk(chunk_r);
3908 let chunk = hash_join.next_unwrap_ready_chunk()?;
3909 assert_eq!(
3910 chunk,
3911 StreamChunk::from_pretty(
3912 " I I I I
3913 + 1 4 1 9
3914 + 3 6 3 10
3915 + 5 8 5 11"
3916 )
3917 );
3918
3919 Ok(())
3920 }
3921}