1use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45const EVICT_EVERY_N_ROWS: u32 = 16;
47
48fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
49 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
50}
51
52#[derive(Debug, Clone)]
55pub struct InequalityPairInfo {
56 pub left_idx: usize,
58 pub right_idx: usize,
60 pub clean_left_state: bool,
62 pub clean_right_state: bool,
64 pub op: InequalityType,
66}
67
68impl InequalityPairInfo {
69 pub fn left_side_is_larger(&self) -> bool {
71 matches!(
72 self.op,
73 InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
74 )
75 }
76}
77
78pub struct JoinParams {
79 pub join_key_indices: Vec<usize>,
81 pub deduped_pk_indices: Vec<usize>,
83}
84
85impl JoinParams {
86 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
87 Self {
88 join_key_indices,
89 deduped_pk_indices,
90 }
91 }
92}
93
94struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
95 ht: JoinHashMap<K, S, E>,
97 join_key_indices: Vec<usize>,
99 all_data_types: Vec<DataType>,
101 start_pos: usize,
103 i2o_mapping: Vec<(usize, usize)>,
105 i2o_mapping_indexed: MultiMap<usize, usize>,
106 input2inequality_index: Vec<Vec<(usize, bool)>>,
114 non_null_fields: Vec<usize>,
116 state_clean_columns: Vec<(usize, usize)>,
119 need_degree_table: bool,
121 _marker: std::marker::PhantomData<E>,
122}
123
124impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("JoinSide")
127 .field("join_key_indices", &self.join_key_indices)
128 .field("col_types", &self.all_data_types)
129 .field("start_pos", &self.start_pos)
130 .field("i2o_mapping", &self.i2o_mapping)
131 .field("need_degree_table", &self.need_degree_table)
132 .finish()
133 }
134}
135
136impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
137 fn is_dirty(&self) -> bool {
139 unimplemented!()
140 }
141
142 #[expect(dead_code)]
143 fn clear_cache(&mut self) {
144 assert!(
145 !self.is_dirty(),
146 "cannot clear cache while states of hash join are dirty"
147 );
148
149 }
152
153 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
154 self.ht.init(epoch).await
155 }
156}
157
158pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
161{
162 ctx: ActorContextRef,
163 info: ExecutorInfo,
164
165 input_l: Option<Executor>,
167 input_r: Option<Executor>,
169 actual_output_data_types: Vec<DataType>,
171 side_l: JoinSide<K, S, E>,
173 side_r: JoinSide<K, S, E>,
175 cond: Option<NonStrictExpression>,
177 inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
180 inequality_watermarks: Vec<Option<Watermark>>,
184 watermark_indices_in_jk: Vec<(usize, bool)>,
187
188 append_only_optimize: bool,
190
191 metrics: Arc<StreamingMetrics>,
192 chunk_size: usize,
194 cnt_rows_received: u32,
196
197 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
199
200 high_join_amplification_threshold: usize,
202
203 entry_state_max_rows: usize,
205}
206
207impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
208 for HashJoinExecutor<K, S, T, E>
209{
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("HashJoinExecutor")
212 .field("join_type", &T)
213 .field("input_left", &self.input_l.as_ref().unwrap().identity())
214 .field("input_right", &self.input_r.as_ref().unwrap().identity())
215 .field("side_l", &self.side_l)
216 .field("side_r", &self.side_r)
217 .field("stream_key", &self.info.stream_key)
218 .field("schema", &self.info.schema)
219 .field("actual_output_data_types", &self.actual_output_data_types)
220 .finish()
221 }
222}
223
224impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
225 for HashJoinExecutor<K, S, T, E>
226{
227 fn execute(self: Box<Self>) -> BoxedMessageStream {
228 self.into_stream().boxed()
229 }
230}
231
232struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
233 ctx: &'a ActorContextRef,
234 side_l: &'a mut JoinSide<K, S, E>,
235 side_r: &'a mut JoinSide<K, S, E>,
236 actual_output_data_types: &'a [DataType],
237 cond: &'a mut Option<NonStrictExpression>,
238 inequality_watermarks: &'a [Option<Watermark>],
239 chunk: StreamChunk,
240 append_only_optimize: bool,
241 chunk_size: usize,
242 cnt_rows_received: &'a mut u32,
243 high_join_amplification_threshold: usize,
244 entry_state_max_rows: usize,
245}
246
247impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
248 HashJoinExecutor<K, S, T, E>
249{
250 #[allow(clippy::too_many_arguments)]
251 pub fn new(
252 ctx: ActorContextRef,
253 info: ExecutorInfo,
254 input_l: Executor,
255 input_r: Executor,
256 params_l: JoinParams,
257 params_r: JoinParams,
258 null_safe: Vec<bool>,
259 output_indices: Vec<usize>,
260 cond: Option<NonStrictExpression>,
261 inequality_pairs: Vec<InequalityPairInfo>,
262 state_table_l: StateTable<S>,
263 degree_state_table_l: StateTable<S>,
264 state_table_r: StateTable<S>,
265 degree_state_table_r: StateTable<S>,
266 watermark_epoch: AtomicU64Ref,
267 is_append_only: bool,
268 metrics: Arc<StreamingMetrics>,
269 chunk_size: usize,
270 high_join_amplification_threshold: usize,
271 watermark_indices_in_jk: Vec<(usize, bool)>,
272 ) -> Self {
273 Self::new_with_cache_size(
274 ctx,
275 info,
276 input_l,
277 input_r,
278 params_l,
279 params_r,
280 null_safe,
281 output_indices,
282 cond,
283 inequality_pairs,
284 state_table_l,
285 degree_state_table_l,
286 state_table_r,
287 degree_state_table_r,
288 watermark_epoch,
289 is_append_only,
290 metrics,
291 chunk_size,
292 high_join_amplification_threshold,
293 None,
294 watermark_indices_in_jk,
295 )
296 }
297
298 #[allow(clippy::too_many_arguments)]
299 pub fn new_with_cache_size(
300 ctx: ActorContextRef,
301 info: ExecutorInfo,
302 input_l: Executor,
303 input_r: Executor,
304 params_l: JoinParams,
305 params_r: JoinParams,
306 null_safe: Vec<bool>,
307 output_indices: Vec<usize>,
308 cond: Option<NonStrictExpression>,
309 inequality_pairs: Vec<InequalityPairInfo>,
310 state_table_l: StateTable<S>,
311 degree_state_table_l: StateTable<S>,
312 state_table_r: StateTable<S>,
313 degree_state_table_r: StateTable<S>,
314 watermark_epoch: AtomicU64Ref,
315 is_append_only: bool,
316 metrics: Arc<StreamingMetrics>,
317 chunk_size: usize,
318 high_join_amplification_threshold: usize,
319 entry_state_max_rows: Option<usize>,
320 watermark_indices_in_jk: Vec<(usize, bool)>,
321 ) -> Self {
322 let entry_state_max_rows = match entry_state_max_rows {
323 None => ctx.config.developer.hash_join_entry_state_max_rows,
324 Some(entry_state_max_rows) => entry_state_max_rows,
325 };
326 let side_l_column_n = input_l.schema().len();
327
328 let schema_fields = match T {
329 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
330 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
331 _ => [
332 input_l.schema().fields.clone(),
333 input_r.schema().fields.clone(),
334 ]
335 .concat(),
336 };
337
338 let original_output_data_types = schema_fields
339 .iter()
340 .map(|field| field.data_type())
341 .collect_vec();
342 let actual_output_data_types = output_indices
343 .iter()
344 .map(|&idx| original_output_data_types[idx].clone())
345 .collect_vec();
346
347 let state_all_data_types_l = input_l.schema().data_types();
349 let state_all_data_types_r = input_r.schema().data_types();
350
351 let state_pk_indices_l = input_l.stream_key().to_vec();
352 let state_pk_indices_r = input_r.stream_key().to_vec();
353
354 let state_join_key_indices_l = params_l.join_key_indices;
355 let state_join_key_indices_r = params_r.join_key_indices;
356
357 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
358 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
359
360 let degree_pk_indices_l = (state_join_key_indices_l.len()
361 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
362 .collect_vec();
363 let degree_pk_indices_r = (state_join_key_indices_r.len()
364 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
365 .collect_vec();
366
367 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
369 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
370
371 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
373
374 let join_key_data_types_l = state_join_key_indices_l
375 .iter()
376 .map(|idx| state_all_data_types_l[*idx].clone())
377 .collect_vec();
378
379 let join_key_data_types_r = state_join_key_indices_r
380 .iter()
381 .map(|idx| state_all_data_types_r[*idx].clone())
382 .collect_vec();
383
384 assert_eq!(join_key_data_types_l, join_key_data_types_r);
385
386 let null_matched = K::Bitmap::from_bool_vec(null_safe);
387
388 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
389 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
390
391 let (left_to_output, right_to_output) = {
392 let (left_len, right_len) = if is_left_semi_or_anti(T) {
393 (state_all_data_types_l.len(), 0usize)
394 } else if is_right_semi_or_anti(T) {
395 (0usize, state_all_data_types_r.len())
396 } else {
397 (state_all_data_types_l.len(), state_all_data_types_r.len())
398 };
399 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
400 };
401
402 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
403 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
404
405 let left_input_len = input_l.schema().len();
406 let right_input_len = input_r.schema().len();
407 let mut l2inequality_index = vec![vec![]; left_input_len];
408 let mut r2inequality_index = vec![vec![]; right_input_len];
409 let mut l_inequal_state_clean_columns = vec![];
410 let mut r_inequal_state_clean_columns = vec![];
411 let inequality_pairs = inequality_pairs
412 .into_iter()
413 .enumerate()
414 .map(|(index, pair)| {
415 let left_is_larger = pair.left_side_is_larger();
421 l2inequality_index[pair.left_idx].push((index, !left_is_larger));
422 r2inequality_index[pair.right_idx].push((index, left_is_larger));
423
424 if pair.clean_left_state {
426 l_inequal_state_clean_columns.push((pair.left_idx, index));
427 }
428 if pair.clean_right_state {
429 r_inequal_state_clean_columns.push((pair.right_idx, index));
430 }
431
432 let output_indices = if pair.left_side_is_larger() {
435 l2o_indexed
437 .get_vec(&pair.left_idx)
438 .cloned()
439 .unwrap_or_default()
440 } else {
441 r2o_indexed
443 .get_vec(&pair.right_idx)
444 .cloned()
445 .unwrap_or_default()
446 };
447
448 (output_indices, pair)
449 })
450 .collect_vec();
451
452 let mut l_non_null_fields = l2inequality_index
453 .iter()
454 .positions(|inequalities| !inequalities.is_empty())
455 .collect_vec();
456 let mut r_non_null_fields = r2inequality_index
457 .iter()
458 .positions(|inequalities| !inequalities.is_empty())
459 .collect_vec();
460
461 if append_only_optimize {
462 l_inequal_state_clean_columns.clear();
463 r_inequal_state_clean_columns.clear();
464 l_non_null_fields.clear();
465 r_non_null_fields.clear();
466 }
467
468 let l_inequality_idx = l_inequal_state_clean_columns
472 .first()
473 .map(|(col_idx, _)| *col_idx);
474 let r_inequality_idx = r_inequal_state_clean_columns
475 .first()
476 .map(|(col_idx, _)| *col_idx);
477
478 let degree_state_l = need_degree_table_l.then(|| {
479 TableInner::new(
480 degree_pk_indices_l,
481 degree_join_key_indices_l,
482 degree_state_table_l,
483 l_inequality_idx,
484 )
485 });
486 let degree_state_r = need_degree_table_r.then(|| {
487 TableInner::new(
488 degree_pk_indices_r,
489 degree_join_key_indices_r,
490 degree_state_table_r,
491 r_inequality_idx,
492 )
493 });
494
495 let inequality_watermarks = vec![None; inequality_pairs.len()];
496 let watermark_buffers = BTreeMap::new();
497 Self {
498 ctx: ctx.clone(),
499 info,
500 input_l: Some(input_l),
501 input_r: Some(input_r),
502 actual_output_data_types,
503 side_l: JoinSide {
504 ht: JoinHashMap::new(
505 watermark_epoch.clone(),
506 join_key_data_types_l,
507 state_join_key_indices_l.clone(),
508 state_all_data_types_l.clone(),
509 state_table_l,
510 params_l.deduped_pk_indices,
511 degree_state_l,
512 null_matched.clone(),
513 pk_contained_in_jk_l,
514 None,
515 metrics.clone(),
516 ctx.id,
517 ctx.fragment_id,
518 "left",
519 ),
520 join_key_indices: state_join_key_indices_l,
521 all_data_types: state_all_data_types_l,
522 i2o_mapping: left_to_output,
523 i2o_mapping_indexed: l2o_indexed,
524 input2inequality_index: l2inequality_index,
525 non_null_fields: l_non_null_fields,
526 state_clean_columns: l_inequal_state_clean_columns,
527 start_pos: 0,
528 need_degree_table: need_degree_table_l,
529 _marker: PhantomData,
530 },
531 side_r: JoinSide {
532 ht: JoinHashMap::new(
533 watermark_epoch,
534 join_key_data_types_r,
535 state_join_key_indices_r.clone(),
536 state_all_data_types_r.clone(),
537 state_table_r,
538 params_r.deduped_pk_indices,
539 degree_state_r,
540 null_matched,
541 pk_contained_in_jk_r,
542 None,
543 metrics.clone(),
544 ctx.id,
545 ctx.fragment_id,
546 "right",
547 ),
548 join_key_indices: state_join_key_indices_r,
549 all_data_types: state_all_data_types_r,
550 start_pos: side_l_column_n,
551 i2o_mapping: right_to_output,
552 i2o_mapping_indexed: r2o_indexed,
553 input2inequality_index: r2inequality_index,
554 non_null_fields: r_non_null_fields,
555 state_clean_columns: r_inequal_state_clean_columns,
556 need_degree_table: need_degree_table_r,
557 _marker: PhantomData,
558 },
559 cond,
560 inequality_pairs,
561 inequality_watermarks,
562 watermark_indices_in_jk,
563 append_only_optimize,
564 metrics,
565 chunk_size,
566 cnt_rows_received: 0,
567 watermark_buffers,
568 high_join_amplification_threshold,
569 entry_state_max_rows,
570 }
571 }
572
573 #[try_stream(ok = Message, error = StreamExecutorError)]
574 async fn into_stream(mut self) {
575 let input_l = self.input_l.take().unwrap();
576 let input_r = self.input_r.take().unwrap();
577 let aligned_stream = barrier_align(
578 input_l.execute(),
579 input_r.execute(),
580 self.ctx.id,
581 self.ctx.fragment_id,
582 self.metrics.clone(),
583 "Join",
584 );
585 pin_mut!(aligned_stream);
586
587 let actor_id = self.ctx.id;
588
589 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
590 let first_epoch = barrier.epoch;
591 yield Message::Barrier(barrier);
593 self.side_l.init(first_epoch).await?;
594 self.side_r.init(first_epoch).await?;
595
596 let actor_id_str = self.ctx.id.to_string();
597 let fragment_id_str = self.ctx.fragment_id.to_string();
598
599 let join_actor_input_waiting_duration_ns = self
601 .metrics
602 .join_actor_input_waiting_duration_ns
603 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
604 let left_join_match_duration_ns = self
605 .metrics
606 .join_match_duration_ns
607 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
608 let right_join_match_duration_ns = self
609 .metrics
610 .join_match_duration_ns
611 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
612
613 let barrier_join_match_duration_ns = self
614 .metrics
615 .join_match_duration_ns
616 .with_guarded_label_values(&[
617 actor_id_str.as_str(),
618 fragment_id_str.as_str(),
619 "barrier",
620 ]);
621
622 let left_join_cached_entry_count = self
623 .metrics
624 .join_cached_entry_count
625 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
626
627 let right_join_cached_entry_count = self
628 .metrics
629 .join_cached_entry_count
630 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
631
632 let mut start_time = Instant::now();
633
634 while let Some(msg) = aligned_stream
635 .next()
636 .instrument_await("hash_join_barrier_align")
637 .await
638 {
639 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
640 match msg? {
641 AlignedMessage::WatermarkLeft(watermark) => {
642 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
643 yield Message::Watermark(watermark_to_emit);
644 }
645 }
646 AlignedMessage::WatermarkRight(watermark) => {
647 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
648 yield Message::Watermark(watermark_to_emit);
649 }
650 }
651 AlignedMessage::Left(chunk) => {
652 let mut left_time = Duration::from_nanos(0);
653 let mut left_start_time = Instant::now();
654 #[for_await]
655 for chunk in Self::eq_join_left(EqJoinArgs {
656 ctx: &self.ctx,
657 side_l: &mut self.side_l,
658 side_r: &mut self.side_r,
659 actual_output_data_types: &self.actual_output_data_types,
660 cond: &mut self.cond,
661 inequality_watermarks: &self.inequality_watermarks,
662 chunk,
663 append_only_optimize: self.append_only_optimize,
664 chunk_size: self.chunk_size,
665 cnt_rows_received: &mut self.cnt_rows_received,
666 high_join_amplification_threshold: self.high_join_amplification_threshold,
667 entry_state_max_rows: self.entry_state_max_rows,
668 }) {
669 left_time += left_start_time.elapsed();
670 yield Message::Chunk(chunk?);
671 left_start_time = Instant::now();
672 }
673 left_time += left_start_time.elapsed();
674 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
675 self.try_flush_data().await?;
676 }
677 AlignedMessage::Right(chunk) => {
678 let mut right_time = Duration::from_nanos(0);
679 let mut right_start_time = Instant::now();
680 #[for_await]
681 for chunk in Self::eq_join_right(EqJoinArgs {
682 ctx: &self.ctx,
683 side_l: &mut self.side_l,
684 side_r: &mut self.side_r,
685 actual_output_data_types: &self.actual_output_data_types,
686 cond: &mut self.cond,
687 inequality_watermarks: &self.inequality_watermarks,
688 chunk,
689 append_only_optimize: self.append_only_optimize,
690 chunk_size: self.chunk_size,
691 cnt_rows_received: &mut self.cnt_rows_received,
692 high_join_amplification_threshold: self.high_join_amplification_threshold,
693 entry_state_max_rows: self.entry_state_max_rows,
694 }) {
695 right_time += right_start_time.elapsed();
696 yield Message::Chunk(chunk?);
697 right_start_time = Instant::now();
698 }
699 right_time += right_start_time.elapsed();
700 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
701 self.try_flush_data().await?;
702 }
703 AlignedMessage::Barrier(barrier) => {
704 let barrier_start_time = Instant::now();
705 let (left_post_commit, right_post_commit) =
706 self.flush_data(barrier.epoch).await?;
707
708 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
709
710 barrier_join_match_duration_ns
713 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
714 yield Message::Barrier(barrier);
715
716 right_post_commit
718 .post_yield_barrier(update_vnode_bitmap.clone())
719 .await?;
720 if left_post_commit
721 .post_yield_barrier(update_vnode_bitmap)
722 .await?
723 .unwrap_or(false)
724 {
725 self.watermark_buffers
726 .values_mut()
727 .for_each(|buffers| buffers.clear());
728 self.inequality_watermarks.fill(None);
729 }
730
731 for (join_cached_entry_count, ht) in [
733 (&left_join_cached_entry_count, &self.side_l.ht),
734 (&right_join_cached_entry_count, &self.side_r.ht),
735 ] {
736 join_cached_entry_count.set(ht.entry_count() as i64);
737 }
738 }
739 }
740 start_time = Instant::now();
741 }
742 }
743
744 async fn flush_data(
745 &mut self,
746 epoch: EpochPair,
747 ) -> StreamExecutorResult<(
748 JoinHashMapPostCommit<'_, K, S, E>,
749 JoinHashMapPostCommit<'_, K, S, E>,
750 )> {
751 let left = self.side_l.ht.flush(epoch).await?;
754 let right = self.side_r.ht.flush(epoch).await?;
755 Ok((left, right))
756 }
757
758 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
759 self.side_l.ht.try_flush().await?;
762 self.side_r.ht.try_flush().await?;
763 Ok(())
764 }
765
766 fn evict_cache(
768 side_update: &mut JoinSide<K, S, E>,
769 side_match: &mut JoinSide<K, S, E>,
770 cnt_rows_received: &mut u32,
771 ) {
772 *cnt_rows_received += 1;
773 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
774 side_update.ht.evict();
775 side_match.ht.evict();
776 *cnt_rows_received = 0;
777 }
778 }
779
780 fn handle_watermark(
781 &mut self,
782 side: SideTypePrimitive,
783 watermark: Watermark,
784 ) -> StreamExecutorResult<Vec<Watermark>> {
785 let (side_update, side_match) = if side == SideType::Left {
786 (&mut self.side_l, &mut self.side_r)
787 } else {
788 (&mut self.side_r, &mut self.side_l)
789 };
790
791 let wm_in_jk = side_update
793 .join_key_indices
794 .iter()
795 .positions(|idx| *idx == watermark.col_idx);
796 let mut watermarks_to_emit = vec![];
797 for idx in wm_in_jk {
798 let buffers = self
799 .watermark_buffers
800 .entry(idx)
801 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
802 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
803 if self
804 .watermark_indices_in_jk
805 .iter()
806 .any(|(jk_pos, do_clean)| *jk_pos == idx && *do_clean)
807 {
808 side_match
809 .ht
810 .update_watermark(selected_watermark.val.clone());
811 side_update
812 .ht
813 .update_watermark(selected_watermark.val.clone());
814 }
815
816 let empty_indices = vec![];
817 let output_indices = side_update
818 .i2o_mapping_indexed
819 .get_vec(&side_update.join_key_indices[idx])
820 .unwrap_or(&empty_indices)
821 .iter()
822 .chain(
823 side_match
824 .i2o_mapping_indexed
825 .get_vec(&side_match.join_key_indices[idx])
826 .unwrap_or(&empty_indices),
827 );
828 for output_idx in output_indices {
829 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
830 }
831 };
832 }
833
834 let mut update_left_watermark = None;
839 let mut update_right_watermark = None;
840 if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
841 for (inequality_index, _) in watermark_indices {
842 let buffers = self
843 .watermark_buffers
844 .entry(side_update.join_key_indices.len() + inequality_index)
845 .or_insert_with(|| {
846 BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
847 });
848 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
849 {
850 let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
851 let left_is_larger = pair_info.left_side_is_larger();
852
853 for output_idx in output_indices {
855 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
856 }
857 self.inequality_watermarks[*inequality_index] =
859 Some(selected_watermark.clone());
860
861 if left_is_larger && pair_info.clean_left_state {
863 update_left_watermark = Some(selected_watermark.val.clone());
864 } else if !left_is_larger && pair_info.clean_right_state {
865 update_right_watermark = Some(selected_watermark.val.clone());
866 }
867 }
868 }
869 if let Some(val) = update_left_watermark {
873 self.side_l.ht.update_watermark(val);
874 }
875 if let Some(val) = update_right_watermark {
876 self.side_r.ht.update_watermark(val);
877 }
878 }
879 Ok(watermarks_to_emit)
880 }
881
882 fn row_concat(
883 row_update: impl Row,
884 update_start_pos: usize,
885 row_matched: impl Row,
886 matched_start_pos: usize,
887 ) -> OwnedRow {
888 let mut new_row = vec![None; row_update.len() + row_matched.len()];
889
890 for (i, datum_ref) in row_update.iter().enumerate() {
891 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
892 }
893 for (i, datum_ref) in row_matched.iter().enumerate() {
894 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
895 }
896 OwnedRow::new(new_row)
897 }
898
899 fn eq_join_left(
901 args: EqJoinArgs<'_, K, S, E>,
902 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
903 Self::eq_join_oneside::<{ SideType::Left }>(args)
904 }
905
906 fn eq_join_right(
908 args: EqJoinArgs<'_, K, S, E>,
909 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
910 Self::eq_join_oneside::<{ SideType::Right }>(args)
911 }
912
913 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
914 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
915 let EqJoinArgs {
916 ctx,
917 side_l,
918 side_r,
919 actual_output_data_types,
920 cond,
921 inequality_watermarks,
922 chunk,
923 append_only_optimize,
924 chunk_size,
925 cnt_rows_received,
926 high_join_amplification_threshold,
927 entry_state_max_rows,
928 ..
929 } = args;
930
931 let (side_update, side_match) = if SIDE == SideType::Left {
932 (side_l, side_r)
933 } else {
934 (side_r, side_l)
935 };
936
937 let useful_state_clean_columns = side_match
938 .state_clean_columns
939 .iter()
940 .filter_map(|(column_idx, inequality_index)| {
941 inequality_watermarks[*inequality_index]
942 .as_ref()
943 .map(|watermark| (*column_idx, watermark))
944 })
945 .collect_vec();
946
947 let mut hashjoin_chunk_builder =
948 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
949 chunk_size,
950 actual_output_data_types.to_vec(),
951 side_update.i2o_mapping.clone(),
952 side_match.i2o_mapping.clone(),
953 ));
954
955 let join_matched_join_keys = ctx
956 .streaming_metrics
957 .join_matched_join_keys
958 .with_guarded_label_values(&[
959 &ctx.id.to_string(),
960 &ctx.fragment_id.to_string(),
961 &side_update.ht.table_id().to_string(),
962 ]);
963
964 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
965 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
966 let Some((op, row)) = r else {
967 continue;
968 };
969 Self::evict_cache(side_update, side_match, cnt_rows_received);
970
971 let cache_lookup_result = {
972 let probe_non_null_requirement_satisfied = side_update
973 .non_null_fields
974 .iter()
975 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
976 let build_non_null_requirement_satisfied =
977 key.null_bitmap().is_subset(side_match.ht.null_matched());
978 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
979 side_match.ht.take_state_opt(key)
980 } else {
981 CacheResult::NeverMatch
982 }
983 };
984 let mut total_matches = 0;
985
986 macro_rules! match_rows {
987 ($op:ident) => {
988 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
989 cache_lookup_result,
990 row,
991 key,
992 &mut hashjoin_chunk_builder,
993 side_match,
994 side_update,
995 &useful_state_clean_columns,
996 cond,
997 append_only_optimize,
998 entry_state_max_rows,
999 )
1000 };
1001 }
1002
1003 match op {
1004 Op::Insert | Op::UpdateInsert =>
1005 {
1006 #[for_await]
1007 for chunk in match_rows!(Insert) {
1008 let chunk = chunk?;
1009 total_matches += chunk.cardinality();
1010 yield chunk;
1011 }
1012 }
1013 Op::Delete | Op::UpdateDelete =>
1014 {
1015 #[for_await]
1016 for chunk in match_rows!(Delete) {
1017 let chunk = chunk?;
1018 total_matches += chunk.cardinality();
1019 yield chunk;
1020 }
1021 }
1022 };
1023
1024 join_matched_join_keys.observe(total_matches as _);
1025 if total_matches > high_join_amplification_threshold {
1026 let join_key_data_types = side_update.ht.join_key_data_types();
1027 let key = key.deserialize(join_key_data_types)?;
1028 tracing::warn!(target: "high_join_amplification",
1029 matched_rows_len = total_matches,
1030 update_table_id = %side_update.ht.table_id(),
1031 match_table_id = %side_match.ht.table_id(),
1032 join_key = ?key,
1033 actor_id = %ctx.id,
1034 fragment_id = %ctx.fragment_id,
1035 "large rows matched for join key"
1036 );
1037 }
1038 }
1039 if let Some(chunk) = hashjoin_chunk_builder.take() {
1041 yield chunk;
1042 }
1043 }
1044
1045 #[allow(clippy::too_many_arguments)]
1054 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1055 async fn handle_match_rows<
1056 'a,
1057 const SIDE: SideTypePrimitive,
1058 const JOIN_OP: JoinOpPrimitive,
1059 >(
1060 cached_lookup_result: CacheResult<E>,
1061 row: RowRef<'a>,
1062 key: &'a K,
1063 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1064 side_match: &'a mut JoinSide<K, S, E>,
1065 side_update: &'a mut JoinSide<K, S, E>,
1066 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1067 cond: &'a mut Option<NonStrictExpression>,
1068 append_only_optimize: bool,
1069 entry_state_max_rows: usize,
1070 ) {
1071 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1072 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1073 let mut entry_state_count = 0;
1074
1075 let mut degree = 0;
1076 let mut append_only_matched_row = None;
1077 let mut matched_rows_to_clean = vec![];
1078
1079 macro_rules! match_row {
1080 (
1081 $match_order_key_indices:expr,
1082 $degree_table:expr,
1083 $matched_row:expr,
1084 $matched_row_ref:expr,
1085 $from_cache:literal,
1086 $map_output:expr,
1087 ) => {
1088 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1089 row,
1090 $matched_row,
1091 $matched_row_ref,
1092 hashjoin_chunk_builder,
1093 $match_order_key_indices,
1094 $degree_table,
1095 side_update.start_pos,
1096 side_match.start_pos,
1097 cond,
1098 &mut degree,
1099 useful_state_clean_columns,
1100 append_only_optimize,
1101 &mut append_only_matched_row,
1102 &mut matched_rows_to_clean,
1103 $map_output,
1104 )
1105 };
1106 }
1107
1108 let entry_state = match cached_lookup_result {
1109 CacheResult::NeverMatch => {
1110 let op = match JOIN_OP {
1111 JoinOp::Insert => Op::Insert,
1112 JoinOp::Delete => Op::Delete,
1113 };
1114 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1115 yield chunk;
1116 }
1117 return Ok(());
1118 }
1119 CacheResult::Hit(mut cached_rows) => {
1120 let (match_order_key_indices, match_degree_state) =
1121 side_match.ht.get_degree_state_mut_ref();
1122 for (matched_row_ref, matched_row) in
1124 cached_rows.values_mut(&side_match.all_data_types)
1125 {
1126 let matched_row = matched_row?;
1127 if let Some(chunk) = match_row!(
1128 match_order_key_indices,
1129 match_degree_state,
1130 matched_row,
1131 Some(matched_row_ref),
1132 true,
1133 Either::Left,
1134 )
1135 .await
1136 {
1137 yield chunk;
1138 }
1139 }
1140
1141 cached_rows
1142 }
1143 CacheResult::Miss => {
1144 let (matched_rows, match_order_key_indices, degree_table) = side_match
1146 .ht
1147 .fetch_matched_rows_and_get_degree_table_ref(key)
1148 .await?;
1149
1150 #[for_await]
1151 for matched_row in matched_rows {
1152 let (encoded_pk, matched_row) = matched_row?;
1153
1154 let mut matched_row_ref = None;
1155
1156 if entry_state_count <= entry_state_max_rows {
1158 let row_ref = entry_state
1159 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1161 matched_row_ref = Some(row_ref);
1162 entry_state_count += 1;
1163 }
1164 if let Some(chunk) = match_row!(
1165 match_order_key_indices,
1166 degree_table,
1167 matched_row,
1168 matched_row_ref,
1169 false,
1170 Either::Right,
1171 )
1172 .await
1173 {
1174 yield chunk;
1175 }
1176 }
1177 Box::new(entry_state)
1178 }
1179 };
1180
1181 let op = match JOIN_OP {
1183 JoinOp::Insert => Op::Insert,
1184 JoinOp::Delete => Op::Delete,
1185 };
1186 if degree == 0 {
1187 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1188 yield chunk;
1189 }
1190 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1191 {
1192 yield chunk;
1193 }
1194
1195 if cache_hit || entry_state_count <= entry_state_max_rows {
1197 side_match.ht.update_state(key, entry_state);
1198 }
1199
1200 for matched_row in matched_rows_to_clean {
1202 side_match.ht.delete_row_in_mem(key, &matched_row.row)?;
1204 }
1205
1206 if append_only_optimize && let Some(row) = append_only_matched_row {
1208 assert_matches!(JOIN_OP, JoinOp::Insert);
1209 side_match.ht.delete_handle_degree(key, row)?;
1210 return Ok(());
1211 }
1212
1213 match JOIN_OP {
1215 JoinOp::Insert => {
1216 side_update
1217 .ht
1218 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1219 }
1220 JoinOp::Delete => {
1221 side_update
1222 .ht
1223 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1224 }
1225 }
1226 }
1227
1228 #[allow(clippy::too_many_arguments)]
1229 #[inline]
1230 async fn handle_match_row<
1231 'a,
1232 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1235 const JOIN_OP: JoinOpPrimitive,
1236 const MATCHED_ROWS_FROM_CACHE: bool,
1237 >(
1238 update_row: RowRef<'a>,
1239 mut matched_row: JoinRow<R>,
1240 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1241 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1242 match_order_key_indices: &[usize],
1243 match_degree_table: &mut Option<TableInner<S>>,
1244 side_update_start_pos: usize,
1245 side_match_start_pos: usize,
1246 cond: &Option<NonStrictExpression>,
1247 update_row_degree: &mut u64,
1248 useful_state_clean_columns: &[(usize, &'a Watermark)],
1249 append_only_optimize: bool,
1250 append_only_matched_row: &mut Option<JoinRow<RO>>,
1251 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1252 map_output: impl Fn(R) -> RO,
1253 ) -> Option<StreamChunk> {
1254 let mut need_state_clean = false;
1255 let mut chunk_opt = None;
1256 let join_condition_satisfied = Self::check_join_condition(
1260 update_row,
1261 side_update_start_pos,
1262 &matched_row.row,
1263 side_match_start_pos,
1264 cond,
1265 )
1266 .await;
1267
1268 if join_condition_satisfied {
1269 *update_row_degree += 1;
1271 if matches!(JOIN_OP, JoinOp::Insert)
1276 && !forward_exactly_once(T, SIDE)
1277 && let Some(chunk) =
1278 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1279 {
1280 chunk_opt = Some(chunk);
1281 }
1282 if let Some(degree_table) = match_degree_table {
1284 update_degree::<S, { JOIN_OP }>(
1285 match_order_key_indices,
1286 degree_table,
1287 &mut matched_row,
1288 );
1289 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1290 match JOIN_OP {
1292 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1293 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1294 }
1295 }
1296 }
1297
1298 if matches!(JOIN_OP, JoinOp::Delete)
1301 && !forward_exactly_once(T, SIDE)
1302 && let Some(chunk) =
1303 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1304 {
1305 chunk_opt = Some(chunk);
1306 }
1307 } else {
1308 for (column_idx, watermark) in useful_state_clean_columns {
1310 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1311 scalar
1312 .default_cmp(&watermark.val.as_scalar_ref_impl())
1313 .is_lt()
1314 }) {
1315 need_state_clean = true;
1316 break;
1317 }
1318 }
1319 }
1320 if append_only_optimize {
1324 assert_matches!(JOIN_OP, JoinOp::Insert);
1325 assert!(append_only_matched_row.is_none());
1328 *append_only_matched_row = Some(matched_row.map(map_output));
1329 } else if need_state_clean {
1330 debug_assert!(
1331 !append_only_optimize,
1332 "`append_only_optimize` and `need_state_clean` must not both be true"
1333 );
1334 matched_rows_to_clean.push(matched_row.map(map_output));
1335 }
1336
1337 chunk_opt
1338 }
1339
1340 #[inline]
1345 async fn check_join_condition(
1346 row: impl Row,
1347 side_update_start_pos: usize,
1348 matched_row: impl Row,
1349 side_match_start_pos: usize,
1350 join_condition: &Option<NonStrictExpression>,
1351 ) -> bool {
1352 if let Some(join_condition) = join_condition {
1353 let new_row = Self::row_concat(
1354 row,
1355 side_update_start_pos,
1356 matched_row,
1357 side_match_start_pos,
1358 );
1359 join_condition
1360 .eval_row_infallible(&new_row)
1361 .await
1362 .map(|s| *s.as_bool())
1363 .unwrap_or(false)
1364 } else {
1365 true
1366 }
1367 }
1368}
1369
1370#[cfg(test)]
1371mod tests {
1372 use std::sync::atomic::AtomicU64;
1373
1374 use pretty_assertions::assert_eq;
1375 use risingwave_common::array::*;
1376 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1377 use risingwave_common::hash::{Key64, Key128};
1378 use risingwave_common::util::epoch::test_epoch;
1379 use risingwave_common::util::sort_util::OrderType;
1380 use risingwave_storage::memory::MemoryStateStore;
1381
1382 use super::*;
1383 use crate::common::table::test_utils::gen_pbtable;
1384 use crate::executor::MemoryEncoding;
1385 use crate::executor::test_utils::expr::build_from_pretty;
1386 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1387
1388 async fn create_in_memory_state_table(
1389 mem_state: MemoryStateStore,
1390 data_types: &[DataType],
1391 order_types: &[OrderType],
1392 pk_indices: &[usize],
1393 table_id: u32,
1394 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1395 create_in_memory_state_table_with_inequality(
1396 mem_state,
1397 data_types,
1398 order_types,
1399 pk_indices,
1400 table_id,
1401 None,
1402 )
1403 .await
1404 }
1405
1406 async fn create_in_memory_state_table_with_inequality(
1407 mem_state: MemoryStateStore,
1408 data_types: &[DataType],
1409 order_types: &[OrderType],
1410 pk_indices: &[usize],
1411 table_id: u32,
1412 degree_inequality_type: Option<DataType>,
1413 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1414 create_in_memory_state_table_with_watermark(
1415 mem_state,
1416 data_types,
1417 order_types,
1418 pk_indices,
1419 table_id,
1420 degree_inequality_type,
1421 vec![],
1422 vec![],
1423 )
1424 .await
1425 }
1426
1427 #[allow(clippy::too_many_arguments)]
1428 async fn create_in_memory_state_table_with_watermark(
1429 mem_state: MemoryStateStore,
1430 data_types: &[DataType],
1431 order_types: &[OrderType],
1432 pk_indices: &[usize],
1433 table_id: u32,
1434 degree_inequality_type: Option<DataType>,
1435 state_clean_watermark_indices: Vec<usize>,
1436 degree_clean_watermark_indices: Vec<usize>,
1437 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1438 let column_descs = data_types
1439 .iter()
1440 .enumerate()
1441 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1442 .collect_vec();
1443 let mut state_table_catalog = gen_pbtable(
1444 TableId::new(table_id),
1445 column_descs,
1446 order_types.to_vec(),
1447 pk_indices.to_vec(),
1448 0,
1449 );
1450 state_table_catalog.clean_watermark_indices = state_clean_watermark_indices
1451 .into_iter()
1452 .map(|idx| idx as u32)
1453 .collect();
1454 let state_table =
1455 StateTable::from_table_catalog(&state_table_catalog, mem_state.clone(), None).await;
1456
1457 let mut degree_table_column_descs = vec![];
1459 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1460 degree_table_column_descs.push(ColumnDesc::unnamed(
1461 ColumnId::new(pk_id as i32),
1462 data_types[*idx].clone(),
1463 ))
1464 });
1465 degree_table_column_descs.push(ColumnDesc::unnamed(
1467 ColumnId::new(pk_indices.len() as i32),
1468 DataType::Int64,
1469 ));
1470 if let Some(ineq_type) = degree_inequality_type {
1472 degree_table_column_descs.push(ColumnDesc::unnamed(
1473 ColumnId::new((pk_indices.len() + 1) as i32),
1474 ineq_type,
1475 ));
1476 }
1477 let mut degree_table_catalog = gen_pbtable(
1478 TableId::new(table_id + 1),
1479 degree_table_column_descs,
1480 order_types.to_vec(),
1481 pk_indices.to_vec(),
1482 0,
1483 );
1484 degree_table_catalog.clean_watermark_indices = degree_clean_watermark_indices
1485 .into_iter()
1486 .map(|idx| idx as u32)
1487 .collect();
1488 let degree_state_table =
1489 StateTable::from_table_catalog(°ree_table_catalog, mem_state, None).await;
1490 (state_table, degree_state_table)
1491 }
1492
1493 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1494 build_from_pretty(
1495 condition_text
1496 .as_deref()
1497 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1498 )
1499 }
1500
1501 async fn create_executor<const T: JoinTypePrimitive>(
1502 with_condition: bool,
1503 null_safe: bool,
1504 condition_text: Option<String>,
1505 inequality_pairs: Vec<InequalityPairInfo>,
1506 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1507 let schema = Schema {
1508 fields: vec![
1509 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1511 ],
1512 };
1513 let (tx_l, source_l) = MockSource::channel();
1514 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1515 let (tx_r, source_r) = MockSource::channel();
1516 let source_r = source_r.into_executor(schema, vec![1]);
1517 let params_l = JoinParams::new(vec![0], vec![1]);
1518 let params_r = JoinParams::new(vec![0], vec![1]);
1519 let cond = with_condition.then(|| create_cond(condition_text));
1520
1521 let mem_state = MemoryStateStore::new();
1522
1523 let l_degree_ineq_type = inequality_pairs
1525 .iter()
1526 .find(|pair| pair.clean_left_state)
1527 .map(|_| DataType::Int64); let r_degree_ineq_type = inequality_pairs
1529 .iter()
1530 .find(|pair| pair.clean_right_state)
1531 .map(|_| DataType::Int64); let l_clean_watermark_indices = inequality_pairs
1533 .iter()
1534 .find(|pair| pair.clean_left_state)
1535 .map(|pair| vec![pair.left_idx])
1536 .unwrap_or_default();
1537 let r_clean_watermark_indices = inequality_pairs
1538 .iter()
1539 .find(|pair| pair.clean_right_state)
1540 .map(|pair| vec![pair.right_idx])
1541 .unwrap_or_default();
1542 let degree_inequality_column_idx = 3;
1543 let l_degree_clean_watermark_indices = l_degree_ineq_type
1544 .as_ref()
1545 .map(|_| vec![degree_inequality_column_idx])
1546 .unwrap_or_default();
1547 let r_degree_clean_watermark_indices = r_degree_ineq_type
1548 .as_ref()
1549 .map(|_| vec![degree_inequality_column_idx])
1550 .unwrap_or_default();
1551
1552 let (state_l, degree_state_l) = create_in_memory_state_table_with_watermark(
1553 mem_state.clone(),
1554 &[DataType::Int64, DataType::Int64],
1555 &[OrderType::ascending(), OrderType::ascending()],
1556 &[0, 1],
1557 0,
1558 l_degree_ineq_type,
1559 l_clean_watermark_indices,
1560 l_degree_clean_watermark_indices,
1561 )
1562 .await;
1563
1564 let (state_r, degree_state_r) = create_in_memory_state_table_with_watermark(
1565 mem_state,
1566 &[DataType::Int64, DataType::Int64],
1567 &[OrderType::ascending(), OrderType::ascending()],
1568 &[0, 1],
1569 2,
1570 r_degree_ineq_type,
1571 r_clean_watermark_indices,
1572 r_degree_clean_watermark_indices,
1573 )
1574 .await;
1575
1576 let schema = match T {
1577 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1578 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1579 _ => [source_l.schema().fields(), source_r.schema().fields()]
1580 .concat()
1581 .into_iter()
1582 .collect(),
1583 };
1584 let schema_len = schema.len();
1585 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1586
1587 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1588 ActorContext::for_test(123),
1589 info,
1590 source_l,
1591 source_r,
1592 params_l,
1593 params_r,
1594 vec![null_safe],
1595 (0..schema_len).collect_vec(),
1596 cond,
1597 inequality_pairs,
1598 state_l,
1599 degree_state_l,
1600 state_r,
1601 degree_state_r,
1602 Arc::new(AtomicU64::new(0)),
1603 false,
1604 Arc::new(StreamingMetrics::unused()),
1605 1024,
1606 2048,
1607 vec![(0, true)],
1608 );
1609 (tx_l, tx_r, executor.boxed().execute())
1610 }
1611
1612 async fn create_classical_executor<const T: JoinTypePrimitive>(
1613 with_condition: bool,
1614 null_safe: bool,
1615 condition_text: Option<String>,
1616 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1617 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1618 }
1619
1620 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1621 with_condition: bool,
1622 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1623 let schema = Schema {
1624 fields: vec![
1625 Field::unnamed(DataType::Int64),
1626 Field::unnamed(DataType::Int64),
1627 Field::unnamed(DataType::Int64),
1628 ],
1629 };
1630 let (tx_l, source_l) = MockSource::channel();
1631 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1632 let (tx_r, source_r) = MockSource::channel();
1633 let source_r = source_r.into_executor(schema, vec![0]);
1634 let params_l = JoinParams::new(vec![0, 1], vec![]);
1635 let params_r = JoinParams::new(vec![0, 1], vec![]);
1636 let cond = with_condition.then(|| create_cond(None));
1637
1638 let mem_state = MemoryStateStore::new();
1639
1640 let (state_l, degree_state_l) = create_in_memory_state_table(
1641 mem_state.clone(),
1642 &[DataType::Int64, DataType::Int64, DataType::Int64],
1643 &[
1644 OrderType::ascending(),
1645 OrderType::ascending(),
1646 OrderType::ascending(),
1647 ],
1648 &[0, 1, 0],
1649 0,
1650 )
1651 .await;
1652
1653 let (state_r, degree_state_r) = create_in_memory_state_table(
1654 mem_state,
1655 &[DataType::Int64, DataType::Int64, DataType::Int64],
1656 &[
1657 OrderType::ascending(),
1658 OrderType::ascending(),
1659 OrderType::ascending(),
1660 ],
1661 &[0, 1, 1],
1662 1,
1663 )
1664 .await;
1665
1666 let schema = match T {
1667 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1668 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1669 _ => [source_l.schema().fields(), source_r.schema().fields()]
1670 .concat()
1671 .into_iter()
1672 .collect(),
1673 };
1674 let schema_len = schema.len();
1675 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1676
1677 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1678 ActorContext::for_test(123),
1679 info,
1680 source_l,
1681 source_r,
1682 params_l,
1683 params_r,
1684 vec![false],
1685 (0..schema_len).collect_vec(),
1686 cond,
1687 vec![],
1688 state_l,
1689 degree_state_l,
1690 state_r,
1691 degree_state_r,
1692 Arc::new(AtomicU64::new(0)),
1693 true,
1694 Arc::new(StreamingMetrics::unused()),
1695 1024,
1696 2048,
1697 vec![(0, true)],
1698 );
1699 (tx_l, tx_r, executor.boxed().execute())
1700 }
1701
1702 #[tokio::test]
1703 async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1704 let chunk_l1 = StreamChunk::from_pretty(
1708 " I I
1709 + 2 4
1710 + 2 7
1711 + 3 8",
1712 );
1713 let chunk_r1 = StreamChunk::from_pretty(
1714 " I I
1715 + 2 6",
1716 );
1717 let chunk_r2 = StreamChunk::from_pretty(
1718 " I I
1719 + 2 3",
1720 );
1721 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1723 true,
1724 false,
1725 Some(String::from(
1726 "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1727 )),
1728 vec![InequalityPairInfo {
1729 left_idx: 1,
1730 right_idx: 1,
1731 clean_left_state: true, clean_right_state: false,
1733 op: InequalityType::GreaterThanOrEqual,
1734 }],
1735 )
1736 .await;
1737
1738 tx_l.push_barrier(test_epoch(1), false);
1740 tx_r.push_barrier(test_epoch(1), false);
1741 hash_join.next_unwrap_ready_barrier()?;
1742
1743 tx_l.push_chunk(chunk_l1);
1745 hash_join.next_unwrap_pending();
1746
1747 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1750 hash_join.next_unwrap_pending();
1751
1752 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1753 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1754 assert_eq!(
1755 output_watermark,
1756 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1757 );
1758
1759 tx_r.push_chunk(chunk_r1);
1768 let chunk = hash_join.next_unwrap_ready_chunk()?;
1769 assert_eq!(
1770 chunk,
1771 StreamChunk::from_pretty(
1772 " I I I I
1773 + 2 7 2 6"
1774 )
1775 );
1776
1777 tx_r.push_chunk(chunk_r2);
1781 let chunk = hash_join.next_unwrap_ready_chunk()?;
1782 assert_eq!(
1783 chunk,
1784 StreamChunk::from_pretty(
1785 " I I I I
1786 + 2 7 2 3"
1787 )
1788 );
1789
1790 Ok(())
1791 }
1792
1793 #[tokio::test]
1794 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1795 let chunk_l1 = StreamChunk::from_pretty(
1796 " I I
1797 + 1 4
1798 + 2 5
1799 + 3 6",
1800 );
1801 let chunk_l2 = StreamChunk::from_pretty(
1802 " I I
1803 + 3 8
1804 - 3 8",
1805 );
1806 let chunk_r1 = StreamChunk::from_pretty(
1807 " I I
1808 + 2 7
1809 + 4 8
1810 + 6 9",
1811 );
1812 let chunk_r2 = StreamChunk::from_pretty(
1813 " I I
1814 + 3 10
1815 + 6 11",
1816 );
1817 let (mut tx_l, mut tx_r, mut hash_join) =
1818 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1819
1820 tx_l.push_barrier(test_epoch(1), false);
1822 tx_r.push_barrier(test_epoch(1), false);
1823 hash_join.next_unwrap_ready_barrier()?;
1824
1825 tx_l.push_chunk(chunk_l1);
1827 hash_join.next_unwrap_pending();
1828
1829 tx_l.push_barrier(test_epoch(2), false);
1831 tx_r.push_barrier(test_epoch(2), false);
1832 hash_join.next_unwrap_ready_barrier()?;
1833
1834 tx_l.push_chunk(chunk_l2);
1836 hash_join.next_unwrap_pending();
1837
1838 tx_r.push_chunk(chunk_r1);
1840 let chunk = hash_join.next_unwrap_ready_chunk()?;
1841 assert_eq!(
1842 chunk,
1843 StreamChunk::from_pretty(
1844 " I I I I
1845 + 2 5 2 7"
1846 )
1847 );
1848
1849 tx_r.push_chunk(chunk_r2);
1851 let chunk = hash_join.next_unwrap_ready_chunk()?;
1852 assert_eq!(
1853 chunk,
1854 StreamChunk::from_pretty(
1855 " I I I I
1856 + 3 6 3 10"
1857 )
1858 );
1859
1860 Ok(())
1861 }
1862
1863 #[tokio::test]
1864 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1865 let chunk_l1 = StreamChunk::from_pretty(
1866 " I I
1867 + 1 4
1868 + 2 5
1869 + . 6",
1870 );
1871 let chunk_l2 = StreamChunk::from_pretty(
1872 " I I
1873 + . 8
1874 - . 8",
1875 );
1876 let chunk_r1 = StreamChunk::from_pretty(
1877 " I I
1878 + 2 7
1879 + 4 8
1880 + 6 9",
1881 );
1882 let chunk_r2 = StreamChunk::from_pretty(
1883 " I I
1884 + . 10
1885 + 6 11",
1886 );
1887 let (mut tx_l, mut tx_r, mut hash_join) =
1888 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1889
1890 tx_l.push_barrier(test_epoch(1), false);
1892 tx_r.push_barrier(test_epoch(1), false);
1893 hash_join.next_unwrap_ready_barrier()?;
1894
1895 tx_l.push_chunk(chunk_l1);
1897 hash_join.next_unwrap_pending();
1898
1899 tx_l.push_barrier(test_epoch(2), false);
1901 tx_r.push_barrier(test_epoch(2), false);
1902 hash_join.next_unwrap_ready_barrier()?;
1903
1904 tx_l.push_chunk(chunk_l2);
1906 hash_join.next_unwrap_pending();
1907
1908 tx_r.push_chunk(chunk_r1);
1910 let chunk = hash_join.next_unwrap_ready_chunk()?;
1911 assert_eq!(
1912 chunk,
1913 StreamChunk::from_pretty(
1914 " I I I I
1915 + 2 5 2 7"
1916 )
1917 );
1918
1919 tx_r.push_chunk(chunk_r2);
1921 let chunk = hash_join.next_unwrap_ready_chunk()?;
1922 assert_eq!(
1923 chunk,
1924 StreamChunk::from_pretty(
1925 " I I I I
1926 + . 6 . 10"
1927 )
1928 );
1929
1930 Ok(())
1931 }
1932
1933 #[tokio::test]
1934 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1935 let chunk_l1 = StreamChunk::from_pretty(
1936 " I I
1937 + 1 4
1938 + 2 5
1939 + 3 6",
1940 );
1941 let chunk_l2 = StreamChunk::from_pretty(
1942 " I I
1943 + 3 8
1944 - 3 8",
1945 );
1946 let chunk_r1 = StreamChunk::from_pretty(
1947 " I I
1948 + 2 7
1949 + 4 8
1950 + 6 9",
1951 );
1952 let chunk_r2 = StreamChunk::from_pretty(
1953 " I I
1954 + 3 10
1955 + 6 11",
1956 );
1957 let chunk_l3 = StreamChunk::from_pretty(
1958 " I I
1959 + 6 10",
1960 );
1961 let chunk_r3 = StreamChunk::from_pretty(
1962 " I I
1963 - 6 11",
1964 );
1965 let chunk_r4 = StreamChunk::from_pretty(
1966 " I I
1967 - 6 9",
1968 );
1969 let (mut tx_l, mut tx_r, mut hash_join) =
1970 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1971
1972 tx_l.push_barrier(test_epoch(1), false);
1974 tx_r.push_barrier(test_epoch(1), false);
1975 hash_join.next_unwrap_ready_barrier()?;
1976
1977 tx_l.push_chunk(chunk_l1);
1979 hash_join.next_unwrap_pending();
1980
1981 tx_l.push_barrier(test_epoch(2), false);
1983 tx_r.push_barrier(test_epoch(2), false);
1984 hash_join.next_unwrap_ready_barrier()?;
1985
1986 tx_l.push_chunk(chunk_l2);
1988 hash_join.next_unwrap_pending();
1989
1990 tx_r.push_chunk(chunk_r1);
1992 let chunk = hash_join.next_unwrap_ready_chunk()?;
1993 assert_eq!(
1994 chunk,
1995 StreamChunk::from_pretty(
1996 " I I
1997 + 2 5"
1998 )
1999 );
2000
2001 tx_r.push_chunk(chunk_r2);
2003 let chunk = hash_join.next_unwrap_ready_chunk()?;
2004 assert_eq!(
2005 chunk,
2006 StreamChunk::from_pretty(
2007 " I I
2008 + 3 6"
2009 )
2010 );
2011
2012 tx_l.push_chunk(chunk_l3);
2014 let chunk = hash_join.next_unwrap_ready_chunk()?;
2015 assert_eq!(
2016 chunk,
2017 StreamChunk::from_pretty(
2018 " I I
2019 + 6 10"
2020 )
2021 );
2022
2023 tx_r.push_chunk(chunk_r3);
2026 hash_join.next_unwrap_pending();
2027
2028 tx_r.push_chunk(chunk_r4);
2031 let chunk = hash_join.next_unwrap_ready_chunk()?;
2032 assert_eq!(
2033 chunk,
2034 StreamChunk::from_pretty(
2035 " I I
2036 - 6 10"
2037 )
2038 );
2039
2040 Ok(())
2041 }
2042
2043 #[tokio::test]
2044 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
2045 let chunk_l1 = StreamChunk::from_pretty(
2046 " I I
2047 + 1 4
2048 + 2 5
2049 + . 6",
2050 );
2051 let chunk_l2 = StreamChunk::from_pretty(
2052 " I I
2053 + . 8
2054 - . 8",
2055 );
2056 let chunk_r1 = StreamChunk::from_pretty(
2057 " I I
2058 + 2 7
2059 + 4 8
2060 + 6 9",
2061 );
2062 let chunk_r2 = StreamChunk::from_pretty(
2063 " I I
2064 + . 10
2065 + 6 11",
2066 );
2067 let chunk_l3 = StreamChunk::from_pretty(
2068 " I I
2069 + 6 10",
2070 );
2071 let chunk_r3 = StreamChunk::from_pretty(
2072 " I I
2073 - 6 11",
2074 );
2075 let chunk_r4 = StreamChunk::from_pretty(
2076 " I I
2077 - 6 9",
2078 );
2079 let (mut tx_l, mut tx_r, mut hash_join) =
2080 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
2081
2082 tx_l.push_barrier(test_epoch(1), false);
2084 tx_r.push_barrier(test_epoch(1), false);
2085 hash_join.next_unwrap_ready_barrier()?;
2086
2087 tx_l.push_chunk(chunk_l1);
2089 hash_join.next_unwrap_pending();
2090
2091 tx_l.push_barrier(test_epoch(2), false);
2093 tx_r.push_barrier(test_epoch(2), false);
2094 hash_join.next_unwrap_ready_barrier()?;
2095
2096 tx_l.push_chunk(chunk_l2);
2098 hash_join.next_unwrap_pending();
2099
2100 tx_r.push_chunk(chunk_r1);
2102 let chunk = hash_join.next_unwrap_ready_chunk()?;
2103 assert_eq!(
2104 chunk,
2105 StreamChunk::from_pretty(
2106 " I I
2107 + 2 5"
2108 )
2109 );
2110
2111 tx_r.push_chunk(chunk_r2);
2113 let chunk = hash_join.next_unwrap_ready_chunk()?;
2114 assert_eq!(
2115 chunk,
2116 StreamChunk::from_pretty(
2117 " I I
2118 + . 6"
2119 )
2120 );
2121
2122 tx_l.push_chunk(chunk_l3);
2124 let chunk = hash_join.next_unwrap_ready_chunk()?;
2125 assert_eq!(
2126 chunk,
2127 StreamChunk::from_pretty(
2128 " I I
2129 + 6 10"
2130 )
2131 );
2132
2133 tx_r.push_chunk(chunk_r3);
2136 hash_join.next_unwrap_pending();
2137
2138 tx_r.push_chunk(chunk_r4);
2141 let chunk = hash_join.next_unwrap_ready_chunk()?;
2142 assert_eq!(
2143 chunk,
2144 StreamChunk::from_pretty(
2145 " I I
2146 - 6 10"
2147 )
2148 );
2149
2150 Ok(())
2151 }
2152
2153 #[tokio::test]
2154 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2155 let chunk_l1 = StreamChunk::from_pretty(
2156 " I I I
2157 + 1 4 1
2158 + 2 5 2
2159 + 3 6 3",
2160 );
2161 let chunk_l2 = StreamChunk::from_pretty(
2162 " I I I
2163 + 4 9 4
2164 + 5 10 5",
2165 );
2166 let chunk_r1 = StreamChunk::from_pretty(
2167 " I I I
2168 + 2 5 1
2169 + 4 9 2
2170 + 6 9 3",
2171 );
2172 let chunk_r2 = StreamChunk::from_pretty(
2173 " I I I
2174 + 1 4 4
2175 + 3 6 5",
2176 );
2177
2178 let (mut tx_l, mut tx_r, mut hash_join) =
2179 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2180
2181 tx_l.push_barrier(test_epoch(1), false);
2183 tx_r.push_barrier(test_epoch(1), false);
2184 hash_join.next_unwrap_ready_barrier()?;
2185
2186 tx_l.push_chunk(chunk_l1);
2188 hash_join.next_unwrap_pending();
2189
2190 tx_l.push_barrier(test_epoch(2), false);
2192 tx_r.push_barrier(test_epoch(2), false);
2193 hash_join.next_unwrap_ready_barrier()?;
2194
2195 tx_l.push_chunk(chunk_l2);
2197 hash_join.next_unwrap_pending();
2198
2199 tx_r.push_chunk(chunk_r1);
2201 let chunk = hash_join.next_unwrap_ready_chunk()?;
2202 assert_eq!(
2203 chunk,
2204 StreamChunk::from_pretty(
2205 " I I I I I I
2206 + 2 5 2 2 5 1
2207 + 4 9 4 4 9 2"
2208 )
2209 );
2210
2211 tx_r.push_chunk(chunk_r2);
2213 let chunk = hash_join.next_unwrap_ready_chunk()?;
2214 assert_eq!(
2215 chunk,
2216 StreamChunk::from_pretty(
2217 " I I I I I I
2218 + 1 4 1 1 4 4
2219 + 3 6 3 3 6 5"
2220 )
2221 );
2222
2223 Ok(())
2224 }
2225
2226 #[tokio::test]
2227 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2228 let chunk_l1 = StreamChunk::from_pretty(
2229 " I I I
2230 + 1 4 1
2231 + 2 5 2
2232 + 3 6 3",
2233 );
2234 let chunk_l2 = StreamChunk::from_pretty(
2235 " I I I
2236 + 4 9 4
2237 + 5 10 5",
2238 );
2239 let chunk_r1 = StreamChunk::from_pretty(
2240 " I I I
2241 + 2 5 1
2242 + 4 9 2
2243 + 6 9 3",
2244 );
2245 let chunk_r2 = StreamChunk::from_pretty(
2246 " I I I
2247 + 1 4 4
2248 + 3 6 5",
2249 );
2250
2251 let (mut tx_l, mut tx_r, mut hash_join) =
2252 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2253
2254 tx_l.push_barrier(test_epoch(1), false);
2256 tx_r.push_barrier(test_epoch(1), false);
2257 hash_join.next_unwrap_ready_barrier()?;
2258
2259 tx_l.push_chunk(chunk_l1);
2261 hash_join.next_unwrap_pending();
2262
2263 tx_l.push_barrier(test_epoch(2), false);
2265 tx_r.push_barrier(test_epoch(2), false);
2266 hash_join.next_unwrap_ready_barrier()?;
2267
2268 tx_l.push_chunk(chunk_l2);
2270 hash_join.next_unwrap_pending();
2271
2272 tx_r.push_chunk(chunk_r1);
2274 let chunk = hash_join.next_unwrap_ready_chunk()?;
2275 assert_eq!(
2276 chunk,
2277 StreamChunk::from_pretty(
2278 " I I I
2279 + 2 5 2
2280 + 4 9 4"
2281 )
2282 );
2283
2284 tx_r.push_chunk(chunk_r2);
2286 let chunk = hash_join.next_unwrap_ready_chunk()?;
2287 assert_eq!(
2288 chunk,
2289 StreamChunk::from_pretty(
2290 " I I I
2291 + 1 4 1
2292 + 3 6 3"
2293 )
2294 );
2295
2296 Ok(())
2297 }
2298
2299 #[tokio::test]
2300 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2301 let chunk_l1 = StreamChunk::from_pretty(
2302 " I I I
2303 + 1 4 1
2304 + 2 5 2
2305 + 3 6 3",
2306 );
2307 let chunk_l2 = StreamChunk::from_pretty(
2308 " I I I
2309 + 4 9 4
2310 + 5 10 5",
2311 );
2312 let chunk_r1 = StreamChunk::from_pretty(
2313 " I I I
2314 + 2 5 1
2315 + 4 9 2
2316 + 6 9 3",
2317 );
2318 let chunk_r2 = StreamChunk::from_pretty(
2319 " I I I
2320 + 1 4 4
2321 + 3 6 5",
2322 );
2323
2324 let (mut tx_l, mut tx_r, mut hash_join) =
2325 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2326
2327 tx_l.push_barrier(test_epoch(1), false);
2329 tx_r.push_barrier(test_epoch(1), false);
2330 hash_join.next_unwrap_ready_barrier()?;
2331
2332 tx_l.push_chunk(chunk_l1);
2334 hash_join.next_unwrap_pending();
2335
2336 tx_l.push_barrier(test_epoch(2), false);
2338 tx_r.push_barrier(test_epoch(2), false);
2339 hash_join.next_unwrap_ready_barrier()?;
2340
2341 tx_l.push_chunk(chunk_l2);
2343 hash_join.next_unwrap_pending();
2344
2345 tx_r.push_chunk(chunk_r1);
2347 let chunk = hash_join.next_unwrap_ready_chunk()?;
2348 assert_eq!(
2349 chunk,
2350 StreamChunk::from_pretty(
2351 " I I I
2352 + 2 5 1
2353 + 4 9 2"
2354 )
2355 );
2356
2357 tx_r.push_chunk(chunk_r2);
2359 let chunk = hash_join.next_unwrap_ready_chunk()?;
2360 assert_eq!(
2361 chunk,
2362 StreamChunk::from_pretty(
2363 " I I I
2364 + 1 4 4
2365 + 3 6 5"
2366 )
2367 );
2368
2369 Ok(())
2370 }
2371
2372 #[tokio::test]
2373 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2374 let chunk_r1 = StreamChunk::from_pretty(
2375 " I I
2376 + 1 4
2377 + 2 5
2378 + 3 6",
2379 );
2380 let chunk_r2 = StreamChunk::from_pretty(
2381 " I I
2382 + 3 8
2383 - 3 8",
2384 );
2385 let chunk_l1 = StreamChunk::from_pretty(
2386 " I I
2387 + 2 7
2388 + 4 8
2389 + 6 9",
2390 );
2391 let chunk_l2 = StreamChunk::from_pretty(
2392 " I I
2393 + 3 10
2394 + 6 11",
2395 );
2396 let chunk_r3 = StreamChunk::from_pretty(
2397 " I I
2398 + 6 10",
2399 );
2400 let chunk_l3 = StreamChunk::from_pretty(
2401 " I I
2402 - 6 11",
2403 );
2404 let chunk_l4 = StreamChunk::from_pretty(
2405 " I I
2406 - 6 9",
2407 );
2408 let (mut tx_l, mut tx_r, mut hash_join) =
2409 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2410
2411 tx_l.push_barrier(test_epoch(1), false);
2413 tx_r.push_barrier(test_epoch(1), false);
2414 hash_join.next_unwrap_ready_barrier()?;
2415
2416 tx_r.push_chunk(chunk_r1);
2418 hash_join.next_unwrap_pending();
2419
2420 tx_l.push_barrier(test_epoch(2), false);
2422 tx_r.push_barrier(test_epoch(2), false);
2423 hash_join.next_unwrap_ready_barrier()?;
2424
2425 tx_r.push_chunk(chunk_r2);
2427 hash_join.next_unwrap_pending();
2428
2429 tx_l.push_chunk(chunk_l1);
2431 let chunk = hash_join.next_unwrap_ready_chunk()?;
2432 assert_eq!(
2433 chunk,
2434 StreamChunk::from_pretty(
2435 " I I
2436 + 2 5"
2437 )
2438 );
2439
2440 tx_l.push_chunk(chunk_l2);
2442 let chunk = hash_join.next_unwrap_ready_chunk()?;
2443 assert_eq!(
2444 chunk,
2445 StreamChunk::from_pretty(
2446 " I I
2447 + 3 6"
2448 )
2449 );
2450
2451 tx_r.push_chunk(chunk_r3);
2453 let chunk = hash_join.next_unwrap_ready_chunk()?;
2454 assert_eq!(
2455 chunk,
2456 StreamChunk::from_pretty(
2457 " I I
2458 + 6 10"
2459 )
2460 );
2461
2462 tx_l.push_chunk(chunk_l3);
2465 hash_join.next_unwrap_pending();
2466
2467 tx_l.push_chunk(chunk_l4);
2470 let chunk = hash_join.next_unwrap_ready_chunk()?;
2471 assert_eq!(
2472 chunk,
2473 StreamChunk::from_pretty(
2474 " I I
2475 - 6 10"
2476 )
2477 );
2478
2479 Ok(())
2480 }
2481
2482 #[tokio::test]
2483 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2484 let chunk_l1 = StreamChunk::from_pretty(
2485 " I I
2486 + 1 4
2487 + 2 5
2488 + 3 6",
2489 );
2490 let chunk_l2 = StreamChunk::from_pretty(
2491 " I I
2492 + 3 8
2493 - 3 8",
2494 );
2495 let chunk_r1 = StreamChunk::from_pretty(
2496 " I I
2497 + 2 7
2498 + 4 8
2499 + 6 9",
2500 );
2501 let chunk_r2 = StreamChunk::from_pretty(
2502 " I I
2503 + 3 10
2504 + 6 11
2505 + 1 2
2506 + 1 3",
2507 );
2508 let chunk_l3 = StreamChunk::from_pretty(
2509 " I I
2510 + 9 10",
2511 );
2512 let chunk_r3 = StreamChunk::from_pretty(
2513 " I I
2514 - 1 2",
2515 );
2516 let chunk_r4 = StreamChunk::from_pretty(
2517 " I I
2518 - 1 3",
2519 );
2520 let (mut tx_l, mut tx_r, mut hash_join) =
2521 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2522
2523 tx_l.push_barrier(test_epoch(1), false);
2525 tx_r.push_barrier(test_epoch(1), false);
2526 hash_join.next_unwrap_ready_barrier()?;
2527
2528 tx_l.push_chunk(chunk_l1);
2530 let chunk = hash_join.next_unwrap_ready_chunk()?;
2531 assert_eq!(
2532 chunk,
2533 StreamChunk::from_pretty(
2534 " I I
2535 + 1 4
2536 + 2 5
2537 + 3 6",
2538 )
2539 );
2540
2541 tx_l.push_barrier(test_epoch(2), false);
2543 tx_r.push_barrier(test_epoch(2), false);
2544 hash_join.next_unwrap_ready_barrier()?;
2545
2546 tx_l.push_chunk(chunk_l2);
2548 let chunk = hash_join.next_unwrap_ready_chunk()?;
2549 assert_eq!(
2550 chunk,
2551 StreamChunk::from_pretty(
2552 " I I
2553 + 3 8 D
2554 - 3 8 D",
2555 )
2556 );
2557
2558 tx_r.push_chunk(chunk_r1);
2560 let chunk = hash_join.next_unwrap_ready_chunk()?;
2561 assert_eq!(
2562 chunk,
2563 StreamChunk::from_pretty(
2564 " I I
2565 - 2 5"
2566 )
2567 );
2568
2569 tx_r.push_chunk(chunk_r2);
2571 let chunk = hash_join.next_unwrap_ready_chunk()?;
2572 assert_eq!(
2573 chunk,
2574 StreamChunk::from_pretty(
2575 " I I
2576 - 3 6
2577 - 1 4"
2578 )
2579 );
2580
2581 tx_l.push_chunk(chunk_l3);
2583 let chunk = hash_join.next_unwrap_ready_chunk()?;
2584 assert_eq!(
2585 chunk,
2586 StreamChunk::from_pretty(
2587 " I I
2588 + 9 10"
2589 )
2590 );
2591
2592 tx_r.push_chunk(chunk_r3);
2595 hash_join.next_unwrap_pending();
2596
2597 tx_r.push_chunk(chunk_r4);
2600 let chunk = hash_join.next_unwrap_ready_chunk()?;
2601 assert_eq!(
2602 chunk,
2603 StreamChunk::from_pretty(
2604 " I I
2605 + 1 4"
2606 )
2607 );
2608
2609 Ok(())
2610 }
2611
2612 #[tokio::test]
2613 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2614 let chunk_r1 = StreamChunk::from_pretty(
2615 " I I
2616 + 1 4
2617 + 2 5
2618 + 3 6",
2619 );
2620 let chunk_r2 = StreamChunk::from_pretty(
2621 " I I
2622 + 3 8
2623 - 3 8",
2624 );
2625 let chunk_l1 = StreamChunk::from_pretty(
2626 " I I
2627 + 2 7
2628 + 4 8
2629 + 6 9",
2630 );
2631 let chunk_l2 = StreamChunk::from_pretty(
2632 " I I
2633 + 3 10
2634 + 6 11
2635 + 1 2
2636 + 1 3",
2637 );
2638 let chunk_r3 = StreamChunk::from_pretty(
2639 " I I
2640 + 9 10",
2641 );
2642 let chunk_l3 = StreamChunk::from_pretty(
2643 " I I
2644 - 1 2",
2645 );
2646 let chunk_l4 = StreamChunk::from_pretty(
2647 " I I
2648 - 1 3",
2649 );
2650 let (mut tx_r, mut tx_l, mut hash_join) =
2651 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2652
2653 tx_r.push_barrier(test_epoch(1), false);
2655 tx_l.push_barrier(test_epoch(1), false);
2656 hash_join.next_unwrap_ready_barrier()?;
2657
2658 tx_r.push_chunk(chunk_r1);
2660 let chunk = hash_join.next_unwrap_ready_chunk()?;
2661 assert_eq!(
2662 chunk,
2663 StreamChunk::from_pretty(
2664 " I I
2665 + 1 4
2666 + 2 5
2667 + 3 6",
2668 )
2669 );
2670
2671 tx_r.push_barrier(test_epoch(2), false);
2673 tx_l.push_barrier(test_epoch(2), false);
2674 hash_join.next_unwrap_ready_barrier()?;
2675
2676 tx_r.push_chunk(chunk_r2);
2678 let chunk = hash_join.next_unwrap_ready_chunk()?;
2679 assert_eq!(
2680 chunk,
2681 StreamChunk::from_pretty(
2682 " I I
2683 + 3 8 D
2684 - 3 8 D",
2685 )
2686 );
2687
2688 tx_l.push_chunk(chunk_l1);
2690 let chunk = hash_join.next_unwrap_ready_chunk()?;
2691 assert_eq!(
2692 chunk,
2693 StreamChunk::from_pretty(
2694 " I I
2695 - 2 5"
2696 )
2697 );
2698
2699 tx_l.push_chunk(chunk_l2);
2701 let chunk = hash_join.next_unwrap_ready_chunk()?;
2702 assert_eq!(
2703 chunk,
2704 StreamChunk::from_pretty(
2705 " I I
2706 - 3 6
2707 - 1 4"
2708 )
2709 );
2710
2711 tx_r.push_chunk(chunk_r3);
2713 let chunk = hash_join.next_unwrap_ready_chunk()?;
2714 assert_eq!(
2715 chunk,
2716 StreamChunk::from_pretty(
2717 " I I
2718 + 9 10"
2719 )
2720 );
2721
2722 tx_l.push_chunk(chunk_l3);
2725 hash_join.next_unwrap_pending();
2726
2727 tx_l.push_chunk(chunk_l4);
2730 let chunk = hash_join.next_unwrap_ready_chunk()?;
2731 assert_eq!(
2732 chunk,
2733 StreamChunk::from_pretty(
2734 " I I
2735 + 1 4"
2736 )
2737 );
2738
2739 Ok(())
2740 }
2741
2742 #[tokio::test]
2743 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2744 let chunk_l1 = StreamChunk::from_pretty(
2745 " I I
2746 + 1 4
2747 + 2 5
2748 + 3 6",
2749 );
2750 let chunk_l2 = StreamChunk::from_pretty(
2751 " I I
2752 + 6 8
2753 + 3 8",
2754 );
2755 let chunk_r1 = StreamChunk::from_pretty(
2756 " I I
2757 + 2 7
2758 + 4 8
2759 + 6 9",
2760 );
2761 let chunk_r2 = StreamChunk::from_pretty(
2762 " I I
2763 + 3 10
2764 + 6 11",
2765 );
2766 let (mut tx_l, mut tx_r, mut hash_join) =
2767 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2768
2769 tx_l.push_barrier(test_epoch(1), false);
2771 tx_r.push_barrier(test_epoch(1), false);
2772 hash_join.next_unwrap_ready_barrier()?;
2773
2774 tx_l.push_chunk(chunk_l1);
2776 hash_join.next_unwrap_pending();
2777
2778 tx_l.push_barrier(test_epoch(2), false);
2780
2781 tx_l.push_chunk(chunk_l2);
2783
2784 tx_r.push_chunk(chunk_r1);
2786
2787 let chunk = hash_join.next_unwrap_ready_chunk()?;
2789 assert_eq!(
2790 chunk,
2791 StreamChunk::from_pretty(
2792 " I I I I
2793 + 2 5 2 7"
2794 )
2795 );
2796
2797 tx_r.push_barrier(test_epoch(2), false);
2799
2800 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2802 assert!(matches!(
2803 hash_join.next_unwrap_ready_barrier()?,
2804 Barrier {
2805 epoch,
2806 mutation: None,
2807 ..
2808 } if epoch == expected_epoch
2809 ));
2810
2811 let chunk = hash_join.next_unwrap_ready_chunk()?;
2813 assert_eq!(
2814 chunk,
2815 StreamChunk::from_pretty(
2816 " I I I I
2817 + 6 8 6 9"
2818 )
2819 );
2820
2821 tx_r.push_chunk(chunk_r2);
2823 let chunk = hash_join.next_unwrap_ready_chunk()?;
2824 assert_eq!(
2825 chunk,
2826 StreamChunk::from_pretty(
2827 " I I I I
2828 + 3 6 3 10
2829 + 3 8 3 10
2830 + 6 8 6 11"
2831 )
2832 );
2833
2834 Ok(())
2835 }
2836
2837 #[tokio::test]
2838 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2839 let chunk_l1 = StreamChunk::from_pretty(
2840 " I I
2841 + 1 4
2842 + 2 .
2843 + 3 .",
2844 );
2845 let chunk_l2 = StreamChunk::from_pretty(
2846 " I I
2847 + 6 .
2848 + 3 8",
2849 );
2850 let chunk_r1 = StreamChunk::from_pretty(
2851 " I I
2852 + 2 7
2853 + 4 8
2854 + 6 9",
2855 );
2856 let chunk_r2 = StreamChunk::from_pretty(
2857 " I I
2858 + 3 10
2859 + 6 11",
2860 );
2861 let (mut tx_l, mut tx_r, mut hash_join) =
2862 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2863
2864 tx_l.push_barrier(test_epoch(1), false);
2866 tx_r.push_barrier(test_epoch(1), false);
2867 hash_join.next_unwrap_ready_barrier()?;
2868
2869 tx_l.push_chunk(chunk_l1);
2871 hash_join.next_unwrap_pending();
2872
2873 tx_l.push_barrier(test_epoch(2), false);
2875
2876 tx_l.push_chunk(chunk_l2);
2878
2879 tx_r.push_chunk(chunk_r1);
2881
2882 let chunk = hash_join.next_unwrap_ready_chunk()?;
2884 assert_eq!(
2885 chunk,
2886 StreamChunk::from_pretty(
2887 " I I I I
2888 + 2 . 2 7"
2889 )
2890 );
2891
2892 tx_r.push_barrier(test_epoch(2), false);
2894
2895 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2897 assert!(matches!(
2898 hash_join.next_unwrap_ready_barrier()?,
2899 Barrier {
2900 epoch,
2901 mutation: None,
2902 ..
2903 } if epoch == expected_epoch
2904 ));
2905
2906 let chunk = hash_join.next_unwrap_ready_chunk()?;
2908 assert_eq!(
2909 chunk,
2910 StreamChunk::from_pretty(
2911 " I I I I
2912 + 6 . 6 9"
2913 )
2914 );
2915
2916 tx_r.push_chunk(chunk_r2);
2918 let chunk = hash_join.next_unwrap_ready_chunk()?;
2919 assert_eq!(
2920 chunk,
2921 StreamChunk::from_pretty(
2922 " I I I I
2923 + 3 8 3 10
2924 + 3 . 3 10
2925 + 6 . 6 11"
2926 )
2927 );
2928
2929 Ok(())
2930 }
2931
2932 #[tokio::test]
2933 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2934 let chunk_l1 = StreamChunk::from_pretty(
2935 " I I
2936 + 1 4
2937 + 2 5
2938 + 3 6",
2939 );
2940 let chunk_l2 = StreamChunk::from_pretty(
2941 " I I
2942 + 3 8
2943 - 3 8",
2944 );
2945 let chunk_r1 = StreamChunk::from_pretty(
2946 " I I
2947 + 2 7
2948 + 4 8
2949 + 6 9",
2950 );
2951 let chunk_r2 = StreamChunk::from_pretty(
2952 " I I
2953 + 3 10
2954 + 6 11",
2955 );
2956 let (mut tx_l, mut tx_r, mut hash_join) =
2957 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2958
2959 tx_l.push_barrier(test_epoch(1), false);
2961 tx_r.push_barrier(test_epoch(1), false);
2962 hash_join.next_unwrap_ready_barrier()?;
2963
2964 tx_l.push_chunk(chunk_l1);
2966 let chunk = hash_join.next_unwrap_ready_chunk()?;
2967 assert_eq!(
2968 chunk,
2969 StreamChunk::from_pretty(
2970 " I I I I
2971 + 1 4 . .
2972 + 2 5 . .
2973 + 3 6 . ."
2974 )
2975 );
2976
2977 tx_l.push_chunk(chunk_l2);
2979 let chunk = hash_join.next_unwrap_ready_chunk()?;
2980 assert_eq!(
2981 chunk,
2982 StreamChunk::from_pretty(
2983 " I I I I
2984 + 3 8 . . D
2985 - 3 8 . . D"
2986 )
2987 );
2988
2989 tx_r.push_chunk(chunk_r1);
2991 let chunk = hash_join.next_unwrap_ready_chunk()?;
2992 assert_eq!(
2993 chunk,
2994 StreamChunk::from_pretty(
2995 " I I I I
2996 - 2 5 . .
2997 + 2 5 2 7"
2998 )
2999 );
3000
3001 tx_r.push_chunk(chunk_r2);
3003 let chunk = hash_join.next_unwrap_ready_chunk()?;
3004 assert_eq!(
3005 chunk,
3006 StreamChunk::from_pretty(
3007 " I I I I
3008 - 3 6 . .
3009 + 3 6 3 10"
3010 )
3011 );
3012
3013 Ok(())
3014 }
3015
3016 #[tokio::test]
3017 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
3018 let chunk_l1 = StreamChunk::from_pretty(
3019 " I I
3020 + 1 4
3021 + 2 5
3022 + . 6",
3023 );
3024 let chunk_l2 = StreamChunk::from_pretty(
3025 " I I
3026 + . 8
3027 - . 8",
3028 );
3029 let chunk_r1 = StreamChunk::from_pretty(
3030 " I I
3031 + 2 7
3032 + 4 8
3033 + 6 9",
3034 );
3035 let chunk_r2 = StreamChunk::from_pretty(
3036 " I I
3037 + . 10
3038 + 6 11",
3039 );
3040 let (mut tx_l, mut tx_r, mut hash_join) =
3041 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
3042
3043 tx_l.push_barrier(test_epoch(1), false);
3045 tx_r.push_barrier(test_epoch(1), false);
3046 hash_join.next_unwrap_ready_barrier()?;
3047
3048 tx_l.push_chunk(chunk_l1);
3050 let chunk = hash_join.next_unwrap_ready_chunk()?;
3051 assert_eq!(
3052 chunk,
3053 StreamChunk::from_pretty(
3054 " I I I I
3055 + 1 4 . .
3056 + 2 5 . .
3057 + . 6 . ."
3058 )
3059 );
3060
3061 tx_l.push_chunk(chunk_l2);
3063 let chunk = hash_join.next_unwrap_ready_chunk()?;
3064 assert_eq!(
3065 chunk,
3066 StreamChunk::from_pretty(
3067 " I I I I
3068 + . 8 . . D
3069 - . 8 . . D"
3070 )
3071 );
3072
3073 tx_r.push_chunk(chunk_r1);
3075 let chunk = hash_join.next_unwrap_ready_chunk()?;
3076 assert_eq!(
3077 chunk,
3078 StreamChunk::from_pretty(
3079 " I I I I
3080 - 2 5 . .
3081 + 2 5 2 7"
3082 )
3083 );
3084
3085 tx_r.push_chunk(chunk_r2);
3087 let chunk = hash_join.next_unwrap_ready_chunk()?;
3088 assert_eq!(
3089 chunk,
3090 StreamChunk::from_pretty(
3091 " I I I I
3092 - . 6 . .
3093 + . 6 . 10"
3094 )
3095 );
3096
3097 Ok(())
3098 }
3099
3100 #[tokio::test]
3101 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
3102 let chunk_l1 = StreamChunk::from_pretty(
3103 " I I
3104 + 1 4
3105 + 2 5
3106 + 3 6",
3107 );
3108 let chunk_l2 = StreamChunk::from_pretty(
3109 " I I
3110 + 3 8
3111 - 3 8",
3112 );
3113 let chunk_r1 = StreamChunk::from_pretty(
3114 " I I
3115 + 2 7
3116 + 4 8
3117 + 6 9",
3118 );
3119 let chunk_r2 = StreamChunk::from_pretty(
3120 " I I
3121 + 5 10
3122 - 5 10",
3123 );
3124 let (mut tx_l, mut tx_r, mut hash_join) =
3125 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3126
3127 tx_l.push_barrier(test_epoch(1), false);
3129 tx_r.push_barrier(test_epoch(1), false);
3130 hash_join.next_unwrap_ready_barrier()?;
3131
3132 tx_l.push_chunk(chunk_l1);
3134 hash_join.next_unwrap_pending();
3135
3136 tx_l.push_chunk(chunk_l2);
3138 hash_join.next_unwrap_pending();
3139
3140 tx_r.push_chunk(chunk_r1);
3142 let chunk = hash_join.next_unwrap_ready_chunk()?;
3143 assert_eq!(
3144 chunk,
3145 StreamChunk::from_pretty(
3146 " I I I I
3147 + 2 5 2 7
3148 + . . 4 8
3149 + . . 6 9"
3150 )
3151 );
3152
3153 tx_r.push_chunk(chunk_r2);
3155 let chunk = hash_join.next_unwrap_ready_chunk()?;
3156 assert_eq!(
3157 chunk,
3158 StreamChunk::from_pretty(
3159 " I I I I
3160 + . . 5 10 D
3161 - . . 5 10 D"
3162 )
3163 );
3164
3165 Ok(())
3166 }
3167
3168 #[tokio::test]
3169 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3170 let chunk_l1 = StreamChunk::from_pretty(
3171 " I I I
3172 + 1 4 1
3173 + 2 5 2
3174 + 3 6 3",
3175 );
3176 let chunk_l2 = StreamChunk::from_pretty(
3177 " I I I
3178 + 4 9 4
3179 + 5 10 5",
3180 );
3181 let chunk_r1 = StreamChunk::from_pretty(
3182 " I I I
3183 + 2 5 1
3184 + 4 9 2
3185 + 6 9 3",
3186 );
3187 let chunk_r2 = StreamChunk::from_pretty(
3188 " I I I
3189 + 1 4 4
3190 + 3 6 5",
3191 );
3192
3193 let (mut tx_l, mut tx_r, mut hash_join) =
3194 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3195
3196 tx_l.push_barrier(test_epoch(1), false);
3198 tx_r.push_barrier(test_epoch(1), false);
3199 hash_join.next_unwrap_ready_barrier()?;
3200
3201 tx_l.push_chunk(chunk_l1);
3203 let chunk = hash_join.next_unwrap_ready_chunk()?;
3204 assert_eq!(
3205 chunk,
3206 StreamChunk::from_pretty(
3207 " I I I I I I
3208 + 1 4 1 . . .
3209 + 2 5 2 . . .
3210 + 3 6 3 . . ."
3211 )
3212 );
3213
3214 tx_l.push_chunk(chunk_l2);
3216 let chunk = hash_join.next_unwrap_ready_chunk()?;
3217 assert_eq!(
3218 chunk,
3219 StreamChunk::from_pretty(
3220 " I I I I I I
3221 + 4 9 4 . . .
3222 + 5 10 5 . . ."
3223 )
3224 );
3225
3226 tx_r.push_chunk(chunk_r1);
3228 let chunk = hash_join.next_unwrap_ready_chunk()?;
3229 assert_eq!(
3230 chunk,
3231 StreamChunk::from_pretty(
3232 " I I I I I I
3233 - 2 5 2 . . .
3234 + 2 5 2 2 5 1
3235 - 4 9 4 . . .
3236 + 4 9 4 4 9 2"
3237 )
3238 );
3239
3240 tx_r.push_chunk(chunk_r2);
3242 let chunk = hash_join.next_unwrap_ready_chunk()?;
3243 assert_eq!(
3244 chunk,
3245 StreamChunk::from_pretty(
3246 " I I I I I I
3247 - 1 4 1 . . .
3248 + 1 4 1 1 4 4
3249 - 3 6 3 . . .
3250 + 3 6 3 3 6 5"
3251 )
3252 );
3253
3254 Ok(())
3255 }
3256
3257 #[tokio::test]
3258 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3259 let chunk_l1 = StreamChunk::from_pretty(
3260 " I I I
3261 + 1 4 1
3262 + 2 5 2
3263 + 3 6 3",
3264 );
3265 let chunk_l2 = StreamChunk::from_pretty(
3266 " I I I
3267 + 4 9 4
3268 + 5 10 5",
3269 );
3270 let chunk_r1 = StreamChunk::from_pretty(
3271 " I I I
3272 + 2 5 1
3273 + 4 9 2
3274 + 6 9 3",
3275 );
3276 let chunk_r2 = StreamChunk::from_pretty(
3277 " I I I
3278 + 1 4 4
3279 + 3 6 5
3280 + 7 7 6",
3281 );
3282
3283 let (mut tx_l, mut tx_r, mut hash_join) =
3284 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3285
3286 tx_l.push_barrier(test_epoch(1), false);
3288 tx_r.push_barrier(test_epoch(1), false);
3289 hash_join.next_unwrap_ready_barrier()?;
3290
3291 tx_l.push_chunk(chunk_l1);
3293 hash_join.next_unwrap_pending();
3294
3295 tx_l.push_chunk(chunk_l2);
3297 hash_join.next_unwrap_pending();
3298
3299 tx_r.push_chunk(chunk_r1);
3301 let chunk = hash_join.next_unwrap_ready_chunk()?;
3302 assert_eq!(
3303 chunk,
3304 StreamChunk::from_pretty(
3305 " I I I I I I
3306 + 2 5 2 2 5 1
3307 + 4 9 4 4 9 2
3308 + . . . 6 9 3"
3309 )
3310 );
3311
3312 tx_r.push_chunk(chunk_r2);
3314 let chunk = hash_join.next_unwrap_ready_chunk()?;
3315 assert_eq!(
3316 chunk,
3317 StreamChunk::from_pretty(
3318 " I I I I I I
3319 + 1 4 1 1 4 4
3320 + 3 6 3 3 6 5
3321 + . . . 7 7 6"
3322 )
3323 );
3324
3325 Ok(())
3326 }
3327
3328 #[tokio::test]
3329 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3330 let chunk_l1 = StreamChunk::from_pretty(
3331 " I I
3332 + 1 4
3333 + 2 5
3334 + 3 6",
3335 );
3336 let chunk_l2 = StreamChunk::from_pretty(
3337 " I I
3338 + 3 8
3339 - 3 8",
3340 );
3341 let chunk_r1 = StreamChunk::from_pretty(
3342 " I I
3343 + 2 7
3344 + 4 8
3345 + 6 9",
3346 );
3347 let chunk_r2 = StreamChunk::from_pretty(
3348 " I I
3349 + 5 10
3350 - 5 10",
3351 );
3352 let (mut tx_l, mut tx_r, mut hash_join) =
3353 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3354
3355 tx_l.push_barrier(test_epoch(1), false);
3357 tx_r.push_barrier(test_epoch(1), false);
3358 hash_join.next_unwrap_ready_barrier()?;
3359
3360 tx_l.push_chunk(chunk_l1);
3362 let chunk = hash_join.next_unwrap_ready_chunk()?;
3363 assert_eq!(
3364 chunk,
3365 StreamChunk::from_pretty(
3366 " I I I I
3367 + 1 4 . .
3368 + 2 5 . .
3369 + 3 6 . ."
3370 )
3371 );
3372
3373 tx_l.push_chunk(chunk_l2);
3375 let chunk = hash_join.next_unwrap_ready_chunk()?;
3376 assert_eq!(
3377 chunk,
3378 StreamChunk::from_pretty(
3379 " I I I I
3380 + 3 8 . . D
3381 - 3 8 . . D"
3382 )
3383 );
3384
3385 tx_r.push_chunk(chunk_r1);
3387 let chunk = hash_join.next_unwrap_ready_chunk()?;
3388 assert_eq!(
3389 chunk,
3390 StreamChunk::from_pretty(
3391 " I I I I
3392 - 2 5 . .
3393 + 2 5 2 7
3394 + . . 4 8
3395 + . . 6 9"
3396 )
3397 );
3398
3399 tx_r.push_chunk(chunk_r2);
3401 let chunk = hash_join.next_unwrap_ready_chunk()?;
3402 assert_eq!(
3403 chunk,
3404 StreamChunk::from_pretty(
3405 " I I I I
3406 + . . 5 10 D
3407 - . . 5 10 D"
3408 )
3409 );
3410
3411 Ok(())
3412 }
3413
3414 #[tokio::test]
3415 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3416 let (mut tx_l, mut tx_r, mut hash_join) =
3417 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3418
3419 tx_l.push_barrier(test_epoch(1), false);
3421 tx_r.push_barrier(test_epoch(1), false);
3422 hash_join.next_unwrap_ready_barrier()?;
3423
3424 tx_l.push_chunk(StreamChunk::from_pretty(
3425 " I I
3426 + 1 1
3427 ",
3428 ));
3429 let chunk = hash_join.next_unwrap_ready_chunk()?;
3430 assert_eq!(
3431 chunk,
3432 StreamChunk::from_pretty(
3433 " I I I I
3434 + 1 1 . ."
3435 )
3436 );
3437
3438 tx_r.push_chunk(StreamChunk::from_pretty(
3439 " I I
3440 + 1 1
3441 ",
3442 ));
3443 let chunk = hash_join.next_unwrap_ready_chunk()?;
3444
3445 assert_eq!(
3446 chunk,
3447 StreamChunk::from_pretty(
3448 " I I I I
3449 - 1 1 . .
3450 + 1 1 1 1"
3451 )
3452 );
3453
3454 tx_l.push_chunk(StreamChunk::from_pretty(
3455 " I I
3456 - 1 1
3457 + 1 2
3458 ",
3459 ));
3460 let chunk = hash_join.next_unwrap_ready_chunk()?;
3461 let chunk = chunk.compact_vis();
3462 assert_eq!(
3463 chunk,
3464 StreamChunk::from_pretty(
3465 " I I I I
3466 - 1 1 1 1
3467 + 1 2 1 1
3468 "
3469 )
3470 );
3471
3472 Ok(())
3473 }
3474
3475 #[tokio::test]
3476 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3477 {
3478 let chunk_l1 = StreamChunk::from_pretty(
3479 " I I
3480 + 1 4
3481 + 2 5
3482 + 3 6
3483 + 3 7",
3484 );
3485 let chunk_l2 = StreamChunk::from_pretty(
3486 " I I
3487 + 3 8
3488 - 3 8
3489 - 1 4", );
3491 let chunk_r1 = StreamChunk::from_pretty(
3492 " I I
3493 + 2 6
3494 + 4 8
3495 + 3 4",
3496 );
3497 let chunk_r2 = StreamChunk::from_pretty(
3498 " I I
3499 + 5 10
3500 - 5 10
3501 + 1 2",
3502 );
3503 let (mut tx_l, mut tx_r, mut hash_join) =
3504 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3505
3506 tx_l.push_barrier(test_epoch(1), false);
3508 tx_r.push_barrier(test_epoch(1), false);
3509 hash_join.next_unwrap_ready_barrier()?;
3510
3511 tx_l.push_chunk(chunk_l1);
3513 let chunk = hash_join.next_unwrap_ready_chunk()?;
3514 assert_eq!(
3515 chunk,
3516 StreamChunk::from_pretty(
3517 " I I I I
3518 + 1 4 . .
3519 + 2 5 . .
3520 + 3 6 . .
3521 + 3 7 . ."
3522 )
3523 );
3524
3525 tx_l.push_chunk(chunk_l2);
3527 let chunk = hash_join.next_unwrap_ready_chunk()?;
3528 assert_eq!(
3529 chunk,
3530 StreamChunk::from_pretty(
3531 " I I I I
3532 + 3 8 . . D
3533 - 3 8 . . D
3534 - 1 4 . ."
3535 )
3536 );
3537
3538 tx_r.push_chunk(chunk_r1);
3540 let chunk = hash_join.next_unwrap_ready_chunk()?;
3541 assert_eq!(
3542 chunk,
3543 StreamChunk::from_pretty(
3544 " I I I I
3545 - 2 5 . .
3546 + 2 5 2 6
3547 + . . 4 8
3548 + . . 3 4" )
3552 );
3553
3554 tx_r.push_chunk(chunk_r2);
3556 let chunk = hash_join.next_unwrap_ready_chunk()?;
3557 assert_eq!(
3558 chunk,
3559 StreamChunk::from_pretty(
3560 " I I I I
3561 + . . 5 10 D
3562 - . . 5 10 D
3563 + . . 1 2" )
3566 );
3567
3568 Ok(())
3569 }
3570
3571 #[tokio::test]
3572 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3573 let chunk_l1 = StreamChunk::from_pretty(
3574 " I I
3575 + 1 4
3576 + 2 10
3577 + 3 6",
3578 );
3579 let chunk_l2 = StreamChunk::from_pretty(
3580 " I I
3581 + 3 8
3582 - 3 8",
3583 );
3584 let chunk_r1 = StreamChunk::from_pretty(
3585 " I I
3586 + 2 7
3587 + 4 8
3588 + 6 9",
3589 );
3590 let chunk_r2 = StreamChunk::from_pretty(
3591 " I I
3592 + 3 10
3593 + 6 11",
3594 );
3595 let (mut tx_l, mut tx_r, mut hash_join) =
3596 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3597
3598 tx_l.push_barrier(test_epoch(1), false);
3600 tx_r.push_barrier(test_epoch(1), false);
3601 hash_join.next_unwrap_ready_barrier()?;
3602
3603 tx_l.push_chunk(chunk_l1);
3605 hash_join.next_unwrap_pending();
3606
3607 tx_l.push_chunk(chunk_l2);
3609 hash_join.next_unwrap_pending();
3610
3611 tx_r.push_chunk(chunk_r1);
3613 hash_join.next_unwrap_pending();
3614
3615 tx_r.push_chunk(chunk_r2);
3617 let chunk = hash_join.next_unwrap_ready_chunk()?;
3618 assert_eq!(
3619 chunk,
3620 StreamChunk::from_pretty(
3621 " I I I I
3622 + 3 6 3 10"
3623 )
3624 );
3625
3626 Ok(())
3627 }
3628
3629 #[tokio::test]
3630 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3631 let (mut tx_l, mut tx_r, mut hash_join) =
3632 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3633
3634 tx_l.push_barrier(test_epoch(1), false);
3636 tx_r.push_barrier(test_epoch(1), false);
3637 hash_join.next_unwrap_ready_barrier()?;
3638
3639 tx_l.push_int64_watermark(0, 100);
3640
3641 tx_l.push_int64_watermark(0, 200);
3642
3643 tx_l.push_barrier(test_epoch(2), false);
3644 tx_r.push_barrier(test_epoch(2), false);
3645 hash_join.next_unwrap_ready_barrier()?;
3646
3647 tx_r.push_int64_watermark(0, 50);
3648
3649 let w1 = hash_join.next().await.unwrap().unwrap();
3650 let w1 = w1.as_watermark().unwrap();
3651
3652 let w2 = hash_join.next().await.unwrap().unwrap();
3653 let w2 = w2.as_watermark().unwrap();
3654
3655 tx_r.push_int64_watermark(0, 100);
3656
3657 let w3 = hash_join.next().await.unwrap().unwrap();
3658 let w3 = w3.as_watermark().unwrap();
3659
3660 let w4 = hash_join.next().await.unwrap().unwrap();
3661 let w4 = w4.as_watermark().unwrap();
3662
3663 assert_eq!(
3664 w1,
3665 &Watermark {
3666 col_idx: 2,
3667 data_type: DataType::Int64,
3668 val: ScalarImpl::Int64(50)
3669 }
3670 );
3671
3672 assert_eq!(
3673 w2,
3674 &Watermark {
3675 col_idx: 0,
3676 data_type: DataType::Int64,
3677 val: ScalarImpl::Int64(50)
3678 }
3679 );
3680
3681 assert_eq!(
3682 w3,
3683 &Watermark {
3684 col_idx: 2,
3685 data_type: DataType::Int64,
3686 val: ScalarImpl::Int64(100)
3687 }
3688 );
3689
3690 assert_eq!(
3691 w4,
3692 &Watermark {
3693 col_idx: 0,
3694 data_type: DataType::Int64,
3695 val: ScalarImpl::Int64(100)
3696 }
3697 );
3698
3699 Ok(())
3700 }
3701}