1use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::expr::NonStrictExpression;
31use risingwave_pb::stream_plan::InequalityType;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45const EVICT_EVERY_N_ROWS: u32 = 16;
47
48fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
49 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
50}
51
52#[derive(Debug, Clone)]
55pub struct InequalityPairInfo {
56 pub left_idx: usize,
58 pub right_idx: usize,
60 pub clean_left_state: bool,
62 pub clean_right_state: bool,
64 pub op: InequalityType,
66}
67
68impl InequalityPairInfo {
69 pub fn left_side_is_larger(&self) -> bool {
73 matches!(
74 self.op,
75 InequalityType::GreaterThan | InequalityType::GreaterThanOrEqual
76 )
77 }
78}
79
80pub struct JoinParams {
81 pub join_key_indices: Vec<usize>,
83 pub deduped_pk_indices: Vec<usize>,
85}
86
87impl JoinParams {
88 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
89 Self {
90 join_key_indices,
91 deduped_pk_indices,
92 }
93 }
94}
95
96struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
97 ht: JoinHashMap<K, S, E>,
99 join_key_indices: Vec<usize>,
101 all_data_types: Vec<DataType>,
103 start_pos: usize,
105 i2o_mapping: Vec<(usize, usize)>,
107 i2o_mapping_indexed: MultiMap<usize, usize>,
108 input2inequality_index: Vec<Vec<(usize, bool)>>,
116 non_null_fields: Vec<usize>,
118 state_clean_columns: Vec<(usize, usize)>,
121 need_degree_table: bool,
123 _marker: std::marker::PhantomData<E>,
124}
125
126impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("JoinSide")
129 .field("join_key_indices", &self.join_key_indices)
130 .field("col_types", &self.all_data_types)
131 .field("start_pos", &self.start_pos)
132 .field("i2o_mapping", &self.i2o_mapping)
133 .field("need_degree_table", &self.need_degree_table)
134 .finish()
135 }
136}
137
138impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
139 fn is_dirty(&self) -> bool {
141 unimplemented!()
142 }
143
144 #[expect(dead_code)]
145 fn clear_cache(&mut self) {
146 assert!(
147 !self.is_dirty(),
148 "cannot clear cache while states of hash join are dirty"
149 );
150
151 }
154
155 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
156 self.ht.init(epoch).await
157 }
158}
159
160pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
163{
164 ctx: ActorContextRef,
165 info: ExecutorInfo,
166
167 input_l: Option<Executor>,
169 input_r: Option<Executor>,
171 actual_output_data_types: Vec<DataType>,
173 side_l: JoinSide<K, S, E>,
175 side_r: JoinSide<K, S, E>,
177 cond: Option<NonStrictExpression>,
179 inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)>,
182 inequality_watermarks: Vec<Option<Watermark>>,
186
187 append_only_optimize: bool,
189
190 metrics: Arc<StreamingMetrics>,
191 chunk_size: usize,
193 cnt_rows_received: u32,
195
196 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
198
199 high_join_amplification_threshold: usize,
201
202 entry_state_max_rows: usize,
204}
205
206impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
207 for HashJoinExecutor<K, S, T, E>
208{
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 f.debug_struct("HashJoinExecutor")
211 .field("join_type", &T)
212 .field("input_left", &self.input_l.as_ref().unwrap().identity())
213 .field("input_right", &self.input_r.as_ref().unwrap().identity())
214 .field("side_l", &self.side_l)
215 .field("side_r", &self.side_r)
216 .field("stream_key", &self.info.stream_key)
217 .field("schema", &self.info.schema)
218 .field("actual_output_data_types", &self.actual_output_data_types)
219 .finish()
220 }
221}
222
223impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
224 for HashJoinExecutor<K, S, T, E>
225{
226 fn execute(self: Box<Self>) -> BoxedMessageStream {
227 self.into_stream().boxed()
228 }
229}
230
231struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
232 ctx: &'a ActorContextRef,
233 side_l: &'a mut JoinSide<K, S, E>,
234 side_r: &'a mut JoinSide<K, S, E>,
235 actual_output_data_types: &'a [DataType],
236 cond: &'a mut Option<NonStrictExpression>,
237 inequality_watermarks: &'a [Option<Watermark>],
238 chunk: StreamChunk,
239 append_only_optimize: bool,
240 chunk_size: usize,
241 cnt_rows_received: &'a mut u32,
242 high_join_amplification_threshold: usize,
243 entry_state_max_rows: usize,
244}
245
246impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
247 HashJoinExecutor<K, S, T, E>
248{
249 #[allow(clippy::too_many_arguments)]
250 pub fn new(
251 ctx: ActorContextRef,
252 info: ExecutorInfo,
253 input_l: Executor,
254 input_r: Executor,
255 params_l: JoinParams,
256 params_r: JoinParams,
257 null_safe: Vec<bool>,
258 output_indices: Vec<usize>,
259 cond: Option<NonStrictExpression>,
260 inequality_pairs: Vec<InequalityPairInfo>,
261 state_table_l: StateTable<S>,
262 degree_state_table_l: StateTable<S>,
263 state_table_r: StateTable<S>,
264 degree_state_table_r: StateTable<S>,
265 watermark_epoch: AtomicU64Ref,
266 is_append_only: bool,
267 metrics: Arc<StreamingMetrics>,
268 chunk_size: usize,
269 high_join_amplification_threshold: usize,
270 ) -> Self {
271 Self::new_with_cache_size(
272 ctx,
273 info,
274 input_l,
275 input_r,
276 params_l,
277 params_r,
278 null_safe,
279 output_indices,
280 cond,
281 inequality_pairs,
282 state_table_l,
283 degree_state_table_l,
284 state_table_r,
285 degree_state_table_r,
286 watermark_epoch,
287 is_append_only,
288 metrics,
289 chunk_size,
290 high_join_amplification_threshold,
291 None,
292 )
293 }
294
295 #[allow(clippy::too_many_arguments)]
296 pub fn new_with_cache_size(
297 ctx: ActorContextRef,
298 info: ExecutorInfo,
299 input_l: Executor,
300 input_r: Executor,
301 params_l: JoinParams,
302 params_r: JoinParams,
303 null_safe: Vec<bool>,
304 output_indices: Vec<usize>,
305 cond: Option<NonStrictExpression>,
306 inequality_pairs: Vec<InequalityPairInfo>,
307 state_table_l: StateTable<S>,
308 degree_state_table_l: StateTable<S>,
309 state_table_r: StateTable<S>,
310 degree_state_table_r: StateTable<S>,
311 watermark_epoch: AtomicU64Ref,
312 is_append_only: bool,
313 metrics: Arc<StreamingMetrics>,
314 chunk_size: usize,
315 high_join_amplification_threshold: usize,
316 entry_state_max_rows: Option<usize>,
317 ) -> Self {
318 let entry_state_max_rows = match entry_state_max_rows {
319 None => ctx.config.developer.hash_join_entry_state_max_rows,
320 Some(entry_state_max_rows) => entry_state_max_rows,
321 };
322 let side_l_column_n = input_l.schema().len();
323
324 let schema_fields = match T {
325 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
326 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
327 _ => [
328 input_l.schema().fields.clone(),
329 input_r.schema().fields.clone(),
330 ]
331 .concat(),
332 };
333
334 let original_output_data_types = schema_fields
335 .iter()
336 .map(|field| field.data_type())
337 .collect_vec();
338 let actual_output_data_types = output_indices
339 .iter()
340 .map(|&idx| original_output_data_types[idx].clone())
341 .collect_vec();
342
343 let state_all_data_types_l = input_l.schema().data_types();
345 let state_all_data_types_r = input_r.schema().data_types();
346
347 let state_pk_indices_l = input_l.stream_key().to_vec();
348 let state_pk_indices_r = input_r.stream_key().to_vec();
349
350 let state_join_key_indices_l = params_l.join_key_indices;
351 let state_join_key_indices_r = params_r.join_key_indices;
352
353 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
354 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
355
356 let degree_pk_indices_l = (state_join_key_indices_l.len()
357 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
358 .collect_vec();
359 let degree_pk_indices_r = (state_join_key_indices_r.len()
360 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
361 .collect_vec();
362
363 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
365 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
366
367 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
369
370 let join_key_data_types_l = state_join_key_indices_l
371 .iter()
372 .map(|idx| state_all_data_types_l[*idx].clone())
373 .collect_vec();
374
375 let join_key_data_types_r = state_join_key_indices_r
376 .iter()
377 .map(|idx| state_all_data_types_r[*idx].clone())
378 .collect_vec();
379
380 assert_eq!(join_key_data_types_l, join_key_data_types_r);
381
382 let null_matched = K::Bitmap::from_bool_vec(null_safe);
383
384 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
385 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
386
387 let degree_state_l = need_degree_table_l.then(|| {
388 TableInner::new(
389 degree_pk_indices_l,
390 degree_join_key_indices_l,
391 degree_state_table_l,
392 )
393 });
394 let degree_state_r = need_degree_table_r.then(|| {
395 TableInner::new(
396 degree_pk_indices_r,
397 degree_join_key_indices_r,
398 degree_state_table_r,
399 )
400 });
401
402 let (left_to_output, right_to_output) = {
403 let (left_len, right_len) = if is_left_semi_or_anti(T) {
404 (state_all_data_types_l.len(), 0usize)
405 } else if is_right_semi_or_anti(T) {
406 (0usize, state_all_data_types_r.len())
407 } else {
408 (state_all_data_types_l.len(), state_all_data_types_r.len())
409 };
410 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
411 };
412
413 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
414 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
415
416 let left_input_len = input_l.schema().len();
417 let right_input_len = input_r.schema().len();
418 let mut l2inequality_index = vec![vec![]; left_input_len];
419 let mut r2inequality_index = vec![vec![]; right_input_len];
420 let mut l_state_clean_columns = vec![];
421 let mut r_state_clean_columns = vec![];
422 let inequality_pairs: Vec<(Vec<usize>, InequalityPairInfo)> = inequality_pairs
423 .into_iter()
424 .enumerate()
425 .map(|(index, pair)| {
426 let left_is_larger = pair.left_side_is_larger();
432 l2inequality_index[pair.left_idx].push((index, !left_is_larger));
433 r2inequality_index[pair.right_idx].push((index, left_is_larger));
434
435 if pair.clean_left_state {
437 l_state_clean_columns.push((pair.left_idx, index));
438 }
439 if pair.clean_right_state {
440 r_state_clean_columns.push((pair.right_idx, index));
441 }
442
443 let output_indices = if pair.left_side_is_larger() {
446 l2o_indexed
448 .get_vec(&pair.left_idx)
449 .cloned()
450 .unwrap_or_default()
451 } else {
452 r2o_indexed
454 .get_vec(&pair.right_idx)
455 .cloned()
456 .unwrap_or_default()
457 };
458
459 (output_indices, pair)
460 })
461 .collect_vec();
462
463 let mut l_non_null_fields = l2inequality_index
464 .iter()
465 .positions(|inequalities| !inequalities.is_empty())
466 .collect_vec();
467 let mut r_non_null_fields = r2inequality_index
468 .iter()
469 .positions(|inequalities| !inequalities.is_empty())
470 .collect_vec();
471
472 if append_only_optimize {
473 l_state_clean_columns.clear();
474 r_state_clean_columns.clear();
475 l_non_null_fields.clear();
476 r_non_null_fields.clear();
477 }
478
479 let inequality_watermarks = vec![None; inequality_pairs.len()];
480 let watermark_buffers = BTreeMap::new();
481
482 Self {
483 ctx: ctx.clone(),
484 info,
485 input_l: Some(input_l),
486 input_r: Some(input_r),
487 actual_output_data_types,
488 side_l: JoinSide {
489 ht: JoinHashMap::new(
490 watermark_epoch.clone(),
491 join_key_data_types_l,
492 state_join_key_indices_l.clone(),
493 state_all_data_types_l.clone(),
494 state_table_l,
495 params_l.deduped_pk_indices,
496 degree_state_l,
497 null_matched.clone(),
498 pk_contained_in_jk_l,
499 None,
500 metrics.clone(),
501 ctx.id,
502 ctx.fragment_id,
503 "left",
504 ),
505 join_key_indices: state_join_key_indices_l,
506 all_data_types: state_all_data_types_l,
507 i2o_mapping: left_to_output,
508 i2o_mapping_indexed: l2o_indexed,
509 input2inequality_index: l2inequality_index,
510 non_null_fields: l_non_null_fields,
511 state_clean_columns: l_state_clean_columns,
512 start_pos: 0,
513 need_degree_table: need_degree_table_l,
514 _marker: PhantomData,
515 },
516 side_r: JoinSide {
517 ht: JoinHashMap::new(
518 watermark_epoch,
519 join_key_data_types_r,
520 state_join_key_indices_r.clone(),
521 state_all_data_types_r.clone(),
522 state_table_r,
523 params_r.deduped_pk_indices,
524 degree_state_r,
525 null_matched,
526 pk_contained_in_jk_r,
527 None,
528 metrics.clone(),
529 ctx.id,
530 ctx.fragment_id,
531 "right",
532 ),
533 join_key_indices: state_join_key_indices_r,
534 all_data_types: state_all_data_types_r,
535 start_pos: side_l_column_n,
536 i2o_mapping: right_to_output,
537 i2o_mapping_indexed: r2o_indexed,
538 input2inequality_index: r2inequality_index,
539 non_null_fields: r_non_null_fields,
540 state_clean_columns: r_state_clean_columns,
541 need_degree_table: need_degree_table_r,
542 _marker: PhantomData,
543 },
544 cond,
545 inequality_pairs,
546 inequality_watermarks,
547 append_only_optimize,
548 metrics,
549 chunk_size,
550 cnt_rows_received: 0,
551 watermark_buffers,
552 high_join_amplification_threshold,
553 entry_state_max_rows,
554 }
555 }
556
557 #[try_stream(ok = Message, error = StreamExecutorError)]
558 async fn into_stream(mut self) {
559 let input_l = self.input_l.take().unwrap();
560 let input_r = self.input_r.take().unwrap();
561 let aligned_stream = barrier_align(
562 input_l.execute(),
563 input_r.execute(),
564 self.ctx.id,
565 self.ctx.fragment_id,
566 self.metrics.clone(),
567 "Join",
568 );
569 pin_mut!(aligned_stream);
570
571 let actor_id = self.ctx.id;
572
573 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
574 let first_epoch = barrier.epoch;
575 yield Message::Barrier(barrier);
577 self.side_l.init(first_epoch).await?;
578 self.side_r.init(first_epoch).await?;
579
580 let actor_id_str = self.ctx.id.to_string();
581 let fragment_id_str = self.ctx.fragment_id.to_string();
582
583 let join_actor_input_waiting_duration_ns = self
585 .metrics
586 .join_actor_input_waiting_duration_ns
587 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
588 let left_join_match_duration_ns = self
589 .metrics
590 .join_match_duration_ns
591 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
592 let right_join_match_duration_ns = self
593 .metrics
594 .join_match_duration_ns
595 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
596
597 let barrier_join_match_duration_ns = self
598 .metrics
599 .join_match_duration_ns
600 .with_guarded_label_values(&[
601 actor_id_str.as_str(),
602 fragment_id_str.as_str(),
603 "barrier",
604 ]);
605
606 let left_join_cached_entry_count = self
607 .metrics
608 .join_cached_entry_count
609 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
610
611 let right_join_cached_entry_count = self
612 .metrics
613 .join_cached_entry_count
614 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
615
616 let mut start_time = Instant::now();
617
618 while let Some(msg) = aligned_stream
619 .next()
620 .instrument_await("hash_join_barrier_align")
621 .await
622 {
623 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
624 match msg? {
625 AlignedMessage::WatermarkLeft(watermark) => {
626 for watermark_to_emit in self.handle_watermark(SideType::Left, watermark)? {
627 yield Message::Watermark(watermark_to_emit);
628 }
629 }
630 AlignedMessage::WatermarkRight(watermark) => {
631 for watermark_to_emit in self.handle_watermark(SideType::Right, watermark)? {
632 yield Message::Watermark(watermark_to_emit);
633 }
634 }
635 AlignedMessage::Left(chunk) => {
636 let mut left_time = Duration::from_nanos(0);
637 let mut left_start_time = Instant::now();
638 #[for_await]
639 for chunk in Self::eq_join_left(EqJoinArgs {
640 ctx: &self.ctx,
641 side_l: &mut self.side_l,
642 side_r: &mut self.side_r,
643 actual_output_data_types: &self.actual_output_data_types,
644 cond: &mut self.cond,
645 inequality_watermarks: &self.inequality_watermarks,
646 chunk,
647 append_only_optimize: self.append_only_optimize,
648 chunk_size: self.chunk_size,
649 cnt_rows_received: &mut self.cnt_rows_received,
650 high_join_amplification_threshold: self.high_join_amplification_threshold,
651 entry_state_max_rows: self.entry_state_max_rows,
652 }) {
653 left_time += left_start_time.elapsed();
654 yield Message::Chunk(chunk?);
655 left_start_time = Instant::now();
656 }
657 left_time += left_start_time.elapsed();
658 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
659 self.try_flush_data().await?;
660 }
661 AlignedMessage::Right(chunk) => {
662 let mut right_time = Duration::from_nanos(0);
663 let mut right_start_time = Instant::now();
664 #[for_await]
665 for chunk in Self::eq_join_right(EqJoinArgs {
666 ctx: &self.ctx,
667 side_l: &mut self.side_l,
668 side_r: &mut self.side_r,
669 actual_output_data_types: &self.actual_output_data_types,
670 cond: &mut self.cond,
671 inequality_watermarks: &self.inequality_watermarks,
672 chunk,
673 append_only_optimize: self.append_only_optimize,
674 chunk_size: self.chunk_size,
675 cnt_rows_received: &mut self.cnt_rows_received,
676 high_join_amplification_threshold: self.high_join_amplification_threshold,
677 entry_state_max_rows: self.entry_state_max_rows,
678 }) {
679 right_time += right_start_time.elapsed();
680 yield Message::Chunk(chunk?);
681 right_start_time = Instant::now();
682 }
683 right_time += right_start_time.elapsed();
684 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
685 self.try_flush_data().await?;
686 }
687 AlignedMessage::Barrier(barrier) => {
688 let barrier_start_time = Instant::now();
689 let (left_post_commit, right_post_commit) =
690 self.flush_data(barrier.epoch).await?;
691
692 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
693
694 barrier_join_match_duration_ns
697 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
698 yield Message::Barrier(barrier);
699
700 right_post_commit
702 .post_yield_barrier(update_vnode_bitmap.clone())
703 .await?;
704 if left_post_commit
705 .post_yield_barrier(update_vnode_bitmap)
706 .await?
707 .unwrap_or(false)
708 {
709 self.watermark_buffers
710 .values_mut()
711 .for_each(|buffers| buffers.clear());
712 self.inequality_watermarks.fill(None);
713 }
714
715 for (join_cached_entry_count, ht) in [
717 (&left_join_cached_entry_count, &self.side_l.ht),
718 (&right_join_cached_entry_count, &self.side_r.ht),
719 ] {
720 join_cached_entry_count.set(ht.entry_count() as i64);
721 }
722 }
723 }
724 start_time = Instant::now();
725 }
726 }
727
728 async fn flush_data(
729 &mut self,
730 epoch: EpochPair,
731 ) -> StreamExecutorResult<(
732 JoinHashMapPostCommit<'_, K, S, E>,
733 JoinHashMapPostCommit<'_, K, S, E>,
734 )> {
735 let left = self.side_l.ht.flush(epoch).await?;
738 let right = self.side_r.ht.flush(epoch).await?;
739 Ok((left, right))
740 }
741
742 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
743 self.side_l.ht.try_flush().await?;
746 self.side_r.ht.try_flush().await?;
747 Ok(())
748 }
749
750 fn evict_cache(
752 side_update: &mut JoinSide<K, S, E>,
753 side_match: &mut JoinSide<K, S, E>,
754 cnt_rows_received: &mut u32,
755 ) {
756 *cnt_rows_received += 1;
757 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
758 side_update.ht.evict();
759 side_match.ht.evict();
760 *cnt_rows_received = 0;
761 }
762 }
763
764 fn handle_watermark(
765 &mut self,
766 side: SideTypePrimitive,
767 watermark: Watermark,
768 ) -> StreamExecutorResult<Vec<Watermark>> {
769 let (side_update, side_match) = if side == SideType::Left {
770 (&mut self.side_l, &mut self.side_r)
771 } else {
772 (&mut self.side_r, &mut self.side_l)
773 };
774
775 if side_update.join_key_indices[0] == watermark.col_idx {
777 side_match.ht.update_watermark(watermark.val.clone());
778 }
779
780 let wm_in_jk = side_update
782 .join_key_indices
783 .iter()
784 .positions(|idx| *idx == watermark.col_idx);
785 let mut watermarks_to_emit = vec![];
786 for idx in wm_in_jk {
787 let buffers = self
788 .watermark_buffers
789 .entry(idx)
790 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
791 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
792 let empty_indices = vec![];
793 let output_indices = side_update
794 .i2o_mapping_indexed
795 .get_vec(&side_update.join_key_indices[idx])
796 .unwrap_or(&empty_indices)
797 .iter()
798 .chain(
799 side_match
800 .i2o_mapping_indexed
801 .get_vec(&side_match.join_key_indices[idx])
802 .unwrap_or(&empty_indices),
803 );
804 for output_idx in output_indices {
805 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
806 }
807 };
808 }
809 let mut update_left_watermark = None;
814 let mut update_right_watermark = None;
815 if let Some(watermark_indices) = side_update.input2inequality_index.get(watermark.col_idx) {
816 for (inequality_index, _) in watermark_indices {
817 let buffers = self
818 .watermark_buffers
819 .entry(side_update.join_key_indices.len() + inequality_index)
820 .or_insert_with(|| {
821 BufferedWatermarks::with_ids([SideType::Left, SideType::Right])
822 });
823 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone())
824 {
825 let (output_indices, pair_info) = &self.inequality_pairs[*inequality_index];
826 let left_is_larger = pair_info.left_side_is_larger();
827
828 for output_idx in output_indices {
830 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
831 }
832 self.inequality_watermarks[*inequality_index] =
834 Some(selected_watermark.clone());
835
836 if left_is_larger && pair_info.clean_left_state {
838 update_left_watermark = Some(selected_watermark.val.clone());
839 } else if !left_is_larger && pair_info.clean_right_state {
840 update_right_watermark = Some(selected_watermark.val.clone());
841 }
842 }
843 }
844 if let Some(_val) = update_left_watermark {
847 }
849 if let Some(_val) = update_right_watermark {
850 }
852 }
853 Ok(watermarks_to_emit)
854 }
855
856 fn row_concat(
857 row_update: impl Row,
858 update_start_pos: usize,
859 row_matched: impl Row,
860 matched_start_pos: usize,
861 ) -> OwnedRow {
862 let mut new_row = vec![None; row_update.len() + row_matched.len()];
863
864 for (i, datum_ref) in row_update.iter().enumerate() {
865 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
866 }
867 for (i, datum_ref) in row_matched.iter().enumerate() {
868 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
869 }
870 OwnedRow::new(new_row)
871 }
872
873 fn eq_join_left(
875 args: EqJoinArgs<'_, K, S, E>,
876 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
877 Self::eq_join_oneside::<{ SideType::Left }>(args)
878 }
879
880 fn eq_join_right(
882 args: EqJoinArgs<'_, K, S, E>,
883 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
884 Self::eq_join_oneside::<{ SideType::Right }>(args)
885 }
886
887 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
888 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
889 let EqJoinArgs {
890 ctx,
891 side_l,
892 side_r,
893 actual_output_data_types,
894 cond,
895 inequality_watermarks,
896 chunk,
897 append_only_optimize,
898 chunk_size,
899 cnt_rows_received,
900 high_join_amplification_threshold,
901 entry_state_max_rows,
902 ..
903 } = args;
904
905 let (side_update, side_match) = if SIDE == SideType::Left {
906 (side_l, side_r)
907 } else {
908 (side_r, side_l)
909 };
910
911 let useful_state_clean_columns = side_match
912 .state_clean_columns
913 .iter()
914 .filter_map(|(column_idx, inequality_index)| {
915 inequality_watermarks[*inequality_index]
916 .as_ref()
917 .map(|watermark| (*column_idx, watermark))
918 })
919 .collect_vec();
920
921 let mut hashjoin_chunk_builder =
922 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
923 chunk_size,
924 actual_output_data_types.to_vec(),
925 side_update.i2o_mapping.clone(),
926 side_match.i2o_mapping.clone(),
927 ));
928
929 let join_matched_join_keys = ctx
930 .streaming_metrics
931 .join_matched_join_keys
932 .with_guarded_label_values(&[
933 &ctx.id.to_string(),
934 &ctx.fragment_id.to_string(),
935 &side_update.ht.table_id().to_string(),
936 ]);
937
938 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
939 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
940 let Some((op, row)) = r else {
941 continue;
942 };
943 Self::evict_cache(side_update, side_match, cnt_rows_received);
944
945 let cache_lookup_result = {
946 let probe_non_null_requirement_satisfied = side_update
947 .non_null_fields
948 .iter()
949 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
950 let build_non_null_requirement_satisfied =
951 key.null_bitmap().is_subset(side_match.ht.null_matched());
952 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
953 side_match.ht.take_state_opt(key)
954 } else {
955 CacheResult::NeverMatch
956 }
957 };
958 let mut total_matches = 0;
959
960 macro_rules! match_rows {
961 ($op:ident) => {
962 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
963 cache_lookup_result,
964 row,
965 key,
966 &mut hashjoin_chunk_builder,
967 side_match,
968 side_update,
969 &useful_state_clean_columns,
970 cond,
971 append_only_optimize,
972 entry_state_max_rows,
973 )
974 };
975 }
976
977 match op {
978 Op::Insert | Op::UpdateInsert =>
979 {
980 #[for_await]
981 for chunk in match_rows!(Insert) {
982 let chunk = chunk?;
983 total_matches += chunk.cardinality();
984 yield chunk;
985 }
986 }
987 Op::Delete | Op::UpdateDelete =>
988 {
989 #[for_await]
990 for chunk in match_rows!(Delete) {
991 let chunk = chunk?;
992 total_matches += chunk.cardinality();
993 yield chunk;
994 }
995 }
996 };
997
998 join_matched_join_keys.observe(total_matches as _);
999 if total_matches > high_join_amplification_threshold {
1000 let join_key_data_types = side_update.ht.join_key_data_types();
1001 let key = key.deserialize(join_key_data_types)?;
1002 tracing::warn!(target: "high_join_amplification",
1003 matched_rows_len = total_matches,
1004 update_table_id = %side_update.ht.table_id(),
1005 match_table_id = %side_match.ht.table_id(),
1006 join_key = ?key,
1007 actor_id = %ctx.id,
1008 fragment_id = %ctx.fragment_id,
1009 "large rows matched for join key"
1010 );
1011 }
1012 }
1013 if let Some(chunk) = hashjoin_chunk_builder.take() {
1015 yield chunk;
1016 }
1017 }
1018
1019 #[allow(clippy::too_many_arguments)]
1028 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
1029 async fn handle_match_rows<
1030 'a,
1031 const SIDE: SideTypePrimitive,
1032 const JOIN_OP: JoinOpPrimitive,
1033 >(
1034 cached_lookup_result: CacheResult<E>,
1035 row: RowRef<'a>,
1036 key: &'a K,
1037 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1038 side_match: &'a mut JoinSide<K, S, E>,
1039 side_update: &'a mut JoinSide<K, S, E>,
1040 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1041 cond: &'a mut Option<NonStrictExpression>,
1042 append_only_optimize: bool,
1043 entry_state_max_rows: usize,
1044 ) {
1045 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1046 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1047 let mut entry_state_count = 0;
1048
1049 let mut degree = 0;
1050 let mut append_only_matched_row = None;
1051 let mut matched_rows_to_clean = vec![];
1052
1053 macro_rules! match_row {
1054 (
1055 $match_order_key_indices:expr,
1056 $degree_table:expr,
1057 $matched_row:expr,
1058 $matched_row_ref:expr,
1059 $from_cache:literal,
1060 $map_output:expr,
1061 ) => {
1062 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1063 row,
1064 $matched_row,
1065 $matched_row_ref,
1066 hashjoin_chunk_builder,
1067 $match_order_key_indices,
1068 $degree_table,
1069 side_update.start_pos,
1070 side_match.start_pos,
1071 cond,
1072 &mut degree,
1073 useful_state_clean_columns,
1074 append_only_optimize,
1075 &mut append_only_matched_row,
1076 &mut matched_rows_to_clean,
1077 $map_output,
1078 )
1079 };
1080 }
1081
1082 let entry_state = match cached_lookup_result {
1083 CacheResult::NeverMatch => {
1084 let op = match JOIN_OP {
1085 JoinOp::Insert => Op::Insert,
1086 JoinOp::Delete => Op::Delete,
1087 };
1088 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1089 yield chunk;
1090 }
1091 return Ok(());
1092 }
1093 CacheResult::Hit(mut cached_rows) => {
1094 let (match_order_key_indices, match_degree_state) =
1095 side_match.ht.get_degree_state_mut_ref();
1096 for (matched_row_ref, matched_row) in
1098 cached_rows.values_mut(&side_match.all_data_types)
1099 {
1100 let matched_row = matched_row?;
1101 if let Some(chunk) = match_row!(
1102 match_order_key_indices,
1103 match_degree_state,
1104 matched_row,
1105 Some(matched_row_ref),
1106 true,
1107 Either::Left,
1108 )
1109 .await
1110 {
1111 yield chunk;
1112 }
1113 }
1114
1115 cached_rows
1116 }
1117 CacheResult::Miss => {
1118 let (matched_rows, match_order_key_indices, degree_table) = side_match
1120 .ht
1121 .fetch_matched_rows_and_get_degree_table_ref(key)
1122 .await?;
1123
1124 #[for_await]
1125 for matched_row in matched_rows {
1126 let (encoded_pk, matched_row) = matched_row?;
1127
1128 let mut matched_row_ref = None;
1129
1130 if entry_state_count <= entry_state_max_rows {
1132 let row_ref = entry_state
1133 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1135 matched_row_ref = Some(row_ref);
1136 entry_state_count += 1;
1137 }
1138 if let Some(chunk) = match_row!(
1139 match_order_key_indices,
1140 degree_table,
1141 matched_row,
1142 matched_row_ref,
1143 false,
1144 Either::Right,
1145 )
1146 .await
1147 {
1148 yield chunk;
1149 }
1150 }
1151 Box::new(entry_state)
1152 }
1153 };
1154
1155 let op = match JOIN_OP {
1157 JoinOp::Insert => Op::Insert,
1158 JoinOp::Delete => Op::Delete,
1159 };
1160 if degree == 0 {
1161 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1162 yield chunk;
1163 }
1164 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1165 {
1166 yield chunk;
1167 }
1168
1169 if cache_hit || entry_state_count <= entry_state_max_rows {
1171 side_match.ht.update_state(key, entry_state);
1172 }
1173
1174 for matched_row in matched_rows_to_clean {
1176 side_match.ht.delete_handle_degree(key, matched_row)?;
1177 }
1178
1179 if append_only_optimize && let Some(row) = append_only_matched_row {
1181 assert_matches!(JOIN_OP, JoinOp::Insert);
1182 side_match.ht.delete_handle_degree(key, row)?;
1183 return Ok(());
1184 }
1185
1186 match JOIN_OP {
1188 JoinOp::Insert => {
1189 side_update
1190 .ht
1191 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1192 }
1193 JoinOp::Delete => {
1194 side_update
1195 .ht
1196 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1197 }
1198 }
1199 }
1200
1201 #[allow(clippy::too_many_arguments)]
1202 #[inline]
1203 async fn handle_match_row<
1204 'a,
1205 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1208 const JOIN_OP: JoinOpPrimitive,
1209 const MATCHED_ROWS_FROM_CACHE: bool,
1210 >(
1211 update_row: RowRef<'a>,
1212 mut matched_row: JoinRow<R>,
1213 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1214 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1215 match_order_key_indices: &[usize],
1216 match_degree_table: &mut Option<TableInner<S>>,
1217 side_update_start_pos: usize,
1218 side_match_start_pos: usize,
1219 cond: &Option<NonStrictExpression>,
1220 update_row_degree: &mut u64,
1221 useful_state_clean_columns: &[(usize, &'a Watermark)],
1222 append_only_optimize: bool,
1223 append_only_matched_row: &mut Option<JoinRow<RO>>,
1224 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1225 map_output: impl Fn(R) -> RO,
1226 ) -> Option<StreamChunk> {
1227 let mut need_state_clean = false;
1228 let mut chunk_opt = None;
1229
1230 let join_condition_satisfied = Self::check_join_condition(
1234 update_row,
1235 side_update_start_pos,
1236 &matched_row.row,
1237 side_match_start_pos,
1238 cond,
1239 )
1240 .await;
1241
1242 if join_condition_satisfied {
1243 *update_row_degree += 1;
1245 if matches!(JOIN_OP, JoinOp::Insert)
1250 && !forward_exactly_once(T, SIDE)
1251 && let Some(chunk) =
1252 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1253 {
1254 chunk_opt = Some(chunk);
1255 }
1256 if let Some(degree_table) = match_degree_table {
1258 update_degree::<S, { JOIN_OP }>(
1259 match_order_key_indices,
1260 degree_table,
1261 &mut matched_row,
1262 );
1263 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1264 match JOIN_OP {
1266 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1267 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1268 }
1269 }
1270 }
1271
1272 if matches!(JOIN_OP, JoinOp::Delete)
1275 && !forward_exactly_once(T, SIDE)
1276 && let Some(chunk) =
1277 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1278 {
1279 chunk_opt = Some(chunk);
1280 }
1281 } else {
1282 for (column_idx, watermark) in useful_state_clean_columns {
1284 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1285 scalar
1286 .default_cmp(&watermark.val.as_scalar_ref_impl())
1287 .is_lt()
1288 }) {
1289 need_state_clean = true;
1290 break;
1291 }
1292 }
1293 }
1294 if append_only_optimize {
1298 assert_matches!(JOIN_OP, JoinOp::Insert);
1299 assert!(append_only_matched_row.is_none());
1302 *append_only_matched_row = Some(matched_row.map(map_output));
1303 } else if need_state_clean {
1304 matched_rows_to_clean.push(matched_row.map(map_output));
1308 }
1309
1310 chunk_opt
1311 }
1312
1313 #[inline]
1318 async fn check_join_condition(
1319 row: impl Row,
1320 side_update_start_pos: usize,
1321 matched_row: impl Row,
1322 side_match_start_pos: usize,
1323 join_condition: &Option<NonStrictExpression>,
1324 ) -> bool {
1325 if let Some(join_condition) = join_condition {
1326 let new_row = Self::row_concat(
1327 row,
1328 side_update_start_pos,
1329 matched_row,
1330 side_match_start_pos,
1331 );
1332 join_condition
1333 .eval_row_infallible(&new_row)
1334 .await
1335 .map(|s| *s.as_bool())
1336 .unwrap_or(false)
1337 } else {
1338 true
1339 }
1340 }
1341}
1342
1343#[cfg(test)]
1344mod tests {
1345 use std::sync::atomic::AtomicU64;
1346
1347 use pretty_assertions::assert_eq;
1348 use risingwave_common::array::*;
1349 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1350 use risingwave_common::hash::{Key64, Key128};
1351 use risingwave_common::util::epoch::test_epoch;
1352 use risingwave_common::util::sort_util::OrderType;
1353 use risingwave_storage::memory::MemoryStateStore;
1354
1355 use super::*;
1356 use crate::common::table::test_utils::gen_pbtable;
1357 use crate::executor::MemoryEncoding;
1358 use crate::executor::test_utils::expr::build_from_pretty;
1359 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1360
1361 async fn create_in_memory_state_table(
1362 mem_state: MemoryStateStore,
1363 data_types: &[DataType],
1364 order_types: &[OrderType],
1365 pk_indices: &[usize],
1366 table_id: u32,
1367 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1368 let column_descs = data_types
1369 .iter()
1370 .enumerate()
1371 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1372 .collect_vec();
1373 let state_table = StateTable::from_table_catalog(
1374 &gen_pbtable(
1375 TableId::new(table_id),
1376 column_descs,
1377 order_types.to_vec(),
1378 pk_indices.to_vec(),
1379 0,
1380 ),
1381 mem_state.clone(),
1382 None,
1383 )
1384 .await;
1385
1386 let mut degree_table_column_descs = vec![];
1388 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1389 degree_table_column_descs.push(ColumnDesc::unnamed(
1390 ColumnId::new(pk_id as i32),
1391 data_types[*idx].clone(),
1392 ))
1393 });
1394 degree_table_column_descs.push(ColumnDesc::unnamed(
1395 ColumnId::new(pk_indices.len() as i32),
1396 DataType::Int64,
1397 ));
1398 let degree_state_table = StateTable::from_table_catalog(
1399 &gen_pbtable(
1400 TableId::new(table_id + 1),
1401 degree_table_column_descs,
1402 order_types.to_vec(),
1403 pk_indices.to_vec(),
1404 0,
1405 ),
1406 mem_state,
1407 None,
1408 )
1409 .await;
1410 (state_table, degree_state_table)
1411 }
1412
1413 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1414 build_from_pretty(
1415 condition_text
1416 .as_deref()
1417 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1418 )
1419 }
1420
1421 async fn create_executor<const T: JoinTypePrimitive>(
1422 with_condition: bool,
1423 null_safe: bool,
1424 condition_text: Option<String>,
1425 inequality_pairs: Vec<InequalityPairInfo>,
1426 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1427 let schema = Schema {
1428 fields: vec![
1429 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1431 ],
1432 };
1433 let (tx_l, source_l) = MockSource::channel();
1434 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1435 let (tx_r, source_r) = MockSource::channel();
1436 let source_r = source_r.into_executor(schema, vec![1]);
1437 let params_l = JoinParams::new(vec![0], vec![1]);
1438 let params_r = JoinParams::new(vec![0], vec![1]);
1439 let cond = with_condition.then(|| create_cond(condition_text));
1440
1441 let mem_state = MemoryStateStore::new();
1442
1443 let (state_l, degree_state_l) = create_in_memory_state_table(
1444 mem_state.clone(),
1445 &[DataType::Int64, DataType::Int64],
1446 &[OrderType::ascending(), OrderType::ascending()],
1447 &[0, 1],
1448 0,
1449 )
1450 .await;
1451
1452 let (state_r, degree_state_r) = create_in_memory_state_table(
1453 mem_state,
1454 &[DataType::Int64, DataType::Int64],
1455 &[OrderType::ascending(), OrderType::ascending()],
1456 &[0, 1],
1457 2,
1458 )
1459 .await;
1460
1461 let schema = match T {
1462 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1463 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1464 _ => [source_l.schema().fields(), source_r.schema().fields()]
1465 .concat()
1466 .into_iter()
1467 .collect(),
1468 };
1469 let schema_len = schema.len();
1470 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1471
1472 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1473 ActorContext::for_test(123),
1474 info,
1475 source_l,
1476 source_r,
1477 params_l,
1478 params_r,
1479 vec![null_safe],
1480 (0..schema_len).collect_vec(),
1481 cond,
1482 inequality_pairs,
1483 state_l,
1484 degree_state_l,
1485 state_r,
1486 degree_state_r,
1487 Arc::new(AtomicU64::new(0)),
1488 false,
1489 Arc::new(StreamingMetrics::unused()),
1490 1024,
1491 2048,
1492 );
1493 (tx_l, tx_r, executor.boxed().execute())
1494 }
1495
1496 async fn create_classical_executor<const T: JoinTypePrimitive>(
1497 with_condition: bool,
1498 null_safe: bool,
1499 condition_text: Option<String>,
1500 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1501 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1502 }
1503
1504 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1505 with_condition: bool,
1506 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1507 let schema = Schema {
1508 fields: vec![
1509 Field::unnamed(DataType::Int64),
1510 Field::unnamed(DataType::Int64),
1511 Field::unnamed(DataType::Int64),
1512 ],
1513 };
1514 let (tx_l, source_l) = MockSource::channel();
1515 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1516 let (tx_r, source_r) = MockSource::channel();
1517 let source_r = source_r.into_executor(schema, vec![0]);
1518 let params_l = JoinParams::new(vec![0, 1], vec![]);
1519 let params_r = JoinParams::new(vec![0, 1], vec![]);
1520 let cond = with_condition.then(|| create_cond(None));
1521
1522 let mem_state = MemoryStateStore::new();
1523
1524 let (state_l, degree_state_l) = create_in_memory_state_table(
1525 mem_state.clone(),
1526 &[DataType::Int64, DataType::Int64, DataType::Int64],
1527 &[
1528 OrderType::ascending(),
1529 OrderType::ascending(),
1530 OrderType::ascending(),
1531 ],
1532 &[0, 1, 0],
1533 0,
1534 )
1535 .await;
1536
1537 let (state_r, degree_state_r) = create_in_memory_state_table(
1538 mem_state,
1539 &[DataType::Int64, DataType::Int64, DataType::Int64],
1540 &[
1541 OrderType::ascending(),
1542 OrderType::ascending(),
1543 OrderType::ascending(),
1544 ],
1545 &[0, 1, 1],
1546 1,
1547 )
1548 .await;
1549
1550 let schema = match T {
1551 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1552 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1553 _ => [source_l.schema().fields(), source_r.schema().fields()]
1554 .concat()
1555 .into_iter()
1556 .collect(),
1557 };
1558 let schema_len = schema.len();
1559 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1560
1561 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1562 ActorContext::for_test(123),
1563 info,
1564 source_l,
1565 source_r,
1566 params_l,
1567 params_r,
1568 vec![false],
1569 (0..schema_len).collect_vec(),
1570 cond,
1571 vec![],
1572 state_l,
1573 degree_state_l,
1574 state_r,
1575 degree_state_r,
1576 Arc::new(AtomicU64::new(0)),
1577 true,
1578 Arc::new(StreamingMetrics::unused()),
1579 1024,
1580 2048,
1581 );
1582 (tx_l, tx_r, executor.boxed().execute())
1583 }
1584
1585 #[tokio::test]
1586 async fn test_inequality_join_watermark() -> StreamExecutorResult<()> {
1587 let chunk_l1 = StreamChunk::from_pretty(
1591 " I I
1592 + 2 4
1593 + 2 7
1594 + 3 8",
1595 );
1596 let chunk_r1 = StreamChunk::from_pretty(
1597 " I I
1598 + 2 6",
1599 );
1600 let chunk_r2 = StreamChunk::from_pretty(
1601 " I I
1602 + 2 3",
1603 );
1604 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1606 true,
1607 false,
1608 Some(String::from(
1609 "(greater_than_or_equal:boolean $1:int8 $3:int8)",
1610 )),
1611 vec![InequalityPairInfo {
1612 left_idx: 1,
1613 right_idx: 1,
1614 clean_left_state: true, clean_right_state: false,
1616 op: InequalityType::GreaterThanOrEqual,
1617 }],
1618 )
1619 .await;
1620
1621 tx_l.push_barrier(test_epoch(1), false);
1623 tx_r.push_barrier(test_epoch(1), false);
1624 hash_join.next_unwrap_ready_barrier()?;
1625
1626 tx_l.push_chunk(chunk_l1);
1628 hash_join.next_unwrap_pending();
1629
1630 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1633 hash_join.next_unwrap_pending();
1634
1635 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1636 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1637 assert_eq!(
1638 output_watermark,
1639 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(6))
1640 );
1641
1642 tx_r.push_chunk(chunk_r1);
1651 let chunk = hash_join.next_unwrap_ready_chunk()?;
1652 assert_eq!(
1653 chunk,
1654 StreamChunk::from_pretty(
1655 " I I I I
1656 + 2 7 2 6"
1657 )
1658 );
1659
1660 tx_r.push_chunk(chunk_r2);
1664 let chunk = hash_join.next_unwrap_ready_chunk()?;
1665 assert_eq!(
1666 chunk,
1667 StreamChunk::from_pretty(
1668 " I I I I
1669 + 2 7 2 3"
1670 )
1671 );
1672
1673 Ok(())
1674 }
1675
1676 #[tokio::test]
1677 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1678 let chunk_l1 = StreamChunk::from_pretty(
1679 " I I
1680 + 1 4
1681 + 2 5
1682 + 3 6",
1683 );
1684 let chunk_l2 = StreamChunk::from_pretty(
1685 " I I
1686 + 3 8
1687 - 3 8",
1688 );
1689 let chunk_r1 = StreamChunk::from_pretty(
1690 " I I
1691 + 2 7
1692 + 4 8
1693 + 6 9",
1694 );
1695 let chunk_r2 = StreamChunk::from_pretty(
1696 " I I
1697 + 3 10
1698 + 6 11",
1699 );
1700 let (mut tx_l, mut tx_r, mut hash_join) =
1701 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1702
1703 tx_l.push_barrier(test_epoch(1), false);
1705 tx_r.push_barrier(test_epoch(1), false);
1706 hash_join.next_unwrap_ready_barrier()?;
1707
1708 tx_l.push_chunk(chunk_l1);
1710 hash_join.next_unwrap_pending();
1711
1712 tx_l.push_barrier(test_epoch(2), false);
1714 tx_r.push_barrier(test_epoch(2), false);
1715 hash_join.next_unwrap_ready_barrier()?;
1716
1717 tx_l.push_chunk(chunk_l2);
1719 hash_join.next_unwrap_pending();
1720
1721 tx_r.push_chunk(chunk_r1);
1723 let chunk = hash_join.next_unwrap_ready_chunk()?;
1724 assert_eq!(
1725 chunk,
1726 StreamChunk::from_pretty(
1727 " I I I I
1728 + 2 5 2 7"
1729 )
1730 );
1731
1732 tx_r.push_chunk(chunk_r2);
1734 let chunk = hash_join.next_unwrap_ready_chunk()?;
1735 assert_eq!(
1736 chunk,
1737 StreamChunk::from_pretty(
1738 " I I I I
1739 + 3 6 3 10"
1740 )
1741 );
1742
1743 Ok(())
1744 }
1745
1746 #[tokio::test]
1747 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1748 let chunk_l1 = StreamChunk::from_pretty(
1749 " I I
1750 + 1 4
1751 + 2 5
1752 + . 6",
1753 );
1754 let chunk_l2 = StreamChunk::from_pretty(
1755 " I I
1756 + . 8
1757 - . 8",
1758 );
1759 let chunk_r1 = StreamChunk::from_pretty(
1760 " I I
1761 + 2 7
1762 + 4 8
1763 + 6 9",
1764 );
1765 let chunk_r2 = StreamChunk::from_pretty(
1766 " I I
1767 + . 10
1768 + 6 11",
1769 );
1770 let (mut tx_l, mut tx_r, mut hash_join) =
1771 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1772
1773 tx_l.push_barrier(test_epoch(1), false);
1775 tx_r.push_barrier(test_epoch(1), false);
1776 hash_join.next_unwrap_ready_barrier()?;
1777
1778 tx_l.push_chunk(chunk_l1);
1780 hash_join.next_unwrap_pending();
1781
1782 tx_l.push_barrier(test_epoch(2), false);
1784 tx_r.push_barrier(test_epoch(2), false);
1785 hash_join.next_unwrap_ready_barrier()?;
1786
1787 tx_l.push_chunk(chunk_l2);
1789 hash_join.next_unwrap_pending();
1790
1791 tx_r.push_chunk(chunk_r1);
1793 let chunk = hash_join.next_unwrap_ready_chunk()?;
1794 assert_eq!(
1795 chunk,
1796 StreamChunk::from_pretty(
1797 " I I I I
1798 + 2 5 2 7"
1799 )
1800 );
1801
1802 tx_r.push_chunk(chunk_r2);
1804 let chunk = hash_join.next_unwrap_ready_chunk()?;
1805 assert_eq!(
1806 chunk,
1807 StreamChunk::from_pretty(
1808 " I I I I
1809 + . 6 . 10"
1810 )
1811 );
1812
1813 Ok(())
1814 }
1815
1816 #[tokio::test]
1817 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1818 let chunk_l1 = StreamChunk::from_pretty(
1819 " I I
1820 + 1 4
1821 + 2 5
1822 + 3 6",
1823 );
1824 let chunk_l2 = StreamChunk::from_pretty(
1825 " I I
1826 + 3 8
1827 - 3 8",
1828 );
1829 let chunk_r1 = StreamChunk::from_pretty(
1830 " I I
1831 + 2 7
1832 + 4 8
1833 + 6 9",
1834 );
1835 let chunk_r2 = StreamChunk::from_pretty(
1836 " I I
1837 + 3 10
1838 + 6 11",
1839 );
1840 let chunk_l3 = StreamChunk::from_pretty(
1841 " I I
1842 + 6 10",
1843 );
1844 let chunk_r3 = StreamChunk::from_pretty(
1845 " I I
1846 - 6 11",
1847 );
1848 let chunk_r4 = StreamChunk::from_pretty(
1849 " I I
1850 - 6 9",
1851 );
1852 let (mut tx_l, mut tx_r, mut hash_join) =
1853 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1854
1855 tx_l.push_barrier(test_epoch(1), false);
1857 tx_r.push_barrier(test_epoch(1), false);
1858 hash_join.next_unwrap_ready_barrier()?;
1859
1860 tx_l.push_chunk(chunk_l1);
1862 hash_join.next_unwrap_pending();
1863
1864 tx_l.push_barrier(test_epoch(2), false);
1866 tx_r.push_barrier(test_epoch(2), false);
1867 hash_join.next_unwrap_ready_barrier()?;
1868
1869 tx_l.push_chunk(chunk_l2);
1871 hash_join.next_unwrap_pending();
1872
1873 tx_r.push_chunk(chunk_r1);
1875 let chunk = hash_join.next_unwrap_ready_chunk()?;
1876 assert_eq!(
1877 chunk,
1878 StreamChunk::from_pretty(
1879 " I I
1880 + 2 5"
1881 )
1882 );
1883
1884 tx_r.push_chunk(chunk_r2);
1886 let chunk = hash_join.next_unwrap_ready_chunk()?;
1887 assert_eq!(
1888 chunk,
1889 StreamChunk::from_pretty(
1890 " I I
1891 + 3 6"
1892 )
1893 );
1894
1895 tx_l.push_chunk(chunk_l3);
1897 let chunk = hash_join.next_unwrap_ready_chunk()?;
1898 assert_eq!(
1899 chunk,
1900 StreamChunk::from_pretty(
1901 " I I
1902 + 6 10"
1903 )
1904 );
1905
1906 tx_r.push_chunk(chunk_r3);
1909 hash_join.next_unwrap_pending();
1910
1911 tx_r.push_chunk(chunk_r4);
1914 let chunk = hash_join.next_unwrap_ready_chunk()?;
1915 assert_eq!(
1916 chunk,
1917 StreamChunk::from_pretty(
1918 " I I
1919 - 6 10"
1920 )
1921 );
1922
1923 Ok(())
1924 }
1925
1926 #[tokio::test]
1927 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1928 let chunk_l1 = StreamChunk::from_pretty(
1929 " I I
1930 + 1 4
1931 + 2 5
1932 + . 6",
1933 );
1934 let chunk_l2 = StreamChunk::from_pretty(
1935 " I I
1936 + . 8
1937 - . 8",
1938 );
1939 let chunk_r1 = StreamChunk::from_pretty(
1940 " I I
1941 + 2 7
1942 + 4 8
1943 + 6 9",
1944 );
1945 let chunk_r2 = StreamChunk::from_pretty(
1946 " I I
1947 + . 10
1948 + 6 11",
1949 );
1950 let chunk_l3 = StreamChunk::from_pretty(
1951 " I I
1952 + 6 10",
1953 );
1954 let chunk_r3 = StreamChunk::from_pretty(
1955 " I I
1956 - 6 11",
1957 );
1958 let chunk_r4 = StreamChunk::from_pretty(
1959 " I I
1960 - 6 9",
1961 );
1962 let (mut tx_l, mut tx_r, mut hash_join) =
1963 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1964
1965 tx_l.push_barrier(test_epoch(1), false);
1967 tx_r.push_barrier(test_epoch(1), false);
1968 hash_join.next_unwrap_ready_barrier()?;
1969
1970 tx_l.push_chunk(chunk_l1);
1972 hash_join.next_unwrap_pending();
1973
1974 tx_l.push_barrier(test_epoch(2), false);
1976 tx_r.push_barrier(test_epoch(2), false);
1977 hash_join.next_unwrap_ready_barrier()?;
1978
1979 tx_l.push_chunk(chunk_l2);
1981 hash_join.next_unwrap_pending();
1982
1983 tx_r.push_chunk(chunk_r1);
1985 let chunk = hash_join.next_unwrap_ready_chunk()?;
1986 assert_eq!(
1987 chunk,
1988 StreamChunk::from_pretty(
1989 " I I
1990 + 2 5"
1991 )
1992 );
1993
1994 tx_r.push_chunk(chunk_r2);
1996 let chunk = hash_join.next_unwrap_ready_chunk()?;
1997 assert_eq!(
1998 chunk,
1999 StreamChunk::from_pretty(
2000 " I I
2001 + . 6"
2002 )
2003 );
2004
2005 tx_l.push_chunk(chunk_l3);
2007 let chunk = hash_join.next_unwrap_ready_chunk()?;
2008 assert_eq!(
2009 chunk,
2010 StreamChunk::from_pretty(
2011 " I I
2012 + 6 10"
2013 )
2014 );
2015
2016 tx_r.push_chunk(chunk_r3);
2019 hash_join.next_unwrap_pending();
2020
2021 tx_r.push_chunk(chunk_r4);
2024 let chunk = hash_join.next_unwrap_ready_chunk()?;
2025 assert_eq!(
2026 chunk,
2027 StreamChunk::from_pretty(
2028 " I I
2029 - 6 10"
2030 )
2031 );
2032
2033 Ok(())
2034 }
2035
2036 #[tokio::test]
2037 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2038 let chunk_l1 = StreamChunk::from_pretty(
2039 " I I I
2040 + 1 4 1
2041 + 2 5 2
2042 + 3 6 3",
2043 );
2044 let chunk_l2 = StreamChunk::from_pretty(
2045 " I I I
2046 + 4 9 4
2047 + 5 10 5",
2048 );
2049 let chunk_r1 = StreamChunk::from_pretty(
2050 " I I I
2051 + 2 5 1
2052 + 4 9 2
2053 + 6 9 3",
2054 );
2055 let chunk_r2 = StreamChunk::from_pretty(
2056 " I I I
2057 + 1 4 4
2058 + 3 6 5",
2059 );
2060
2061 let (mut tx_l, mut tx_r, mut hash_join) =
2062 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2063
2064 tx_l.push_barrier(test_epoch(1), false);
2066 tx_r.push_barrier(test_epoch(1), false);
2067 hash_join.next_unwrap_ready_barrier()?;
2068
2069 tx_l.push_chunk(chunk_l1);
2071 hash_join.next_unwrap_pending();
2072
2073 tx_l.push_barrier(test_epoch(2), false);
2075 tx_r.push_barrier(test_epoch(2), false);
2076 hash_join.next_unwrap_ready_barrier()?;
2077
2078 tx_l.push_chunk(chunk_l2);
2080 hash_join.next_unwrap_pending();
2081
2082 tx_r.push_chunk(chunk_r1);
2084 let chunk = hash_join.next_unwrap_ready_chunk()?;
2085 assert_eq!(
2086 chunk,
2087 StreamChunk::from_pretty(
2088 " I I I I I I
2089 + 2 5 2 2 5 1
2090 + 4 9 4 4 9 2"
2091 )
2092 );
2093
2094 tx_r.push_chunk(chunk_r2);
2096 let chunk = hash_join.next_unwrap_ready_chunk()?;
2097 assert_eq!(
2098 chunk,
2099 StreamChunk::from_pretty(
2100 " I I I I I I
2101 + 1 4 1 1 4 4
2102 + 3 6 3 3 6 5"
2103 )
2104 );
2105
2106 Ok(())
2107 }
2108
2109 #[tokio::test]
2110 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2111 let chunk_l1 = StreamChunk::from_pretty(
2112 " I I I
2113 + 1 4 1
2114 + 2 5 2
2115 + 3 6 3",
2116 );
2117 let chunk_l2 = StreamChunk::from_pretty(
2118 " I I I
2119 + 4 9 4
2120 + 5 10 5",
2121 );
2122 let chunk_r1 = StreamChunk::from_pretty(
2123 " I I I
2124 + 2 5 1
2125 + 4 9 2
2126 + 6 9 3",
2127 );
2128 let chunk_r2 = StreamChunk::from_pretty(
2129 " I I I
2130 + 1 4 4
2131 + 3 6 5",
2132 );
2133
2134 let (mut tx_l, mut tx_r, mut hash_join) =
2135 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2136
2137 tx_l.push_barrier(test_epoch(1), false);
2139 tx_r.push_barrier(test_epoch(1), false);
2140 hash_join.next_unwrap_ready_barrier()?;
2141
2142 tx_l.push_chunk(chunk_l1);
2144 hash_join.next_unwrap_pending();
2145
2146 tx_l.push_barrier(test_epoch(2), false);
2148 tx_r.push_barrier(test_epoch(2), false);
2149 hash_join.next_unwrap_ready_barrier()?;
2150
2151 tx_l.push_chunk(chunk_l2);
2153 hash_join.next_unwrap_pending();
2154
2155 tx_r.push_chunk(chunk_r1);
2157 let chunk = hash_join.next_unwrap_ready_chunk()?;
2158 assert_eq!(
2159 chunk,
2160 StreamChunk::from_pretty(
2161 " I I I
2162 + 2 5 2
2163 + 4 9 4"
2164 )
2165 );
2166
2167 tx_r.push_chunk(chunk_r2);
2169 let chunk = hash_join.next_unwrap_ready_chunk()?;
2170 assert_eq!(
2171 chunk,
2172 StreamChunk::from_pretty(
2173 " I I I
2174 + 1 4 1
2175 + 3 6 3"
2176 )
2177 );
2178
2179 Ok(())
2180 }
2181
2182 #[tokio::test]
2183 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2184 let chunk_l1 = StreamChunk::from_pretty(
2185 " I I I
2186 + 1 4 1
2187 + 2 5 2
2188 + 3 6 3",
2189 );
2190 let chunk_l2 = StreamChunk::from_pretty(
2191 " I I I
2192 + 4 9 4
2193 + 5 10 5",
2194 );
2195 let chunk_r1 = StreamChunk::from_pretty(
2196 " I I I
2197 + 2 5 1
2198 + 4 9 2
2199 + 6 9 3",
2200 );
2201 let chunk_r2 = StreamChunk::from_pretty(
2202 " I I I
2203 + 1 4 4
2204 + 3 6 5",
2205 );
2206
2207 let (mut tx_l, mut tx_r, mut hash_join) =
2208 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2209
2210 tx_l.push_barrier(test_epoch(1), false);
2212 tx_r.push_barrier(test_epoch(1), false);
2213 hash_join.next_unwrap_ready_barrier()?;
2214
2215 tx_l.push_chunk(chunk_l1);
2217 hash_join.next_unwrap_pending();
2218
2219 tx_l.push_barrier(test_epoch(2), false);
2221 tx_r.push_barrier(test_epoch(2), false);
2222 hash_join.next_unwrap_ready_barrier()?;
2223
2224 tx_l.push_chunk(chunk_l2);
2226 hash_join.next_unwrap_pending();
2227
2228 tx_r.push_chunk(chunk_r1);
2230 let chunk = hash_join.next_unwrap_ready_chunk()?;
2231 assert_eq!(
2232 chunk,
2233 StreamChunk::from_pretty(
2234 " I I I
2235 + 2 5 1
2236 + 4 9 2"
2237 )
2238 );
2239
2240 tx_r.push_chunk(chunk_r2);
2242 let chunk = hash_join.next_unwrap_ready_chunk()?;
2243 assert_eq!(
2244 chunk,
2245 StreamChunk::from_pretty(
2246 " I I I
2247 + 1 4 4
2248 + 3 6 5"
2249 )
2250 );
2251
2252 Ok(())
2253 }
2254
2255 #[tokio::test]
2256 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2257 let chunk_r1 = StreamChunk::from_pretty(
2258 " I I
2259 + 1 4
2260 + 2 5
2261 + 3 6",
2262 );
2263 let chunk_r2 = StreamChunk::from_pretty(
2264 " I I
2265 + 3 8
2266 - 3 8",
2267 );
2268 let chunk_l1 = StreamChunk::from_pretty(
2269 " I I
2270 + 2 7
2271 + 4 8
2272 + 6 9",
2273 );
2274 let chunk_l2 = StreamChunk::from_pretty(
2275 " I I
2276 + 3 10
2277 + 6 11",
2278 );
2279 let chunk_r3 = StreamChunk::from_pretty(
2280 " I I
2281 + 6 10",
2282 );
2283 let chunk_l3 = StreamChunk::from_pretty(
2284 " I I
2285 - 6 11",
2286 );
2287 let chunk_l4 = StreamChunk::from_pretty(
2288 " I I
2289 - 6 9",
2290 );
2291 let (mut tx_l, mut tx_r, mut hash_join) =
2292 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2293
2294 tx_l.push_barrier(test_epoch(1), false);
2296 tx_r.push_barrier(test_epoch(1), false);
2297 hash_join.next_unwrap_ready_barrier()?;
2298
2299 tx_r.push_chunk(chunk_r1);
2301 hash_join.next_unwrap_pending();
2302
2303 tx_l.push_barrier(test_epoch(2), false);
2305 tx_r.push_barrier(test_epoch(2), false);
2306 hash_join.next_unwrap_ready_barrier()?;
2307
2308 tx_r.push_chunk(chunk_r2);
2310 hash_join.next_unwrap_pending();
2311
2312 tx_l.push_chunk(chunk_l1);
2314 let chunk = hash_join.next_unwrap_ready_chunk()?;
2315 assert_eq!(
2316 chunk,
2317 StreamChunk::from_pretty(
2318 " I I
2319 + 2 5"
2320 )
2321 );
2322
2323 tx_l.push_chunk(chunk_l2);
2325 let chunk = hash_join.next_unwrap_ready_chunk()?;
2326 assert_eq!(
2327 chunk,
2328 StreamChunk::from_pretty(
2329 " I I
2330 + 3 6"
2331 )
2332 );
2333
2334 tx_r.push_chunk(chunk_r3);
2336 let chunk = hash_join.next_unwrap_ready_chunk()?;
2337 assert_eq!(
2338 chunk,
2339 StreamChunk::from_pretty(
2340 " I I
2341 + 6 10"
2342 )
2343 );
2344
2345 tx_l.push_chunk(chunk_l3);
2348 hash_join.next_unwrap_pending();
2349
2350 tx_l.push_chunk(chunk_l4);
2353 let chunk = hash_join.next_unwrap_ready_chunk()?;
2354 assert_eq!(
2355 chunk,
2356 StreamChunk::from_pretty(
2357 " I I
2358 - 6 10"
2359 )
2360 );
2361
2362 Ok(())
2363 }
2364
2365 #[tokio::test]
2366 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2367 let chunk_l1 = StreamChunk::from_pretty(
2368 " I I
2369 + 1 4
2370 + 2 5
2371 + 3 6",
2372 );
2373 let chunk_l2 = StreamChunk::from_pretty(
2374 " I I
2375 + 3 8
2376 - 3 8",
2377 );
2378 let chunk_r1 = StreamChunk::from_pretty(
2379 " I I
2380 + 2 7
2381 + 4 8
2382 + 6 9",
2383 );
2384 let chunk_r2 = StreamChunk::from_pretty(
2385 " I I
2386 + 3 10
2387 + 6 11
2388 + 1 2
2389 + 1 3",
2390 );
2391 let chunk_l3 = StreamChunk::from_pretty(
2392 " I I
2393 + 9 10",
2394 );
2395 let chunk_r3 = StreamChunk::from_pretty(
2396 " I I
2397 - 1 2",
2398 );
2399 let chunk_r4 = StreamChunk::from_pretty(
2400 " I I
2401 - 1 3",
2402 );
2403 let (mut tx_l, mut tx_r, mut hash_join) =
2404 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2405
2406 tx_l.push_barrier(test_epoch(1), false);
2408 tx_r.push_barrier(test_epoch(1), false);
2409 hash_join.next_unwrap_ready_barrier()?;
2410
2411 tx_l.push_chunk(chunk_l1);
2413 let chunk = hash_join.next_unwrap_ready_chunk()?;
2414 assert_eq!(
2415 chunk,
2416 StreamChunk::from_pretty(
2417 " I I
2418 + 1 4
2419 + 2 5
2420 + 3 6",
2421 )
2422 );
2423
2424 tx_l.push_barrier(test_epoch(2), false);
2426 tx_r.push_barrier(test_epoch(2), false);
2427 hash_join.next_unwrap_ready_barrier()?;
2428
2429 tx_l.push_chunk(chunk_l2);
2431 let chunk = hash_join.next_unwrap_ready_chunk()?;
2432 assert_eq!(
2433 chunk,
2434 StreamChunk::from_pretty(
2435 " I I
2436 + 3 8 D
2437 - 3 8 D",
2438 )
2439 );
2440
2441 tx_r.push_chunk(chunk_r1);
2443 let chunk = hash_join.next_unwrap_ready_chunk()?;
2444 assert_eq!(
2445 chunk,
2446 StreamChunk::from_pretty(
2447 " I I
2448 - 2 5"
2449 )
2450 );
2451
2452 tx_r.push_chunk(chunk_r2);
2454 let chunk = hash_join.next_unwrap_ready_chunk()?;
2455 assert_eq!(
2456 chunk,
2457 StreamChunk::from_pretty(
2458 " I I
2459 - 3 6
2460 - 1 4"
2461 )
2462 );
2463
2464 tx_l.push_chunk(chunk_l3);
2466 let chunk = hash_join.next_unwrap_ready_chunk()?;
2467 assert_eq!(
2468 chunk,
2469 StreamChunk::from_pretty(
2470 " I I
2471 + 9 10"
2472 )
2473 );
2474
2475 tx_r.push_chunk(chunk_r3);
2478 hash_join.next_unwrap_pending();
2479
2480 tx_r.push_chunk(chunk_r4);
2483 let chunk = hash_join.next_unwrap_ready_chunk()?;
2484 assert_eq!(
2485 chunk,
2486 StreamChunk::from_pretty(
2487 " I I
2488 + 1 4"
2489 )
2490 );
2491
2492 Ok(())
2493 }
2494
2495 #[tokio::test]
2496 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2497 let chunk_r1 = StreamChunk::from_pretty(
2498 " I I
2499 + 1 4
2500 + 2 5
2501 + 3 6",
2502 );
2503 let chunk_r2 = StreamChunk::from_pretty(
2504 " I I
2505 + 3 8
2506 - 3 8",
2507 );
2508 let chunk_l1 = StreamChunk::from_pretty(
2509 " I I
2510 + 2 7
2511 + 4 8
2512 + 6 9",
2513 );
2514 let chunk_l2 = StreamChunk::from_pretty(
2515 " I I
2516 + 3 10
2517 + 6 11
2518 + 1 2
2519 + 1 3",
2520 );
2521 let chunk_r3 = StreamChunk::from_pretty(
2522 " I I
2523 + 9 10",
2524 );
2525 let chunk_l3 = StreamChunk::from_pretty(
2526 " I I
2527 - 1 2",
2528 );
2529 let chunk_l4 = StreamChunk::from_pretty(
2530 " I I
2531 - 1 3",
2532 );
2533 let (mut tx_r, mut tx_l, mut hash_join) =
2534 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2535
2536 tx_r.push_barrier(test_epoch(1), false);
2538 tx_l.push_barrier(test_epoch(1), false);
2539 hash_join.next_unwrap_ready_barrier()?;
2540
2541 tx_r.push_chunk(chunk_r1);
2543 let chunk = hash_join.next_unwrap_ready_chunk()?;
2544 assert_eq!(
2545 chunk,
2546 StreamChunk::from_pretty(
2547 " I I
2548 + 1 4
2549 + 2 5
2550 + 3 6",
2551 )
2552 );
2553
2554 tx_r.push_barrier(test_epoch(2), false);
2556 tx_l.push_barrier(test_epoch(2), false);
2557 hash_join.next_unwrap_ready_barrier()?;
2558
2559 tx_r.push_chunk(chunk_r2);
2561 let chunk = hash_join.next_unwrap_ready_chunk()?;
2562 assert_eq!(
2563 chunk,
2564 StreamChunk::from_pretty(
2565 " I I
2566 + 3 8 D
2567 - 3 8 D",
2568 )
2569 );
2570
2571 tx_l.push_chunk(chunk_l1);
2573 let chunk = hash_join.next_unwrap_ready_chunk()?;
2574 assert_eq!(
2575 chunk,
2576 StreamChunk::from_pretty(
2577 " I I
2578 - 2 5"
2579 )
2580 );
2581
2582 tx_l.push_chunk(chunk_l2);
2584 let chunk = hash_join.next_unwrap_ready_chunk()?;
2585 assert_eq!(
2586 chunk,
2587 StreamChunk::from_pretty(
2588 " I I
2589 - 3 6
2590 - 1 4"
2591 )
2592 );
2593
2594 tx_r.push_chunk(chunk_r3);
2596 let chunk = hash_join.next_unwrap_ready_chunk()?;
2597 assert_eq!(
2598 chunk,
2599 StreamChunk::from_pretty(
2600 " I I
2601 + 9 10"
2602 )
2603 );
2604
2605 tx_l.push_chunk(chunk_l3);
2608 hash_join.next_unwrap_pending();
2609
2610 tx_l.push_chunk(chunk_l4);
2613 let chunk = hash_join.next_unwrap_ready_chunk()?;
2614 assert_eq!(
2615 chunk,
2616 StreamChunk::from_pretty(
2617 " I I
2618 + 1 4"
2619 )
2620 );
2621
2622 Ok(())
2623 }
2624
2625 #[tokio::test]
2626 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2627 let chunk_l1 = StreamChunk::from_pretty(
2628 " I I
2629 + 1 4
2630 + 2 5
2631 + 3 6",
2632 );
2633 let chunk_l2 = StreamChunk::from_pretty(
2634 " I I
2635 + 6 8
2636 + 3 8",
2637 );
2638 let chunk_r1 = StreamChunk::from_pretty(
2639 " I I
2640 + 2 7
2641 + 4 8
2642 + 6 9",
2643 );
2644 let chunk_r2 = StreamChunk::from_pretty(
2645 " I I
2646 + 3 10
2647 + 6 11",
2648 );
2649 let (mut tx_l, mut tx_r, mut hash_join) =
2650 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2651
2652 tx_l.push_barrier(test_epoch(1), false);
2654 tx_r.push_barrier(test_epoch(1), false);
2655 hash_join.next_unwrap_ready_barrier()?;
2656
2657 tx_l.push_chunk(chunk_l1);
2659 hash_join.next_unwrap_pending();
2660
2661 tx_l.push_barrier(test_epoch(2), false);
2663
2664 tx_l.push_chunk(chunk_l2);
2666
2667 tx_r.push_chunk(chunk_r1);
2669
2670 let chunk = hash_join.next_unwrap_ready_chunk()?;
2672 assert_eq!(
2673 chunk,
2674 StreamChunk::from_pretty(
2675 " I I I I
2676 + 2 5 2 7"
2677 )
2678 );
2679
2680 tx_r.push_barrier(test_epoch(2), false);
2682
2683 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2685 assert!(matches!(
2686 hash_join.next_unwrap_ready_barrier()?,
2687 Barrier {
2688 epoch,
2689 mutation: None,
2690 ..
2691 } if epoch == expected_epoch
2692 ));
2693
2694 let chunk = hash_join.next_unwrap_ready_chunk()?;
2696 assert_eq!(
2697 chunk,
2698 StreamChunk::from_pretty(
2699 " I I I I
2700 + 6 8 6 9"
2701 )
2702 );
2703
2704 tx_r.push_chunk(chunk_r2);
2706 let chunk = hash_join.next_unwrap_ready_chunk()?;
2707 assert_eq!(
2708 chunk,
2709 StreamChunk::from_pretty(
2710 " I I I I
2711 + 3 6 3 10
2712 + 3 8 3 10
2713 + 6 8 6 11"
2714 )
2715 );
2716
2717 Ok(())
2718 }
2719
2720 #[tokio::test]
2721 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2722 let chunk_l1 = StreamChunk::from_pretty(
2723 " I I
2724 + 1 4
2725 + 2 .
2726 + 3 .",
2727 );
2728 let chunk_l2 = StreamChunk::from_pretty(
2729 " I I
2730 + 6 .
2731 + 3 8",
2732 );
2733 let chunk_r1 = StreamChunk::from_pretty(
2734 " I I
2735 + 2 7
2736 + 4 8
2737 + 6 9",
2738 );
2739 let chunk_r2 = StreamChunk::from_pretty(
2740 " I I
2741 + 3 10
2742 + 6 11",
2743 );
2744 let (mut tx_l, mut tx_r, mut hash_join) =
2745 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2746
2747 tx_l.push_barrier(test_epoch(1), false);
2749 tx_r.push_barrier(test_epoch(1), false);
2750 hash_join.next_unwrap_ready_barrier()?;
2751
2752 tx_l.push_chunk(chunk_l1);
2754 hash_join.next_unwrap_pending();
2755
2756 tx_l.push_barrier(test_epoch(2), false);
2758
2759 tx_l.push_chunk(chunk_l2);
2761
2762 tx_r.push_chunk(chunk_r1);
2764
2765 let chunk = hash_join.next_unwrap_ready_chunk()?;
2767 assert_eq!(
2768 chunk,
2769 StreamChunk::from_pretty(
2770 " I I I I
2771 + 2 . 2 7"
2772 )
2773 );
2774
2775 tx_r.push_barrier(test_epoch(2), false);
2777
2778 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2780 assert!(matches!(
2781 hash_join.next_unwrap_ready_barrier()?,
2782 Barrier {
2783 epoch,
2784 mutation: None,
2785 ..
2786 } if epoch == expected_epoch
2787 ));
2788
2789 let chunk = hash_join.next_unwrap_ready_chunk()?;
2791 assert_eq!(
2792 chunk,
2793 StreamChunk::from_pretty(
2794 " I I I I
2795 + 6 . 6 9"
2796 )
2797 );
2798
2799 tx_r.push_chunk(chunk_r2);
2801 let chunk = hash_join.next_unwrap_ready_chunk()?;
2802 assert_eq!(
2803 chunk,
2804 StreamChunk::from_pretty(
2805 " I I I I
2806 + 3 8 3 10
2807 + 3 . 3 10
2808 + 6 . 6 11"
2809 )
2810 );
2811
2812 Ok(())
2813 }
2814
2815 #[tokio::test]
2816 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2817 let chunk_l1 = StreamChunk::from_pretty(
2818 " I I
2819 + 1 4
2820 + 2 5
2821 + 3 6",
2822 );
2823 let chunk_l2 = StreamChunk::from_pretty(
2824 " I I
2825 + 3 8
2826 - 3 8",
2827 );
2828 let chunk_r1 = StreamChunk::from_pretty(
2829 " I I
2830 + 2 7
2831 + 4 8
2832 + 6 9",
2833 );
2834 let chunk_r2 = StreamChunk::from_pretty(
2835 " I I
2836 + 3 10
2837 + 6 11",
2838 );
2839 let (mut tx_l, mut tx_r, mut hash_join) =
2840 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2841
2842 tx_l.push_barrier(test_epoch(1), false);
2844 tx_r.push_barrier(test_epoch(1), false);
2845 hash_join.next_unwrap_ready_barrier()?;
2846
2847 tx_l.push_chunk(chunk_l1);
2849 let chunk = hash_join.next_unwrap_ready_chunk()?;
2850 assert_eq!(
2851 chunk,
2852 StreamChunk::from_pretty(
2853 " I I I I
2854 + 1 4 . .
2855 + 2 5 . .
2856 + 3 6 . ."
2857 )
2858 );
2859
2860 tx_l.push_chunk(chunk_l2);
2862 let chunk = hash_join.next_unwrap_ready_chunk()?;
2863 assert_eq!(
2864 chunk,
2865 StreamChunk::from_pretty(
2866 " I I I I
2867 + 3 8 . . D
2868 - 3 8 . . D"
2869 )
2870 );
2871
2872 tx_r.push_chunk(chunk_r1);
2874 let chunk = hash_join.next_unwrap_ready_chunk()?;
2875 assert_eq!(
2876 chunk,
2877 StreamChunk::from_pretty(
2878 " I I I I
2879 - 2 5 . .
2880 + 2 5 2 7"
2881 )
2882 );
2883
2884 tx_r.push_chunk(chunk_r2);
2886 let chunk = hash_join.next_unwrap_ready_chunk()?;
2887 assert_eq!(
2888 chunk,
2889 StreamChunk::from_pretty(
2890 " I I I I
2891 - 3 6 . .
2892 + 3 6 3 10"
2893 )
2894 );
2895
2896 Ok(())
2897 }
2898
2899 #[tokio::test]
2900 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2901 let chunk_l1 = StreamChunk::from_pretty(
2902 " I I
2903 + 1 4
2904 + 2 5
2905 + . 6",
2906 );
2907 let chunk_l2 = StreamChunk::from_pretty(
2908 " I I
2909 + . 8
2910 - . 8",
2911 );
2912 let chunk_r1 = StreamChunk::from_pretty(
2913 " I I
2914 + 2 7
2915 + 4 8
2916 + 6 9",
2917 );
2918 let chunk_r2 = StreamChunk::from_pretty(
2919 " I I
2920 + . 10
2921 + 6 11",
2922 );
2923 let (mut tx_l, mut tx_r, mut hash_join) =
2924 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2925
2926 tx_l.push_barrier(test_epoch(1), false);
2928 tx_r.push_barrier(test_epoch(1), false);
2929 hash_join.next_unwrap_ready_barrier()?;
2930
2931 tx_l.push_chunk(chunk_l1);
2933 let chunk = hash_join.next_unwrap_ready_chunk()?;
2934 assert_eq!(
2935 chunk,
2936 StreamChunk::from_pretty(
2937 " I I I I
2938 + 1 4 . .
2939 + 2 5 . .
2940 + . 6 . ."
2941 )
2942 );
2943
2944 tx_l.push_chunk(chunk_l2);
2946 let chunk = hash_join.next_unwrap_ready_chunk()?;
2947 assert_eq!(
2948 chunk,
2949 StreamChunk::from_pretty(
2950 " I I I I
2951 + . 8 . . D
2952 - . 8 . . D"
2953 )
2954 );
2955
2956 tx_r.push_chunk(chunk_r1);
2958 let chunk = hash_join.next_unwrap_ready_chunk()?;
2959 assert_eq!(
2960 chunk,
2961 StreamChunk::from_pretty(
2962 " I I I I
2963 - 2 5 . .
2964 + 2 5 2 7"
2965 )
2966 );
2967
2968 tx_r.push_chunk(chunk_r2);
2970 let chunk = hash_join.next_unwrap_ready_chunk()?;
2971 assert_eq!(
2972 chunk,
2973 StreamChunk::from_pretty(
2974 " I I I I
2975 - . 6 . .
2976 + . 6 . 10"
2977 )
2978 );
2979
2980 Ok(())
2981 }
2982
2983 #[tokio::test]
2984 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2985 let chunk_l1 = StreamChunk::from_pretty(
2986 " I I
2987 + 1 4
2988 + 2 5
2989 + 3 6",
2990 );
2991 let chunk_l2 = StreamChunk::from_pretty(
2992 " I I
2993 + 3 8
2994 - 3 8",
2995 );
2996 let chunk_r1 = StreamChunk::from_pretty(
2997 " I I
2998 + 2 7
2999 + 4 8
3000 + 6 9",
3001 );
3002 let chunk_r2 = StreamChunk::from_pretty(
3003 " I I
3004 + 5 10
3005 - 5 10",
3006 );
3007 let (mut tx_l, mut tx_r, mut hash_join) =
3008 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
3009
3010 tx_l.push_barrier(test_epoch(1), false);
3012 tx_r.push_barrier(test_epoch(1), false);
3013 hash_join.next_unwrap_ready_barrier()?;
3014
3015 tx_l.push_chunk(chunk_l1);
3017 hash_join.next_unwrap_pending();
3018
3019 tx_l.push_chunk(chunk_l2);
3021 hash_join.next_unwrap_pending();
3022
3023 tx_r.push_chunk(chunk_r1);
3025 let chunk = hash_join.next_unwrap_ready_chunk()?;
3026 assert_eq!(
3027 chunk,
3028 StreamChunk::from_pretty(
3029 " I I I I
3030 + 2 5 2 7
3031 + . . 4 8
3032 + . . 6 9"
3033 )
3034 );
3035
3036 tx_r.push_chunk(chunk_r2);
3038 let chunk = hash_join.next_unwrap_ready_chunk()?;
3039 assert_eq!(
3040 chunk,
3041 StreamChunk::from_pretty(
3042 " I I I I
3043 + . . 5 10 D
3044 - . . 5 10 D"
3045 )
3046 );
3047
3048 Ok(())
3049 }
3050
3051 #[tokio::test]
3052 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3053 let chunk_l1 = StreamChunk::from_pretty(
3054 " I I I
3055 + 1 4 1
3056 + 2 5 2
3057 + 3 6 3",
3058 );
3059 let chunk_l2 = StreamChunk::from_pretty(
3060 " I I I
3061 + 4 9 4
3062 + 5 10 5",
3063 );
3064 let chunk_r1 = StreamChunk::from_pretty(
3065 " I I I
3066 + 2 5 1
3067 + 4 9 2
3068 + 6 9 3",
3069 );
3070 let chunk_r2 = StreamChunk::from_pretty(
3071 " I I I
3072 + 1 4 4
3073 + 3 6 5",
3074 );
3075
3076 let (mut tx_l, mut tx_r, mut hash_join) =
3077 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3078
3079 tx_l.push_barrier(test_epoch(1), false);
3081 tx_r.push_barrier(test_epoch(1), false);
3082 hash_join.next_unwrap_ready_barrier()?;
3083
3084 tx_l.push_chunk(chunk_l1);
3086 let chunk = hash_join.next_unwrap_ready_chunk()?;
3087 assert_eq!(
3088 chunk,
3089 StreamChunk::from_pretty(
3090 " I I I I I I
3091 + 1 4 1 . . .
3092 + 2 5 2 . . .
3093 + 3 6 3 . . ."
3094 )
3095 );
3096
3097 tx_l.push_chunk(chunk_l2);
3099 let chunk = hash_join.next_unwrap_ready_chunk()?;
3100 assert_eq!(
3101 chunk,
3102 StreamChunk::from_pretty(
3103 " I I I I I I
3104 + 4 9 4 . . .
3105 + 5 10 5 . . ."
3106 )
3107 );
3108
3109 tx_r.push_chunk(chunk_r1);
3111 let chunk = hash_join.next_unwrap_ready_chunk()?;
3112 assert_eq!(
3113 chunk,
3114 StreamChunk::from_pretty(
3115 " I I I I I I
3116 - 2 5 2 . . .
3117 + 2 5 2 2 5 1
3118 - 4 9 4 . . .
3119 + 4 9 4 4 9 2"
3120 )
3121 );
3122
3123 tx_r.push_chunk(chunk_r2);
3125 let chunk = hash_join.next_unwrap_ready_chunk()?;
3126 assert_eq!(
3127 chunk,
3128 StreamChunk::from_pretty(
3129 " I I I I I I
3130 - 1 4 1 . . .
3131 + 1 4 1 1 4 4
3132 - 3 6 3 . . .
3133 + 3 6 3 3 6 5"
3134 )
3135 );
3136
3137 Ok(())
3138 }
3139
3140 #[tokio::test]
3141 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3142 let chunk_l1 = StreamChunk::from_pretty(
3143 " I I I
3144 + 1 4 1
3145 + 2 5 2
3146 + 3 6 3",
3147 );
3148 let chunk_l2 = StreamChunk::from_pretty(
3149 " I I I
3150 + 4 9 4
3151 + 5 10 5",
3152 );
3153 let chunk_r1 = StreamChunk::from_pretty(
3154 " I I I
3155 + 2 5 1
3156 + 4 9 2
3157 + 6 9 3",
3158 );
3159 let chunk_r2 = StreamChunk::from_pretty(
3160 " I I I
3161 + 1 4 4
3162 + 3 6 5
3163 + 7 7 6",
3164 );
3165
3166 let (mut tx_l, mut tx_r, mut hash_join) =
3167 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3168
3169 tx_l.push_barrier(test_epoch(1), false);
3171 tx_r.push_barrier(test_epoch(1), false);
3172 hash_join.next_unwrap_ready_barrier()?;
3173
3174 tx_l.push_chunk(chunk_l1);
3176 hash_join.next_unwrap_pending();
3177
3178 tx_l.push_chunk(chunk_l2);
3180 hash_join.next_unwrap_pending();
3181
3182 tx_r.push_chunk(chunk_r1);
3184 let chunk = hash_join.next_unwrap_ready_chunk()?;
3185 assert_eq!(
3186 chunk,
3187 StreamChunk::from_pretty(
3188 " I I I I I I
3189 + 2 5 2 2 5 1
3190 + 4 9 4 4 9 2
3191 + . . . 6 9 3"
3192 )
3193 );
3194
3195 tx_r.push_chunk(chunk_r2);
3197 let chunk = hash_join.next_unwrap_ready_chunk()?;
3198 assert_eq!(
3199 chunk,
3200 StreamChunk::from_pretty(
3201 " I I I I I I
3202 + 1 4 1 1 4 4
3203 + 3 6 3 3 6 5
3204 + . . . 7 7 6"
3205 )
3206 );
3207
3208 Ok(())
3209 }
3210
3211 #[tokio::test]
3212 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3213 let chunk_l1 = StreamChunk::from_pretty(
3214 " I I
3215 + 1 4
3216 + 2 5
3217 + 3 6",
3218 );
3219 let chunk_l2 = StreamChunk::from_pretty(
3220 " I I
3221 + 3 8
3222 - 3 8",
3223 );
3224 let chunk_r1 = StreamChunk::from_pretty(
3225 " I I
3226 + 2 7
3227 + 4 8
3228 + 6 9",
3229 );
3230 let chunk_r2 = StreamChunk::from_pretty(
3231 " I I
3232 + 5 10
3233 - 5 10",
3234 );
3235 let (mut tx_l, mut tx_r, mut hash_join) =
3236 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3237
3238 tx_l.push_barrier(test_epoch(1), false);
3240 tx_r.push_barrier(test_epoch(1), false);
3241 hash_join.next_unwrap_ready_barrier()?;
3242
3243 tx_l.push_chunk(chunk_l1);
3245 let chunk = hash_join.next_unwrap_ready_chunk()?;
3246 assert_eq!(
3247 chunk,
3248 StreamChunk::from_pretty(
3249 " I I I I
3250 + 1 4 . .
3251 + 2 5 . .
3252 + 3 6 . ."
3253 )
3254 );
3255
3256 tx_l.push_chunk(chunk_l2);
3258 let chunk = hash_join.next_unwrap_ready_chunk()?;
3259 assert_eq!(
3260 chunk,
3261 StreamChunk::from_pretty(
3262 " I I I I
3263 + 3 8 . . D
3264 - 3 8 . . D"
3265 )
3266 );
3267
3268 tx_r.push_chunk(chunk_r1);
3270 let chunk = hash_join.next_unwrap_ready_chunk()?;
3271 assert_eq!(
3272 chunk,
3273 StreamChunk::from_pretty(
3274 " I I I I
3275 - 2 5 . .
3276 + 2 5 2 7
3277 + . . 4 8
3278 + . . 6 9"
3279 )
3280 );
3281
3282 tx_r.push_chunk(chunk_r2);
3284 let chunk = hash_join.next_unwrap_ready_chunk()?;
3285 assert_eq!(
3286 chunk,
3287 StreamChunk::from_pretty(
3288 " I I I I
3289 + . . 5 10 D
3290 - . . 5 10 D"
3291 )
3292 );
3293
3294 Ok(())
3295 }
3296
3297 #[tokio::test]
3298 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3299 let (mut tx_l, mut tx_r, mut hash_join) =
3300 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3301
3302 tx_l.push_barrier(test_epoch(1), false);
3304 tx_r.push_barrier(test_epoch(1), false);
3305 hash_join.next_unwrap_ready_barrier()?;
3306
3307 tx_l.push_chunk(StreamChunk::from_pretty(
3308 " I I
3309 + 1 1
3310 ",
3311 ));
3312 let chunk = hash_join.next_unwrap_ready_chunk()?;
3313 assert_eq!(
3314 chunk,
3315 StreamChunk::from_pretty(
3316 " I I I I
3317 + 1 1 . ."
3318 )
3319 );
3320
3321 tx_r.push_chunk(StreamChunk::from_pretty(
3322 " I I
3323 + 1 1
3324 ",
3325 ));
3326 let chunk = hash_join.next_unwrap_ready_chunk()?;
3327
3328 assert_eq!(
3329 chunk,
3330 StreamChunk::from_pretty(
3331 " I I I I
3332 - 1 1 . .
3333 + 1 1 1 1"
3334 )
3335 );
3336
3337 tx_l.push_chunk(StreamChunk::from_pretty(
3338 " I I
3339 - 1 1
3340 + 1 2
3341 ",
3342 ));
3343 let chunk = hash_join.next_unwrap_ready_chunk()?;
3344 let chunk = chunk.compact_vis();
3345 assert_eq!(
3346 chunk,
3347 StreamChunk::from_pretty(
3348 " I I I I
3349 - 1 1 1 1
3350 + 1 2 1 1
3351 "
3352 )
3353 );
3354
3355 Ok(())
3356 }
3357
3358 #[tokio::test]
3359 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3360 {
3361 let chunk_l1 = StreamChunk::from_pretty(
3362 " I I
3363 + 1 4
3364 + 2 5
3365 + 3 6
3366 + 3 7",
3367 );
3368 let chunk_l2 = StreamChunk::from_pretty(
3369 " I I
3370 + 3 8
3371 - 3 8
3372 - 1 4", );
3374 let chunk_r1 = StreamChunk::from_pretty(
3375 " I I
3376 + 2 6
3377 + 4 8
3378 + 3 4",
3379 );
3380 let chunk_r2 = StreamChunk::from_pretty(
3381 " I I
3382 + 5 10
3383 - 5 10
3384 + 1 2",
3385 );
3386 let (mut tx_l, mut tx_r, mut hash_join) =
3387 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3388
3389 tx_l.push_barrier(test_epoch(1), false);
3391 tx_r.push_barrier(test_epoch(1), false);
3392 hash_join.next_unwrap_ready_barrier()?;
3393
3394 tx_l.push_chunk(chunk_l1);
3396 let chunk = hash_join.next_unwrap_ready_chunk()?;
3397 assert_eq!(
3398 chunk,
3399 StreamChunk::from_pretty(
3400 " I I I I
3401 + 1 4 . .
3402 + 2 5 . .
3403 + 3 6 . .
3404 + 3 7 . ."
3405 )
3406 );
3407
3408 tx_l.push_chunk(chunk_l2);
3410 let chunk = hash_join.next_unwrap_ready_chunk()?;
3411 assert_eq!(
3412 chunk,
3413 StreamChunk::from_pretty(
3414 " I I I I
3415 + 3 8 . . D
3416 - 3 8 . . D
3417 - 1 4 . ."
3418 )
3419 );
3420
3421 tx_r.push_chunk(chunk_r1);
3423 let chunk = hash_join.next_unwrap_ready_chunk()?;
3424 assert_eq!(
3425 chunk,
3426 StreamChunk::from_pretty(
3427 " I I I I
3428 - 2 5 . .
3429 + 2 5 2 6
3430 + . . 4 8
3431 + . . 3 4" )
3435 );
3436
3437 tx_r.push_chunk(chunk_r2);
3439 let chunk = hash_join.next_unwrap_ready_chunk()?;
3440 assert_eq!(
3441 chunk,
3442 StreamChunk::from_pretty(
3443 " I I I I
3444 + . . 5 10 D
3445 - . . 5 10 D
3446 + . . 1 2" )
3449 );
3450
3451 Ok(())
3452 }
3453
3454 #[tokio::test]
3455 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3456 let chunk_l1 = StreamChunk::from_pretty(
3457 " I I
3458 + 1 4
3459 + 2 10
3460 + 3 6",
3461 );
3462 let chunk_l2 = StreamChunk::from_pretty(
3463 " I I
3464 + 3 8
3465 - 3 8",
3466 );
3467 let chunk_r1 = StreamChunk::from_pretty(
3468 " I I
3469 + 2 7
3470 + 4 8
3471 + 6 9",
3472 );
3473 let chunk_r2 = StreamChunk::from_pretty(
3474 " I I
3475 + 3 10
3476 + 6 11",
3477 );
3478 let (mut tx_l, mut tx_r, mut hash_join) =
3479 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3480
3481 tx_l.push_barrier(test_epoch(1), false);
3483 tx_r.push_barrier(test_epoch(1), false);
3484 hash_join.next_unwrap_ready_barrier()?;
3485
3486 tx_l.push_chunk(chunk_l1);
3488 hash_join.next_unwrap_pending();
3489
3490 tx_l.push_chunk(chunk_l2);
3492 hash_join.next_unwrap_pending();
3493
3494 tx_r.push_chunk(chunk_r1);
3496 hash_join.next_unwrap_pending();
3497
3498 tx_r.push_chunk(chunk_r2);
3500 let chunk = hash_join.next_unwrap_ready_chunk()?;
3501 assert_eq!(
3502 chunk,
3503 StreamChunk::from_pretty(
3504 " I I I I
3505 + 3 6 3 10"
3506 )
3507 );
3508
3509 Ok(())
3510 }
3511
3512 #[tokio::test]
3513 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3514 let (mut tx_l, mut tx_r, mut hash_join) =
3515 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3516
3517 tx_l.push_barrier(test_epoch(1), false);
3519 tx_r.push_barrier(test_epoch(1), false);
3520 hash_join.next_unwrap_ready_barrier()?;
3521
3522 tx_l.push_int64_watermark(0, 100);
3523
3524 tx_l.push_int64_watermark(0, 200);
3525
3526 tx_l.push_barrier(test_epoch(2), false);
3527 tx_r.push_barrier(test_epoch(2), false);
3528 hash_join.next_unwrap_ready_barrier()?;
3529
3530 tx_r.push_int64_watermark(0, 50);
3531
3532 let w1 = hash_join.next().await.unwrap().unwrap();
3533 let w1 = w1.as_watermark().unwrap();
3534
3535 let w2 = hash_join.next().await.unwrap().unwrap();
3536 let w2 = w2.as_watermark().unwrap();
3537
3538 tx_r.push_int64_watermark(0, 100);
3539
3540 let w3 = hash_join.next().await.unwrap().unwrap();
3541 let w3 = w3.as_watermark().unwrap();
3542
3543 let w4 = hash_join.next().await.unwrap().unwrap();
3544 let w4 = w4.as_watermark().unwrap();
3545
3546 assert_eq!(
3547 w1,
3548 &Watermark {
3549 col_idx: 2,
3550 data_type: DataType::Int64,
3551 val: ScalarImpl::Int64(50)
3552 }
3553 );
3554
3555 assert_eq!(
3556 w2,
3557 &Watermark {
3558 col_idx: 0,
3559 data_type: DataType::Int64,
3560 val: ScalarImpl::Int64(50)
3561 }
3562 );
3563
3564 assert_eq!(
3565 w3,
3566 &Watermark {
3567 col_idx: 2,
3568 data_type: DataType::Int64,
3569 val: ScalarImpl::Int64(100)
3570 }
3571 );
3572
3573 assert_eq!(
3574 w4,
3575 &Watermark {
3576 col_idx: 0,
3577 data_type: DataType::Int64,
3578 val: ScalarImpl::Int64(100)
3579 }
3580 );
3581
3582 Ok(())
3583 }
3584}