1use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::time::Duration;
17
18use anyhow::Context;
19use itertools::Itertools;
20use multimap::MultiMap;
21use risingwave_common::array::Op;
22use risingwave_common::hash::{HashKey, NullBitmap};
23use risingwave_common::row::RowExt;
24use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
25use risingwave_common::util::epoch::EpochPair;
26use risingwave_common::util::iter_util::ZipEqDebug;
27use risingwave_expr::ExprError;
28use risingwave_expr::expr::NonStrictExpression;
29use tokio::time::Instant;
30
31use self::builder::JoinChunkBuilder;
32use super::barrier_align::*;
33use super::join::hash_join::*;
34use super::join::row::JoinRow;
35use super::join::*;
36use super::watermark::*;
37use crate::executor::join::builder::JoinStreamChunkBuilder;
38use crate::executor::join::hash_join::CacheResult;
39use crate::executor::prelude::*;
40
41const EVICT_EVERY_N_ROWS: u32 = 16;
43
44fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
45 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
46}
47
48pub struct JoinParams {
49 pub join_key_indices: Vec<usize>,
51 pub deduped_pk_indices: Vec<usize>,
53}
54
55impl JoinParams {
56 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
57 Self {
58 join_key_indices,
59 deduped_pk_indices,
60 }
61 }
62}
63
64struct JoinSide<K: HashKey, S: StateStore> {
65 ht: JoinHashMap<K, S>,
67 join_key_indices: Vec<usize>,
69 all_data_types: Vec<DataType>,
71 start_pos: usize,
73 i2o_mapping: Vec<(usize, usize)>,
75 i2o_mapping_indexed: MultiMap<usize, usize>,
76 input2inequality_index: Vec<Vec<(usize, bool)>>,
84 non_null_fields: Vec<usize>,
86 state_clean_columns: Vec<(usize, usize)>,
89 need_degree_table: bool,
91}
92
93impl<K: HashKey, S: StateStore> std::fmt::Debug for JoinSide<K, S> {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("JoinSide")
96 .field("join_key_indices", &self.join_key_indices)
97 .field("col_types", &self.all_data_types)
98 .field("start_pos", &self.start_pos)
99 .field("i2o_mapping", &self.i2o_mapping)
100 .field("need_degree_table", &self.need_degree_table)
101 .finish()
102 }
103}
104
105impl<K: HashKey, S: StateStore> JoinSide<K, S> {
106 fn is_dirty(&self) -> bool {
108 unimplemented!()
109 }
110
111 #[expect(dead_code)]
112 fn clear_cache(&mut self) {
113 assert!(
114 !self.is_dirty(),
115 "cannot clear cache while states of hash join are dirty"
116 );
117
118 }
121
122 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
123 self.ht.init(epoch).await
124 }
125}
126
127pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive> {
130 ctx: ActorContextRef,
131 info: ExecutorInfo,
132
133 input_l: Option<Executor>,
135 input_r: Option<Executor>,
137 actual_output_data_types: Vec<DataType>,
139 side_l: JoinSide<K, S>,
141 side_r: JoinSide<K, S>,
143 cond: Option<NonStrictExpression>,
145 inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
147 inequality_watermarks: Vec<Option<Watermark>>,
151
152 append_only_optimize: bool,
154
155 metrics: Arc<StreamingMetrics>,
156 chunk_size: usize,
158 cnt_rows_received: u32,
160
161 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
163
164 high_join_amplification_threshold: usize,
166
167 entry_state_max_rows: usize,
169}
170
171impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> std::fmt::Debug
172 for HashJoinExecutor<K, S, T>
173{
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("HashJoinExecutor")
176 .field("join_type", &T)
177 .field("input_left", &self.input_l.as_ref().unwrap().identity())
178 .field("input_right", &self.input_r.as_ref().unwrap().identity())
179 .field("side_l", &self.side_l)
180 .field("side_r", &self.side_r)
181 .field("pk_indices", &self.info.pk_indices)
182 .field("schema", &self.info.schema)
183 .field("actual_output_data_types", &self.actual_output_data_types)
184 .finish()
185 }
186}
187
188impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> Execute for HashJoinExecutor<K, S, T> {
189 fn execute(self: Box<Self>) -> BoxedMessageStream {
190 self.into_stream().boxed()
191 }
192}
193
194struct EqJoinArgs<'a, K: HashKey, S: StateStore> {
195 ctx: &'a ActorContextRef,
196 side_l: &'a mut JoinSide<K, S>,
197 side_r: &'a mut JoinSide<K, S>,
198 actual_output_data_types: &'a [DataType],
199 cond: &'a mut Option<NonStrictExpression>,
200 inequality_watermarks: &'a [Option<Watermark>],
201 chunk: StreamChunk,
202 append_only_optimize: bool,
203 chunk_size: usize,
204 cnt_rows_received: &'a mut u32,
205 high_join_amplification_threshold: usize,
206 entry_state_max_rows: usize,
207}
208
209impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K, S, T> {
210 #[allow(clippy::too_many_arguments)]
211 pub fn new(
212 ctx: ActorContextRef,
213 info: ExecutorInfo,
214 input_l: Executor,
215 input_r: Executor,
216 params_l: JoinParams,
217 params_r: JoinParams,
218 null_safe: Vec<bool>,
219 output_indices: Vec<usize>,
220 cond: Option<NonStrictExpression>,
221 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
222 state_table_l: StateTable<S>,
223 degree_state_table_l: StateTable<S>,
224 state_table_r: StateTable<S>,
225 degree_state_table_r: StateTable<S>,
226 watermark_epoch: AtomicU64Ref,
227 is_append_only: bool,
228 metrics: Arc<StreamingMetrics>,
229 chunk_size: usize,
230 high_join_amplification_threshold: usize,
231 ) -> Self {
232 Self::new_with_cache_size(
233 ctx,
234 info,
235 input_l,
236 input_r,
237 params_l,
238 params_r,
239 null_safe,
240 output_indices,
241 cond,
242 inequality_pairs,
243 state_table_l,
244 degree_state_table_l,
245 state_table_r,
246 degree_state_table_r,
247 watermark_epoch,
248 is_append_only,
249 metrics,
250 chunk_size,
251 high_join_amplification_threshold,
252 None,
253 )
254 }
255
256 #[allow(clippy::too_many_arguments)]
257 pub fn new_with_cache_size(
258 ctx: ActorContextRef,
259 info: ExecutorInfo,
260 input_l: Executor,
261 input_r: Executor,
262 params_l: JoinParams,
263 params_r: JoinParams,
264 null_safe: Vec<bool>,
265 output_indices: Vec<usize>,
266 cond: Option<NonStrictExpression>,
267 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
268 state_table_l: StateTable<S>,
269 degree_state_table_l: StateTable<S>,
270 state_table_r: StateTable<S>,
271 degree_state_table_r: StateTable<S>,
272 watermark_epoch: AtomicU64Ref,
273 is_append_only: bool,
274 metrics: Arc<StreamingMetrics>,
275 chunk_size: usize,
276 high_join_amplification_threshold: usize,
277 entry_state_max_rows: Option<usize>,
278 ) -> Self {
279 let entry_state_max_rows = match entry_state_max_rows {
280 None => {
281 ctx.streaming_config
282 .developer
283 .hash_join_entry_state_max_rows
284 }
285 Some(entry_state_max_rows) => entry_state_max_rows,
286 };
287 let side_l_column_n = input_l.schema().len();
288
289 let schema_fields = match T {
290 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
291 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
292 _ => [
293 input_l.schema().fields.clone(),
294 input_r.schema().fields.clone(),
295 ]
296 .concat(),
297 };
298
299 let original_output_data_types = schema_fields
300 .iter()
301 .map(|field| field.data_type())
302 .collect_vec();
303 let actual_output_data_types = output_indices
304 .iter()
305 .map(|&idx| original_output_data_types[idx].clone())
306 .collect_vec();
307
308 let state_all_data_types_l = input_l.schema().data_types();
310 let state_all_data_types_r = input_r.schema().data_types();
311
312 let state_pk_indices_l = input_l.pk_indices().to_vec();
313 let state_pk_indices_r = input_r.pk_indices().to_vec();
314
315 let state_join_key_indices_l = params_l.join_key_indices;
316 let state_join_key_indices_r = params_r.join_key_indices;
317
318 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
319 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
320
321 let degree_pk_indices_l = (state_join_key_indices_l.len()
322 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
323 .collect_vec();
324 let degree_pk_indices_r = (state_join_key_indices_r.len()
325 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
326 .collect_vec();
327
328 let pk_contained_in_jk_l =
330 is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
331 let pk_contained_in_jk_r =
332 is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
333
334 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
336
337 let join_key_data_types_l = state_join_key_indices_l
338 .iter()
339 .map(|idx| state_all_data_types_l[*idx].clone())
340 .collect_vec();
341
342 let join_key_data_types_r = state_join_key_indices_r
343 .iter()
344 .map(|idx| state_all_data_types_r[*idx].clone())
345 .collect_vec();
346
347 assert_eq!(join_key_data_types_l, join_key_data_types_r);
348
349 let null_matched = K::Bitmap::from_bool_vec(null_safe);
350
351 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
352 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
353
354 let degree_state_l = need_degree_table_l.then(|| {
355 TableInner::new(
356 degree_pk_indices_l,
357 degree_join_key_indices_l,
358 degree_state_table_l,
359 )
360 });
361 let degree_state_r = need_degree_table_r.then(|| {
362 TableInner::new(
363 degree_pk_indices_r,
364 degree_join_key_indices_r,
365 degree_state_table_r,
366 )
367 });
368
369 let (left_to_output, right_to_output) = {
370 let (left_len, right_len) = if is_left_semi_or_anti(T) {
371 (state_all_data_types_l.len(), 0usize)
372 } else if is_right_semi_or_anti(T) {
373 (0usize, state_all_data_types_r.len())
374 } else {
375 (state_all_data_types_l.len(), state_all_data_types_r.len())
376 };
377 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
378 };
379
380 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
381 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
382
383 let left_input_len = input_l.schema().len();
384 let right_input_len = input_r.schema().len();
385 let mut l2inequality_index = vec![vec![]; left_input_len];
386 let mut r2inequality_index = vec![vec![]; right_input_len];
387 let mut l_state_clean_columns = vec![];
388 let mut r_state_clean_columns = vec![];
389 let inequality_pairs = inequality_pairs
390 .into_iter()
391 .enumerate()
392 .map(
393 |(
394 index,
395 (key_required_larger, key_required_smaller, clean_state, delta_expression),
396 )| {
397 let output_indices = if key_required_larger < key_required_smaller {
398 if clean_state {
399 l_state_clean_columns.push((key_required_larger, index));
400 }
401 l2inequality_index[key_required_larger].push((index, false));
402 r2inequality_index[key_required_smaller - left_input_len]
403 .push((index, true));
404 l2o_indexed
405 .get_vec(&key_required_larger)
406 .cloned()
407 .unwrap_or_default()
408 } else {
409 if clean_state {
410 r_state_clean_columns
411 .push((key_required_larger - left_input_len, index));
412 }
413 l2inequality_index[key_required_smaller].push((index, true));
414 r2inequality_index[key_required_larger - left_input_len]
415 .push((index, false));
416 r2o_indexed
417 .get_vec(&(key_required_larger - left_input_len))
418 .cloned()
419 .unwrap_or_default()
420 };
421 (output_indices, delta_expression)
422 },
423 )
424 .collect_vec();
425
426 let mut l_non_null_fields = l2inequality_index
427 .iter()
428 .positions(|inequalities| !inequalities.is_empty())
429 .collect_vec();
430 let mut r_non_null_fields = r2inequality_index
431 .iter()
432 .positions(|inequalities| !inequalities.is_empty())
433 .collect_vec();
434
435 if append_only_optimize {
436 l_state_clean_columns.clear();
437 r_state_clean_columns.clear();
438 l_non_null_fields.clear();
439 r_non_null_fields.clear();
440 }
441
442 let inequality_watermarks = vec![None; inequality_pairs.len()];
443 let watermark_buffers = BTreeMap::new();
444
445 Self {
446 ctx: ctx.clone(),
447 info,
448 input_l: Some(input_l),
449 input_r: Some(input_r),
450 actual_output_data_types,
451 side_l: JoinSide {
452 ht: JoinHashMap::new(
453 watermark_epoch.clone(),
454 join_key_data_types_l,
455 state_join_key_indices_l.clone(),
456 state_all_data_types_l.clone(),
457 state_table_l,
458 params_l.deduped_pk_indices,
459 degree_state_l,
460 null_matched.clone(),
461 pk_contained_in_jk_l,
462 None,
463 metrics.clone(),
464 ctx.id,
465 ctx.fragment_id,
466 "left",
467 ),
468 join_key_indices: state_join_key_indices_l,
469 all_data_types: state_all_data_types_l,
470 i2o_mapping: left_to_output,
471 i2o_mapping_indexed: l2o_indexed,
472 input2inequality_index: l2inequality_index,
473 non_null_fields: l_non_null_fields,
474 state_clean_columns: l_state_clean_columns,
475 start_pos: 0,
476 need_degree_table: need_degree_table_l,
477 },
478 side_r: JoinSide {
479 ht: JoinHashMap::new(
480 watermark_epoch,
481 join_key_data_types_r,
482 state_join_key_indices_r.clone(),
483 state_all_data_types_r.clone(),
484 state_table_r,
485 params_r.deduped_pk_indices,
486 degree_state_r,
487 null_matched,
488 pk_contained_in_jk_r,
489 None,
490 metrics.clone(),
491 ctx.id,
492 ctx.fragment_id,
493 "right",
494 ),
495 join_key_indices: state_join_key_indices_r,
496 all_data_types: state_all_data_types_r,
497 start_pos: side_l_column_n,
498 i2o_mapping: right_to_output,
499 i2o_mapping_indexed: r2o_indexed,
500 input2inequality_index: r2inequality_index,
501 non_null_fields: r_non_null_fields,
502 state_clean_columns: r_state_clean_columns,
503 need_degree_table: need_degree_table_r,
504 },
505 cond,
506 inequality_pairs,
507 inequality_watermarks,
508 append_only_optimize,
509 metrics,
510 chunk_size,
511 cnt_rows_received: 0,
512 watermark_buffers,
513 high_join_amplification_threshold,
514 entry_state_max_rows,
515 }
516 }
517
518 #[try_stream(ok = Message, error = StreamExecutorError)]
519 async fn into_stream(mut self) {
520 let input_l = self.input_l.take().unwrap();
521 let input_r = self.input_r.take().unwrap();
522 let aligned_stream = barrier_align(
523 input_l.execute(),
524 input_r.execute(),
525 self.ctx.id,
526 self.ctx.fragment_id,
527 self.metrics.clone(),
528 "Join",
529 );
530 pin_mut!(aligned_stream);
531
532 let actor_id = self.ctx.id;
533
534 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
535 let first_epoch = barrier.epoch;
536 yield Message::Barrier(barrier);
538 self.side_l.init(first_epoch).await?;
539 self.side_r.init(first_epoch).await?;
540
541 let actor_id_str = self.ctx.id.to_string();
542 let fragment_id_str = self.ctx.fragment_id.to_string();
543
544 let join_actor_input_waiting_duration_ns = self
546 .metrics
547 .join_actor_input_waiting_duration_ns
548 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
549 let left_join_match_duration_ns = self
550 .metrics
551 .join_match_duration_ns
552 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
553 let right_join_match_duration_ns = self
554 .metrics
555 .join_match_duration_ns
556 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
557
558 let barrier_join_match_duration_ns = self
559 .metrics
560 .join_match_duration_ns
561 .with_guarded_label_values(&[
562 actor_id_str.as_str(),
563 fragment_id_str.as_str(),
564 "barrier",
565 ]);
566
567 let left_join_cached_entry_count = self
568 .metrics
569 .join_cached_entry_count
570 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
571
572 let right_join_cached_entry_count = self
573 .metrics
574 .join_cached_entry_count
575 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
576
577 let mut start_time = Instant::now();
578
579 while let Some(msg) = aligned_stream
580 .next()
581 .instrument_await("hash_join_barrier_align")
582 .await
583 {
584 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
585 match msg? {
586 AlignedMessage::WatermarkLeft(watermark) => {
587 for watermark_to_emit in
588 self.handle_watermark(SideType::Left, watermark).await?
589 {
590 yield Message::Watermark(watermark_to_emit);
591 }
592 }
593 AlignedMessage::WatermarkRight(watermark) => {
594 for watermark_to_emit in
595 self.handle_watermark(SideType::Right, watermark).await?
596 {
597 yield Message::Watermark(watermark_to_emit);
598 }
599 }
600 AlignedMessage::Left(chunk) => {
601 let mut left_time = Duration::from_nanos(0);
602 let mut left_start_time = Instant::now();
603 #[for_await]
604 for chunk in Self::eq_join_left(EqJoinArgs {
605 ctx: &self.ctx,
606 side_l: &mut self.side_l,
607 side_r: &mut self.side_r,
608 actual_output_data_types: &self.actual_output_data_types,
609 cond: &mut self.cond,
610 inequality_watermarks: &self.inequality_watermarks,
611 chunk,
612 append_only_optimize: self.append_only_optimize,
613 chunk_size: self.chunk_size,
614 cnt_rows_received: &mut self.cnt_rows_received,
615 high_join_amplification_threshold: self.high_join_amplification_threshold,
616 entry_state_max_rows: self.entry_state_max_rows,
617 }) {
618 left_time += left_start_time.elapsed();
619 yield Message::Chunk(chunk?);
620 left_start_time = Instant::now();
621 }
622 left_time += left_start_time.elapsed();
623 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
624 self.try_flush_data().await?;
625 }
626 AlignedMessage::Right(chunk) => {
627 let mut right_time = Duration::from_nanos(0);
628 let mut right_start_time = Instant::now();
629 #[for_await]
630 for chunk in Self::eq_join_right(EqJoinArgs {
631 ctx: &self.ctx,
632 side_l: &mut self.side_l,
633 side_r: &mut self.side_r,
634 actual_output_data_types: &self.actual_output_data_types,
635 cond: &mut self.cond,
636 inequality_watermarks: &self.inequality_watermarks,
637 chunk,
638 append_only_optimize: self.append_only_optimize,
639 chunk_size: self.chunk_size,
640 cnt_rows_received: &mut self.cnt_rows_received,
641 high_join_amplification_threshold: self.high_join_amplification_threshold,
642 entry_state_max_rows: self.entry_state_max_rows,
643 }) {
644 right_time += right_start_time.elapsed();
645 yield Message::Chunk(chunk?);
646 right_start_time = Instant::now();
647 }
648 right_time += right_start_time.elapsed();
649 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
650 self.try_flush_data().await?;
651 }
652 AlignedMessage::Barrier(barrier) => {
653 let barrier_start_time = Instant::now();
654 let (left_post_commit, right_post_commit) =
655 self.flush_data(barrier.epoch).await?;
656
657 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
658 yield Message::Barrier(barrier);
659
660 right_post_commit
662 .post_yield_barrier(update_vnode_bitmap.clone())
663 .await?;
664 if left_post_commit
665 .post_yield_barrier(update_vnode_bitmap)
666 .await?
667 .unwrap_or(false)
668 {
669 self.watermark_buffers
670 .values_mut()
671 .for_each(|buffers| buffers.clear());
672 self.inequality_watermarks.fill(None);
673 }
674
675 for (join_cached_entry_count, ht) in [
677 (&left_join_cached_entry_count, &self.side_l.ht),
678 (&right_join_cached_entry_count, &self.side_r.ht),
679 ] {
680 join_cached_entry_count.set(ht.entry_count() as i64);
681 }
682
683 barrier_join_match_duration_ns
684 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
685 }
686 }
687 start_time = Instant::now();
688 }
689 }
690
691 async fn flush_data(
692 &mut self,
693 epoch: EpochPair,
694 ) -> StreamExecutorResult<(
695 JoinHashMapPostCommit<'_, K, S>,
696 JoinHashMapPostCommit<'_, K, S>,
697 )> {
698 let left = self.side_l.ht.flush(epoch).await?;
701 let right = self.side_r.ht.flush(epoch).await?;
702 Ok((left, right))
703 }
704
705 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
706 self.side_l.ht.try_flush().await?;
709 self.side_r.ht.try_flush().await?;
710 Ok(())
711 }
712
713 fn evict_cache(
715 side_update: &mut JoinSide<K, S>,
716 side_match: &mut JoinSide<K, S>,
717 cnt_rows_received: &mut u32,
718 ) {
719 *cnt_rows_received += 1;
720 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
721 side_update.ht.evict();
722 side_match.ht.evict();
723 *cnt_rows_received = 0;
724 }
725 }
726
727 async fn handle_watermark(
728 &mut self,
729 side: SideTypePrimitive,
730 watermark: Watermark,
731 ) -> StreamExecutorResult<Vec<Watermark>> {
732 let (side_update, side_match) = if side == SideType::Left {
733 (&mut self.side_l, &mut self.side_r)
734 } else {
735 (&mut self.side_r, &mut self.side_l)
736 };
737
738 if side_update.join_key_indices[0] == watermark.col_idx {
740 side_match.ht.update_watermark(watermark.val.clone());
741 }
742
743 let wm_in_jk = side_update
745 .join_key_indices
746 .iter()
747 .positions(|idx| *idx == watermark.col_idx);
748 let mut watermarks_to_emit = vec![];
749 for idx in wm_in_jk {
750 let buffers = self
751 .watermark_buffers
752 .entry(idx)
753 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
754 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
755 let empty_indices = vec![];
756 let output_indices = side_update
757 .i2o_mapping_indexed
758 .get_vec(&side_update.join_key_indices[idx])
759 .unwrap_or(&empty_indices)
760 .iter()
761 .chain(
762 side_match
763 .i2o_mapping_indexed
764 .get_vec(&side_match.join_key_indices[idx])
765 .unwrap_or(&empty_indices),
766 );
767 for output_idx in output_indices {
768 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
769 }
770 };
771 }
772 for (inequality_index, need_offset) in
773 &side_update.input2inequality_index[watermark.col_idx]
774 {
775 let buffers = self
776 .watermark_buffers
777 .entry(side_update.join_key_indices.len() + inequality_index)
778 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
779 let mut input_watermark = watermark.clone();
780 if *need_offset
781 && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
782 {
783 #[allow(clippy::disallowed_methods)]
785 let eval_result = delta_expression
786 .inner()
787 .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
788 .await;
789 match eval_result {
790 Ok(value) => input_watermark.val = value.unwrap(),
791 Err(err) => {
792 if !matches!(err, ExprError::NumericOutOfRange) {
793 self.ctx.on_compute_error(err, &self.info.identity);
794 }
795 continue;
796 }
797 }
798 };
799 if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
800 for output_idx in &self.inequality_pairs[*inequality_index].0 {
801 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
802 }
803 self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
804 }
805 }
806 Ok(watermarks_to_emit)
807 }
808
809 fn row_concat(
810 row_update: impl Row,
811 update_start_pos: usize,
812 row_matched: impl Row,
813 matched_start_pos: usize,
814 ) -> OwnedRow {
815 let mut new_row = vec![None; row_update.len() + row_matched.len()];
816
817 for (i, datum_ref) in row_update.iter().enumerate() {
818 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
819 }
820 for (i, datum_ref) in row_matched.iter().enumerate() {
821 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
822 }
823 OwnedRow::new(new_row)
824 }
825
826 fn eq_join_left(
828 args: EqJoinArgs<'_, K, S>,
829 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
830 Self::eq_join_oneside::<{ SideType::Left }>(args)
831 }
832
833 fn eq_join_right(
835 args: EqJoinArgs<'_, K, S>,
836 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
837 Self::eq_join_oneside::<{ SideType::Right }>(args)
838 }
839
840 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
841 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S>) {
842 let EqJoinArgs {
843 ctx,
844 side_l,
845 side_r,
846 actual_output_data_types,
847 cond,
848 inequality_watermarks,
849 chunk,
850 append_only_optimize,
851 chunk_size,
852 cnt_rows_received,
853 high_join_amplification_threshold,
854 entry_state_max_rows,
855 ..
856 } = args;
857
858 let (side_update, side_match) = if SIDE == SideType::Left {
859 (side_l, side_r)
860 } else {
861 (side_r, side_l)
862 };
863
864 let useful_state_clean_columns = side_match
865 .state_clean_columns
866 .iter()
867 .filter_map(|(column_idx, inequality_index)| {
868 inequality_watermarks[*inequality_index]
869 .as_ref()
870 .map(|watermark| (*column_idx, watermark))
871 })
872 .collect_vec();
873
874 let mut hashjoin_chunk_builder =
875 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
876 chunk_size,
877 actual_output_data_types.to_vec(),
878 side_update.i2o_mapping.clone(),
879 side_match.i2o_mapping.clone(),
880 ));
881
882 let join_matched_join_keys = ctx
883 .streaming_metrics
884 .join_matched_join_keys
885 .with_guarded_label_values(&[
886 &ctx.id.to_string(),
887 &ctx.fragment_id.to_string(),
888 &side_update.ht.table_id().to_string(),
889 ]);
890
891 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
892 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
893 let Some((op, row)) = r else {
894 continue;
895 };
896 Self::evict_cache(side_update, side_match, cnt_rows_received);
897
898 let cache_lookup_result = {
899 let probe_non_null_requirement_satisfied = side_update
900 .non_null_fields
901 .iter()
902 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
903 let build_non_null_requirement_satisfied =
904 key.null_bitmap().is_subset(side_match.ht.null_matched());
905 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
906 side_match.ht.take_state_opt(key)
907 } else {
908 CacheResult::NeverMatch
909 }
910 };
911 let mut total_matches = 0;
912
913 macro_rules! match_rows {
914 ($op:ident) => {
915 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
916 cache_lookup_result,
917 row,
918 key,
919 &mut hashjoin_chunk_builder,
920 side_match,
921 side_update,
922 &useful_state_clean_columns,
923 cond,
924 append_only_optimize,
925 entry_state_max_rows,
926 )
927 };
928 }
929
930 match op {
931 Op::Insert | Op::UpdateInsert =>
932 {
933 #[for_await]
934 for chunk in match_rows!(Insert) {
935 let chunk = chunk?;
936 total_matches += chunk.cardinality();
937 yield chunk;
938 }
939 }
940 Op::Delete | Op::UpdateDelete =>
941 {
942 #[for_await]
943 for chunk in match_rows!(Delete) {
944 let chunk = chunk?;
945 total_matches += chunk.cardinality();
946 yield chunk;
947 }
948 }
949 };
950
951 join_matched_join_keys.observe(total_matches as _);
952 if total_matches > high_join_amplification_threshold {
953 let join_key_data_types = side_update.ht.join_key_data_types();
954 let key = key.deserialize(join_key_data_types)?;
955 tracing::warn!(target: "high_join_amplification",
956 matched_rows_len = total_matches,
957 update_table_id = side_update.ht.table_id(),
958 match_table_id = side_match.ht.table_id(),
959 join_key = ?key,
960 actor_id = ctx.id,
961 fragment_id = ctx.fragment_id,
962 "large rows matched for join key"
963 );
964 }
965 }
966 if let Some(chunk) = hashjoin_chunk_builder.take() {
968 yield chunk;
969 }
970 }
971
972 #[allow(clippy::too_many_arguments)]
981 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
982 async fn handle_match_rows<
983 'a,
984 const SIDE: SideTypePrimitive,
985 const JOIN_OP: JoinOpPrimitive,
986 >(
987 cached_lookup_result: CacheResult,
988 row: RowRef<'a>,
989 key: &'a K,
990 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
991 side_match: &'a mut JoinSide<K, S>,
992 side_update: &'a mut JoinSide<K, S>,
993 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
994 cond: &'a mut Option<NonStrictExpression>,
995 append_only_optimize: bool,
996 entry_state_max_rows: usize,
997 ) {
998 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
999 let mut entry_state = JoinEntryState::default();
1000 let mut entry_state_count = 0;
1001
1002 let mut degree = 0;
1003 let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1004 let mut matched_rows_to_clean = vec![];
1005
1006 macro_rules! match_row {
1007 (
1008 $match_order_key_indices:expr,
1009 $degree_table:expr,
1010 $matched_row:expr,
1011 $matched_row_ref:expr,
1012 $from_cache:literal
1013 ) => {
1014 Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1015 row,
1016 $matched_row,
1017 $matched_row_ref,
1018 hashjoin_chunk_builder,
1019 $match_order_key_indices,
1020 $degree_table,
1021 side_update.start_pos,
1022 side_match.start_pos,
1023 cond,
1024 &mut degree,
1025 useful_state_clean_columns,
1026 append_only_optimize,
1027 &mut append_only_matched_row,
1028 &mut matched_rows_to_clean,
1029 )
1030 };
1031 }
1032
1033 let entry_state = match cached_lookup_result {
1034 CacheResult::NeverMatch => {
1035 let op = match JOIN_OP {
1036 JoinOp::Insert => Op::Insert,
1037 JoinOp::Delete => Op::Delete,
1038 };
1039 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1040 yield chunk;
1041 }
1042 return Ok(());
1043 }
1044 CacheResult::Hit(mut cached_rows) => {
1045 let (match_order_key_indices, match_degree_state) =
1046 side_match.ht.get_degree_state_mut_ref();
1047 for (matched_row_ref, matched_row) in
1049 cached_rows.values_mut(&side_match.all_data_types)
1050 {
1051 let matched_row = matched_row?;
1052 if let Some(chunk) = match_row!(
1053 match_order_key_indices,
1054 match_degree_state,
1055 matched_row,
1056 Some(matched_row_ref),
1057 true
1058 )
1059 .await
1060 {
1061 yield chunk;
1062 }
1063 }
1064
1065 cached_rows
1066 }
1067 CacheResult::Miss => {
1068 let (matched_rows, match_order_key_indices, degree_table) = side_match
1070 .ht
1071 .fetch_matched_rows_and_get_degree_table_ref(key)
1072 .await?;
1073
1074 #[for_await]
1075 for matched_row in matched_rows {
1076 let (encoded_pk, matched_row) = matched_row?;
1077
1078 let mut matched_row_ref = None;
1079
1080 if entry_state_count <= entry_state_max_rows {
1082 let row_ref = entry_state
1083 .insert(encoded_pk, matched_row.encode(), None) .with_context(|| format!("row: {}", row.display(),))?;
1085 matched_row_ref = Some(row_ref);
1086 entry_state_count += 1;
1087 }
1088 if let Some(chunk) = match_row!(
1089 match_order_key_indices,
1090 degree_table,
1091 matched_row,
1092 matched_row_ref,
1093 false
1094 )
1095 .await
1096 {
1097 yield chunk;
1098 }
1099 }
1100 Box::new(entry_state)
1101 }
1102 };
1103
1104 let op = match JOIN_OP {
1106 JoinOp::Insert => Op::Insert,
1107 JoinOp::Delete => Op::Delete,
1108 };
1109 if degree == 0 {
1110 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1111 yield chunk;
1112 }
1113 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1114 {
1115 yield chunk;
1116 }
1117
1118 if cache_hit || entry_state_count <= entry_state_max_rows {
1120 side_match.ht.update_state(key, entry_state);
1121 }
1122
1123 for matched_row in matched_rows_to_clean {
1125 side_match.ht.delete_handle_degree(key, matched_row)?;
1126 }
1127
1128 if append_only_optimize && let Some(row) = append_only_matched_row {
1130 assert_matches!(JOIN_OP, JoinOp::Insert);
1131 side_match.ht.delete_handle_degree(key, row)?;
1132 return Ok(());
1133 }
1134
1135 match JOIN_OP {
1137 JoinOp::Insert => {
1138 side_update
1139 .ht
1140 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1141 }
1142 JoinOp::Delete => {
1143 side_update
1144 .ht
1145 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1146 }
1147 }
1148 }
1149
1150 #[allow(clippy::too_many_arguments)]
1151 #[inline]
1152 async fn handle_match_row<
1153 'a,
1154 const SIDE: SideTypePrimitive,
1155 const JOIN_OP: JoinOpPrimitive,
1156 const MATCHED_ROWS_FROM_CACHE: bool,
1157 >(
1158 update_row: RowRef<'a>,
1159 mut matched_row: JoinRow<OwnedRow>,
1160 mut matched_row_cache_ref: Option<&mut StateValueType>,
1161 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1162 match_order_key_indices: &[usize],
1163 match_degree_table: &mut Option<TableInner<S>>,
1164 side_update_start_pos: usize,
1165 side_match_start_pos: usize,
1166 cond: &Option<NonStrictExpression>,
1167 update_row_degree: &mut u64,
1168 useful_state_clean_columns: &[(usize, &'a Watermark)],
1169 append_only_optimize: bool,
1170 append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1171 matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1172 ) -> Option<StreamChunk> {
1173 let mut need_state_clean = false;
1174 let mut chunk_opt = None;
1175
1176 let join_condition_satisfied = Self::check_join_condition(
1180 update_row,
1181 side_update_start_pos,
1182 &matched_row.row,
1183 side_match_start_pos,
1184 cond,
1185 )
1186 .await;
1187
1188 if join_condition_satisfied {
1189 *update_row_degree += 1;
1191 if matches!(JOIN_OP, JoinOp::Insert) && !forward_exactly_once(T, SIDE) {
1196 if let Some(chunk) =
1197 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1198 {
1199 chunk_opt = Some(chunk);
1200 }
1201 }
1202 if let Some(degree_table) = match_degree_table {
1204 update_degree::<S, { JOIN_OP }>(
1205 match_order_key_indices,
1206 degree_table,
1207 &mut matched_row,
1208 );
1209 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1210 match JOIN_OP {
1212 JoinOp::Insert => {
1213 matched_row_cache_ref.as_mut().unwrap().degree += 1;
1214 }
1215 JoinOp::Delete => {
1216 matched_row_cache_ref.as_mut().unwrap().degree -= 1;
1217 }
1218 }
1219 }
1220 }
1221
1222 if matches!(JOIN_OP, JoinOp::Delete) && !forward_exactly_once(T, SIDE) {
1225 if let Some(chunk) =
1226 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1227 {
1228 chunk_opt = Some(chunk);
1229 }
1230 }
1231 } else {
1232 for (column_idx, watermark) in useful_state_clean_columns {
1234 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1235 scalar
1236 .default_cmp(&watermark.val.as_scalar_ref_impl())
1237 .is_lt()
1238 }) {
1239 need_state_clean = true;
1240 break;
1241 }
1242 }
1243 }
1244 if append_only_optimize {
1248 assert_matches!(JOIN_OP, JoinOp::Insert);
1249 assert!(append_only_matched_row.is_none());
1252 *append_only_matched_row = Some(matched_row);
1253 } else if need_state_clean {
1254 matched_rows_to_clean.push(matched_row);
1258 }
1259
1260 chunk_opt
1261 }
1262
1263 #[inline]
1268 async fn check_join_condition(
1269 row: impl Row,
1270 side_update_start_pos: usize,
1271 matched_row: impl Row,
1272 side_match_start_pos: usize,
1273 join_condition: &Option<NonStrictExpression>,
1274 ) -> bool {
1275 if let Some(join_condition) = join_condition {
1276 let new_row = Self::row_concat(
1277 row,
1278 side_update_start_pos,
1279 matched_row,
1280 side_match_start_pos,
1281 );
1282 join_condition
1283 .eval_row_infallible(&new_row)
1284 .await
1285 .map(|s| *s.as_bool())
1286 .unwrap_or(false)
1287 } else {
1288 true
1289 }
1290 }
1291}
1292
1293#[cfg(test)]
1294mod tests {
1295 use std::sync::atomic::AtomicU64;
1296
1297 use pretty_assertions::assert_eq;
1298 use risingwave_common::array::*;
1299 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1300 use risingwave_common::hash::{Key64, Key128};
1301 use risingwave_common::util::epoch::test_epoch;
1302 use risingwave_common::util::sort_util::OrderType;
1303 use risingwave_storage::memory::MemoryStateStore;
1304
1305 use super::*;
1306 use crate::common::table::test_utils::gen_pbtable;
1307 use crate::executor::test_utils::expr::build_from_pretty;
1308 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1309
1310 async fn create_in_memory_state_table(
1311 mem_state: MemoryStateStore,
1312 data_types: &[DataType],
1313 order_types: &[OrderType],
1314 pk_indices: &[usize],
1315 table_id: u32,
1316 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1317 let column_descs = data_types
1318 .iter()
1319 .enumerate()
1320 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1321 .collect_vec();
1322 let state_table = StateTable::from_table_catalog(
1323 &gen_pbtable(
1324 TableId::new(table_id),
1325 column_descs,
1326 order_types.to_vec(),
1327 pk_indices.to_vec(),
1328 0,
1329 ),
1330 mem_state.clone(),
1331 None,
1332 )
1333 .await;
1334
1335 let mut degree_table_column_descs = vec![];
1337 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1338 degree_table_column_descs.push(ColumnDesc::unnamed(
1339 ColumnId::new(pk_id as i32),
1340 data_types[*idx].clone(),
1341 ))
1342 });
1343 degree_table_column_descs.push(ColumnDesc::unnamed(
1344 ColumnId::new(pk_indices.len() as i32),
1345 DataType::Int64,
1346 ));
1347 let degree_state_table = StateTable::from_table_catalog(
1348 &gen_pbtable(
1349 TableId::new(table_id + 1),
1350 degree_table_column_descs,
1351 order_types.to_vec(),
1352 pk_indices.to_vec(),
1353 0,
1354 ),
1355 mem_state,
1356 None,
1357 )
1358 .await;
1359 (state_table, degree_state_table)
1360 }
1361
1362 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1363 build_from_pretty(
1364 condition_text
1365 .as_deref()
1366 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1367 )
1368 }
1369
1370 async fn create_executor<const T: JoinTypePrimitive>(
1371 with_condition: bool,
1372 null_safe: bool,
1373 condition_text: Option<String>,
1374 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1375 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1376 let schema = Schema {
1377 fields: vec![
1378 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1380 ],
1381 };
1382 let (tx_l, source_l) = MockSource::channel();
1383 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1384 let (tx_r, source_r) = MockSource::channel();
1385 let source_r = source_r.into_executor(schema, vec![1]);
1386 let params_l = JoinParams::new(vec![0], vec![1]);
1387 let params_r = JoinParams::new(vec![0], vec![1]);
1388 let cond = with_condition.then(|| create_cond(condition_text));
1389
1390 let mem_state = MemoryStateStore::new();
1391
1392 let (state_l, degree_state_l) = create_in_memory_state_table(
1393 mem_state.clone(),
1394 &[DataType::Int64, DataType::Int64],
1395 &[OrderType::ascending(), OrderType::ascending()],
1396 &[0, 1],
1397 0,
1398 )
1399 .await;
1400
1401 let (state_r, degree_state_r) = create_in_memory_state_table(
1402 mem_state,
1403 &[DataType::Int64, DataType::Int64],
1404 &[OrderType::ascending(), OrderType::ascending()],
1405 &[0, 1],
1406 2,
1407 )
1408 .await;
1409
1410 let schema = match T {
1411 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1412 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1413 _ => [source_l.schema().fields(), source_r.schema().fields()]
1414 .concat()
1415 .into_iter()
1416 .collect(),
1417 };
1418 let schema_len = schema.len();
1419 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1420
1421 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T>::new(
1422 ActorContext::for_test(123),
1423 info,
1424 source_l,
1425 source_r,
1426 params_l,
1427 params_r,
1428 vec![null_safe],
1429 (0..schema_len).collect_vec(),
1430 cond,
1431 inequality_pairs,
1432 state_l,
1433 degree_state_l,
1434 state_r,
1435 degree_state_r,
1436 Arc::new(AtomicU64::new(0)),
1437 false,
1438 Arc::new(StreamingMetrics::unused()),
1439 1024,
1440 2048,
1441 );
1442 (tx_l, tx_r, executor.boxed().execute())
1443 }
1444
1445 async fn create_classical_executor<const T: JoinTypePrimitive>(
1446 with_condition: bool,
1447 null_safe: bool,
1448 condition_text: Option<String>,
1449 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1450 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1451 }
1452
1453 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1454 with_condition: bool,
1455 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1456 let schema = Schema {
1457 fields: vec![
1458 Field::unnamed(DataType::Int64),
1459 Field::unnamed(DataType::Int64),
1460 Field::unnamed(DataType::Int64),
1461 ],
1462 };
1463 let (tx_l, source_l) = MockSource::channel();
1464 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1465 let (tx_r, source_r) = MockSource::channel();
1466 let source_r = source_r.into_executor(schema, vec![0]);
1467 let params_l = JoinParams::new(vec![0, 1], vec![]);
1468 let params_r = JoinParams::new(vec![0, 1], vec![]);
1469 let cond = with_condition.then(|| create_cond(None));
1470
1471 let mem_state = MemoryStateStore::new();
1472
1473 let (state_l, degree_state_l) = create_in_memory_state_table(
1474 mem_state.clone(),
1475 &[DataType::Int64, DataType::Int64, DataType::Int64],
1476 &[
1477 OrderType::ascending(),
1478 OrderType::ascending(),
1479 OrderType::ascending(),
1480 ],
1481 &[0, 1, 0],
1482 0,
1483 )
1484 .await;
1485
1486 let (state_r, degree_state_r) = create_in_memory_state_table(
1487 mem_state,
1488 &[DataType::Int64, DataType::Int64, DataType::Int64],
1489 &[
1490 OrderType::ascending(),
1491 OrderType::ascending(),
1492 OrderType::ascending(),
1493 ],
1494 &[0, 1, 1],
1495 1,
1496 )
1497 .await;
1498
1499 let schema = match T {
1500 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1501 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1502 _ => [source_l.schema().fields(), source_r.schema().fields()]
1503 .concat()
1504 .into_iter()
1505 .collect(),
1506 };
1507 let schema_len = schema.len();
1508 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1509
1510 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T>::new(
1511 ActorContext::for_test(123),
1512 info,
1513 source_l,
1514 source_r,
1515 params_l,
1516 params_r,
1517 vec![false],
1518 (0..schema_len).collect_vec(),
1519 cond,
1520 vec![],
1521 state_l,
1522 degree_state_l,
1523 state_r,
1524 degree_state_r,
1525 Arc::new(AtomicU64::new(0)),
1526 true,
1527 Arc::new(StreamingMetrics::unused()),
1528 1024,
1529 2048,
1530 );
1531 (tx_l, tx_r, executor.boxed().execute())
1532 }
1533
1534 #[tokio::test]
1535 async fn test_interval_join() -> StreamExecutorResult<()> {
1536 let chunk_l1 = StreamChunk::from_pretty(
1537 " I I
1538 + 1 4
1539 + 2 3
1540 + 2 5
1541 + 3 6",
1542 );
1543 let chunk_l2 = StreamChunk::from_pretty(
1544 " I I
1545 + 3 8
1546 - 3 8",
1547 );
1548 let chunk_r1 = StreamChunk::from_pretty(
1549 " I I
1550 + 2 6
1551 + 4 8
1552 + 6 9",
1553 );
1554 let chunk_r2 = StreamChunk::from_pretty(
1555 " I I
1556 + 2 3
1557 + 6 11",
1558 );
1559 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1560 true,
1561 false,
1562 Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1563 vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1564 )
1565 .await;
1566
1567 tx_l.push_barrier(test_epoch(1), false);
1569 tx_r.push_barrier(test_epoch(1), false);
1570 hash_join.next_unwrap_ready_barrier()?;
1571
1572 tx_l.push_chunk(chunk_l1);
1574 hash_join.next_unwrap_pending();
1575
1576 tx_l.push_barrier(test_epoch(2), false);
1578 tx_r.push_barrier(test_epoch(2), false);
1579 hash_join.next_unwrap_ready_barrier()?;
1580
1581 tx_l.push_chunk(chunk_l2);
1583 hash_join.next_unwrap_pending();
1584
1585 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1586 hash_join.next_unwrap_pending();
1587
1588 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1589 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1590 assert_eq!(
1591 output_watermark,
1592 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1593 );
1594 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1595 assert_eq!(
1596 output_watermark,
1597 Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1598 );
1599
1600 tx_r.push_chunk(chunk_r1);
1602 let chunk = hash_join.next_unwrap_ready_chunk()?;
1603 assert_eq!(
1605 chunk,
1606 StreamChunk::from_pretty(
1607 " I I I I
1608 + 2 5 2 6"
1609 )
1610 );
1611
1612 tx_r.push_chunk(chunk_r2);
1614 hash_join.next_unwrap_pending();
1617
1618 Ok(())
1619 }
1620
1621 #[tokio::test]
1622 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1623 let chunk_l1 = StreamChunk::from_pretty(
1624 " I I
1625 + 1 4
1626 + 2 5
1627 + 3 6",
1628 );
1629 let chunk_l2 = StreamChunk::from_pretty(
1630 " I I
1631 + 3 8
1632 - 3 8",
1633 );
1634 let chunk_r1 = StreamChunk::from_pretty(
1635 " I I
1636 + 2 7
1637 + 4 8
1638 + 6 9",
1639 );
1640 let chunk_r2 = StreamChunk::from_pretty(
1641 " I I
1642 + 3 10
1643 + 6 11",
1644 );
1645 let (mut tx_l, mut tx_r, mut hash_join) =
1646 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1647
1648 tx_l.push_barrier(test_epoch(1), false);
1650 tx_r.push_barrier(test_epoch(1), false);
1651 hash_join.next_unwrap_ready_barrier()?;
1652
1653 tx_l.push_chunk(chunk_l1);
1655 hash_join.next_unwrap_pending();
1656
1657 tx_l.push_barrier(test_epoch(2), false);
1659 tx_r.push_barrier(test_epoch(2), false);
1660 hash_join.next_unwrap_ready_barrier()?;
1661
1662 tx_l.push_chunk(chunk_l2);
1664 hash_join.next_unwrap_pending();
1665
1666 tx_r.push_chunk(chunk_r1);
1668 let chunk = hash_join.next_unwrap_ready_chunk()?;
1669 assert_eq!(
1670 chunk,
1671 StreamChunk::from_pretty(
1672 " I I I I
1673 + 2 5 2 7"
1674 )
1675 );
1676
1677 tx_r.push_chunk(chunk_r2);
1679 let chunk = hash_join.next_unwrap_ready_chunk()?;
1680 assert_eq!(
1681 chunk,
1682 StreamChunk::from_pretty(
1683 " I I I I
1684 + 3 6 3 10"
1685 )
1686 );
1687
1688 Ok(())
1689 }
1690
1691 #[tokio::test]
1692 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1693 let chunk_l1 = StreamChunk::from_pretty(
1694 " I I
1695 + 1 4
1696 + 2 5
1697 + . 6",
1698 );
1699 let chunk_l2 = StreamChunk::from_pretty(
1700 " I I
1701 + . 8
1702 - . 8",
1703 );
1704 let chunk_r1 = StreamChunk::from_pretty(
1705 " I I
1706 + 2 7
1707 + 4 8
1708 + 6 9",
1709 );
1710 let chunk_r2 = StreamChunk::from_pretty(
1711 " I I
1712 + . 10
1713 + 6 11",
1714 );
1715 let (mut tx_l, mut tx_r, mut hash_join) =
1716 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1717
1718 tx_l.push_barrier(test_epoch(1), false);
1720 tx_r.push_barrier(test_epoch(1), false);
1721 hash_join.next_unwrap_ready_barrier()?;
1722
1723 tx_l.push_chunk(chunk_l1);
1725 hash_join.next_unwrap_pending();
1726
1727 tx_l.push_barrier(test_epoch(2), false);
1729 tx_r.push_barrier(test_epoch(2), false);
1730 hash_join.next_unwrap_ready_barrier()?;
1731
1732 tx_l.push_chunk(chunk_l2);
1734 hash_join.next_unwrap_pending();
1735
1736 tx_r.push_chunk(chunk_r1);
1738 let chunk = hash_join.next_unwrap_ready_chunk()?;
1739 assert_eq!(
1740 chunk,
1741 StreamChunk::from_pretty(
1742 " I I I I
1743 + 2 5 2 7"
1744 )
1745 );
1746
1747 tx_r.push_chunk(chunk_r2);
1749 let chunk = hash_join.next_unwrap_ready_chunk()?;
1750 assert_eq!(
1751 chunk,
1752 StreamChunk::from_pretty(
1753 " I I I I
1754 + . 6 . 10"
1755 )
1756 );
1757
1758 Ok(())
1759 }
1760
1761 #[tokio::test]
1762 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1763 let chunk_l1 = StreamChunk::from_pretty(
1764 " I I
1765 + 1 4
1766 + 2 5
1767 + 3 6",
1768 );
1769 let chunk_l2 = StreamChunk::from_pretty(
1770 " I I
1771 + 3 8
1772 - 3 8",
1773 );
1774 let chunk_r1 = StreamChunk::from_pretty(
1775 " I I
1776 + 2 7
1777 + 4 8
1778 + 6 9",
1779 );
1780 let chunk_r2 = StreamChunk::from_pretty(
1781 " I I
1782 + 3 10
1783 + 6 11",
1784 );
1785 let chunk_l3 = StreamChunk::from_pretty(
1786 " I I
1787 + 6 10",
1788 );
1789 let chunk_r3 = StreamChunk::from_pretty(
1790 " I I
1791 - 6 11",
1792 );
1793 let chunk_r4 = StreamChunk::from_pretty(
1794 " I I
1795 - 6 9",
1796 );
1797 let (mut tx_l, mut tx_r, mut hash_join) =
1798 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1799
1800 tx_l.push_barrier(test_epoch(1), false);
1802 tx_r.push_barrier(test_epoch(1), false);
1803 hash_join.next_unwrap_ready_barrier()?;
1804
1805 tx_l.push_chunk(chunk_l1);
1807 hash_join.next_unwrap_pending();
1808
1809 tx_l.push_barrier(test_epoch(2), false);
1811 tx_r.push_barrier(test_epoch(2), false);
1812 hash_join.next_unwrap_ready_barrier()?;
1813
1814 tx_l.push_chunk(chunk_l2);
1816 hash_join.next_unwrap_pending();
1817
1818 tx_r.push_chunk(chunk_r1);
1820 let chunk = hash_join.next_unwrap_ready_chunk()?;
1821 assert_eq!(
1822 chunk,
1823 StreamChunk::from_pretty(
1824 " I I
1825 + 2 5"
1826 )
1827 );
1828
1829 tx_r.push_chunk(chunk_r2);
1831 let chunk = hash_join.next_unwrap_ready_chunk()?;
1832 assert_eq!(
1833 chunk,
1834 StreamChunk::from_pretty(
1835 " I I
1836 + 3 6"
1837 )
1838 );
1839
1840 tx_l.push_chunk(chunk_l3);
1842 let chunk = hash_join.next_unwrap_ready_chunk()?;
1843 assert_eq!(
1844 chunk,
1845 StreamChunk::from_pretty(
1846 " I I
1847 + 6 10"
1848 )
1849 );
1850
1851 tx_r.push_chunk(chunk_r3);
1854 hash_join.next_unwrap_pending();
1855
1856 tx_r.push_chunk(chunk_r4);
1859 let chunk = hash_join.next_unwrap_ready_chunk()?;
1860 assert_eq!(
1861 chunk,
1862 StreamChunk::from_pretty(
1863 " I I
1864 - 6 10"
1865 )
1866 );
1867
1868 Ok(())
1869 }
1870
1871 #[tokio::test]
1872 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1873 let chunk_l1 = StreamChunk::from_pretty(
1874 " I I
1875 + 1 4
1876 + 2 5
1877 + . 6",
1878 );
1879 let chunk_l2 = StreamChunk::from_pretty(
1880 " I I
1881 + . 8
1882 - . 8",
1883 );
1884 let chunk_r1 = StreamChunk::from_pretty(
1885 " I I
1886 + 2 7
1887 + 4 8
1888 + 6 9",
1889 );
1890 let chunk_r2 = StreamChunk::from_pretty(
1891 " I I
1892 + . 10
1893 + 6 11",
1894 );
1895 let chunk_l3 = StreamChunk::from_pretty(
1896 " I I
1897 + 6 10",
1898 );
1899 let chunk_r3 = StreamChunk::from_pretty(
1900 " I I
1901 - 6 11",
1902 );
1903 let chunk_r4 = StreamChunk::from_pretty(
1904 " I I
1905 - 6 9",
1906 );
1907 let (mut tx_l, mut tx_r, mut hash_join) =
1908 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1909
1910 tx_l.push_barrier(test_epoch(1), false);
1912 tx_r.push_barrier(test_epoch(1), false);
1913 hash_join.next_unwrap_ready_barrier()?;
1914
1915 tx_l.push_chunk(chunk_l1);
1917 hash_join.next_unwrap_pending();
1918
1919 tx_l.push_barrier(test_epoch(2), false);
1921 tx_r.push_barrier(test_epoch(2), false);
1922 hash_join.next_unwrap_ready_barrier()?;
1923
1924 tx_l.push_chunk(chunk_l2);
1926 hash_join.next_unwrap_pending();
1927
1928 tx_r.push_chunk(chunk_r1);
1930 let chunk = hash_join.next_unwrap_ready_chunk()?;
1931 assert_eq!(
1932 chunk,
1933 StreamChunk::from_pretty(
1934 " I I
1935 + 2 5"
1936 )
1937 );
1938
1939 tx_r.push_chunk(chunk_r2);
1941 let chunk = hash_join.next_unwrap_ready_chunk()?;
1942 assert_eq!(
1943 chunk,
1944 StreamChunk::from_pretty(
1945 " I I
1946 + . 6"
1947 )
1948 );
1949
1950 tx_l.push_chunk(chunk_l3);
1952 let chunk = hash_join.next_unwrap_ready_chunk()?;
1953 assert_eq!(
1954 chunk,
1955 StreamChunk::from_pretty(
1956 " I I
1957 + 6 10"
1958 )
1959 );
1960
1961 tx_r.push_chunk(chunk_r3);
1964 hash_join.next_unwrap_pending();
1965
1966 tx_r.push_chunk(chunk_r4);
1969 let chunk = hash_join.next_unwrap_ready_chunk()?;
1970 assert_eq!(
1971 chunk,
1972 StreamChunk::from_pretty(
1973 " I I
1974 - 6 10"
1975 )
1976 );
1977
1978 Ok(())
1979 }
1980
1981 #[tokio::test]
1982 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1983 let chunk_l1 = StreamChunk::from_pretty(
1984 " I I I
1985 + 1 4 1
1986 + 2 5 2
1987 + 3 6 3",
1988 );
1989 let chunk_l2 = StreamChunk::from_pretty(
1990 " I I I
1991 + 4 9 4
1992 + 5 10 5",
1993 );
1994 let chunk_r1 = StreamChunk::from_pretty(
1995 " I I I
1996 + 2 5 1
1997 + 4 9 2
1998 + 6 9 3",
1999 );
2000 let chunk_r2 = StreamChunk::from_pretty(
2001 " I I I
2002 + 1 4 4
2003 + 3 6 5",
2004 );
2005
2006 let (mut tx_l, mut tx_r, mut hash_join) =
2007 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2008
2009 tx_l.push_barrier(test_epoch(1), false);
2011 tx_r.push_barrier(test_epoch(1), false);
2012 hash_join.next_unwrap_ready_barrier()?;
2013
2014 tx_l.push_chunk(chunk_l1);
2016 hash_join.next_unwrap_pending();
2017
2018 tx_l.push_barrier(test_epoch(2), false);
2020 tx_r.push_barrier(test_epoch(2), false);
2021 hash_join.next_unwrap_ready_barrier()?;
2022
2023 tx_l.push_chunk(chunk_l2);
2025 hash_join.next_unwrap_pending();
2026
2027 tx_r.push_chunk(chunk_r1);
2029 let chunk = hash_join.next_unwrap_ready_chunk()?;
2030 assert_eq!(
2031 chunk,
2032 StreamChunk::from_pretty(
2033 " I I I I I I
2034 + 2 5 2 2 5 1
2035 + 4 9 4 4 9 2"
2036 )
2037 );
2038
2039 tx_r.push_chunk(chunk_r2);
2041 let chunk = hash_join.next_unwrap_ready_chunk()?;
2042 assert_eq!(
2043 chunk,
2044 StreamChunk::from_pretty(
2045 " I I I I I I
2046 + 1 4 1 1 4 4
2047 + 3 6 3 3 6 5"
2048 )
2049 );
2050
2051 Ok(())
2052 }
2053
2054 #[tokio::test]
2055 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2056 let chunk_l1 = StreamChunk::from_pretty(
2057 " I I I
2058 + 1 4 1
2059 + 2 5 2
2060 + 3 6 3",
2061 );
2062 let chunk_l2 = StreamChunk::from_pretty(
2063 " I I I
2064 + 4 9 4
2065 + 5 10 5",
2066 );
2067 let chunk_r1 = StreamChunk::from_pretty(
2068 " I I I
2069 + 2 5 1
2070 + 4 9 2
2071 + 6 9 3",
2072 );
2073 let chunk_r2 = StreamChunk::from_pretty(
2074 " I I I
2075 + 1 4 4
2076 + 3 6 5",
2077 );
2078
2079 let (mut tx_l, mut tx_r, mut hash_join) =
2080 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2081
2082 tx_l.push_barrier(test_epoch(1), false);
2084 tx_r.push_barrier(test_epoch(1), false);
2085 hash_join.next_unwrap_ready_barrier()?;
2086
2087 tx_l.push_chunk(chunk_l1);
2089 hash_join.next_unwrap_pending();
2090
2091 tx_l.push_barrier(test_epoch(2), false);
2093 tx_r.push_barrier(test_epoch(2), false);
2094 hash_join.next_unwrap_ready_barrier()?;
2095
2096 tx_l.push_chunk(chunk_l2);
2098 hash_join.next_unwrap_pending();
2099
2100 tx_r.push_chunk(chunk_r1);
2102 let chunk = hash_join.next_unwrap_ready_chunk()?;
2103 assert_eq!(
2104 chunk,
2105 StreamChunk::from_pretty(
2106 " I I I
2107 + 2 5 2
2108 + 4 9 4"
2109 )
2110 );
2111
2112 tx_r.push_chunk(chunk_r2);
2114 let chunk = hash_join.next_unwrap_ready_chunk()?;
2115 assert_eq!(
2116 chunk,
2117 StreamChunk::from_pretty(
2118 " I I I
2119 + 1 4 1
2120 + 3 6 3"
2121 )
2122 );
2123
2124 Ok(())
2125 }
2126
2127 #[tokio::test]
2128 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2129 let chunk_l1 = StreamChunk::from_pretty(
2130 " I I I
2131 + 1 4 1
2132 + 2 5 2
2133 + 3 6 3",
2134 );
2135 let chunk_l2 = StreamChunk::from_pretty(
2136 " I I I
2137 + 4 9 4
2138 + 5 10 5",
2139 );
2140 let chunk_r1 = StreamChunk::from_pretty(
2141 " I I I
2142 + 2 5 1
2143 + 4 9 2
2144 + 6 9 3",
2145 );
2146 let chunk_r2 = StreamChunk::from_pretty(
2147 " I I I
2148 + 1 4 4
2149 + 3 6 5",
2150 );
2151
2152 let (mut tx_l, mut tx_r, mut hash_join) =
2153 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2154
2155 tx_l.push_barrier(test_epoch(1), false);
2157 tx_r.push_barrier(test_epoch(1), false);
2158 hash_join.next_unwrap_ready_barrier()?;
2159
2160 tx_l.push_chunk(chunk_l1);
2162 hash_join.next_unwrap_pending();
2163
2164 tx_l.push_barrier(test_epoch(2), false);
2166 tx_r.push_barrier(test_epoch(2), false);
2167 hash_join.next_unwrap_ready_barrier()?;
2168
2169 tx_l.push_chunk(chunk_l2);
2171 hash_join.next_unwrap_pending();
2172
2173 tx_r.push_chunk(chunk_r1);
2175 let chunk = hash_join.next_unwrap_ready_chunk()?;
2176 assert_eq!(
2177 chunk,
2178 StreamChunk::from_pretty(
2179 " I I I
2180 + 2 5 1
2181 + 4 9 2"
2182 )
2183 );
2184
2185 tx_r.push_chunk(chunk_r2);
2187 let chunk = hash_join.next_unwrap_ready_chunk()?;
2188 assert_eq!(
2189 chunk,
2190 StreamChunk::from_pretty(
2191 " I I I
2192 + 1 4 4
2193 + 3 6 5"
2194 )
2195 );
2196
2197 Ok(())
2198 }
2199
2200 #[tokio::test]
2201 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2202 let chunk_r1 = StreamChunk::from_pretty(
2203 " I I
2204 + 1 4
2205 + 2 5
2206 + 3 6",
2207 );
2208 let chunk_r2 = StreamChunk::from_pretty(
2209 " I I
2210 + 3 8
2211 - 3 8",
2212 );
2213 let chunk_l1 = StreamChunk::from_pretty(
2214 " I I
2215 + 2 7
2216 + 4 8
2217 + 6 9",
2218 );
2219 let chunk_l2 = StreamChunk::from_pretty(
2220 " I I
2221 + 3 10
2222 + 6 11",
2223 );
2224 let chunk_r3 = StreamChunk::from_pretty(
2225 " I I
2226 + 6 10",
2227 );
2228 let chunk_l3 = StreamChunk::from_pretty(
2229 " I I
2230 - 6 11",
2231 );
2232 let chunk_l4 = StreamChunk::from_pretty(
2233 " I I
2234 - 6 9",
2235 );
2236 let (mut tx_l, mut tx_r, mut hash_join) =
2237 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2238
2239 tx_l.push_barrier(test_epoch(1), false);
2241 tx_r.push_barrier(test_epoch(1), false);
2242 hash_join.next_unwrap_ready_barrier()?;
2243
2244 tx_r.push_chunk(chunk_r1);
2246 hash_join.next_unwrap_pending();
2247
2248 tx_l.push_barrier(test_epoch(2), false);
2250 tx_r.push_barrier(test_epoch(2), false);
2251 hash_join.next_unwrap_ready_barrier()?;
2252
2253 tx_r.push_chunk(chunk_r2);
2255 hash_join.next_unwrap_pending();
2256
2257 tx_l.push_chunk(chunk_l1);
2259 let chunk = hash_join.next_unwrap_ready_chunk()?;
2260 assert_eq!(
2261 chunk,
2262 StreamChunk::from_pretty(
2263 " I I
2264 + 2 5"
2265 )
2266 );
2267
2268 tx_l.push_chunk(chunk_l2);
2270 let chunk = hash_join.next_unwrap_ready_chunk()?;
2271 assert_eq!(
2272 chunk,
2273 StreamChunk::from_pretty(
2274 " I I
2275 + 3 6"
2276 )
2277 );
2278
2279 tx_r.push_chunk(chunk_r3);
2281 let chunk = hash_join.next_unwrap_ready_chunk()?;
2282 assert_eq!(
2283 chunk,
2284 StreamChunk::from_pretty(
2285 " I I
2286 + 6 10"
2287 )
2288 );
2289
2290 tx_l.push_chunk(chunk_l3);
2293 hash_join.next_unwrap_pending();
2294
2295 tx_l.push_chunk(chunk_l4);
2298 let chunk = hash_join.next_unwrap_ready_chunk()?;
2299 assert_eq!(
2300 chunk,
2301 StreamChunk::from_pretty(
2302 " I I
2303 - 6 10"
2304 )
2305 );
2306
2307 Ok(())
2308 }
2309
2310 #[tokio::test]
2311 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2312 let chunk_l1 = StreamChunk::from_pretty(
2313 " I I
2314 + 1 4
2315 + 2 5
2316 + 3 6",
2317 );
2318 let chunk_l2 = StreamChunk::from_pretty(
2319 " I I
2320 + 3 8
2321 - 3 8",
2322 );
2323 let chunk_r1 = StreamChunk::from_pretty(
2324 " I I
2325 + 2 7
2326 + 4 8
2327 + 6 9",
2328 );
2329 let chunk_r2 = StreamChunk::from_pretty(
2330 " I I
2331 + 3 10
2332 + 6 11
2333 + 1 2
2334 + 1 3",
2335 );
2336 let chunk_l3 = StreamChunk::from_pretty(
2337 " I I
2338 + 9 10",
2339 );
2340 let chunk_r3 = StreamChunk::from_pretty(
2341 " I I
2342 - 1 2",
2343 );
2344 let chunk_r4 = StreamChunk::from_pretty(
2345 " I I
2346 - 1 3",
2347 );
2348 let (mut tx_l, mut tx_r, mut hash_join) =
2349 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2350
2351 tx_l.push_barrier(test_epoch(1), false);
2353 tx_r.push_barrier(test_epoch(1), false);
2354 hash_join.next_unwrap_ready_barrier()?;
2355
2356 tx_l.push_chunk(chunk_l1);
2358 let chunk = hash_join.next_unwrap_ready_chunk()?;
2359 assert_eq!(
2360 chunk,
2361 StreamChunk::from_pretty(
2362 " I I
2363 + 1 4
2364 + 2 5
2365 + 3 6",
2366 )
2367 );
2368
2369 tx_l.push_barrier(test_epoch(2), false);
2371 tx_r.push_barrier(test_epoch(2), false);
2372 hash_join.next_unwrap_ready_barrier()?;
2373
2374 tx_l.push_chunk(chunk_l2);
2376 let chunk = hash_join.next_unwrap_ready_chunk()?;
2377 assert_eq!(
2378 chunk,
2379 StreamChunk::from_pretty(
2380 " I I
2381 + 3 8
2382 - 3 8",
2383 )
2384 );
2385
2386 tx_r.push_chunk(chunk_r1);
2388 let chunk = hash_join.next_unwrap_ready_chunk()?;
2389 assert_eq!(
2390 chunk,
2391 StreamChunk::from_pretty(
2392 " I I
2393 - 2 5"
2394 )
2395 );
2396
2397 tx_r.push_chunk(chunk_r2);
2399 let chunk = hash_join.next_unwrap_ready_chunk()?;
2400 assert_eq!(
2401 chunk,
2402 StreamChunk::from_pretty(
2403 " I I
2404 - 3 6
2405 - 1 4"
2406 )
2407 );
2408
2409 tx_l.push_chunk(chunk_l3);
2411 let chunk = hash_join.next_unwrap_ready_chunk()?;
2412 assert_eq!(
2413 chunk,
2414 StreamChunk::from_pretty(
2415 " I I
2416 + 9 10"
2417 )
2418 );
2419
2420 tx_r.push_chunk(chunk_r3);
2423 hash_join.next_unwrap_pending();
2424
2425 tx_r.push_chunk(chunk_r4);
2428 let chunk = hash_join.next_unwrap_ready_chunk()?;
2429 assert_eq!(
2430 chunk,
2431 StreamChunk::from_pretty(
2432 " I I
2433 + 1 4"
2434 )
2435 );
2436
2437 Ok(())
2438 }
2439
2440 #[tokio::test]
2441 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2442 let chunk_r1 = StreamChunk::from_pretty(
2443 " I I
2444 + 1 4
2445 + 2 5
2446 + 3 6",
2447 );
2448 let chunk_r2 = StreamChunk::from_pretty(
2449 " I I
2450 + 3 8
2451 - 3 8",
2452 );
2453 let chunk_l1 = StreamChunk::from_pretty(
2454 " I I
2455 + 2 7
2456 + 4 8
2457 + 6 9",
2458 );
2459 let chunk_l2 = StreamChunk::from_pretty(
2460 " I I
2461 + 3 10
2462 + 6 11
2463 + 1 2
2464 + 1 3",
2465 );
2466 let chunk_r3 = StreamChunk::from_pretty(
2467 " I I
2468 + 9 10",
2469 );
2470 let chunk_l3 = StreamChunk::from_pretty(
2471 " I I
2472 - 1 2",
2473 );
2474 let chunk_l4 = StreamChunk::from_pretty(
2475 " I I
2476 - 1 3",
2477 );
2478 let (mut tx_r, mut tx_l, mut hash_join) =
2479 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2480
2481 tx_r.push_barrier(test_epoch(1), false);
2483 tx_l.push_barrier(test_epoch(1), false);
2484 hash_join.next_unwrap_ready_barrier()?;
2485
2486 tx_r.push_chunk(chunk_r1);
2488 let chunk = hash_join.next_unwrap_ready_chunk()?;
2489 assert_eq!(
2490 chunk,
2491 StreamChunk::from_pretty(
2492 " I I
2493 + 1 4
2494 + 2 5
2495 + 3 6",
2496 )
2497 );
2498
2499 tx_r.push_barrier(test_epoch(2), false);
2501 tx_l.push_barrier(test_epoch(2), false);
2502 hash_join.next_unwrap_ready_barrier()?;
2503
2504 tx_r.push_chunk(chunk_r2);
2506 let chunk = hash_join.next_unwrap_ready_chunk()?;
2507 assert_eq!(
2508 chunk,
2509 StreamChunk::from_pretty(
2510 " I I
2511 + 3 8
2512 - 3 8",
2513 )
2514 );
2515
2516 tx_l.push_chunk(chunk_l1);
2518 let chunk = hash_join.next_unwrap_ready_chunk()?;
2519 assert_eq!(
2520 chunk,
2521 StreamChunk::from_pretty(
2522 " I I
2523 - 2 5"
2524 )
2525 );
2526
2527 tx_l.push_chunk(chunk_l2);
2529 let chunk = hash_join.next_unwrap_ready_chunk()?;
2530 assert_eq!(
2531 chunk,
2532 StreamChunk::from_pretty(
2533 " I I
2534 - 3 6
2535 - 1 4"
2536 )
2537 );
2538
2539 tx_r.push_chunk(chunk_r3);
2541 let chunk = hash_join.next_unwrap_ready_chunk()?;
2542 assert_eq!(
2543 chunk,
2544 StreamChunk::from_pretty(
2545 " I I
2546 + 9 10"
2547 )
2548 );
2549
2550 tx_l.push_chunk(chunk_l3);
2553 hash_join.next_unwrap_pending();
2554
2555 tx_l.push_chunk(chunk_l4);
2558 let chunk = hash_join.next_unwrap_ready_chunk()?;
2559 assert_eq!(
2560 chunk,
2561 StreamChunk::from_pretty(
2562 " I I
2563 + 1 4"
2564 )
2565 );
2566
2567 Ok(())
2568 }
2569
2570 #[tokio::test]
2571 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2572 let chunk_l1 = StreamChunk::from_pretty(
2573 " I I
2574 + 1 4
2575 + 2 5
2576 + 3 6",
2577 );
2578 let chunk_l2 = StreamChunk::from_pretty(
2579 " I I
2580 + 6 8
2581 + 3 8",
2582 );
2583 let chunk_r1 = StreamChunk::from_pretty(
2584 " I I
2585 + 2 7
2586 + 4 8
2587 + 6 9",
2588 );
2589 let chunk_r2 = StreamChunk::from_pretty(
2590 " I I
2591 + 3 10
2592 + 6 11",
2593 );
2594 let (mut tx_l, mut tx_r, mut hash_join) =
2595 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2596
2597 tx_l.push_barrier(test_epoch(1), false);
2599 tx_r.push_barrier(test_epoch(1), false);
2600 hash_join.next_unwrap_ready_barrier()?;
2601
2602 tx_l.push_chunk(chunk_l1);
2604 hash_join.next_unwrap_pending();
2605
2606 tx_l.push_barrier(test_epoch(2), false);
2608
2609 tx_l.push_chunk(chunk_l2);
2611
2612 tx_r.push_chunk(chunk_r1);
2614
2615 let chunk = hash_join.next_unwrap_ready_chunk()?;
2617 assert_eq!(
2618 chunk,
2619 StreamChunk::from_pretty(
2620 " I I I I
2621 + 2 5 2 7"
2622 )
2623 );
2624
2625 tx_r.push_barrier(test_epoch(2), false);
2627
2628 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2630 assert!(matches!(
2631 hash_join.next_unwrap_ready_barrier()?,
2632 Barrier {
2633 epoch,
2634 mutation: None,
2635 ..
2636 } if epoch == expected_epoch
2637 ));
2638
2639 let chunk = hash_join.next_unwrap_ready_chunk()?;
2641 assert_eq!(
2642 chunk,
2643 StreamChunk::from_pretty(
2644 " I I I I
2645 + 6 8 6 9"
2646 )
2647 );
2648
2649 tx_r.push_chunk(chunk_r2);
2651 let chunk = hash_join.next_unwrap_ready_chunk()?;
2652 assert_eq!(
2653 chunk,
2654 StreamChunk::from_pretty(
2655 " I I I I
2656 + 3 6 3 10
2657 + 3 8 3 10
2658 + 6 8 6 11"
2659 )
2660 );
2661
2662 Ok(())
2663 }
2664
2665 #[tokio::test]
2666 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2667 let chunk_l1 = StreamChunk::from_pretty(
2668 " I I
2669 + 1 4
2670 + 2 .
2671 + 3 .",
2672 );
2673 let chunk_l2 = StreamChunk::from_pretty(
2674 " I I
2675 + 6 .
2676 + 3 8",
2677 );
2678 let chunk_r1 = StreamChunk::from_pretty(
2679 " I I
2680 + 2 7
2681 + 4 8
2682 + 6 9",
2683 );
2684 let chunk_r2 = StreamChunk::from_pretty(
2685 " I I
2686 + 3 10
2687 + 6 11",
2688 );
2689 let (mut tx_l, mut tx_r, mut hash_join) =
2690 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2691
2692 tx_l.push_barrier(test_epoch(1), false);
2694 tx_r.push_barrier(test_epoch(1), false);
2695 hash_join.next_unwrap_ready_barrier()?;
2696
2697 tx_l.push_chunk(chunk_l1);
2699 hash_join.next_unwrap_pending();
2700
2701 tx_l.push_barrier(test_epoch(2), false);
2703
2704 tx_l.push_chunk(chunk_l2);
2706
2707 tx_r.push_chunk(chunk_r1);
2709
2710 let chunk = hash_join.next_unwrap_ready_chunk()?;
2712 assert_eq!(
2713 chunk,
2714 StreamChunk::from_pretty(
2715 " I I I I
2716 + 2 . 2 7"
2717 )
2718 );
2719
2720 tx_r.push_barrier(test_epoch(2), false);
2722
2723 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2725 assert!(matches!(
2726 hash_join.next_unwrap_ready_barrier()?,
2727 Barrier {
2728 epoch,
2729 mutation: None,
2730 ..
2731 } if epoch == expected_epoch
2732 ));
2733
2734 let chunk = hash_join.next_unwrap_ready_chunk()?;
2736 assert_eq!(
2737 chunk,
2738 StreamChunk::from_pretty(
2739 " I I I I
2740 + 6 . 6 9"
2741 )
2742 );
2743
2744 tx_r.push_chunk(chunk_r2);
2746 let chunk = hash_join.next_unwrap_ready_chunk()?;
2747 assert_eq!(
2748 chunk,
2749 StreamChunk::from_pretty(
2750 " I I I I
2751 + 3 8 3 10
2752 + 3 . 3 10
2753 + 6 . 6 11"
2754 )
2755 );
2756
2757 Ok(())
2758 }
2759
2760 #[tokio::test]
2761 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2762 let chunk_l1 = StreamChunk::from_pretty(
2763 " I I
2764 + 1 4
2765 + 2 5
2766 + 3 6",
2767 );
2768 let chunk_l2 = StreamChunk::from_pretty(
2769 " I I
2770 + 3 8
2771 - 3 8",
2772 );
2773 let chunk_r1 = StreamChunk::from_pretty(
2774 " I I
2775 + 2 7
2776 + 4 8
2777 + 6 9",
2778 );
2779 let chunk_r2 = StreamChunk::from_pretty(
2780 " I I
2781 + 3 10
2782 + 6 11",
2783 );
2784 let (mut tx_l, mut tx_r, mut hash_join) =
2785 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2786
2787 tx_l.push_barrier(test_epoch(1), false);
2789 tx_r.push_barrier(test_epoch(1), false);
2790 hash_join.next_unwrap_ready_barrier()?;
2791
2792 tx_l.push_chunk(chunk_l1);
2794 let chunk = hash_join.next_unwrap_ready_chunk()?;
2795 assert_eq!(
2796 chunk,
2797 StreamChunk::from_pretty(
2798 " I I I I
2799 + 1 4 . .
2800 + 2 5 . .
2801 + 3 6 . ."
2802 )
2803 );
2804
2805 tx_l.push_chunk(chunk_l2);
2807 let chunk = hash_join.next_unwrap_ready_chunk()?;
2808 assert_eq!(
2809 chunk,
2810 StreamChunk::from_pretty(
2811 " I I I I
2812 + 3 8 . .
2813 - 3 8 . ."
2814 )
2815 );
2816
2817 tx_r.push_chunk(chunk_r1);
2819 let chunk = hash_join.next_unwrap_ready_chunk()?;
2820 assert_eq!(
2821 chunk,
2822 StreamChunk::from_pretty(
2823 " I I I I
2824 U- 2 5 . .
2825 U+ 2 5 2 7"
2826 )
2827 );
2828
2829 tx_r.push_chunk(chunk_r2);
2831 let chunk = hash_join.next_unwrap_ready_chunk()?;
2832 assert_eq!(
2833 chunk,
2834 StreamChunk::from_pretty(
2835 " I I I I
2836 U- 3 6 . .
2837 U+ 3 6 3 10"
2838 )
2839 );
2840
2841 Ok(())
2842 }
2843
2844 #[tokio::test]
2845 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2846 let chunk_l1 = StreamChunk::from_pretty(
2847 " I I
2848 + 1 4
2849 + 2 5
2850 + . 6",
2851 );
2852 let chunk_l2 = StreamChunk::from_pretty(
2853 " I I
2854 + . 8
2855 - . 8",
2856 );
2857 let chunk_r1 = StreamChunk::from_pretty(
2858 " I I
2859 + 2 7
2860 + 4 8
2861 + 6 9",
2862 );
2863 let chunk_r2 = StreamChunk::from_pretty(
2864 " I I
2865 + . 10
2866 + 6 11",
2867 );
2868 let (mut tx_l, mut tx_r, mut hash_join) =
2869 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2870
2871 tx_l.push_barrier(test_epoch(1), false);
2873 tx_r.push_barrier(test_epoch(1), false);
2874 hash_join.next_unwrap_ready_barrier()?;
2875
2876 tx_l.push_chunk(chunk_l1);
2878 let chunk = hash_join.next_unwrap_ready_chunk()?;
2879 assert_eq!(
2880 chunk,
2881 StreamChunk::from_pretty(
2882 " I I I I
2883 + 1 4 . .
2884 + 2 5 . .
2885 + . 6 . ."
2886 )
2887 );
2888
2889 tx_l.push_chunk(chunk_l2);
2891 let chunk = hash_join.next_unwrap_ready_chunk()?;
2892 assert_eq!(
2893 chunk,
2894 StreamChunk::from_pretty(
2895 " I I I I
2896 + . 8 . .
2897 - . 8 . ."
2898 )
2899 );
2900
2901 tx_r.push_chunk(chunk_r1);
2903 let chunk = hash_join.next_unwrap_ready_chunk()?;
2904 assert_eq!(
2905 chunk,
2906 StreamChunk::from_pretty(
2907 " I I I I
2908 U- 2 5 . .
2909 U+ 2 5 2 7"
2910 )
2911 );
2912
2913 tx_r.push_chunk(chunk_r2);
2915 let chunk = hash_join.next_unwrap_ready_chunk()?;
2916 assert_eq!(
2917 chunk,
2918 StreamChunk::from_pretty(
2919 " I I I I
2920 U- . 6 . .
2921 U+ . 6 . 10"
2922 )
2923 );
2924
2925 Ok(())
2926 }
2927
2928 #[tokio::test]
2929 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2930 let chunk_l1 = StreamChunk::from_pretty(
2931 " I I
2932 + 1 4
2933 + 2 5
2934 + 3 6",
2935 );
2936 let chunk_l2 = StreamChunk::from_pretty(
2937 " I I
2938 + 3 8
2939 - 3 8",
2940 );
2941 let chunk_r1 = StreamChunk::from_pretty(
2942 " I I
2943 + 2 7
2944 + 4 8
2945 + 6 9",
2946 );
2947 let chunk_r2 = StreamChunk::from_pretty(
2948 " I I
2949 + 5 10
2950 - 5 10",
2951 );
2952 let (mut tx_l, mut tx_r, mut hash_join) =
2953 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2954
2955 tx_l.push_barrier(test_epoch(1), false);
2957 tx_r.push_barrier(test_epoch(1), false);
2958 hash_join.next_unwrap_ready_barrier()?;
2959
2960 tx_l.push_chunk(chunk_l1);
2962 hash_join.next_unwrap_pending();
2963
2964 tx_l.push_chunk(chunk_l2);
2966 hash_join.next_unwrap_pending();
2967
2968 tx_r.push_chunk(chunk_r1);
2970 let chunk = hash_join.next_unwrap_ready_chunk()?;
2971 assert_eq!(
2972 chunk,
2973 StreamChunk::from_pretty(
2974 " I I I I
2975 + 2 5 2 7
2976 + . . 4 8
2977 + . . 6 9"
2978 )
2979 );
2980
2981 tx_r.push_chunk(chunk_r2);
2983 let chunk = hash_join.next_unwrap_ready_chunk()?;
2984 assert_eq!(
2985 chunk,
2986 StreamChunk::from_pretty(
2987 " I I I I
2988 + . . 5 10
2989 - . . 5 10"
2990 )
2991 );
2992
2993 Ok(())
2994 }
2995
2996 #[tokio::test]
2997 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
2998 let chunk_l1 = StreamChunk::from_pretty(
2999 " I I I
3000 + 1 4 1
3001 + 2 5 2
3002 + 3 6 3",
3003 );
3004 let chunk_l2 = StreamChunk::from_pretty(
3005 " I I I
3006 + 4 9 4
3007 + 5 10 5",
3008 );
3009 let chunk_r1 = StreamChunk::from_pretty(
3010 " I I I
3011 + 2 5 1
3012 + 4 9 2
3013 + 6 9 3",
3014 );
3015 let chunk_r2 = StreamChunk::from_pretty(
3016 " I I I
3017 + 1 4 4
3018 + 3 6 5",
3019 );
3020
3021 let (mut tx_l, mut tx_r, mut hash_join) =
3022 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3023
3024 tx_l.push_barrier(test_epoch(1), false);
3026 tx_r.push_barrier(test_epoch(1), false);
3027 hash_join.next_unwrap_ready_barrier()?;
3028
3029 tx_l.push_chunk(chunk_l1);
3031 let chunk = hash_join.next_unwrap_ready_chunk()?;
3032 assert_eq!(
3033 chunk,
3034 StreamChunk::from_pretty(
3035 " I I I I I I
3036 + 1 4 1 . . .
3037 + 2 5 2 . . .
3038 + 3 6 3 . . ."
3039 )
3040 );
3041
3042 tx_l.push_chunk(chunk_l2);
3044 let chunk = hash_join.next_unwrap_ready_chunk()?;
3045 assert_eq!(
3046 chunk,
3047 StreamChunk::from_pretty(
3048 " I I I I I I
3049 + 4 9 4 . . .
3050 + 5 10 5 . . ."
3051 )
3052 );
3053
3054 tx_r.push_chunk(chunk_r1);
3056 let chunk = hash_join.next_unwrap_ready_chunk()?;
3057 assert_eq!(
3058 chunk,
3059 StreamChunk::from_pretty(
3060 " I I I I I I
3061 U- 2 5 2 . . .
3062 U+ 2 5 2 2 5 1
3063 U- 4 9 4 . . .
3064 U+ 4 9 4 4 9 2"
3065 )
3066 );
3067
3068 tx_r.push_chunk(chunk_r2);
3070 let chunk = hash_join.next_unwrap_ready_chunk()?;
3071 assert_eq!(
3072 chunk,
3073 StreamChunk::from_pretty(
3074 " I I I I I I
3075 U- 1 4 1 . . .
3076 U+ 1 4 1 1 4 4
3077 U- 3 6 3 . . .
3078 U+ 3 6 3 3 6 5"
3079 )
3080 );
3081
3082 Ok(())
3083 }
3084
3085 #[tokio::test]
3086 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3087 let chunk_l1 = StreamChunk::from_pretty(
3088 " I I I
3089 + 1 4 1
3090 + 2 5 2
3091 + 3 6 3",
3092 );
3093 let chunk_l2 = StreamChunk::from_pretty(
3094 " I I I
3095 + 4 9 4
3096 + 5 10 5",
3097 );
3098 let chunk_r1 = StreamChunk::from_pretty(
3099 " I I I
3100 + 2 5 1
3101 + 4 9 2
3102 + 6 9 3",
3103 );
3104 let chunk_r2 = StreamChunk::from_pretty(
3105 " I I I
3106 + 1 4 4
3107 + 3 6 5
3108 + 7 7 6",
3109 );
3110
3111 let (mut tx_l, mut tx_r, mut hash_join) =
3112 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3113
3114 tx_l.push_barrier(test_epoch(1), false);
3116 tx_r.push_barrier(test_epoch(1), false);
3117 hash_join.next_unwrap_ready_barrier()?;
3118
3119 tx_l.push_chunk(chunk_l1);
3121 hash_join.next_unwrap_pending();
3122
3123 tx_l.push_chunk(chunk_l2);
3125 hash_join.next_unwrap_pending();
3126
3127 tx_r.push_chunk(chunk_r1);
3129 let chunk = hash_join.next_unwrap_ready_chunk()?;
3130 assert_eq!(
3131 chunk,
3132 StreamChunk::from_pretty(
3133 " I I I I I I
3134 + 2 5 2 2 5 1
3135 + 4 9 4 4 9 2
3136 + . . . 6 9 3"
3137 )
3138 );
3139
3140 tx_r.push_chunk(chunk_r2);
3142 let chunk = hash_join.next_unwrap_ready_chunk()?;
3143 assert_eq!(
3144 chunk,
3145 StreamChunk::from_pretty(
3146 " I I I I I I
3147 + 1 4 1 1 4 4
3148 + 3 6 3 3 6 5
3149 + . . . 7 7 6"
3150 )
3151 );
3152
3153 Ok(())
3154 }
3155
3156 #[tokio::test]
3157 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3158 let chunk_l1 = StreamChunk::from_pretty(
3159 " I I
3160 + 1 4
3161 + 2 5
3162 + 3 6",
3163 );
3164 let chunk_l2 = StreamChunk::from_pretty(
3165 " I I
3166 + 3 8
3167 - 3 8",
3168 );
3169 let chunk_r1 = StreamChunk::from_pretty(
3170 " I I
3171 + 2 7
3172 + 4 8
3173 + 6 9",
3174 );
3175 let chunk_r2 = StreamChunk::from_pretty(
3176 " I I
3177 + 5 10
3178 - 5 10",
3179 );
3180 let (mut tx_l, mut tx_r, mut hash_join) =
3181 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3182
3183 tx_l.push_barrier(test_epoch(1), false);
3185 tx_r.push_barrier(test_epoch(1), false);
3186 hash_join.next_unwrap_ready_barrier()?;
3187
3188 tx_l.push_chunk(chunk_l1);
3190 let chunk = hash_join.next_unwrap_ready_chunk()?;
3191 assert_eq!(
3192 chunk,
3193 StreamChunk::from_pretty(
3194 " I I I I
3195 + 1 4 . .
3196 + 2 5 . .
3197 + 3 6 . ."
3198 )
3199 );
3200
3201 tx_l.push_chunk(chunk_l2);
3203 let chunk = hash_join.next_unwrap_ready_chunk()?;
3204 assert_eq!(
3205 chunk,
3206 StreamChunk::from_pretty(
3207 " I I I I
3208 + 3 8 . .
3209 - 3 8 . ."
3210 )
3211 );
3212
3213 tx_r.push_chunk(chunk_r1);
3215 let chunk = hash_join.next_unwrap_ready_chunk()?;
3216 assert_eq!(
3217 chunk,
3218 StreamChunk::from_pretty(
3219 " I I I I
3220 U- 2 5 . .
3221 U+ 2 5 2 7
3222 + . . 4 8
3223 + . . 6 9"
3224 )
3225 );
3226
3227 tx_r.push_chunk(chunk_r2);
3229 let chunk = hash_join.next_unwrap_ready_chunk()?;
3230 assert_eq!(
3231 chunk,
3232 StreamChunk::from_pretty(
3233 " I I I I
3234 + . . 5 10
3235 - . . 5 10"
3236 )
3237 );
3238
3239 Ok(())
3240 }
3241
3242 #[tokio::test]
3243 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3244 let (mut tx_l, mut tx_r, mut hash_join) =
3245 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3246
3247 tx_l.push_barrier(test_epoch(1), false);
3249 tx_r.push_barrier(test_epoch(1), false);
3250 hash_join.next_unwrap_ready_barrier()?;
3251
3252 tx_l.push_chunk(StreamChunk::from_pretty(
3253 " I I
3254 + 1 1
3255 ",
3256 ));
3257 let chunk = hash_join.next_unwrap_ready_chunk()?;
3258 assert_eq!(
3259 chunk,
3260 StreamChunk::from_pretty(
3261 " I I I I
3262 + 1 1 . ."
3263 )
3264 );
3265
3266 tx_r.push_chunk(StreamChunk::from_pretty(
3267 " I I
3268 + 1 1
3269 ",
3270 ));
3271 let chunk = hash_join.next_unwrap_ready_chunk()?;
3272
3273 assert_eq!(
3274 chunk,
3275 StreamChunk::from_pretty(
3276 " I I I I
3277 U- 1 1 . .
3278 U+ 1 1 1 1"
3279 )
3280 );
3281
3282 tx_l.push_chunk(StreamChunk::from_pretty(
3283 " I I
3284 - 1 1
3285 + 1 2
3286 ",
3287 ));
3288 let chunk = hash_join.next_unwrap_ready_chunk()?;
3289 let chunk = chunk.compact();
3290 assert_eq!(
3291 chunk,
3292 StreamChunk::from_pretty(
3293 " I I I I
3294 - 1 1 1 1
3295 + 1 2 1 1
3296 "
3297 )
3298 );
3299
3300 Ok(())
3301 }
3302
3303 #[tokio::test]
3304 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3305 {
3306 let chunk_l1 = StreamChunk::from_pretty(
3307 " I I
3308 + 1 4
3309 + 2 5
3310 + 3 6
3311 + 3 7",
3312 );
3313 let chunk_l2 = StreamChunk::from_pretty(
3314 " I I
3315 + 3 8
3316 - 3 8
3317 - 1 4", );
3319 let chunk_r1 = StreamChunk::from_pretty(
3320 " I I
3321 + 2 6
3322 + 4 8
3323 + 3 4",
3324 );
3325 let chunk_r2 = StreamChunk::from_pretty(
3326 " I I
3327 + 5 10
3328 - 5 10
3329 + 1 2",
3330 );
3331 let (mut tx_l, mut tx_r, mut hash_join) =
3332 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3333
3334 tx_l.push_barrier(test_epoch(1), false);
3336 tx_r.push_barrier(test_epoch(1), false);
3337 hash_join.next_unwrap_ready_barrier()?;
3338
3339 tx_l.push_chunk(chunk_l1);
3341 let chunk = hash_join.next_unwrap_ready_chunk()?;
3342 assert_eq!(
3343 chunk,
3344 StreamChunk::from_pretty(
3345 " I I I I
3346 + 1 4 . .
3347 + 2 5 . .
3348 + 3 6 . .
3349 + 3 7 . ."
3350 )
3351 );
3352
3353 tx_l.push_chunk(chunk_l2);
3355 let chunk = hash_join.next_unwrap_ready_chunk()?;
3356 assert_eq!(
3357 chunk,
3358 StreamChunk::from_pretty(
3359 " I I I I
3360 + 3 8 . .
3361 - 3 8 . .
3362 - 1 4 . ."
3363 )
3364 );
3365
3366 tx_r.push_chunk(chunk_r1);
3368 let chunk = hash_join.next_unwrap_ready_chunk()?;
3369 assert_eq!(
3370 chunk,
3371 StreamChunk::from_pretty(
3372 " I I I I
3373 U- 2 5 . .
3374 U+ 2 5 2 6
3375 + . . 4 8
3376 + . . 3 4" )
3380 );
3381
3382 tx_r.push_chunk(chunk_r2);
3384 let chunk = hash_join.next_unwrap_ready_chunk()?;
3385 assert_eq!(
3386 chunk,
3387 StreamChunk::from_pretty(
3388 " I I I I
3389 + . . 5 10
3390 - . . 5 10
3391 + . . 1 2" )
3394 );
3395
3396 Ok(())
3397 }
3398
3399 #[tokio::test]
3400 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3401 let chunk_l1 = StreamChunk::from_pretty(
3402 " I I
3403 + 1 4
3404 + 2 10
3405 + 3 6",
3406 );
3407 let chunk_l2 = StreamChunk::from_pretty(
3408 " I I
3409 + 3 8
3410 - 3 8",
3411 );
3412 let chunk_r1 = StreamChunk::from_pretty(
3413 " I I
3414 + 2 7
3415 + 4 8
3416 + 6 9",
3417 );
3418 let chunk_r2 = StreamChunk::from_pretty(
3419 " I I
3420 + 3 10
3421 + 6 11",
3422 );
3423 let (mut tx_l, mut tx_r, mut hash_join) =
3424 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3425
3426 tx_l.push_barrier(test_epoch(1), false);
3428 tx_r.push_barrier(test_epoch(1), false);
3429 hash_join.next_unwrap_ready_barrier()?;
3430
3431 tx_l.push_chunk(chunk_l1);
3433 hash_join.next_unwrap_pending();
3434
3435 tx_l.push_chunk(chunk_l2);
3437 hash_join.next_unwrap_pending();
3438
3439 tx_r.push_chunk(chunk_r1);
3441 hash_join.next_unwrap_pending();
3442
3443 tx_r.push_chunk(chunk_r2);
3445 let chunk = hash_join.next_unwrap_ready_chunk()?;
3446 assert_eq!(
3447 chunk,
3448 StreamChunk::from_pretty(
3449 " I I I I
3450 + 3 6 3 10"
3451 )
3452 );
3453
3454 Ok(())
3455 }
3456
3457 #[tokio::test]
3458 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3459 let (mut tx_l, mut tx_r, mut hash_join) =
3460 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3461
3462 tx_l.push_barrier(test_epoch(1), false);
3464 tx_r.push_barrier(test_epoch(1), false);
3465 hash_join.next_unwrap_ready_barrier()?;
3466
3467 tx_l.push_int64_watermark(0, 100);
3468
3469 tx_l.push_int64_watermark(0, 200);
3470
3471 tx_l.push_barrier(test_epoch(2), false);
3472 tx_r.push_barrier(test_epoch(2), false);
3473 hash_join.next_unwrap_ready_barrier()?;
3474
3475 tx_r.push_int64_watermark(0, 50);
3476
3477 let w1 = hash_join.next().await.unwrap().unwrap();
3478 let w1 = w1.as_watermark().unwrap();
3479
3480 let w2 = hash_join.next().await.unwrap().unwrap();
3481 let w2 = w2.as_watermark().unwrap();
3482
3483 tx_r.push_int64_watermark(0, 100);
3484
3485 let w3 = hash_join.next().await.unwrap().unwrap();
3486 let w3 = w3.as_watermark().unwrap();
3487
3488 let w4 = hash_join.next().await.unwrap().unwrap();
3489 let w4 = w4.as_watermark().unwrap();
3490
3491 assert_eq!(
3492 w1,
3493 &Watermark {
3494 col_idx: 2,
3495 data_type: DataType::Int64,
3496 val: ScalarImpl::Int64(50)
3497 }
3498 );
3499
3500 assert_eq!(
3501 w2,
3502 &Watermark {
3503 col_idx: 0,
3504 data_type: DataType::Int64,
3505 val: ScalarImpl::Int64(50)
3506 }
3507 );
3508
3509 assert_eq!(
3510 w3,
3511 &Watermark {
3512 col_idx: 2,
3513 data_type: DataType::Int64,
3514 val: ScalarImpl::Int64(100)
3515 }
3516 );
3517
3518 assert_eq!(
3519 w4,
3520 &Watermark {
3521 col_idx: 0,
3522 data_type: DataType::Int64,
3523 val: ScalarImpl::Int64(100)
3524 }
3525 );
3526
3527 Ok(())
3528 }
3529}