1use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::time::Duration;
17
18use anyhow::Context;
19use itertools::Itertools;
20use multimap::MultiMap;
21use risingwave_common::array::Op;
22use risingwave_common::hash::{HashKey, NullBitmap};
23use risingwave_common::row::RowExt;
24use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_expr::ExprError;
28use risingwave_expr::expr::NonStrictExpression;
29use tokio::time::Instant;
30
31use self::builder::JoinChunkBuilder;
32use super::barrier_align::*;
33use super::join::hash_join::*;
34use super::join::row::JoinRow;
35use super::join::*;
36use super::watermark::*;
37use crate::executor::join::builder::JoinStreamChunkBuilder;
38use crate::executor::join::hash_join::CacheResult;
39use crate::executor::prelude::*;
40
41const EVICT_EVERY_N_ROWS: u32 = 16;
43
44fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
45 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
46}
47
48pub struct JoinParams {
49 pub join_key_indices: Vec<usize>,
51 pub deduped_pk_indices: Vec<usize>,
53}
54
55impl JoinParams {
56 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
57 Self {
58 join_key_indices,
59 deduped_pk_indices,
60 }
61 }
62}
63
64struct JoinSide<K: HashKey, S: StateStore> {
65 ht: JoinHashMap<K, S>,
67 join_key_indices: Vec<usize>,
69 all_data_types: Vec<DataType>,
71 start_pos: usize,
73 i2o_mapping: Vec<(usize, usize)>,
75 i2o_mapping_indexed: MultiMap<usize, usize>,
76 input2inequality_index: Vec<Vec<(usize, bool)>>,
84 non_null_fields: Vec<usize>,
86 state_clean_columns: Vec<(usize, usize)>,
89 need_degree_table: bool,
91}
92
93impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("JoinSide")
96 .field("join_key_indices", &self.join_key_indices)
97 .field("col_types", &self.all_data_types)
98 .field("start_pos", &self.start_pos)
99 .field("i2o_mapping", &self.i2o_mapping)
100 .field("need_degree_table", &self.need_degree_table)
101 .finish()
102 }
103}
104
105impl<K: HashKey, S: StateStore> JoinSide<K, S> {
106 fn is_dirty(&self) -> bool {
108 unimplemented!()
109 }
110
111 #[expect(dead_code)]
112 fn clear_cache(&mut self) {
113 assert!(
114 !self.is_dirty(),
115 "cannot clear cache while states of hash join are dirty"
116 );
117
118 }
121
122 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
123 self.ht.init(epoch).await
124 }
125}
126
127pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive> {
130 ctx: ActorContextRef,
131 info: ExecutorInfo,
132
133 input_l: Option<Executor>,
135 input_r: Option<Executor>,
137 actual_output_data_types: Vec<DataType>,
139 side_l: JoinSide<K, S>,
141 side_r: JoinSide<K, S>,
143 cond: Option<NonStrictExpression>,
145 inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
147 inequality_watermarks: Vec<Option<Watermark>>,
151
152 append_only_optimize: bool,
154
155 metrics: Arc<StreamingMetrics>,
156 chunk_size: usize,
158 cnt_rows_received: u32,
160
161 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
163
164 high_join_amplification_threshold: usize,
166
167 entry_state_max_rows: usize,
169}
170
171impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> std::fmt::Debug
172 for HashJoinExecutor<K, S, T>
173{
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("HashJoinExecutor")
176 .field("join_type", &T)
177 .field("input_left", &self.input_l.as_ref().unwrap().identity())
178 .field("input_right", &self.input_r.as_ref().unwrap().identity())
179 .field("side_l", &self.side_l)
180 .field("side_r", &self.side_r)
181 .field("pk_indices", &self.info.pk_indices)
182 .field("schema", &self.info.schema)
183 .field("actual_output_data_types", &self.actual_output_data_types)
184 .finish()
185 }
186}
187
188impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> Execute for HashJoinExecutor<K, S, T> {
189 fn execute(self: Box<Self>) -> BoxedMessageStream {
190 self.into_stream().boxed()
191 }
192}
193
194struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
195 ctx: &'a ActorContextRef,
196 side_l: &'a mut JoinSide<K, S>,
197 side_r: &'a mut JoinSide<K, S>,
198 actual_output_data_types: &'a [DataType],
199 cond: &'a mut Option<NonStrictExpression>,
200 inequality_watermarks: &'a [Option<Watermark>],
201 chunk: StreamChunk,
202 append_only_optimize: bool,
203 chunk_size: usize,
204 cnt_rows_received: &'a mut u32,
205 high_join_amplification_threshold: usize,
206 entry_state_max_rows: usize,
207}
208
209impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K, S, T> {
210 #[allow(clippy::too_many_arguments)]
211 pub fn new(
212 ctx: ActorContextRef,
213 info: ExecutorInfo,
214 input_l: Executor,
215 input_r: Executor,
216 params_l: JoinParams,
217 params_r: JoinParams,
218 null_safe: Vec<bool>,
219 output_indices: Vec<usize>,
220 cond: Option<NonStrictExpression>,
221 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
222 state_table_l: StateTable<S>,
223 degree_state_table_l: StateTable<S>,
224 state_table_r: StateTable<S>,
225 degree_state_table_r: StateTable<S>,
226 watermark_epoch: AtomicU64Ref,
227 is_append_only: bool,
228 metrics: Arc<StreamingMetrics>,
229 chunk_size: usize,
230 high_join_amplification_threshold: usize,
231 ) -> Self {
232 Self::new_with_cache_size(
233 ctx,
234 info,
235 input_l,
236 input_r,
237 params_l,
238 params_r,
239 null_safe,
240 output_indices,
241 cond,
242 inequality_pairs,
243 state_table_l,
244 degree_state_table_l,
245 state_table_r,
246 degree_state_table_r,
247 watermark_epoch,
248 is_append_only,
249 metrics,
250 chunk_size,
251 high_join_amplification_threshold,
252 None,
253 )
254 }
255
256 #[allow(clippy::too_many_arguments)]
257 pub fn new_with_cache_size(
258 ctx: ActorContextRef,
259 info: ExecutorInfo,
260 input_l: Executor,
261 input_r: Executor,
262 params_l: JoinParams,
263 params_r: JoinParams,
264 null_safe: Vec<bool>,
265 output_indices: Vec<usize>,
266 cond: Option<NonStrictExpression>,
267 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
268 state_table_l: StateTable<S>,
269 degree_state_table_l: StateTable<S>,
270 state_table_r: StateTable<S>,
271 degree_state_table_r: StateTable<S>,
272 watermark_epoch: AtomicU64Ref,
273 is_append_only: bool,
274 metrics: Arc<StreamingMetrics>,
275 chunk_size: usize,
276 high_join_amplification_threshold: usize,
277 entry_state_max_rows: Option<usize>,
278 ) -> Self {
279 let entry_state_max_rows = match entry_state_max_rows {
280 None => {
281 ctx.streaming_config
282 .developer
283 .hash_join_entry_state_max_rows
284 }
285 Some(entry_state_max_rows) => entry_state_max_rows,
286 };
287 let side_l_column_n = input_l.schema().len();
288
289 let schema_fields = match T {
290 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
291 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
292 _ => [
293 input_l.schema().fields.clone(),
294 input_r.schema().fields.clone(),
295 ]
296 .concat(),
297 };
298
299 let original_output_data_types = schema_fields
300 .iter()
301 .map(|field| field.data_type())
302 .collect_vec();
303 let actual_output_data_types = output_indices
304 .iter()
305 .map(|&idx| original_output_data_types[idx].clone())
306 .collect_vec();
307
308 let state_all_data_types_l = input_l.schema().data_types();
310 let state_all_data_types_r = input_r.schema().data_types();
311
312 let state_pk_indices_l = input_l.pk_indices().to_vec();
313 let state_pk_indices_r = input_r.pk_indices().to_vec();
314
315 let state_join_key_indices_l = params_l.join_key_indices;
316 let state_join_key_indices_r = params_r.join_key_indices;
317
318 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
319 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
320
321 let degree_pk_indices_l = (state_join_key_indices_l.len()
322 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
323 .collect_vec();
324 let degree_pk_indices_r = (state_join_key_indices_r.len()
325 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
326 .collect_vec();
327
328 let pk_contained_in_jk_l =
330 is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
331 let pk_contained_in_jk_r =
332 is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
333
334 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
336
337 let join_key_data_types_l = state_join_key_indices_l
338 .iter()
339 .map(|idx| state_all_data_types_l[*idx].clone())
340 .collect_vec();
341
342 let join_key_data_types_r = state_join_key_indices_r
343 .iter()
344 .map(|idx| state_all_data_types_r[*idx].clone())
345 .collect_vec();
346
347 assert_eq!(join_key_data_types_l, join_key_data_types_r);
348
349 let null_matched = K::Bitmap::from_bool_vec(null_safe);
350
351 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
352 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
353
354 let degree_state_l = need_degree_table_l.then(|| {
355 TableInner::new(
356 degree_pk_indices_l,
357 degree_join_key_indices_l,
358 degree_state_table_l,
359 )
360 });
361 let degree_state_r = need_degree_table_r.then(|| {
362 TableInner::new(
363 degree_pk_indices_r,
364 degree_join_key_indices_r,
365 degree_state_table_r,
366 )
367 });
368
369 let (left_to_output, right_to_output) = {
370 let (left_len, right_len) = if is_left_semi_or_anti(T) {
371 (state_all_data_types_l.len(), 0usize)
372 } else if is_right_semi_or_anti(T) {
373 (0usize, state_all_data_types_r.len())
374 } else {
375 (state_all_data_types_l.len(), state_all_data_types_r.len())
376 };
377 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
378 };
379
380 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
381 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
382
383 let left_input_len = input_l.schema().len();
384 let right_input_len = input_r.schema().len();
385 let mut l2inequality_index = vec![vec![]; left_input_len];
386 let mut r2inequality_index = vec![vec![]; right_input_len];
387 let mut l_state_clean_columns = vec![];
388 let mut r_state_clean_columns = vec![];
389 let inequality_pairs = inequality_pairs
390 .into_iter()
391 .enumerate()
392 .map(
393 |(
394 index,
395 (key_required_larger, key_required_smaller, clean_state, delta_expression),
396 )| {
397 let output_indices = if key_required_larger < key_required_smaller {
398 if clean_state {
399 l_state_clean_columns.push((key_required_larger, index));
400 }
401 l2inequality_index[key_required_larger].push((index, false));
402 r2inequality_index[key_required_smaller - left_input_len]
403 .push((index, true));
404 l2o_indexed
405 .get_vec(&key_required_larger)
406 .cloned()
407 .unwrap_or_default()
408 } else {
409 if clean_state {
410 r_state_clean_columns
411 .push((key_required_larger - left_input_len, index));
412 }
413 l2inequality_index[key_required_smaller].push((index, true));
414 r2inequality_index[key_required_larger - left_input_len]
415 .push((index, false));
416 r2o_indexed
417 .get_vec(&(key_required_larger - left_input_len))
418 .cloned()
419 .unwrap_or_default()
420 };
421 (output_indices, delta_expression)
422 },
423 )
424 .collect_vec();
425
426 let mut l_non_null_fields = l2inequality_index
427 .iter()
428 .positions(|inequalities| !inequalities.is_empty())
429 .collect_vec();
430 let mut r_non_null_fields = r2inequality_index
431 .iter()
432 .positions(|inequalities| !inequalities.is_empty())
433 .collect_vec();
434
435 if append_only_optimize {
436 l_state_clean_columns.clear();
437 r_state_clean_columns.clear();
438 l_non_null_fields.clear();
439 r_non_null_fields.clear();
440 }
441
442 let inequality_watermarks = vec![None; inequality_pairs.len()];
443 let watermark_buffers = BTreeMap::new();
444
445 Self {
446 ctx: ctx.clone(),
447 info,
448 input_l: Some(input_l),
449 input_r: Some(input_r),
450 actual_output_data_types,
451 side_l: JoinSide {
452 ht: JoinHashMap::new(
453 watermark_epoch.clone(),
454 join_key_data_types_l,
455 state_join_key_indices_l.clone(),
456 state_all_data_types_l.clone(),
457 state_table_l,
458 params_l.deduped_pk_indices,
459 degree_state_l,
460 null_matched.clone(),
461 pk_contained_in_jk_l,
462 None,
463 metrics.clone(),
464 ctx.id,
465 ctx.fragment_id,
466 "left",
467 ),
468 join_key_indices: state_join_key_indices_l,
469 all_data_types: state_all_data_types_l,
470 i2o_mapping: left_to_output,
471 i2o_mapping_indexed: l2o_indexed,
472 input2inequality_index: l2inequality_index,
473 non_null_fields: l_non_null_fields,
474 state_clean_columns: l_state_clean_columns,
475 start_pos: 0,
476 need_degree_table: need_degree_table_l,
477 },
478 side_r: JoinSide {
479 ht: JoinHashMap::new(
480 watermark_epoch,
481 join_key_data_types_r,
482 state_join_key_indices_r.clone(),
483 state_all_data_types_r.clone(),
484 state_table_r,
485 params_r.deduped_pk_indices,
486 degree_state_r,
487 null_matched,
488 pk_contained_in_jk_r,
489 None,
490 metrics.clone(),
491 ctx.id,
492 ctx.fragment_id,
493 "right",
494 ),
495 join_key_indices: state_join_key_indices_r,
496 all_data_types: state_all_data_types_r,
497 start_pos: side_l_column_n,
498 i2o_mapping: right_to_output,
499 i2o_mapping_indexed: r2o_indexed,
500 input2inequality_index: r2inequality_index,
501 non_null_fields: r_non_null_fields,
502 state_clean_columns: r_state_clean_columns,
503 need_degree_table: need_degree_table_r,
504 },
505 cond,
506 inequality_pairs,
507 inequality_watermarks,
508 append_only_optimize,
509 metrics,
510 chunk_size,
511 cnt_rows_received: 0,
512 watermark_buffers,
513 high_join_amplification_threshold,
514 entry_state_max_rows,
515 }
516 }
517
518 #[try_stream(ok = Message, error = StreamExecutorError)]
519 async fn into_stream(mut self) {
520 let input_l = self.input_l.take().unwrap();
521 let input_r = self.input_r.take().unwrap();
522 let aligned_stream = barrier_align(
523 input_l.execute(),
524 input_r.execute(),
525 self.ctx.id,
526 self.ctx.fragment_id,
527 self.metrics.clone(),
528 "Join",
529 );
530 pin_mut!(aligned_stream);
531
532 let actor_id = self.ctx.id;
533
534 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
535 let first_epoch = barrier.epoch;
536 yield Message::Barrier(barrier);
538 self.side_l.init(first_epoch).await?;
539 self.side_r.init(first_epoch).await?;
540
541 let actor_id_str = self.ctx.id.to_string();
542 let fragment_id_str = self.ctx.fragment_id.to_string();
543
544 let join_actor_input_waiting_duration_ns = self
546 .metrics
547 .join_actor_input_waiting_duration_ns
548 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
549 let left_join_match_duration_ns = self
550 .metrics
551 .join_match_duration_ns
552 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "left"]);
553 let right_join_match_duration_ns = self
554 .metrics
555 .join_match_duration_ns
556 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "right"]);
557
558 let barrier_join_match_duration_ns = self
559 .metrics
560 .join_match_duration_ns
561 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "barrier"]);
562
563 let left_join_cached_entry_count = self
564 .metrics
565 .join_cached_entry_count
566 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "left"]);
567
568 let right_join_cached_entry_count = self
569 .metrics
570 .join_cached_entry_count
571 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str, "right"]);
572
573 let mut start_time = Instant::now();
574
575 while let Some(msg) = aligned_stream
576 .next()
577 .instrument_await("hash_join_barrier_align")
578 .await
579 {
580 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
581 match msg? {
582 AlignedMessage::WatermarkLeft(watermark) => {
583 for watermark_to_emit in
584 self.handle_watermark(SideType::Left, watermark).await?
585 {
586 yield Message::Watermark(watermark_to_emit);
587 }
588 }
589 AlignedMessage::WatermarkRight(watermark) => {
590 for watermark_to_emit in
591 self.handle_watermark(SideType::Right, watermark).await?
592 {
593 yield Message::Watermark(watermark_to_emit);
594 }
595 }
596 AlignedMessage::Left(chunk) => {
597 let mut left_time = Duration::from_nanos(0);
598 let mut left_start_time = Instant::now();
599 #[for_await]
600 for chunk in Self::eq_join_left(EqJoinArgs {
601 ctx: &self.ctx,
602 side_l: &mut self.side_l,
603 side_r: &mut self.side_r,
604 actual_output_data_types: &self.actual_output_data_types,
605 cond: &mut self.cond,
606 inequality_watermarks: &self.inequality_watermarks,
607 chunk,
608 append_only_optimize: self.append_only_optimize,
609 chunk_size: self.chunk_size,
610 cnt_rows_received: &mut self.cnt_rows_received,
611 high_join_amplification_threshold: self.high_join_amplification_threshold,
612 entry_state_max_rows: self.entry_state_max_rows,
613 }) {
614 left_time += left_start_time.elapsed();
615 yield Message::Chunk(chunk?);
616 left_start_time = Instant::now();
617 }
618 left_time += left_start_time.elapsed();
619 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
620 self.try_flush_data().await?;
621 }
622 AlignedMessage::Right(chunk) => {
623 let mut right_time = Duration::from_nanos(0);
624 let mut right_start_time = Instant::now();
625 #[for_await]
626 for chunk in Self::eq_join_right(EqJoinArgs {
627 ctx: &self.ctx,
628 side_l: &mut self.side_l,
629 side_r: &mut self.side_r,
630 actual_output_data_types: &self.actual_output_data_types,
631 cond: &mut self.cond,
632 inequality_watermarks: &self.inequality_watermarks,
633 chunk,
634 append_only_optimize: self.append_only_optimize,
635 chunk_size: self.chunk_size,
636 cnt_rows_received: &mut self.cnt_rows_received,
637 high_join_amplification_threshold: self.high_join_amplification_threshold,
638 entry_state_max_rows: self.entry_state_max_rows,
639 }) {
640 right_time += right_start_time.elapsed();
641 yield Message::Chunk(chunk?);
642 right_start_time = Instant::now();
643 }
644 right_time += right_start_time.elapsed();
645 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
646 self.try_flush_data().await?;
647 }
648 AlignedMessage::Barrier(barrier) => {
649 let barrier_start_time = Instant::now();
650 let (left_post_commit, right_post_commit) =
651 self.flush_data(barrier.epoch).await?;
652
653 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
654 yield Message::Barrier(barrier);
655
656 right_post_commit
658 .post_yield_barrier(update_vnode_bitmap.clone())
659 .await?;
660 if left_post_commit
661 .post_yield_barrier(update_vnode_bitmap)
662 .await?
663 .unwrap_or(false)
664 {
665 self.watermark_buffers
666 .values_mut()
667 .for_each(|buffers| buffers.clear());
668 self.inequality_watermarks.fill(None);
669 }
670
671 for (join_cached_entry_count, ht) in [
673 (&left_join_cached_entry_count, &self.side_l.ht),
674 (&right_join_cached_entry_count, &self.side_r.ht),
675 ] {
676 join_cached_entry_count.set(ht.entry_count() as i64);
677 }
678
679 barrier_join_match_duration_ns
680 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
681 }
682 }
683 start_time = Instant::now();
684 }
685 }
686
687 async fn flush_data(
688 &mut self,
689 epoch: EpochPair,
690 ) -> StreamExecutorResult<(
691 JoinHashMapPostCommit<'_, K, S>,
692 JoinHashMapPostCommit<'_, K, S>,
693 )> {
694 let left = self.side_l.ht.flush(epoch).await?;
697 let right = self.side_r.ht.flush(epoch).await?;
698 Ok((left, right))
699 }
700
701 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
702 self.side_l.ht.try_flush().await?;
705 self.side_r.ht.try_flush().await?;
706 Ok(())
707 }
708
709 fn evict_cache(
711 side_update: &mut JoinSide<K, S>,
712 side_match: &mut JoinSide<K, S>,
713 cnt_rows_received: &mut u32,
714 ) {
715 *cnt_rows_received += 1;
716 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
717 side_update.ht.evict();
718 side_match.ht.evict();
719 *cnt_rows_received = 0;
720 }
721 }
722
723 async fn handle_watermark(
724 &mut self,
725 side: SideTypePrimitive,
726 watermark: Watermark,
727 ) -> StreamExecutorResult<Vec<Watermark>> {
728 let (side_update, side_match) = if side == SideType::Left {
729 (&mut self.side_l, &mut self.side_r)
730 } else {
731 (&mut self.side_r, &mut self.side_l)
732 };
733
734 if side_update.join_key_indices[0] == watermark.col_idx {
736 side_match.ht.update_watermark(watermark.val.clone());
737 }
738
739 let wm_in_jk = side_update
741 .join_key_indices
742 .iter()
743 .positions(|idx| *idx == watermark.col_idx);
744 let mut watermarks_to_emit = vec![];
745 for idx in wm_in_jk {
746 let buffers = self
747 .watermark_buffers
748 .entry(idx)
749 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
750 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
751 let empty_indices = vec![];
752 let output_indices = side_update
753 .i2o_mapping_indexed
754 .get_vec(&side_update.join_key_indices[idx])
755 .unwrap_or(&empty_indices)
756 .iter()
757 .chain(
758 side_match
759 .i2o_mapping_indexed
760 .get_vec(&side_match.join_key_indices[idx])
761 .unwrap_or(&empty_indices),
762 );
763 for output_idx in output_indices {
764 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
765 }
766 };
767 }
768 for (inequality_index, need_offset) in
769 &side_update.input2inequality_index[watermark.col_idx]
770 {
771 let buffers = self
772 .watermark_buffers
773 .entry(side_update.join_key_indices.len() + inequality_index)
774 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
775 let mut input_watermark = watermark.clone();
776 if *need_offset
777 && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
778 {
779 #[allow(clippy::disallowed_methods)]
781 let eval_result = delta_expression
782 .inner()
783 .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
784 .await;
785 match eval_result {
786 Ok(value) => input_watermark.val = value.unwrap(),
787 Err(err) => {
788 if !matches!(err, ExprError::NumericOutOfRange) {
789 self.ctx.on_compute_error(err, &self.info.identity);
790 }
791 continue;
792 }
793 }
794 };
795 if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
796 for output_idx in &self.inequality_pairs[*inequality_index].0 {
797 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
798 }
799 self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
800 }
801 }
802 Ok(watermarks_to_emit)
803 }
804
805 fn row_concat(
806 row_update: impl Row,
807 update_start_pos: usize,
808 row_matched: impl Row,
809 matched_start_pos: usize,
810 ) -> OwnedRow {
811 let mut new_row = vec![None; row_update.len() + row_matched.len()];
812
813 for (i, datum_ref) in row_update.iter().enumerate() {
814 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
815 }
816 for (i, datum_ref) in row_matched.iter().enumerate() {
817 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
818 }
819 OwnedRow::new(new_row)
820 }
821
822 fn eq_join_left(
824 args: EqJoinArgs<'_, K, S>,
825 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
826 Self::eq_join_oneside::<{ SideType::Left }>(args)
827 }
828
829 fn eq_join_right(
831 args: EqJoinArgs<'_, K, S>,
832 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
833 Self::eq_join_oneside::<{ SideType::Right }>(args)
834 }
835
836 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
837 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S>) {
838 let EqJoinArgs {
839 ctx,
840 side_l,
841 side_r,
842 actual_output_data_types,
843 cond,
844 inequality_watermarks,
845 chunk,
846 append_only_optimize,
847 chunk_size,
848 cnt_rows_received,
849 high_join_amplification_threshold,
850 entry_state_max_rows,
851 ..
852 } = args;
853
854 let (side_update, side_match) = if SIDE == SideType::Left {
855 (side_l, side_r)
856 } else {
857 (side_r, side_l)
858 };
859
860 let useful_state_clean_columns = side_match
861 .state_clean_columns
862 .iter()
863 .filter_map(|(column_idx, inequality_index)| {
864 inequality_watermarks[*inequality_index]
865 .as_ref()
866 .map(|watermark| (*column_idx, watermark))
867 })
868 .collect_vec();
869
870 let mut hashjoin_chunk_builder =
871 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
872 chunk_size,
873 actual_output_data_types.to_vec(),
874 side_update.i2o_mapping.clone(),
875 side_match.i2o_mapping.clone(),
876 ));
877
878 let join_matched_join_keys = ctx
879 .streaming_metrics
880 .join_matched_join_keys
881 .with_guarded_label_values(&[
882 &ctx.id.to_string(),
883 &ctx.fragment_id.to_string(),
884 &side_update.ht.table_id().to_string(),
885 ]);
886
887 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
888 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
889 let Some((op, row)) = r else {
890 continue;
891 };
892 Self::evict_cache(side_update, side_match, cnt_rows_received);
893
894 let cache_lookup_result = {
895 let probe_non_null_requirement_satisfied = side_update
896 .non_null_fields
897 .iter()
898 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
899 let build_non_null_requirement_satisfied =
900 key.null_bitmap().is_subset(side_match.ht.null_matched());
901 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
902 side_match.ht.take_state_opt(key)
903 } else {
904 CacheResult::NeverMatch
905 }
906 };
907 let mut total_matches = 0;
908
909 macro_rules! match_rows {
910 ($op:ident) => {
911 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
912 cache_lookup_result,
913 row,
914 key,
915 &mut hashjoin_chunk_builder,
916 side_match,
917 side_update,
918 &useful_state_clean_columns,
919 cond,
920 append_only_optimize,
921 entry_state_max_rows,
922 )
923 };
924 }
925
926 match op {
927 Op::Insert | Op::UpdateInsert =>
928 {
929 #[for_await]
930 for chunk in match_rows!(Insert) {
931 let chunk = chunk?;
932 total_matches += chunk.cardinality();
933 yield chunk;
934 }
935 }
936 Op::Delete | Op::UpdateDelete =>
937 {
938 #[for_await]
939 for chunk in match_rows!(Delete) {
940 let chunk = chunk?;
941 total_matches += chunk.cardinality();
942 yield chunk;
943 }
944 }
945 };
946
947 join_matched_join_keys.observe(total_matches as _);
948 if total_matches > high_join_amplification_threshold {
949 let join_key_data_types = side_update.ht.join_key_data_types();
950 let key = key.deserialize(join_key_data_types)?;
951 tracing::warn!(target: "high_join_amplification",
952 matched_rows_len = total_matches,
953 update_table_id = side_update.ht.table_id(),
954 match_table_id = side_match.ht.table_id(),
955 join_key = ?key,
956 actor_id = ctx.id,
957 fragment_id = ctx.fragment_id,
958 "large rows matched for join key"
959 );
960 }
961 }
962 if let Some(chunk) = hashjoin_chunk_builder.take() {
964 yield chunk;
965 }
966 }
967
968 #[allow(clippy::too_many_arguments)]
977 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
978 async fn handle_match_rows<
979 'a,
980 const SIDE: SideTypePrimitive,
981 const JOIN_OP: JoinOpPrimitive,
982 >(
983 cached_lookup_result: CacheResult,
984 row: RowRef<'a>,
985 key: &'a K,
986 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
987 side_match: &'a mut JoinSide<K, S>,
988 side_update: &'a mut JoinSide<K, S>,
989 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
990 cond: &'a mut Option<NonStrictExpression>,
991 append_only_optimize: bool,
992 entry_state_max_rows: usize,
993 ) {
994 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
995 let mut entry_state = JoinEntryState::default();
996 let mut entry_state_count = 0;
997
998 let mut degree = 0;
999 let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1000 let mut matched_rows_to_clean = vec![];
1001
1002 macro_rules! match_row {
1003 (
1004 $match_order_key_indices:expr,
1005 $degree_table:expr,
1006 $matched_row:expr,
1007 $matched_row_ref:expr,
1008 $from_cache:literal
1009 ) => {
1010 Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1011 row,
1012 $matched_row,
1013 $matched_row_ref,
1014 hashjoin_chunk_builder,
1015 $match_order_key_indices,
1016 $degree_table,
1017 side_update.start_pos,
1018 side_match.start_pos,
1019 cond,
1020 &mut degree,
1021 useful_state_clean_columns,
1022 append_only_optimize,
1023 &mut append_only_matched_row,
1024 &mut matched_rows_to_clean,
1025 )
1026 };
1027 }
1028
1029 let entry_state = match cached_lookup_result {
1030 CacheResult::NeverMatch => {
1031 let op = match JOIN_OP {
1032 JoinOp::Insert => Op::Insert,
1033 JoinOp::Delete => Op::Delete,
1034 };
1035 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1036 yield chunk;
1037 }
1038 return Ok(());
1039 }
1040 CacheResult::Hit(mut cached_rows) => {
1041 let (match_order_key_indices, match_degree_state) =
1042 side_match.ht.get_degree_state_mut_ref();
1043 for (matched_row_ref, matched_row) in
1045 cached_rows.values_mut(&side_match.all_data_types)
1046 {
1047 let matched_row = matched_row?;
1048 if let Some(chunk) = match_row!(
1049 match_order_key_indices,
1050 match_degree_state,
1051 matched_row,
1052 Some(matched_row_ref),
1053 true
1054 )
1055 .await
1056 {
1057 yield chunk;
1058 }
1059 }
1060
1061 cached_rows
1062 }
1063 CacheResult::Miss => {
1064 let (matched_rows, match_order_key_indices, degree_table) = side_match
1066 .ht
1067 .fetch_matched_rows_and_get_degree_table_ref(key)
1068 .await?;
1069
1070 #[for_await]
1071 for matched_row in matched_rows {
1072 let (encoded_pk, matched_row) = matched_row?;
1073
1074 let mut matched_row_ref = None;
1075
1076 if entry_state_count <= entry_state_max_rows {
1078 let row_ref = entry_state
1079 .insert(encoded_pk, matched_row.encode(), None) .with_context(|| format!("row: {}", row.display(),))?;
1081 matched_row_ref = Some(row_ref);
1082 entry_state_count += 1;
1083 }
1084 if let Some(chunk) = match_row!(
1085 match_order_key_indices,
1086 degree_table,
1087 matched_row,
1088 matched_row_ref,
1089 false
1090 )
1091 .await
1092 {
1093 yield chunk;
1094 }
1095 }
1096 Box::new(entry_state)
1097 }
1098 };
1099
1100 let op = match JOIN_OP {
1102 JoinOp::Insert => Op::Insert,
1103 JoinOp::Delete => Op::Delete,
1104 };
1105 if degree == 0 {
1106 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1107 yield chunk;
1108 }
1109 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1110 {
1111 yield chunk;
1112 }
1113
1114 if cache_hit || entry_state_count <= entry_state_max_rows {
1116 side_match.ht.update_state(key, entry_state);
1117 }
1118
1119 for matched_row in matched_rows_to_clean {
1121 side_match.ht.delete_handle_degree(key, matched_row)?;
1122 }
1123
1124 if append_only_optimize && let Some(row) = append_only_matched_row {
1126 assert_matches!(JOIN_OP, JoinOp::Insert);
1127 side_match.ht.delete_handle_degree(key, row)?;
1128 return Ok(());
1129 }
1130
1131 match JOIN_OP {
1133 JoinOp::Insert => {
1134 side_update
1135 .ht
1136 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1137 }
1138 JoinOp::Delete => {
1139 side_update
1140 .ht
1141 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1142 }
1143 }
1144 }
1145
1146 #[allow(clippy::too_many_arguments)]
1147 #[inline]
1148 async fn handle_match_row<
1149 'a,
1150 const SIDE: SideTypePrimitive,
1151 const JOIN_OP: JoinOpPrimitive,
1152 const MATCHED_ROWS_FROM_CACHE: bool,
1153 >(
1154 update_row: RowRef<'a>,
1155 mut matched_row: JoinRow<OwnedRow>,
1156 mut matched_row_cache_ref: Option<&mut StateValueType>,
1157 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1158 match_order_key_indices: &[usize],
1159 match_degree_table: &mut Option<TableInner<S>>,
1160 side_update_start_pos: usize,
1161 side_match_start_pos: usize,
1162 cond: &Option<NonStrictExpression>,
1163 update_row_degree: &mut u64,
1164 useful_state_clean_columns: &[(usize, &'a Watermark)],
1165 append_only_optimize: bool,
1166 append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1167 matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1168 ) -> Option<StreamChunk> {
1169 let mut need_state_clean = false;
1170 let mut chunk_opt = None;
1171
1172 let join_condition_satisfied = Self::check_join_condition(
1176 update_row,
1177 side_update_start_pos,
1178 &matched_row.row,
1179 side_match_start_pos,
1180 cond,
1181 )
1182 .await;
1183
1184 if join_condition_satisfied {
1185 *update_row_degree += 1;
1187 if matches!(JOIN_OP, JoinOp::Insert) && !forward_exactly_once(T, SIDE) {
1192 if let Some(chunk) =
1193 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1194 {
1195 chunk_opt = Some(chunk);
1196 }
1197 }
1198 if let Some(degree_table) = match_degree_table {
1200 update_degree::<S, { JOIN_OP }>(
1201 match_order_key_indices,
1202 degree_table,
1203 &mut matched_row,
1204 );
1205 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1206 match JOIN_OP {
1208 JoinOp::Insert => {
1209 matched_row_cache_ref.as_mut().unwrap().degree += 1;
1210 }
1211 JoinOp::Delete => {
1212 matched_row_cache_ref.as_mut().unwrap().degree -= 1;
1213 }
1214 }
1215 }
1216 }
1217
1218 if matches!(JOIN_OP, JoinOp::Delete) && !forward_exactly_once(T, SIDE) {
1221 if let Some(chunk) =
1222 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1223 {
1224 chunk_opt = Some(chunk);
1225 }
1226 }
1227 } else {
1228 for (column_idx, watermark) in useful_state_clean_columns {
1230 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1231 scalar
1232 .default_cmp(&watermark.val.as_scalar_ref_impl())
1233 .is_lt()
1234 }) {
1235 need_state_clean = true;
1236 break;
1237 }
1238 }
1239 }
1240 if append_only_optimize {
1244 assert_matches!(JOIN_OP, JoinOp::Insert);
1245 assert!(append_only_matched_row.is_none());
1248 *append_only_matched_row = Some(matched_row);
1249 } else if need_state_clean {
1250 matched_rows_to_clean.push(matched_row);
1254 }
1255
1256 chunk_opt
1257 }
1258
1259 #[inline]
1264 async fn check_join_condition(
1265 row: impl Row,
1266 side_update_start_pos: usize,
1267 matched_row: impl Row,
1268 side_match_start_pos: usize,
1269 join_condition: &Option<NonStrictExpression>,
1270 ) -> bool {
1271 if let Some(join_condition) = join_condition {
1272 let new_row = Self::row_concat(
1273 row,
1274 side_update_start_pos,
1275 matched_row,
1276 side_match_start_pos,
1277 );
1278 join_condition
1279 .eval_row_infallible(&new_row)
1280 .await
1281 .map(|s| *s.as_bool())
1282 .unwrap_or(false)
1283 } else {
1284 true
1285 }
1286 }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291 use std::sync::atomic::AtomicU64;
1292
1293 use pretty_assertions::assert_eq;
1294 use risingwave_common::array::*;
1295 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1296 use risingwave_common::hash::{Key64, Key128};
1297 use risingwave_common::util::epoch::test_epoch;
1298 use risingwave_common::util::sort_util::OrderType;
1299 use risingwave_storage::memory::MemoryStateStore;
1300
1301 use super::*;
1302 use crate::common::table::test_utils::gen_pbtable;
1303 use crate::executor::test_utils::expr::build_from_pretty;
1304 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1305
1306 async fn create_in_memory_state_table(
1307 mem_state: MemoryStateStore,
1308 data_types: &[DataType],
1309 order_types: &[OrderType],
1310 pk_indices: &[usize],
1311 table_id: u32,
1312 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1313 let column_descs = data_types
1314 .iter()
1315 .enumerate()
1316 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1317 .collect_vec();
1318 let state_table = StateTable::from_table_catalog(
1319 &gen_pbtable(
1320 TableId::new(table_id),
1321 column_descs,
1322 order_types.to_vec(),
1323 pk_indices.to_vec(),
1324 0,
1325 ),
1326 mem_state.clone(),
1327 None,
1328 )
1329 .await;
1330
1331 let mut degree_table_column_descs = vec![];
1333 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1334 degree_table_column_descs.push(ColumnDesc::unnamed(
1335 ColumnId::new(pk_id as i32),
1336 data_types[*idx].clone(),
1337 ))
1338 });
1339 degree_table_column_descs.push(ColumnDesc::unnamed(
1340 ColumnId::new(pk_indices.len() as i32),
1341 DataType::Int64,
1342 ));
1343 let degree_state_table = StateTable::from_table_catalog(
1344 &gen_pbtable(
1345 TableId::new(table_id + 1),
1346 degree_table_column_descs,
1347 order_types.to_vec(),
1348 pk_indices.to_vec(),
1349 0,
1350 ),
1351 mem_state,
1352 None,
1353 )
1354 .await;
1355 (state_table, degree_state_table)
1356 }
1357
1358 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1359 build_from_pretty(
1360 condition_text
1361 .as_deref()
1362 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1363 )
1364 }
1365
1366 async fn create_executor<const T: JoinTypePrimitive>(
1367 with_condition: bool,
1368 null_safe: bool,
1369 condition_text: Option<String>,
1370 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1371 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1372 let schema = Schema {
1373 fields: vec![
1374 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1376 ],
1377 };
1378 let (tx_l, source_l) = MockSource::channel();
1379 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1380 let (tx_r, source_r) = MockSource::channel();
1381 let source_r = source_r.into_executor(schema, vec![1]);
1382 let params_l = JoinParams::new(vec![0], vec![1]);
1383 let params_r = JoinParams::new(vec![0], vec![1]);
1384 let cond = with_condition.then(|| create_cond(condition_text));
1385
1386 let mem_state = MemoryStateStore::new();
1387
1388 let (state_l, degree_state_l) = create_in_memory_state_table(
1389 mem_state.clone(),
1390 &[DataType::Int64, DataType::Int64],
1391 &[OrderType::ascending(), OrderType::ascending()],
1392 &[0, 1],
1393 0,
1394 )
1395 .await;
1396
1397 let (state_r, degree_state_r) = create_in_memory_state_table(
1398 mem_state,
1399 &[DataType::Int64, DataType::Int64],
1400 &[OrderType::ascending(), OrderType::ascending()],
1401 &[0, 1],
1402 2,
1403 )
1404 .await;
1405
1406 let schema = match T {
1407 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1408 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1409 _ => [source_l.schema().fields(), source_r.schema().fields()]
1410 .concat()
1411 .into_iter()
1412 .collect(),
1413 };
1414 let schema_len = schema.len();
1415 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1416
1417 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T>::new(
1418 ActorContext::for_test(123),
1419 info,
1420 source_l,
1421 source_r,
1422 params_l,
1423 params_r,
1424 vec![null_safe],
1425 (0..schema_len).collect_vec(),
1426 cond,
1427 inequality_pairs,
1428 state_l,
1429 degree_state_l,
1430 state_r,
1431 degree_state_r,
1432 Arc::new(AtomicU64::new(0)),
1433 false,
1434 Arc::new(StreamingMetrics::unused()),
1435 1024,
1436 2048,
1437 );
1438 (tx_l, tx_r, executor.boxed().execute())
1439 }
1440
1441 async fn create_classical_executor<const T: JoinTypePrimitive>(
1442 with_condition: bool,
1443 null_safe: bool,
1444 condition_text: Option<String>,
1445 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1446 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1447 }
1448
1449 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1450 with_condition: bool,
1451 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1452 let schema = Schema {
1453 fields: vec![
1454 Field::unnamed(DataType::Int64),
1455 Field::unnamed(DataType::Int64),
1456 Field::unnamed(DataType::Int64),
1457 ],
1458 };
1459 let (tx_l, source_l) = MockSource::channel();
1460 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1461 let (tx_r, source_r) = MockSource::channel();
1462 let source_r = source_r.into_executor(schema, vec![0]);
1463 let params_l = JoinParams::new(vec![0, 1], vec![]);
1464 let params_r = JoinParams::new(vec![0, 1], vec![]);
1465 let cond = with_condition.then(|| create_cond(None));
1466
1467 let mem_state = MemoryStateStore::new();
1468
1469 let (state_l, degree_state_l) = create_in_memory_state_table(
1470 mem_state.clone(),
1471 &[DataType::Int64, DataType::Int64, DataType::Int64],
1472 &[
1473 OrderType::ascending(),
1474 OrderType::ascending(),
1475 OrderType::ascending(),
1476 ],
1477 &[0, 1, 0],
1478 0,
1479 )
1480 .await;
1481
1482 let (state_r, degree_state_r) = create_in_memory_state_table(
1483 mem_state,
1484 &[DataType::Int64, DataType::Int64, DataType::Int64],
1485 &[
1486 OrderType::ascending(),
1487 OrderType::ascending(),
1488 OrderType::ascending(),
1489 ],
1490 &[0, 1, 1],
1491 1,
1492 )
1493 .await;
1494
1495 let schema = match T {
1496 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1497 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1498 _ => [source_l.schema().fields(), source_r.schema().fields()]
1499 .concat()
1500 .into_iter()
1501 .collect(),
1502 };
1503 let schema_len = schema.len();
1504 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1505
1506 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T>::new(
1507 ActorContext::for_test(123),
1508 info,
1509 source_l,
1510 source_r,
1511 params_l,
1512 params_r,
1513 vec![false],
1514 (0..schema_len).collect_vec(),
1515 cond,
1516 vec![],
1517 state_l,
1518 degree_state_l,
1519 state_r,
1520 degree_state_r,
1521 Arc::new(AtomicU64::new(0)),
1522 true,
1523 Arc::new(StreamingMetrics::unused()),
1524 1024,
1525 2048,
1526 );
1527 (tx_l, tx_r, executor.boxed().execute())
1528 }
1529
1530 #[tokio::test]
1531 async fn test_interval_join() -> StreamExecutorResult<()> {
1532 let chunk_l1 = StreamChunk::from_pretty(
1533 " I I
1534 + 1 4
1535 + 2 3
1536 + 2 5
1537 + 3 6",
1538 );
1539 let chunk_l2 = StreamChunk::from_pretty(
1540 " I I
1541 + 3 8
1542 - 3 8",
1543 );
1544 let chunk_r1 = StreamChunk::from_pretty(
1545 " I I
1546 + 2 6
1547 + 4 8
1548 + 6 9",
1549 );
1550 let chunk_r2 = StreamChunk::from_pretty(
1551 " I I
1552 + 2 3
1553 + 6 11",
1554 );
1555 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1556 true,
1557 false,
1558 Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1559 vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1560 )
1561 .await;
1562
1563 tx_l.push_barrier(test_epoch(1), false);
1565 tx_r.push_barrier(test_epoch(1), false);
1566 hash_join.next_unwrap_ready_barrier()?;
1567
1568 tx_l.push_chunk(chunk_l1);
1570 hash_join.next_unwrap_pending();
1571
1572 tx_l.push_barrier(test_epoch(2), false);
1574 tx_r.push_barrier(test_epoch(2), false);
1575 hash_join.next_unwrap_ready_barrier()?;
1576
1577 tx_l.push_chunk(chunk_l2);
1579 hash_join.next_unwrap_pending();
1580
1581 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1582 hash_join.next_unwrap_pending();
1583
1584 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1585 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1586 assert_eq!(
1587 output_watermark,
1588 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1589 );
1590 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1591 assert_eq!(
1592 output_watermark,
1593 Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1594 );
1595
1596 tx_r.push_chunk(chunk_r1);
1598 let chunk = hash_join.next_unwrap_ready_chunk()?;
1599 assert_eq!(
1601 chunk,
1602 StreamChunk::from_pretty(
1603 " I I I I
1604 + 2 5 2 6"
1605 )
1606 );
1607
1608 tx_r.push_chunk(chunk_r2);
1610 hash_join.next_unwrap_pending();
1613
1614 Ok(())
1615 }
1616
1617 #[tokio::test]
1618 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1619 let chunk_l1 = StreamChunk::from_pretty(
1620 " I I
1621 + 1 4
1622 + 2 5
1623 + 3 6",
1624 );
1625 let chunk_l2 = StreamChunk::from_pretty(
1626 " I I
1627 + 3 8
1628 - 3 8",
1629 );
1630 let chunk_r1 = StreamChunk::from_pretty(
1631 " I I
1632 + 2 7
1633 + 4 8
1634 + 6 9",
1635 );
1636 let chunk_r2 = StreamChunk::from_pretty(
1637 " I I
1638 + 3 10
1639 + 6 11",
1640 );
1641 let (mut tx_l, mut tx_r, mut hash_join) =
1642 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1643
1644 tx_l.push_barrier(test_epoch(1), false);
1646 tx_r.push_barrier(test_epoch(1), false);
1647 hash_join.next_unwrap_ready_barrier()?;
1648
1649 tx_l.push_chunk(chunk_l1);
1651 hash_join.next_unwrap_pending();
1652
1653 tx_l.push_barrier(test_epoch(2), false);
1655 tx_r.push_barrier(test_epoch(2), false);
1656 hash_join.next_unwrap_ready_barrier()?;
1657
1658 tx_l.push_chunk(chunk_l2);
1660 hash_join.next_unwrap_pending();
1661
1662 tx_r.push_chunk(chunk_r1);
1664 let chunk = hash_join.next_unwrap_ready_chunk()?;
1665 assert_eq!(
1666 chunk,
1667 StreamChunk::from_pretty(
1668 " I I I I
1669 + 2 5 2 7"
1670 )
1671 );
1672
1673 tx_r.push_chunk(chunk_r2);
1675 let chunk = hash_join.next_unwrap_ready_chunk()?;
1676 assert_eq!(
1677 chunk,
1678 StreamChunk::from_pretty(
1679 " I I I I
1680 + 3 6 3 10"
1681 )
1682 );
1683
1684 Ok(())
1685 }
1686
1687 #[tokio::test]
1688 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1689 let chunk_l1 = StreamChunk::from_pretty(
1690 " I I
1691 + 1 4
1692 + 2 5
1693 + . 6",
1694 );
1695 let chunk_l2 = StreamChunk::from_pretty(
1696 " I I
1697 + . 8
1698 - . 8",
1699 );
1700 let chunk_r1 = StreamChunk::from_pretty(
1701 " I I
1702 + 2 7
1703 + 4 8
1704 + 6 9",
1705 );
1706 let chunk_r2 = StreamChunk::from_pretty(
1707 " I I
1708 + . 10
1709 + 6 11",
1710 );
1711 let (mut tx_l, mut tx_r, mut hash_join) =
1712 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1713
1714 tx_l.push_barrier(test_epoch(1), false);
1716 tx_r.push_barrier(test_epoch(1), false);
1717 hash_join.next_unwrap_ready_barrier()?;
1718
1719 tx_l.push_chunk(chunk_l1);
1721 hash_join.next_unwrap_pending();
1722
1723 tx_l.push_barrier(test_epoch(2), false);
1725 tx_r.push_barrier(test_epoch(2), false);
1726 hash_join.next_unwrap_ready_barrier()?;
1727
1728 tx_l.push_chunk(chunk_l2);
1730 hash_join.next_unwrap_pending();
1731
1732 tx_r.push_chunk(chunk_r1);
1734 let chunk = hash_join.next_unwrap_ready_chunk()?;
1735 assert_eq!(
1736 chunk,
1737 StreamChunk::from_pretty(
1738 " I I I I
1739 + 2 5 2 7"
1740 )
1741 );
1742
1743 tx_r.push_chunk(chunk_r2);
1745 let chunk = hash_join.next_unwrap_ready_chunk()?;
1746 assert_eq!(
1747 chunk,
1748 StreamChunk::from_pretty(
1749 " I I I I
1750 + . 6 . 10"
1751 )
1752 );
1753
1754 Ok(())
1755 }
1756
1757 #[tokio::test]
1758 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1759 let chunk_l1 = StreamChunk::from_pretty(
1760 " I I
1761 + 1 4
1762 + 2 5
1763 + 3 6",
1764 );
1765 let chunk_l2 = StreamChunk::from_pretty(
1766 " I I
1767 + 3 8
1768 - 3 8",
1769 );
1770 let chunk_r1 = StreamChunk::from_pretty(
1771 " I I
1772 + 2 7
1773 + 4 8
1774 + 6 9",
1775 );
1776 let chunk_r2 = StreamChunk::from_pretty(
1777 " I I
1778 + 3 10
1779 + 6 11",
1780 );
1781 let chunk_l3 = StreamChunk::from_pretty(
1782 " I I
1783 + 6 10",
1784 );
1785 let chunk_r3 = StreamChunk::from_pretty(
1786 " I I
1787 - 6 11",
1788 );
1789 let chunk_r4 = StreamChunk::from_pretty(
1790 " I I
1791 - 6 9",
1792 );
1793 let (mut tx_l, mut tx_r, mut hash_join) =
1794 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1795
1796 tx_l.push_barrier(test_epoch(1), false);
1798 tx_r.push_barrier(test_epoch(1), false);
1799 hash_join.next_unwrap_ready_barrier()?;
1800
1801 tx_l.push_chunk(chunk_l1);
1803 hash_join.next_unwrap_pending();
1804
1805 tx_l.push_barrier(test_epoch(2), false);
1807 tx_r.push_barrier(test_epoch(2), false);
1808 hash_join.next_unwrap_ready_barrier()?;
1809
1810 tx_l.push_chunk(chunk_l2);
1812 hash_join.next_unwrap_pending();
1813
1814 tx_r.push_chunk(chunk_r1);
1816 let chunk = hash_join.next_unwrap_ready_chunk()?;
1817 assert_eq!(
1818 chunk,
1819 StreamChunk::from_pretty(
1820 " I I
1821 + 2 5"
1822 )
1823 );
1824
1825 tx_r.push_chunk(chunk_r2);
1827 let chunk = hash_join.next_unwrap_ready_chunk()?;
1828 assert_eq!(
1829 chunk,
1830 StreamChunk::from_pretty(
1831 " I I
1832 + 3 6"
1833 )
1834 );
1835
1836 tx_l.push_chunk(chunk_l3);
1838 let chunk = hash_join.next_unwrap_ready_chunk()?;
1839 assert_eq!(
1840 chunk,
1841 StreamChunk::from_pretty(
1842 " I I
1843 + 6 10"
1844 )
1845 );
1846
1847 tx_r.push_chunk(chunk_r3);
1850 hash_join.next_unwrap_pending();
1851
1852 tx_r.push_chunk(chunk_r4);
1855 let chunk = hash_join.next_unwrap_ready_chunk()?;
1856 assert_eq!(
1857 chunk,
1858 StreamChunk::from_pretty(
1859 " I I
1860 - 6 10"
1861 )
1862 );
1863
1864 Ok(())
1865 }
1866
1867 #[tokio::test]
1868 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1869 let chunk_l1 = StreamChunk::from_pretty(
1870 " I I
1871 + 1 4
1872 + 2 5
1873 + . 6",
1874 );
1875 let chunk_l2 = StreamChunk::from_pretty(
1876 " I I
1877 + . 8
1878 - . 8",
1879 );
1880 let chunk_r1 = StreamChunk::from_pretty(
1881 " I I
1882 + 2 7
1883 + 4 8
1884 + 6 9",
1885 );
1886 let chunk_r2 = StreamChunk::from_pretty(
1887 " I I
1888 + . 10
1889 + 6 11",
1890 );
1891 let chunk_l3 = StreamChunk::from_pretty(
1892 " I I
1893 + 6 10",
1894 );
1895 let chunk_r3 = StreamChunk::from_pretty(
1896 " I I
1897 - 6 11",
1898 );
1899 let chunk_r4 = StreamChunk::from_pretty(
1900 " I I
1901 - 6 9",
1902 );
1903 let (mut tx_l, mut tx_r, mut hash_join) =
1904 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1905
1906 tx_l.push_barrier(test_epoch(1), false);
1908 tx_r.push_barrier(test_epoch(1), false);
1909 hash_join.next_unwrap_ready_barrier()?;
1910
1911 tx_l.push_chunk(chunk_l1);
1913 hash_join.next_unwrap_pending();
1914
1915 tx_l.push_barrier(test_epoch(2), false);
1917 tx_r.push_barrier(test_epoch(2), false);
1918 hash_join.next_unwrap_ready_barrier()?;
1919
1920 tx_l.push_chunk(chunk_l2);
1922 hash_join.next_unwrap_pending();
1923
1924 tx_r.push_chunk(chunk_r1);
1926 let chunk = hash_join.next_unwrap_ready_chunk()?;
1927 assert_eq!(
1928 chunk,
1929 StreamChunk::from_pretty(
1930 " I I
1931 + 2 5"
1932 )
1933 );
1934
1935 tx_r.push_chunk(chunk_r2);
1937 let chunk = hash_join.next_unwrap_ready_chunk()?;
1938 assert_eq!(
1939 chunk,
1940 StreamChunk::from_pretty(
1941 " I I
1942 + . 6"
1943 )
1944 );
1945
1946 tx_l.push_chunk(chunk_l3);
1948 let chunk = hash_join.next_unwrap_ready_chunk()?;
1949 assert_eq!(
1950 chunk,
1951 StreamChunk::from_pretty(
1952 " I I
1953 + 6 10"
1954 )
1955 );
1956
1957 tx_r.push_chunk(chunk_r3);
1960 hash_join.next_unwrap_pending();
1961
1962 tx_r.push_chunk(chunk_r4);
1965 let chunk = hash_join.next_unwrap_ready_chunk()?;
1966 assert_eq!(
1967 chunk,
1968 StreamChunk::from_pretty(
1969 " I I
1970 - 6 10"
1971 )
1972 );
1973
1974 Ok(())
1975 }
1976
1977 #[tokio::test]
1978 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1979 let chunk_l1 = StreamChunk::from_pretty(
1980 " I I I
1981 + 1 4 1
1982 + 2 5 2
1983 + 3 6 3",
1984 );
1985 let chunk_l2 = StreamChunk::from_pretty(
1986 " I I I
1987 + 4 9 4
1988 + 5 10 5",
1989 );
1990 let chunk_r1 = StreamChunk::from_pretty(
1991 " I I I
1992 + 2 5 1
1993 + 4 9 2
1994 + 6 9 3",
1995 );
1996 let chunk_r2 = StreamChunk::from_pretty(
1997 " I I I
1998 + 1 4 4
1999 + 3 6 5",
2000 );
2001
2002 let (mut tx_l, mut tx_r, mut hash_join) =
2003 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2004
2005 tx_l.push_barrier(test_epoch(1), false);
2007 tx_r.push_barrier(test_epoch(1), false);
2008 hash_join.next_unwrap_ready_barrier()?;
2009
2010 tx_l.push_chunk(chunk_l1);
2012 hash_join.next_unwrap_pending();
2013
2014 tx_l.push_barrier(test_epoch(2), false);
2016 tx_r.push_barrier(test_epoch(2), false);
2017 hash_join.next_unwrap_ready_barrier()?;
2018
2019 tx_l.push_chunk(chunk_l2);
2021 hash_join.next_unwrap_pending();
2022
2023 tx_r.push_chunk(chunk_r1);
2025 let chunk = hash_join.next_unwrap_ready_chunk()?;
2026 assert_eq!(
2027 chunk,
2028 StreamChunk::from_pretty(
2029 " I I I I I I
2030 + 2 5 2 2 5 1
2031 + 4 9 4 4 9 2"
2032 )
2033 );
2034
2035 tx_r.push_chunk(chunk_r2);
2037 let chunk = hash_join.next_unwrap_ready_chunk()?;
2038 assert_eq!(
2039 chunk,
2040 StreamChunk::from_pretty(
2041 " I I I I I I
2042 + 1 4 1 1 4 4
2043 + 3 6 3 3 6 5"
2044 )
2045 );
2046
2047 Ok(())
2048 }
2049
2050 #[tokio::test]
2051 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2052 let chunk_l1 = StreamChunk::from_pretty(
2053 " I I I
2054 + 1 4 1
2055 + 2 5 2
2056 + 3 6 3",
2057 );
2058 let chunk_l2 = StreamChunk::from_pretty(
2059 " I I I
2060 + 4 9 4
2061 + 5 10 5",
2062 );
2063 let chunk_r1 = StreamChunk::from_pretty(
2064 " I I I
2065 + 2 5 1
2066 + 4 9 2
2067 + 6 9 3",
2068 );
2069 let chunk_r2 = StreamChunk::from_pretty(
2070 " I I I
2071 + 1 4 4
2072 + 3 6 5",
2073 );
2074
2075 let (mut tx_l, mut tx_r, mut hash_join) =
2076 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2077
2078 tx_l.push_barrier(test_epoch(1), false);
2080 tx_r.push_barrier(test_epoch(1), false);
2081 hash_join.next_unwrap_ready_barrier()?;
2082
2083 tx_l.push_chunk(chunk_l1);
2085 hash_join.next_unwrap_pending();
2086
2087 tx_l.push_barrier(test_epoch(2), false);
2089 tx_r.push_barrier(test_epoch(2), false);
2090 hash_join.next_unwrap_ready_barrier()?;
2091
2092 tx_l.push_chunk(chunk_l2);
2094 hash_join.next_unwrap_pending();
2095
2096 tx_r.push_chunk(chunk_r1);
2098 let chunk = hash_join.next_unwrap_ready_chunk()?;
2099 assert_eq!(
2100 chunk,
2101 StreamChunk::from_pretty(
2102 " I I I
2103 + 2 5 2
2104 + 4 9 4"
2105 )
2106 );
2107
2108 tx_r.push_chunk(chunk_r2);
2110 let chunk = hash_join.next_unwrap_ready_chunk()?;
2111 assert_eq!(
2112 chunk,
2113 StreamChunk::from_pretty(
2114 " I I I
2115 + 1 4 1
2116 + 3 6 3"
2117 )
2118 );
2119
2120 Ok(())
2121 }
2122
2123 #[tokio::test]
2124 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2125 let chunk_l1 = StreamChunk::from_pretty(
2126 " I I I
2127 + 1 4 1
2128 + 2 5 2
2129 + 3 6 3",
2130 );
2131 let chunk_l2 = StreamChunk::from_pretty(
2132 " I I I
2133 + 4 9 4
2134 + 5 10 5",
2135 );
2136 let chunk_r1 = StreamChunk::from_pretty(
2137 " I I I
2138 + 2 5 1
2139 + 4 9 2
2140 + 6 9 3",
2141 );
2142 let chunk_r2 = StreamChunk::from_pretty(
2143 " I I I
2144 + 1 4 4
2145 + 3 6 5",
2146 );
2147
2148 let (mut tx_l, mut tx_r, mut hash_join) =
2149 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2150
2151 tx_l.push_barrier(test_epoch(1), false);
2153 tx_r.push_barrier(test_epoch(1), false);
2154 hash_join.next_unwrap_ready_barrier()?;
2155
2156 tx_l.push_chunk(chunk_l1);
2158 hash_join.next_unwrap_pending();
2159
2160 tx_l.push_barrier(test_epoch(2), false);
2162 tx_r.push_barrier(test_epoch(2), false);
2163 hash_join.next_unwrap_ready_barrier()?;
2164
2165 tx_l.push_chunk(chunk_l2);
2167 hash_join.next_unwrap_pending();
2168
2169 tx_r.push_chunk(chunk_r1);
2171 let chunk = hash_join.next_unwrap_ready_chunk()?;
2172 assert_eq!(
2173 chunk,
2174 StreamChunk::from_pretty(
2175 " I I I
2176 + 2 5 1
2177 + 4 9 2"
2178 )
2179 );
2180
2181 tx_r.push_chunk(chunk_r2);
2183 let chunk = hash_join.next_unwrap_ready_chunk()?;
2184 assert_eq!(
2185 chunk,
2186 StreamChunk::from_pretty(
2187 " I I I
2188 + 1 4 4
2189 + 3 6 5"
2190 )
2191 );
2192
2193 Ok(())
2194 }
2195
2196 #[tokio::test]
2197 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2198 let chunk_r1 = StreamChunk::from_pretty(
2199 " I I
2200 + 1 4
2201 + 2 5
2202 + 3 6",
2203 );
2204 let chunk_r2 = StreamChunk::from_pretty(
2205 " I I
2206 + 3 8
2207 - 3 8",
2208 );
2209 let chunk_l1 = StreamChunk::from_pretty(
2210 " I I
2211 + 2 7
2212 + 4 8
2213 + 6 9",
2214 );
2215 let chunk_l2 = StreamChunk::from_pretty(
2216 " I I
2217 + 3 10
2218 + 6 11",
2219 );
2220 let chunk_r3 = StreamChunk::from_pretty(
2221 " I I
2222 + 6 10",
2223 );
2224 let chunk_l3 = StreamChunk::from_pretty(
2225 " I I
2226 - 6 11",
2227 );
2228 let chunk_l4 = StreamChunk::from_pretty(
2229 " I I
2230 - 6 9",
2231 );
2232 let (mut tx_l, mut tx_r, mut hash_join) =
2233 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2234
2235 tx_l.push_barrier(test_epoch(1), false);
2237 tx_r.push_barrier(test_epoch(1), false);
2238 hash_join.next_unwrap_ready_barrier()?;
2239
2240 tx_r.push_chunk(chunk_r1);
2242 hash_join.next_unwrap_pending();
2243
2244 tx_l.push_barrier(test_epoch(2), false);
2246 tx_r.push_barrier(test_epoch(2), false);
2247 hash_join.next_unwrap_ready_barrier()?;
2248
2249 tx_r.push_chunk(chunk_r2);
2251 hash_join.next_unwrap_pending();
2252
2253 tx_l.push_chunk(chunk_l1);
2255 let chunk = hash_join.next_unwrap_ready_chunk()?;
2256 assert_eq!(
2257 chunk,
2258 StreamChunk::from_pretty(
2259 " I I
2260 + 2 5"
2261 )
2262 );
2263
2264 tx_l.push_chunk(chunk_l2);
2266 let chunk = hash_join.next_unwrap_ready_chunk()?;
2267 assert_eq!(
2268 chunk,
2269 StreamChunk::from_pretty(
2270 " I I
2271 + 3 6"
2272 )
2273 );
2274
2275 tx_r.push_chunk(chunk_r3);
2277 let chunk = hash_join.next_unwrap_ready_chunk()?;
2278 assert_eq!(
2279 chunk,
2280 StreamChunk::from_pretty(
2281 " I I
2282 + 6 10"
2283 )
2284 );
2285
2286 tx_l.push_chunk(chunk_l3);
2289 hash_join.next_unwrap_pending();
2290
2291 tx_l.push_chunk(chunk_l4);
2294 let chunk = hash_join.next_unwrap_ready_chunk()?;
2295 assert_eq!(
2296 chunk,
2297 StreamChunk::from_pretty(
2298 " I I
2299 - 6 10"
2300 )
2301 );
2302
2303 Ok(())
2304 }
2305
2306 #[tokio::test]
2307 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2308 let chunk_l1 = StreamChunk::from_pretty(
2309 " I I
2310 + 1 4
2311 + 2 5
2312 + 3 6",
2313 );
2314 let chunk_l2 = StreamChunk::from_pretty(
2315 " I I
2316 + 3 8
2317 - 3 8",
2318 );
2319 let chunk_r1 = StreamChunk::from_pretty(
2320 " I I
2321 + 2 7
2322 + 4 8
2323 + 6 9",
2324 );
2325 let chunk_r2 = StreamChunk::from_pretty(
2326 " I I
2327 + 3 10
2328 + 6 11
2329 + 1 2
2330 + 1 3",
2331 );
2332 let chunk_l3 = StreamChunk::from_pretty(
2333 " I I
2334 + 9 10",
2335 );
2336 let chunk_r3 = StreamChunk::from_pretty(
2337 " I I
2338 - 1 2",
2339 );
2340 let chunk_r4 = StreamChunk::from_pretty(
2341 " I I
2342 - 1 3",
2343 );
2344 let (mut tx_l, mut tx_r, mut hash_join) =
2345 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2346
2347 tx_l.push_barrier(test_epoch(1), false);
2349 tx_r.push_barrier(test_epoch(1), false);
2350 hash_join.next_unwrap_ready_barrier()?;
2351
2352 tx_l.push_chunk(chunk_l1);
2354 let chunk = hash_join.next_unwrap_ready_chunk()?;
2355 assert_eq!(
2356 chunk,
2357 StreamChunk::from_pretty(
2358 " I I
2359 + 1 4
2360 + 2 5
2361 + 3 6",
2362 )
2363 );
2364
2365 tx_l.push_barrier(test_epoch(2), false);
2367 tx_r.push_barrier(test_epoch(2), false);
2368 hash_join.next_unwrap_ready_barrier()?;
2369
2370 tx_l.push_chunk(chunk_l2);
2372 let chunk = hash_join.next_unwrap_ready_chunk()?;
2373 assert_eq!(
2374 chunk,
2375 StreamChunk::from_pretty(
2376 " I I
2377 + 3 8
2378 - 3 8",
2379 )
2380 );
2381
2382 tx_r.push_chunk(chunk_r1);
2384 let chunk = hash_join.next_unwrap_ready_chunk()?;
2385 assert_eq!(
2386 chunk,
2387 StreamChunk::from_pretty(
2388 " I I
2389 - 2 5"
2390 )
2391 );
2392
2393 tx_r.push_chunk(chunk_r2);
2395 let chunk = hash_join.next_unwrap_ready_chunk()?;
2396 assert_eq!(
2397 chunk,
2398 StreamChunk::from_pretty(
2399 " I I
2400 - 3 6
2401 - 1 4"
2402 )
2403 );
2404
2405 tx_l.push_chunk(chunk_l3);
2407 let chunk = hash_join.next_unwrap_ready_chunk()?;
2408 assert_eq!(
2409 chunk,
2410 StreamChunk::from_pretty(
2411 " I I
2412 + 9 10"
2413 )
2414 );
2415
2416 tx_r.push_chunk(chunk_r3);
2419 hash_join.next_unwrap_pending();
2420
2421 tx_r.push_chunk(chunk_r4);
2424 let chunk = hash_join.next_unwrap_ready_chunk()?;
2425 assert_eq!(
2426 chunk,
2427 StreamChunk::from_pretty(
2428 " I I
2429 + 1 4"
2430 )
2431 );
2432
2433 Ok(())
2434 }
2435
2436 #[tokio::test]
2437 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2438 let chunk_r1 = StreamChunk::from_pretty(
2439 " I I
2440 + 1 4
2441 + 2 5
2442 + 3 6",
2443 );
2444 let chunk_r2 = StreamChunk::from_pretty(
2445 " I I
2446 + 3 8
2447 - 3 8",
2448 );
2449 let chunk_l1 = StreamChunk::from_pretty(
2450 " I I
2451 + 2 7
2452 + 4 8
2453 + 6 9",
2454 );
2455 let chunk_l2 = StreamChunk::from_pretty(
2456 " I I
2457 + 3 10
2458 + 6 11
2459 + 1 2
2460 + 1 3",
2461 );
2462 let chunk_r3 = StreamChunk::from_pretty(
2463 " I I
2464 + 9 10",
2465 );
2466 let chunk_l3 = StreamChunk::from_pretty(
2467 " I I
2468 - 1 2",
2469 );
2470 let chunk_l4 = StreamChunk::from_pretty(
2471 " I I
2472 - 1 3",
2473 );
2474 let (mut tx_r, mut tx_l, mut hash_join) =
2475 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2476
2477 tx_r.push_barrier(test_epoch(1), false);
2479 tx_l.push_barrier(test_epoch(1), false);
2480 hash_join.next_unwrap_ready_barrier()?;
2481
2482 tx_r.push_chunk(chunk_r1);
2484 let chunk = hash_join.next_unwrap_ready_chunk()?;
2485 assert_eq!(
2486 chunk,
2487 StreamChunk::from_pretty(
2488 " I I
2489 + 1 4
2490 + 2 5
2491 + 3 6",
2492 )
2493 );
2494
2495 tx_r.push_barrier(test_epoch(2), false);
2497 tx_l.push_barrier(test_epoch(2), false);
2498 hash_join.next_unwrap_ready_barrier()?;
2499
2500 tx_r.push_chunk(chunk_r2);
2502 let chunk = hash_join.next_unwrap_ready_chunk()?;
2503 assert_eq!(
2504 chunk,
2505 StreamChunk::from_pretty(
2506 " I I
2507 + 3 8
2508 - 3 8",
2509 )
2510 );
2511
2512 tx_l.push_chunk(chunk_l1);
2514 let chunk = hash_join.next_unwrap_ready_chunk()?;
2515 assert_eq!(
2516 chunk,
2517 StreamChunk::from_pretty(
2518 " I I
2519 - 2 5"
2520 )
2521 );
2522
2523 tx_l.push_chunk(chunk_l2);
2525 let chunk = hash_join.next_unwrap_ready_chunk()?;
2526 assert_eq!(
2527 chunk,
2528 StreamChunk::from_pretty(
2529 " I I
2530 - 3 6
2531 - 1 4"
2532 )
2533 );
2534
2535 tx_r.push_chunk(chunk_r3);
2537 let chunk = hash_join.next_unwrap_ready_chunk()?;
2538 assert_eq!(
2539 chunk,
2540 StreamChunk::from_pretty(
2541 " I I
2542 + 9 10"
2543 )
2544 );
2545
2546 tx_l.push_chunk(chunk_l3);
2549 hash_join.next_unwrap_pending();
2550
2551 tx_l.push_chunk(chunk_l4);
2554 let chunk = hash_join.next_unwrap_ready_chunk()?;
2555 assert_eq!(
2556 chunk,
2557 StreamChunk::from_pretty(
2558 " I I
2559 + 1 4"
2560 )
2561 );
2562
2563 Ok(())
2564 }
2565
2566 #[tokio::test]
2567 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2568 let chunk_l1 = StreamChunk::from_pretty(
2569 " I I
2570 + 1 4
2571 + 2 5
2572 + 3 6",
2573 );
2574 let chunk_l2 = StreamChunk::from_pretty(
2575 " I I
2576 + 6 8
2577 + 3 8",
2578 );
2579 let chunk_r1 = StreamChunk::from_pretty(
2580 " I I
2581 + 2 7
2582 + 4 8
2583 + 6 9",
2584 );
2585 let chunk_r2 = StreamChunk::from_pretty(
2586 " I I
2587 + 3 10
2588 + 6 11",
2589 );
2590 let (mut tx_l, mut tx_r, mut hash_join) =
2591 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2592
2593 tx_l.push_barrier(test_epoch(1), false);
2595 tx_r.push_barrier(test_epoch(1), false);
2596 hash_join.next_unwrap_ready_barrier()?;
2597
2598 tx_l.push_chunk(chunk_l1);
2600 hash_join.next_unwrap_pending();
2601
2602 tx_l.push_barrier(test_epoch(2), false);
2604
2605 tx_l.push_chunk(chunk_l2);
2607
2608 tx_r.push_chunk(chunk_r1);
2610
2611 let chunk = hash_join.next_unwrap_ready_chunk()?;
2613 assert_eq!(
2614 chunk,
2615 StreamChunk::from_pretty(
2616 " I I I I
2617 + 2 5 2 7"
2618 )
2619 );
2620
2621 tx_r.push_barrier(test_epoch(2), false);
2623
2624 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2626 assert!(matches!(
2627 hash_join.next_unwrap_ready_barrier()?,
2628 Barrier {
2629 epoch,
2630 mutation: None,
2631 ..
2632 } if epoch == expected_epoch
2633 ));
2634
2635 let chunk = hash_join.next_unwrap_ready_chunk()?;
2637 assert_eq!(
2638 chunk,
2639 StreamChunk::from_pretty(
2640 " I I I I
2641 + 6 8 6 9"
2642 )
2643 );
2644
2645 tx_r.push_chunk(chunk_r2);
2647 let chunk = hash_join.next_unwrap_ready_chunk()?;
2648 assert_eq!(
2649 chunk,
2650 StreamChunk::from_pretty(
2651 " I I I I
2652 + 3 6 3 10
2653 + 3 8 3 10
2654 + 6 8 6 11"
2655 )
2656 );
2657
2658 Ok(())
2659 }
2660
2661 #[tokio::test]
2662 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2663 let chunk_l1 = StreamChunk::from_pretty(
2664 " I I
2665 + 1 4
2666 + 2 .
2667 + 3 .",
2668 );
2669 let chunk_l2 = StreamChunk::from_pretty(
2670 " I I
2671 + 6 .
2672 + 3 8",
2673 );
2674 let chunk_r1 = StreamChunk::from_pretty(
2675 " I I
2676 + 2 7
2677 + 4 8
2678 + 6 9",
2679 );
2680 let chunk_r2 = StreamChunk::from_pretty(
2681 " I I
2682 + 3 10
2683 + 6 11",
2684 );
2685 let (mut tx_l, mut tx_r, mut hash_join) =
2686 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2687
2688 tx_l.push_barrier(test_epoch(1), false);
2690 tx_r.push_barrier(test_epoch(1), false);
2691 hash_join.next_unwrap_ready_barrier()?;
2692
2693 tx_l.push_chunk(chunk_l1);
2695 hash_join.next_unwrap_pending();
2696
2697 tx_l.push_barrier(test_epoch(2), false);
2699
2700 tx_l.push_chunk(chunk_l2);
2702
2703 tx_r.push_chunk(chunk_r1);
2705
2706 let chunk = hash_join.next_unwrap_ready_chunk()?;
2708 assert_eq!(
2709 chunk,
2710 StreamChunk::from_pretty(
2711 " I I I I
2712 + 2 . 2 7"
2713 )
2714 );
2715
2716 tx_r.push_barrier(test_epoch(2), false);
2718
2719 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2721 assert!(matches!(
2722 hash_join.next_unwrap_ready_barrier()?,
2723 Barrier {
2724 epoch,
2725 mutation: None,
2726 ..
2727 } if epoch == expected_epoch
2728 ));
2729
2730 let chunk = hash_join.next_unwrap_ready_chunk()?;
2732 assert_eq!(
2733 chunk,
2734 StreamChunk::from_pretty(
2735 " I I I I
2736 + 6 . 6 9"
2737 )
2738 );
2739
2740 tx_r.push_chunk(chunk_r2);
2742 let chunk = hash_join.next_unwrap_ready_chunk()?;
2743 assert_eq!(
2744 chunk,
2745 StreamChunk::from_pretty(
2746 " I I I I
2747 + 3 8 3 10
2748 + 3 . 3 10
2749 + 6 . 6 11"
2750 )
2751 );
2752
2753 Ok(())
2754 }
2755
2756 #[tokio::test]
2757 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2758 let chunk_l1 = StreamChunk::from_pretty(
2759 " I I
2760 + 1 4
2761 + 2 5
2762 + 3 6",
2763 );
2764 let chunk_l2 = StreamChunk::from_pretty(
2765 " I I
2766 + 3 8
2767 - 3 8",
2768 );
2769 let chunk_r1 = StreamChunk::from_pretty(
2770 " I I
2771 + 2 7
2772 + 4 8
2773 + 6 9",
2774 );
2775 let chunk_r2 = StreamChunk::from_pretty(
2776 " I I
2777 + 3 10
2778 + 6 11",
2779 );
2780 let (mut tx_l, mut tx_r, mut hash_join) =
2781 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2782
2783 tx_l.push_barrier(test_epoch(1), false);
2785 tx_r.push_barrier(test_epoch(1), false);
2786 hash_join.next_unwrap_ready_barrier()?;
2787
2788 tx_l.push_chunk(chunk_l1);
2790 let chunk = hash_join.next_unwrap_ready_chunk()?;
2791 assert_eq!(
2792 chunk,
2793 StreamChunk::from_pretty(
2794 " I I I I
2795 + 1 4 . .
2796 + 2 5 . .
2797 + 3 6 . ."
2798 )
2799 );
2800
2801 tx_l.push_chunk(chunk_l2);
2803 let chunk = hash_join.next_unwrap_ready_chunk()?;
2804 assert_eq!(
2805 chunk,
2806 StreamChunk::from_pretty(
2807 " I I I I
2808 + 3 8 . .
2809 - 3 8 . ."
2810 )
2811 );
2812
2813 tx_r.push_chunk(chunk_r1);
2815 let chunk = hash_join.next_unwrap_ready_chunk()?;
2816 assert_eq!(
2817 chunk,
2818 StreamChunk::from_pretty(
2819 " I I I I
2820 U- 2 5 . .
2821 U+ 2 5 2 7"
2822 )
2823 );
2824
2825 tx_r.push_chunk(chunk_r2);
2827 let chunk = hash_join.next_unwrap_ready_chunk()?;
2828 assert_eq!(
2829 chunk,
2830 StreamChunk::from_pretty(
2831 " I I I I
2832 U- 3 6 . .
2833 U+ 3 6 3 10"
2834 )
2835 );
2836
2837 Ok(())
2838 }
2839
2840 #[tokio::test]
2841 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2842 let chunk_l1 = StreamChunk::from_pretty(
2843 " I I
2844 + 1 4
2845 + 2 5
2846 + . 6",
2847 );
2848 let chunk_l2 = StreamChunk::from_pretty(
2849 " I I
2850 + . 8
2851 - . 8",
2852 );
2853 let chunk_r1 = StreamChunk::from_pretty(
2854 " I I
2855 + 2 7
2856 + 4 8
2857 + 6 9",
2858 );
2859 let chunk_r2 = StreamChunk::from_pretty(
2860 " I I
2861 + . 10
2862 + 6 11",
2863 );
2864 let (mut tx_l, mut tx_r, mut hash_join) =
2865 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2866
2867 tx_l.push_barrier(test_epoch(1), false);
2869 tx_r.push_barrier(test_epoch(1), false);
2870 hash_join.next_unwrap_ready_barrier()?;
2871
2872 tx_l.push_chunk(chunk_l1);
2874 let chunk = hash_join.next_unwrap_ready_chunk()?;
2875 assert_eq!(
2876 chunk,
2877 StreamChunk::from_pretty(
2878 " I I I I
2879 + 1 4 . .
2880 + 2 5 . .
2881 + . 6 . ."
2882 )
2883 );
2884
2885 tx_l.push_chunk(chunk_l2);
2887 let chunk = hash_join.next_unwrap_ready_chunk()?;
2888 assert_eq!(
2889 chunk,
2890 StreamChunk::from_pretty(
2891 " I I I I
2892 + . 8 . .
2893 - . 8 . ."
2894 )
2895 );
2896
2897 tx_r.push_chunk(chunk_r1);
2899 let chunk = hash_join.next_unwrap_ready_chunk()?;
2900 assert_eq!(
2901 chunk,
2902 StreamChunk::from_pretty(
2903 " I I I I
2904 U- 2 5 . .
2905 U+ 2 5 2 7"
2906 )
2907 );
2908
2909 tx_r.push_chunk(chunk_r2);
2911 let chunk = hash_join.next_unwrap_ready_chunk()?;
2912 assert_eq!(
2913 chunk,
2914 StreamChunk::from_pretty(
2915 " I I I I
2916 U- . 6 . .
2917 U+ . 6 . 10"
2918 )
2919 );
2920
2921 Ok(())
2922 }
2923
2924 #[tokio::test]
2925 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2926 let chunk_l1 = StreamChunk::from_pretty(
2927 " I I
2928 + 1 4
2929 + 2 5
2930 + 3 6",
2931 );
2932 let chunk_l2 = StreamChunk::from_pretty(
2933 " I I
2934 + 3 8
2935 - 3 8",
2936 );
2937 let chunk_r1 = StreamChunk::from_pretty(
2938 " I I
2939 + 2 7
2940 + 4 8
2941 + 6 9",
2942 );
2943 let chunk_r2 = StreamChunk::from_pretty(
2944 " I I
2945 + 5 10
2946 - 5 10",
2947 );
2948 let (mut tx_l, mut tx_r, mut hash_join) =
2949 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2950
2951 tx_l.push_barrier(test_epoch(1), false);
2953 tx_r.push_barrier(test_epoch(1), false);
2954 hash_join.next_unwrap_ready_barrier()?;
2955
2956 tx_l.push_chunk(chunk_l1);
2958 hash_join.next_unwrap_pending();
2959
2960 tx_l.push_chunk(chunk_l2);
2962 hash_join.next_unwrap_pending();
2963
2964 tx_r.push_chunk(chunk_r1);
2966 let chunk = hash_join.next_unwrap_ready_chunk()?;
2967 assert_eq!(
2968 chunk,
2969 StreamChunk::from_pretty(
2970 " I I I I
2971 + 2 5 2 7
2972 + . . 4 8
2973 + . . 6 9"
2974 )
2975 );
2976
2977 tx_r.push_chunk(chunk_r2);
2979 let chunk = hash_join.next_unwrap_ready_chunk()?;
2980 assert_eq!(
2981 chunk,
2982 StreamChunk::from_pretty(
2983 " I I I I
2984 + . . 5 10
2985 - . . 5 10"
2986 )
2987 );
2988
2989 Ok(())
2990 }
2991
2992 #[tokio::test]
2993 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
2994 let chunk_l1 = StreamChunk::from_pretty(
2995 " I I I
2996 + 1 4 1
2997 + 2 5 2
2998 + 3 6 3",
2999 );
3000 let chunk_l2 = StreamChunk::from_pretty(
3001 " I I I
3002 + 4 9 4
3003 + 5 10 5",
3004 );
3005 let chunk_r1 = StreamChunk::from_pretty(
3006 " I I I
3007 + 2 5 1
3008 + 4 9 2
3009 + 6 9 3",
3010 );
3011 let chunk_r2 = StreamChunk::from_pretty(
3012 " I I I
3013 + 1 4 4
3014 + 3 6 5",
3015 );
3016
3017 let (mut tx_l, mut tx_r, mut hash_join) =
3018 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3019
3020 tx_l.push_barrier(test_epoch(1), false);
3022 tx_r.push_barrier(test_epoch(1), false);
3023 hash_join.next_unwrap_ready_barrier()?;
3024
3025 tx_l.push_chunk(chunk_l1);
3027 let chunk = hash_join.next_unwrap_ready_chunk()?;
3028 assert_eq!(
3029 chunk,
3030 StreamChunk::from_pretty(
3031 " I I I I I I
3032 + 1 4 1 . . .
3033 + 2 5 2 . . .
3034 + 3 6 3 . . ."
3035 )
3036 );
3037
3038 tx_l.push_chunk(chunk_l2);
3040 let chunk = hash_join.next_unwrap_ready_chunk()?;
3041 assert_eq!(
3042 chunk,
3043 StreamChunk::from_pretty(
3044 " I I I I I I
3045 + 4 9 4 . . .
3046 + 5 10 5 . . ."
3047 )
3048 );
3049
3050 tx_r.push_chunk(chunk_r1);
3052 let chunk = hash_join.next_unwrap_ready_chunk()?;
3053 assert_eq!(
3054 chunk,
3055 StreamChunk::from_pretty(
3056 " I I I I I I
3057 U- 2 5 2 . . .
3058 U+ 2 5 2 2 5 1
3059 U- 4 9 4 . . .
3060 U+ 4 9 4 4 9 2"
3061 )
3062 );
3063
3064 tx_r.push_chunk(chunk_r2);
3066 let chunk = hash_join.next_unwrap_ready_chunk()?;
3067 assert_eq!(
3068 chunk,
3069 StreamChunk::from_pretty(
3070 " I I I I I I
3071 U- 1 4 1 . . .
3072 U+ 1 4 1 1 4 4
3073 U- 3 6 3 . . .
3074 U+ 3 6 3 3 6 5"
3075 )
3076 );
3077
3078 Ok(())
3079 }
3080
3081 #[tokio::test]
3082 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3083 let chunk_l1 = StreamChunk::from_pretty(
3084 " I I I
3085 + 1 4 1
3086 + 2 5 2
3087 + 3 6 3",
3088 );
3089 let chunk_l2 = StreamChunk::from_pretty(
3090 " I I I
3091 + 4 9 4
3092 + 5 10 5",
3093 );
3094 let chunk_r1 = StreamChunk::from_pretty(
3095 " I I I
3096 + 2 5 1
3097 + 4 9 2
3098 + 6 9 3",
3099 );
3100 let chunk_r2 = StreamChunk::from_pretty(
3101 " I I I
3102 + 1 4 4
3103 + 3 6 5
3104 + 7 7 6",
3105 );
3106
3107 let (mut tx_l, mut tx_r, mut hash_join) =
3108 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3109
3110 tx_l.push_barrier(test_epoch(1), false);
3112 tx_r.push_barrier(test_epoch(1), false);
3113 hash_join.next_unwrap_ready_barrier()?;
3114
3115 tx_l.push_chunk(chunk_l1);
3117 hash_join.next_unwrap_pending();
3118
3119 tx_l.push_chunk(chunk_l2);
3121 hash_join.next_unwrap_pending();
3122
3123 tx_r.push_chunk(chunk_r1);
3125 let chunk = hash_join.next_unwrap_ready_chunk()?;
3126 assert_eq!(
3127 chunk,
3128 StreamChunk::from_pretty(
3129 " I I I I I I
3130 + 2 5 2 2 5 1
3131 + 4 9 4 4 9 2
3132 + . . . 6 9 3"
3133 )
3134 );
3135
3136 tx_r.push_chunk(chunk_r2);
3138 let chunk = hash_join.next_unwrap_ready_chunk()?;
3139 assert_eq!(
3140 chunk,
3141 StreamChunk::from_pretty(
3142 " I I I I I I
3143 + 1 4 1 1 4 4
3144 + 3 6 3 3 6 5
3145 + . . . 7 7 6"
3146 )
3147 );
3148
3149 Ok(())
3150 }
3151
3152 #[tokio::test]
3153 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3154 let chunk_l1 = StreamChunk::from_pretty(
3155 " I I
3156 + 1 4
3157 + 2 5
3158 + 3 6",
3159 );
3160 let chunk_l2 = StreamChunk::from_pretty(
3161 " I I
3162 + 3 8
3163 - 3 8",
3164 );
3165 let chunk_r1 = StreamChunk::from_pretty(
3166 " I I
3167 + 2 7
3168 + 4 8
3169 + 6 9",
3170 );
3171 let chunk_r2 = StreamChunk::from_pretty(
3172 " I I
3173 + 5 10
3174 - 5 10",
3175 );
3176 let (mut tx_l, mut tx_r, mut hash_join) =
3177 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3178
3179 tx_l.push_barrier(test_epoch(1), false);
3181 tx_r.push_barrier(test_epoch(1), false);
3182 hash_join.next_unwrap_ready_barrier()?;
3183
3184 tx_l.push_chunk(chunk_l1);
3186 let chunk = hash_join.next_unwrap_ready_chunk()?;
3187 assert_eq!(
3188 chunk,
3189 StreamChunk::from_pretty(
3190 " I I I I
3191 + 1 4 . .
3192 + 2 5 . .
3193 + 3 6 . ."
3194 )
3195 );
3196
3197 tx_l.push_chunk(chunk_l2);
3199 let chunk = hash_join.next_unwrap_ready_chunk()?;
3200 assert_eq!(
3201 chunk,
3202 StreamChunk::from_pretty(
3203 " I I I I
3204 + 3 8 . .
3205 - 3 8 . ."
3206 )
3207 );
3208
3209 tx_r.push_chunk(chunk_r1);
3211 let chunk = hash_join.next_unwrap_ready_chunk()?;
3212 assert_eq!(
3213 chunk,
3214 StreamChunk::from_pretty(
3215 " I I I I
3216 U- 2 5 . .
3217 U+ 2 5 2 7
3218 + . . 4 8
3219 + . . 6 9"
3220 )
3221 );
3222
3223 tx_r.push_chunk(chunk_r2);
3225 let chunk = hash_join.next_unwrap_ready_chunk()?;
3226 assert_eq!(
3227 chunk,
3228 StreamChunk::from_pretty(
3229 " I I I I
3230 + . . 5 10
3231 - . . 5 10"
3232 )
3233 );
3234
3235 Ok(())
3236 }
3237
3238 #[tokio::test]
3239 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3240 let (mut tx_l, mut tx_r, mut hash_join) =
3241 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3242
3243 tx_l.push_barrier(test_epoch(1), false);
3245 tx_r.push_barrier(test_epoch(1), false);
3246 hash_join.next_unwrap_ready_barrier()?;
3247
3248 tx_l.push_chunk(StreamChunk::from_pretty(
3249 " I I
3250 + 1 1
3251 ",
3252 ));
3253 let chunk = hash_join.next_unwrap_ready_chunk()?;
3254 assert_eq!(
3255 chunk,
3256 StreamChunk::from_pretty(
3257 " I I I I
3258 + 1 1 . ."
3259 )
3260 );
3261
3262 tx_r.push_chunk(StreamChunk::from_pretty(
3263 " I I
3264 + 1 1
3265 ",
3266 ));
3267 let chunk = hash_join.next_unwrap_ready_chunk()?;
3268
3269 assert_eq!(
3270 chunk,
3271 StreamChunk::from_pretty(
3272 " I I I I
3273 U- 1 1 . .
3274 U+ 1 1 1 1"
3275 )
3276 );
3277
3278 tx_l.push_chunk(StreamChunk::from_pretty(
3279 " I I
3280 - 1 1
3281 + 1 2
3282 ",
3283 ));
3284 let chunk = hash_join.next_unwrap_ready_chunk()?;
3285 let chunk = chunk.compact();
3286 assert_eq!(
3287 chunk,
3288 StreamChunk::from_pretty(
3289 " I I I I
3290 - 1 1 1 1
3291 + 1 2 1 1
3292 "
3293 )
3294 );
3295
3296 Ok(())
3297 }
3298
3299 #[tokio::test]
3300 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3301 {
3302 let chunk_l1 = StreamChunk::from_pretty(
3303 " I I
3304 + 1 4
3305 + 2 5
3306 + 3 6
3307 + 3 7",
3308 );
3309 let chunk_l2 = StreamChunk::from_pretty(
3310 " I I
3311 + 3 8
3312 - 3 8
3313 - 1 4", );
3315 let chunk_r1 = StreamChunk::from_pretty(
3316 " I I
3317 + 2 6
3318 + 4 8
3319 + 3 4",
3320 );
3321 let chunk_r2 = StreamChunk::from_pretty(
3322 " I I
3323 + 5 10
3324 - 5 10
3325 + 1 2",
3326 );
3327 let (mut tx_l, mut tx_r, mut hash_join) =
3328 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3329
3330 tx_l.push_barrier(test_epoch(1), false);
3332 tx_r.push_barrier(test_epoch(1), false);
3333 hash_join.next_unwrap_ready_barrier()?;
3334
3335 tx_l.push_chunk(chunk_l1);
3337 let chunk = hash_join.next_unwrap_ready_chunk()?;
3338 assert_eq!(
3339 chunk,
3340 StreamChunk::from_pretty(
3341 " I I I I
3342 + 1 4 . .
3343 + 2 5 . .
3344 + 3 6 . .
3345 + 3 7 . ."
3346 )
3347 );
3348
3349 tx_l.push_chunk(chunk_l2);
3351 let chunk = hash_join.next_unwrap_ready_chunk()?;
3352 assert_eq!(
3353 chunk,
3354 StreamChunk::from_pretty(
3355 " I I I I
3356 + 3 8 . .
3357 - 3 8 . .
3358 - 1 4 . ."
3359 )
3360 );
3361
3362 tx_r.push_chunk(chunk_r1);
3364 let chunk = hash_join.next_unwrap_ready_chunk()?;
3365 assert_eq!(
3366 chunk,
3367 StreamChunk::from_pretty(
3368 " I I I I
3369 U- 2 5 . .
3370 U+ 2 5 2 6
3371 + . . 4 8
3372 + . . 3 4" )
3376 );
3377
3378 tx_r.push_chunk(chunk_r2);
3380 let chunk = hash_join.next_unwrap_ready_chunk()?;
3381 assert_eq!(
3382 chunk,
3383 StreamChunk::from_pretty(
3384 " I I I I
3385 + . . 5 10
3386 - . . 5 10
3387 + . . 1 2" )
3390 );
3391
3392 Ok(())
3393 }
3394
3395 #[tokio::test]
3396 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3397 let chunk_l1 = StreamChunk::from_pretty(
3398 " I I
3399 + 1 4
3400 + 2 10
3401 + 3 6",
3402 );
3403 let chunk_l2 = StreamChunk::from_pretty(
3404 " I I
3405 + 3 8
3406 - 3 8",
3407 );
3408 let chunk_r1 = StreamChunk::from_pretty(
3409 " I I
3410 + 2 7
3411 + 4 8
3412 + 6 9",
3413 );
3414 let chunk_r2 = StreamChunk::from_pretty(
3415 " I I
3416 + 3 10
3417 + 6 11",
3418 );
3419 let (mut tx_l, mut tx_r, mut hash_join) =
3420 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3421
3422 tx_l.push_barrier(test_epoch(1), false);
3424 tx_r.push_barrier(test_epoch(1), false);
3425 hash_join.next_unwrap_ready_barrier()?;
3426
3427 tx_l.push_chunk(chunk_l1);
3429 hash_join.next_unwrap_pending();
3430
3431 tx_l.push_chunk(chunk_l2);
3433 hash_join.next_unwrap_pending();
3434
3435 tx_r.push_chunk(chunk_r1);
3437 hash_join.next_unwrap_pending();
3438
3439 tx_r.push_chunk(chunk_r2);
3441 let chunk = hash_join.next_unwrap_ready_chunk()?;
3442 assert_eq!(
3443 chunk,
3444 StreamChunk::from_pretty(
3445 " I I I I
3446 + 3 6 3 10"
3447 )
3448 );
3449
3450 Ok(())
3451 }
3452
3453 #[tokio::test]
3454 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3455 let (mut tx_l, mut tx_r, mut hash_join) =
3456 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3457
3458 tx_l.push_barrier(test_epoch(1), false);
3460 tx_r.push_barrier(test_epoch(1), false);
3461 hash_join.next_unwrap_ready_barrier()?;
3462
3463 tx_l.push_int64_watermark(0, 100);
3464
3465 tx_l.push_int64_watermark(0, 200);
3466
3467 tx_l.push_barrier(test_epoch(2), false);
3468 tx_r.push_barrier(test_epoch(2), false);
3469 hash_join.next_unwrap_ready_barrier()?;
3470
3471 tx_r.push_int64_watermark(0, 50);
3472
3473 let w1 = hash_join.next().await.unwrap().unwrap();
3474 let w1 = w1.as_watermark().unwrap();
3475
3476 let w2 = hash_join.next().await.unwrap().unwrap();
3477 let w2 = w2.as_watermark().unwrap();
3478
3479 tx_r.push_int64_watermark(0, 100);
3480
3481 let w3 = hash_join.next().await.unwrap().unwrap();
3482 let w3 = w3.as_watermark().unwrap();
3483
3484 let w4 = hash_join.next().await.unwrap().unwrap();
3485 let w4 = w4.as_watermark().unwrap();
3486
3487 assert_eq!(
3488 w1,
3489 &Watermark {
3490 col_idx: 2,
3491 data_type: DataType::Int64,
3492 val: ScalarImpl::Int64(50)
3493 }
3494 );
3495
3496 assert_eq!(
3497 w2,
3498 &Watermark {
3499 col_idx: 0,
3500 data_type: DataType::Int64,
3501 val: ScalarImpl::Int64(50)
3502 }
3503 );
3504
3505 assert_eq!(
3506 w3,
3507 &Watermark {
3508 col_idx: 2,
3509 data_type: DataType::Int64,
3510 val: ScalarImpl::Int64(100)
3511 }
3512 );
3513
3514 assert_eq!(
3515 w4,
3516 &Watermark {
3517 col_idx: 0,
3518 data_type: DataType::Int64,
3519 val: ScalarImpl::Int64(100)
3520 }
3521 );
3522
3523 Ok(())
3524 }
3525}