1use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use either::Either;
21use itertools::Itertools;
22use multimap::MultiMap;
23use risingwave_common::array::Op;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::row::RowExt;
26use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_expr::ExprError;
30use risingwave_expr::expr::NonStrictExpression;
31use tokio::time::Instant;
32
33use self::builder::JoinChunkBuilder;
34use super::barrier_align::*;
35use super::join::hash_join::*;
36use super::join::row::{JoinEncoding, JoinRow};
37use super::join::*;
38use super::watermark::*;
39use crate::executor::CachedJoinRow;
40use crate::executor::join::builder::JoinStreamChunkBuilder;
41use crate::executor::join::hash_join::CacheResult;
42use crate::executor::prelude::*;
43
44const EVICT_EVERY_N_ROWS: u32 = 16;
46
47fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
48 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
49}
50
51pub struct JoinParams {
52 pub join_key_indices: Vec<usize>,
54 pub deduped_pk_indices: Vec<usize>,
56}
57
58impl JoinParams {
59 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
60 Self {
61 join_key_indices,
62 deduped_pk_indices,
63 }
64 }
65}
66
67struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
68 ht: JoinHashMap<K, S, E>,
70 join_key_indices: Vec<usize>,
72 all_data_types: Vec<DataType>,
74 start_pos: usize,
76 i2o_mapping: Vec<(usize, usize)>,
78 i2o_mapping_indexed: MultiMap<usize, usize>,
79 input2inequality_index: Vec<Vec<(usize, bool)>>,
87 non_null_fields: Vec<usize>,
89 state_clean_columns: Vec<(usize, usize)>,
92 need_degree_table: bool,
94 _marker: std::marker::PhantomData<E>,
95}
96
97impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("JoinSide")
100 .field("join_key_indices", &self.join_key_indices)
101 .field("col_types", &self.all_data_types)
102 .field("start_pos", &self.start_pos)
103 .field("i2o_mapping", &self.i2o_mapping)
104 .field("need_degree_table", &self.need_degree_table)
105 .finish()
106 }
107}
108
109impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
110 fn is_dirty(&self) -> bool {
112 unimplemented!()
113 }
114
115 #[expect(dead_code)]
116 fn clear_cache(&mut self) {
117 assert!(
118 !self.is_dirty(),
119 "cannot clear cache while states of hash join are dirty"
120 );
121
122 }
125
126 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
127 self.ht.init(epoch).await
128 }
129}
130
131pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
134{
135 ctx: ActorContextRef,
136 info: ExecutorInfo,
137
138 input_l: Option<Executor>,
140 input_r: Option<Executor>,
142 actual_output_data_types: Vec<DataType>,
144 side_l: JoinSide<K, S, E>,
146 side_r: JoinSide<K, S, E>,
148 cond: Option<NonStrictExpression>,
150 inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
152 inequality_watermarks: Vec<Option<Watermark>>,
156
157 append_only_optimize: bool,
159
160 metrics: Arc<StreamingMetrics>,
161 chunk_size: usize,
163 cnt_rows_received: u32,
165
166 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
168
169 high_join_amplification_threshold: usize,
171
172 entry_state_max_rows: usize,
174}
175
176impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
177 for HashJoinExecutor<K, S, T, E>
178{
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 f.debug_struct("HashJoinExecutor")
181 .field("join_type", &T)
182 .field("input_left", &self.input_l.as_ref().unwrap().identity())
183 .field("input_right", &self.input_r.as_ref().unwrap().identity())
184 .field("side_l", &self.side_l)
185 .field("side_r", &self.side_r)
186 .field("pk_indices", &self.info.pk_indices)
187 .field("schema", &self.info.schema)
188 .field("actual_output_data_types", &self.actual_output_data_types)
189 .finish()
190 }
191}
192
193impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
194 for HashJoinExecutor<K, S, T, E>
195{
196 fn execute(self: Box<Self>) -> BoxedMessageStream {
197 self.into_stream().boxed()
198 }
199}
200
201struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
202 ctx: &'a ActorContextRef,
203 side_l: &'a mut JoinSide<K, S, E>,
204 side_r: &'a mut JoinSide<K, S, E>,
205 actual_output_data_types: &'a [DataType],
206 cond: &'a mut Option<NonStrictExpression>,
207 inequality_watermarks: &'a [Option<Watermark>],
208 chunk: StreamChunk,
209 append_only_optimize: bool,
210 chunk_size: usize,
211 cnt_rows_received: &'a mut u32,
212 high_join_amplification_threshold: usize,
213 entry_state_max_rows: usize,
214}
215
216impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
217 HashJoinExecutor<K, S, T, E>
218{
219 #[allow(clippy::too_many_arguments)]
220 pub fn new(
221 ctx: ActorContextRef,
222 info: ExecutorInfo,
223 input_l: Executor,
224 input_r: Executor,
225 params_l: JoinParams,
226 params_r: JoinParams,
227 null_safe: Vec<bool>,
228 output_indices: Vec<usize>,
229 cond: Option<NonStrictExpression>,
230 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
231 state_table_l: StateTable<S>,
232 degree_state_table_l: StateTable<S>,
233 state_table_r: StateTable<S>,
234 degree_state_table_r: StateTable<S>,
235 watermark_epoch: AtomicU64Ref,
236 is_append_only: bool,
237 metrics: Arc<StreamingMetrics>,
238 chunk_size: usize,
239 high_join_amplification_threshold: usize,
240 ) -> Self {
241 Self::new_with_cache_size(
242 ctx,
243 info,
244 input_l,
245 input_r,
246 params_l,
247 params_r,
248 null_safe,
249 output_indices,
250 cond,
251 inequality_pairs,
252 state_table_l,
253 degree_state_table_l,
254 state_table_r,
255 degree_state_table_r,
256 watermark_epoch,
257 is_append_only,
258 metrics,
259 chunk_size,
260 high_join_amplification_threshold,
261 None,
262 )
263 }
264
265 #[allow(clippy::too_many_arguments)]
266 pub fn new_with_cache_size(
267 ctx: ActorContextRef,
268 info: ExecutorInfo,
269 input_l: Executor,
270 input_r: Executor,
271 params_l: JoinParams,
272 params_r: JoinParams,
273 null_safe: Vec<bool>,
274 output_indices: Vec<usize>,
275 cond: Option<NonStrictExpression>,
276 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
277 state_table_l: StateTable<S>,
278 degree_state_table_l: StateTable<S>,
279 state_table_r: StateTable<S>,
280 degree_state_table_r: StateTable<S>,
281 watermark_epoch: AtomicU64Ref,
282 is_append_only: bool,
283 metrics: Arc<StreamingMetrics>,
284 chunk_size: usize,
285 high_join_amplification_threshold: usize,
286 entry_state_max_rows: Option<usize>,
287 ) -> Self {
288 let entry_state_max_rows = match entry_state_max_rows {
289 None => {
290 ctx.streaming_config
291 .developer
292 .hash_join_entry_state_max_rows
293 }
294 Some(entry_state_max_rows) => entry_state_max_rows,
295 };
296 let side_l_column_n = input_l.schema().len();
297
298 let schema_fields = match T {
299 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
300 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
301 _ => [
302 input_l.schema().fields.clone(),
303 input_r.schema().fields.clone(),
304 ]
305 .concat(),
306 };
307
308 let original_output_data_types = schema_fields
309 .iter()
310 .map(|field| field.data_type())
311 .collect_vec();
312 let actual_output_data_types = output_indices
313 .iter()
314 .map(|&idx| original_output_data_types[idx].clone())
315 .collect_vec();
316
317 let state_all_data_types_l = input_l.schema().data_types();
319 let state_all_data_types_r = input_r.schema().data_types();
320
321 let state_pk_indices_l = input_l.pk_indices().to_vec();
322 let state_pk_indices_r = input_r.pk_indices().to_vec();
323
324 let state_join_key_indices_l = params_l.join_key_indices;
325 let state_join_key_indices_r = params_r.join_key_indices;
326
327 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
328 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
329
330 let degree_pk_indices_l = (state_join_key_indices_l.len()
331 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
332 .collect_vec();
333 let degree_pk_indices_r = (state_join_key_indices_r.len()
334 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
335 .collect_vec();
336
337 let pk_contained_in_jk_l =
339 is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
340 let pk_contained_in_jk_r =
341 is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
342
343 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
345
346 let join_key_data_types_l = state_join_key_indices_l
347 .iter()
348 .map(|idx| state_all_data_types_l[*idx].clone())
349 .collect_vec();
350
351 let join_key_data_types_r = state_join_key_indices_r
352 .iter()
353 .map(|idx| state_all_data_types_r[*idx].clone())
354 .collect_vec();
355
356 assert_eq!(join_key_data_types_l, join_key_data_types_r);
357
358 let null_matched = K::Bitmap::from_bool_vec(null_safe);
359
360 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
361 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
362
363 let degree_state_l = need_degree_table_l.then(|| {
364 TableInner::new(
365 degree_pk_indices_l,
366 degree_join_key_indices_l,
367 degree_state_table_l,
368 )
369 });
370 let degree_state_r = need_degree_table_r.then(|| {
371 TableInner::new(
372 degree_pk_indices_r,
373 degree_join_key_indices_r,
374 degree_state_table_r,
375 )
376 });
377
378 let (left_to_output, right_to_output) = {
379 let (left_len, right_len) = if is_left_semi_or_anti(T) {
380 (state_all_data_types_l.len(), 0usize)
381 } else if is_right_semi_or_anti(T) {
382 (0usize, state_all_data_types_r.len())
383 } else {
384 (state_all_data_types_l.len(), state_all_data_types_r.len())
385 };
386 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
387 };
388
389 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
390 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
391
392 let left_input_len = input_l.schema().len();
393 let right_input_len = input_r.schema().len();
394 let mut l2inequality_index = vec![vec![]; left_input_len];
395 let mut r2inequality_index = vec![vec![]; right_input_len];
396 let mut l_state_clean_columns = vec![];
397 let mut r_state_clean_columns = vec![];
398 let inequality_pairs = inequality_pairs
399 .into_iter()
400 .enumerate()
401 .map(
402 |(
403 index,
404 (key_required_larger, key_required_smaller, clean_state, delta_expression),
405 )| {
406 let output_indices = if key_required_larger < key_required_smaller {
407 if clean_state {
408 l_state_clean_columns.push((key_required_larger, index));
409 }
410 l2inequality_index[key_required_larger].push((index, false));
411 r2inequality_index[key_required_smaller - left_input_len]
412 .push((index, true));
413 l2o_indexed
414 .get_vec(&key_required_larger)
415 .cloned()
416 .unwrap_or_default()
417 } else {
418 if clean_state {
419 r_state_clean_columns
420 .push((key_required_larger - left_input_len, index));
421 }
422 l2inequality_index[key_required_smaller].push((index, true));
423 r2inequality_index[key_required_larger - left_input_len]
424 .push((index, false));
425 r2o_indexed
426 .get_vec(&(key_required_larger - left_input_len))
427 .cloned()
428 .unwrap_or_default()
429 };
430 (output_indices, delta_expression)
431 },
432 )
433 .collect_vec();
434
435 let mut l_non_null_fields = l2inequality_index
436 .iter()
437 .positions(|inequalities| !inequalities.is_empty())
438 .collect_vec();
439 let mut r_non_null_fields = r2inequality_index
440 .iter()
441 .positions(|inequalities| !inequalities.is_empty())
442 .collect_vec();
443
444 if append_only_optimize {
445 l_state_clean_columns.clear();
446 r_state_clean_columns.clear();
447 l_non_null_fields.clear();
448 r_non_null_fields.clear();
449 }
450
451 let inequality_watermarks = vec![None; inequality_pairs.len()];
452 let watermark_buffers = BTreeMap::new();
453
454 Self {
455 ctx: ctx.clone(),
456 info,
457 input_l: Some(input_l),
458 input_r: Some(input_r),
459 actual_output_data_types,
460 side_l: JoinSide {
461 ht: JoinHashMap::new(
462 watermark_epoch.clone(),
463 join_key_data_types_l,
464 state_join_key_indices_l.clone(),
465 state_all_data_types_l.clone(),
466 state_table_l,
467 params_l.deduped_pk_indices,
468 degree_state_l,
469 null_matched.clone(),
470 pk_contained_in_jk_l,
471 None,
472 metrics.clone(),
473 ctx.id,
474 ctx.fragment_id,
475 "left",
476 ),
477 join_key_indices: state_join_key_indices_l,
478 all_data_types: state_all_data_types_l,
479 i2o_mapping: left_to_output,
480 i2o_mapping_indexed: l2o_indexed,
481 input2inequality_index: l2inequality_index,
482 non_null_fields: l_non_null_fields,
483 state_clean_columns: l_state_clean_columns,
484 start_pos: 0,
485 need_degree_table: need_degree_table_l,
486 _marker: PhantomData,
487 },
488 side_r: JoinSide {
489 ht: JoinHashMap::new(
490 watermark_epoch,
491 join_key_data_types_r,
492 state_join_key_indices_r.clone(),
493 state_all_data_types_r.clone(),
494 state_table_r,
495 params_r.deduped_pk_indices,
496 degree_state_r,
497 null_matched,
498 pk_contained_in_jk_r,
499 None,
500 metrics.clone(),
501 ctx.id,
502 ctx.fragment_id,
503 "right",
504 ),
505 join_key_indices: state_join_key_indices_r,
506 all_data_types: state_all_data_types_r,
507 start_pos: side_l_column_n,
508 i2o_mapping: right_to_output,
509 i2o_mapping_indexed: r2o_indexed,
510 input2inequality_index: r2inequality_index,
511 non_null_fields: r_non_null_fields,
512 state_clean_columns: r_state_clean_columns,
513 need_degree_table: need_degree_table_r,
514 _marker: PhantomData,
515 },
516 cond,
517 inequality_pairs,
518 inequality_watermarks,
519 append_only_optimize,
520 metrics,
521 chunk_size,
522 cnt_rows_received: 0,
523 watermark_buffers,
524 high_join_amplification_threshold,
525 entry_state_max_rows,
526 }
527 }
528
529 #[try_stream(ok = Message, error = StreamExecutorError)]
530 async fn into_stream(mut self) {
531 let input_l = self.input_l.take().unwrap();
532 let input_r = self.input_r.take().unwrap();
533 let aligned_stream = barrier_align(
534 input_l.execute(),
535 input_r.execute(),
536 self.ctx.id,
537 self.ctx.fragment_id,
538 self.metrics.clone(),
539 "Join",
540 );
541 pin_mut!(aligned_stream);
542
543 let actor_id = self.ctx.id;
544
545 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
546 let first_epoch = barrier.epoch;
547 yield Message::Barrier(barrier);
549 self.side_l.init(first_epoch).await?;
550 self.side_r.init(first_epoch).await?;
551
552 let actor_id_str = self.ctx.id.to_string();
553 let fragment_id_str = self.ctx.fragment_id.to_string();
554
555 let join_actor_input_waiting_duration_ns = self
557 .metrics
558 .join_actor_input_waiting_duration_ns
559 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
560 let left_join_match_duration_ns = self
561 .metrics
562 .join_match_duration_ns
563 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
564 let right_join_match_duration_ns = self
565 .metrics
566 .join_match_duration_ns
567 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
568
569 let barrier_join_match_duration_ns = self
570 .metrics
571 .join_match_duration_ns
572 .with_guarded_label_values(&[
573 actor_id_str.as_str(),
574 fragment_id_str.as_str(),
575 "barrier",
576 ]);
577
578 let left_join_cached_entry_count = self
579 .metrics
580 .join_cached_entry_count
581 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
582
583 let right_join_cached_entry_count = self
584 .metrics
585 .join_cached_entry_count
586 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
587
588 let mut start_time = Instant::now();
589
590 while let Some(msg) = aligned_stream
591 .next()
592 .instrument_await("hash_join_barrier_align")
593 .await
594 {
595 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
596 match msg? {
597 AlignedMessage::WatermarkLeft(watermark) => {
598 for watermark_to_emit in
599 self.handle_watermark(SideType::Left, watermark).await?
600 {
601 yield Message::Watermark(watermark_to_emit);
602 }
603 }
604 AlignedMessage::WatermarkRight(watermark) => {
605 for watermark_to_emit in
606 self.handle_watermark(SideType::Right, watermark).await?
607 {
608 yield Message::Watermark(watermark_to_emit);
609 }
610 }
611 AlignedMessage::Left(chunk) => {
612 let mut left_time = Duration::from_nanos(0);
613 let mut left_start_time = Instant::now();
614 #[for_await]
615 for chunk in Self::eq_join_left(EqJoinArgs {
616 ctx: &self.ctx,
617 side_l: &mut self.side_l,
618 side_r: &mut self.side_r,
619 actual_output_data_types: &self.actual_output_data_types,
620 cond: &mut self.cond,
621 inequality_watermarks: &self.inequality_watermarks,
622 chunk,
623 append_only_optimize: self.append_only_optimize,
624 chunk_size: self.chunk_size,
625 cnt_rows_received: &mut self.cnt_rows_received,
626 high_join_amplification_threshold: self.high_join_amplification_threshold,
627 entry_state_max_rows: self.entry_state_max_rows,
628 }) {
629 left_time += left_start_time.elapsed();
630 yield Message::Chunk(chunk?);
631 left_start_time = Instant::now();
632 }
633 left_time += left_start_time.elapsed();
634 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
635 self.try_flush_data().await?;
636 }
637 AlignedMessage::Right(chunk) => {
638 let mut right_time = Duration::from_nanos(0);
639 let mut right_start_time = Instant::now();
640 #[for_await]
641 for chunk in Self::eq_join_right(EqJoinArgs {
642 ctx: &self.ctx,
643 side_l: &mut self.side_l,
644 side_r: &mut self.side_r,
645 actual_output_data_types: &self.actual_output_data_types,
646 cond: &mut self.cond,
647 inequality_watermarks: &self.inequality_watermarks,
648 chunk,
649 append_only_optimize: self.append_only_optimize,
650 chunk_size: self.chunk_size,
651 cnt_rows_received: &mut self.cnt_rows_received,
652 high_join_amplification_threshold: self.high_join_amplification_threshold,
653 entry_state_max_rows: self.entry_state_max_rows,
654 }) {
655 right_time += right_start_time.elapsed();
656 yield Message::Chunk(chunk?);
657 right_start_time = Instant::now();
658 }
659 right_time += right_start_time.elapsed();
660 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
661 self.try_flush_data().await?;
662 }
663 AlignedMessage::Barrier(barrier) => {
664 let barrier_start_time = Instant::now();
665 let (left_post_commit, right_post_commit) =
666 self.flush_data(barrier.epoch).await?;
667
668 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
669
670 barrier_join_match_duration_ns
673 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
674 yield Message::Barrier(barrier);
675
676 right_post_commit
678 .post_yield_barrier(update_vnode_bitmap.clone())
679 .await?;
680 if left_post_commit
681 .post_yield_barrier(update_vnode_bitmap)
682 .await?
683 .unwrap_or(false)
684 {
685 self.watermark_buffers
686 .values_mut()
687 .for_each(|buffers| buffers.clear());
688 self.inequality_watermarks.fill(None);
689 }
690
691 for (join_cached_entry_count, ht) in [
693 (&left_join_cached_entry_count, &self.side_l.ht),
694 (&right_join_cached_entry_count, &self.side_r.ht),
695 ] {
696 join_cached_entry_count.set(ht.entry_count() as i64);
697 }
698 }
699 }
700 start_time = Instant::now();
701 }
702 }
703
704 async fn flush_data(
705 &mut self,
706 epoch: EpochPair,
707 ) -> StreamExecutorResult<(
708 JoinHashMapPostCommit<'_, K, S, E>,
709 JoinHashMapPostCommit<'_, K, S, E>,
710 )> {
711 let left = self.side_l.ht.flush(epoch).await?;
714 let right = self.side_r.ht.flush(epoch).await?;
715 Ok((left, right))
716 }
717
718 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
719 self.side_l.ht.try_flush().await?;
722 self.side_r.ht.try_flush().await?;
723 Ok(())
724 }
725
726 fn evict_cache(
728 side_update: &mut JoinSide<K, S, E>,
729 side_match: &mut JoinSide<K, S, E>,
730 cnt_rows_received: &mut u32,
731 ) {
732 *cnt_rows_received += 1;
733 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
734 side_update.ht.evict();
735 side_match.ht.evict();
736 *cnt_rows_received = 0;
737 }
738 }
739
740 async fn handle_watermark(
741 &mut self,
742 side: SideTypePrimitive,
743 watermark: Watermark,
744 ) -> StreamExecutorResult<Vec<Watermark>> {
745 let (side_update, side_match) = if side == SideType::Left {
746 (&mut self.side_l, &mut self.side_r)
747 } else {
748 (&mut self.side_r, &mut self.side_l)
749 };
750
751 if side_update.join_key_indices[0] == watermark.col_idx {
753 side_match.ht.update_watermark(watermark.val.clone());
754 }
755
756 let wm_in_jk = side_update
758 .join_key_indices
759 .iter()
760 .positions(|idx| *idx == watermark.col_idx);
761 let mut watermarks_to_emit = vec![];
762 for idx in wm_in_jk {
763 let buffers = self
764 .watermark_buffers
765 .entry(idx)
766 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
767 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
768 let empty_indices = vec![];
769 let output_indices = side_update
770 .i2o_mapping_indexed
771 .get_vec(&side_update.join_key_indices[idx])
772 .unwrap_or(&empty_indices)
773 .iter()
774 .chain(
775 side_match
776 .i2o_mapping_indexed
777 .get_vec(&side_match.join_key_indices[idx])
778 .unwrap_or(&empty_indices),
779 );
780 for output_idx in output_indices {
781 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
782 }
783 };
784 }
785 for (inequality_index, need_offset) in
786 &side_update.input2inequality_index[watermark.col_idx]
787 {
788 let buffers = self
789 .watermark_buffers
790 .entry(side_update.join_key_indices.len() + inequality_index)
791 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
792 let mut input_watermark = watermark.clone();
793 if *need_offset
794 && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
795 {
796 #[allow(clippy::disallowed_methods)]
798 let eval_result = delta_expression
799 .inner()
800 .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
801 .await;
802 match eval_result {
803 Ok(value) => input_watermark.val = value.unwrap(),
804 Err(err) => {
805 if !matches!(err, ExprError::NumericOutOfRange) {
806 self.ctx.on_compute_error(err, &self.info.identity);
807 }
808 continue;
809 }
810 }
811 };
812 if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
813 for output_idx in &self.inequality_pairs[*inequality_index].0 {
814 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
815 }
816 self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
817 }
818 }
819 Ok(watermarks_to_emit)
820 }
821
822 fn row_concat(
823 row_update: impl Row,
824 update_start_pos: usize,
825 row_matched: impl Row,
826 matched_start_pos: usize,
827 ) -> OwnedRow {
828 let mut new_row = vec![None; row_update.len() + row_matched.len()];
829
830 for (i, datum_ref) in row_update.iter().enumerate() {
831 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
832 }
833 for (i, datum_ref) in row_matched.iter().enumerate() {
834 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
835 }
836 OwnedRow::new(new_row)
837 }
838
839 fn eq_join_left(
841 args: EqJoinArgs<'_, K, S, E>,
842 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
843 Self::eq_join_oneside::<{ SideType::Left }>(args)
844 }
845
846 fn eq_join_right(
848 args: EqJoinArgs<'_, K, S, E>,
849 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
850 Self::eq_join_oneside::<{ SideType::Right }>(args)
851 }
852
853 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
854 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
855 let EqJoinArgs {
856 ctx,
857 side_l,
858 side_r,
859 actual_output_data_types,
860 cond,
861 inequality_watermarks,
862 chunk,
863 append_only_optimize,
864 chunk_size,
865 cnt_rows_received,
866 high_join_amplification_threshold,
867 entry_state_max_rows,
868 ..
869 } = args;
870
871 let (side_update, side_match) = if SIDE == SideType::Left {
872 (side_l, side_r)
873 } else {
874 (side_r, side_l)
875 };
876
877 let useful_state_clean_columns = side_match
878 .state_clean_columns
879 .iter()
880 .filter_map(|(column_idx, inequality_index)| {
881 inequality_watermarks[*inequality_index]
882 .as_ref()
883 .map(|watermark| (*column_idx, watermark))
884 })
885 .collect_vec();
886
887 let mut hashjoin_chunk_builder =
888 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
889 chunk_size,
890 actual_output_data_types.to_vec(),
891 side_update.i2o_mapping.clone(),
892 side_match.i2o_mapping.clone(),
893 ));
894
895 let join_matched_join_keys = ctx
896 .streaming_metrics
897 .join_matched_join_keys
898 .with_guarded_label_values(&[
899 &ctx.id.to_string(),
900 &ctx.fragment_id.to_string(),
901 &side_update.ht.table_id().to_string(),
902 ]);
903
904 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
905 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
906 let Some((op, row)) = r else {
907 continue;
908 };
909 Self::evict_cache(side_update, side_match, cnt_rows_received);
910
911 let cache_lookup_result = {
912 let probe_non_null_requirement_satisfied = side_update
913 .non_null_fields
914 .iter()
915 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
916 let build_non_null_requirement_satisfied =
917 key.null_bitmap().is_subset(side_match.ht.null_matched());
918 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
919 side_match.ht.take_state_opt(key)
920 } else {
921 CacheResult::NeverMatch
922 }
923 };
924 let mut total_matches = 0;
925
926 macro_rules! match_rows {
927 ($op:ident) => {
928 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
929 cache_lookup_result,
930 row,
931 key,
932 &mut hashjoin_chunk_builder,
933 side_match,
934 side_update,
935 &useful_state_clean_columns,
936 cond,
937 append_only_optimize,
938 entry_state_max_rows,
939 )
940 };
941 }
942
943 match op {
944 Op::Insert | Op::UpdateInsert =>
945 {
946 #[for_await]
947 for chunk in match_rows!(Insert) {
948 let chunk = chunk?;
949 total_matches += chunk.cardinality();
950 yield chunk;
951 }
952 }
953 Op::Delete | Op::UpdateDelete =>
954 {
955 #[for_await]
956 for chunk in match_rows!(Delete) {
957 let chunk = chunk?;
958 total_matches += chunk.cardinality();
959 yield chunk;
960 }
961 }
962 };
963
964 join_matched_join_keys.observe(total_matches as _);
965 if total_matches > high_join_amplification_threshold {
966 let join_key_data_types = side_update.ht.join_key_data_types();
967 let key = key.deserialize(join_key_data_types)?;
968 tracing::warn!(target: "high_join_amplification",
969 matched_rows_len = total_matches,
970 update_table_id = side_update.ht.table_id(),
971 match_table_id = side_match.ht.table_id(),
972 join_key = ?key,
973 actor_id = ctx.id,
974 fragment_id = ctx.fragment_id,
975 "large rows matched for join key"
976 );
977 }
978 }
979 if let Some(chunk) = hashjoin_chunk_builder.take() {
981 yield chunk;
982 }
983 }
984
985 #[allow(clippy::too_many_arguments)]
994 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
995 async fn handle_match_rows<
996 'a,
997 const SIDE: SideTypePrimitive,
998 const JOIN_OP: JoinOpPrimitive,
999 >(
1000 cached_lookup_result: CacheResult<E>,
1001 row: RowRef<'a>,
1002 key: &'a K,
1003 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1004 side_match: &'a mut JoinSide<K, S, E>,
1005 side_update: &'a mut JoinSide<K, S, E>,
1006 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1007 cond: &'a mut Option<NonStrictExpression>,
1008 append_only_optimize: bool,
1009 entry_state_max_rows: usize,
1010 ) {
1011 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1012 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1013 let mut entry_state_count = 0;
1014
1015 let mut degree = 0;
1016 let mut append_only_matched_row = None;
1017 let mut matched_rows_to_clean = vec![];
1018
1019 macro_rules! match_row {
1020 (
1021 $match_order_key_indices:expr,
1022 $degree_table:expr,
1023 $matched_row:expr,
1024 $matched_row_ref:expr,
1025 $from_cache:literal,
1026 $map_output:expr,
1027 ) => {
1028 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1029 row,
1030 $matched_row,
1031 $matched_row_ref,
1032 hashjoin_chunk_builder,
1033 $match_order_key_indices,
1034 $degree_table,
1035 side_update.start_pos,
1036 side_match.start_pos,
1037 cond,
1038 &mut degree,
1039 useful_state_clean_columns,
1040 append_only_optimize,
1041 &mut append_only_matched_row,
1042 &mut matched_rows_to_clean,
1043 $map_output,
1044 )
1045 };
1046 }
1047
1048 let entry_state = match cached_lookup_result {
1049 CacheResult::NeverMatch => {
1050 let op = match JOIN_OP {
1051 JoinOp::Insert => Op::Insert,
1052 JoinOp::Delete => Op::Delete,
1053 };
1054 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1055 yield chunk;
1056 }
1057 return Ok(());
1058 }
1059 CacheResult::Hit(mut cached_rows) => {
1060 let (match_order_key_indices, match_degree_state) =
1061 side_match.ht.get_degree_state_mut_ref();
1062 for (matched_row_ref, matched_row) in
1064 cached_rows.values_mut(&side_match.all_data_types)
1065 {
1066 let matched_row = matched_row?;
1067 if let Some(chunk) = match_row!(
1068 match_order_key_indices,
1069 match_degree_state,
1070 matched_row,
1071 Some(matched_row_ref),
1072 true,
1073 Either::Left,
1074 )
1075 .await
1076 {
1077 yield chunk;
1078 }
1079 }
1080
1081 cached_rows
1082 }
1083 CacheResult::Miss => {
1084 let (matched_rows, match_order_key_indices, degree_table) = side_match
1086 .ht
1087 .fetch_matched_rows_and_get_degree_table_ref(key)
1088 .await?;
1089
1090 #[for_await]
1091 for matched_row in matched_rows {
1092 let (encoded_pk, matched_row) = matched_row?;
1093
1094 let mut matched_row_ref = None;
1095
1096 if entry_state_count <= entry_state_max_rows {
1098 let row_ref = entry_state
1099 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1101 matched_row_ref = Some(row_ref);
1102 entry_state_count += 1;
1103 }
1104 if let Some(chunk) = match_row!(
1105 match_order_key_indices,
1106 degree_table,
1107 matched_row,
1108 matched_row_ref,
1109 false,
1110 Either::Right,
1111 )
1112 .await
1113 {
1114 yield chunk;
1115 }
1116 }
1117 Box::new(entry_state)
1118 }
1119 };
1120
1121 let op = match JOIN_OP {
1123 JoinOp::Insert => Op::Insert,
1124 JoinOp::Delete => Op::Delete,
1125 };
1126 if degree == 0 {
1127 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1128 yield chunk;
1129 }
1130 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1131 {
1132 yield chunk;
1133 }
1134
1135 if cache_hit || entry_state_count <= entry_state_max_rows {
1137 side_match.ht.update_state(key, entry_state);
1138 }
1139
1140 for matched_row in matched_rows_to_clean {
1142 side_match.ht.delete_handle_degree(key, matched_row)?;
1143 }
1144
1145 if append_only_optimize && let Some(row) = append_only_matched_row {
1147 assert_matches!(JOIN_OP, JoinOp::Insert);
1148 side_match.ht.delete_handle_degree(key, row)?;
1149 return Ok(());
1150 }
1151
1152 match JOIN_OP {
1154 JoinOp::Insert => {
1155 side_update
1156 .ht
1157 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1158 }
1159 JoinOp::Delete => {
1160 side_update
1161 .ht
1162 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1163 }
1164 }
1165 }
1166
1167 #[allow(clippy::too_many_arguments)]
1168 #[inline]
1169 async fn handle_match_row<
1170 'a,
1171 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1174 const JOIN_OP: JoinOpPrimitive,
1175 const MATCHED_ROWS_FROM_CACHE: bool,
1176 >(
1177 update_row: RowRef<'a>,
1178 mut matched_row: JoinRow<R>,
1179 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1180 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1181 match_order_key_indices: &[usize],
1182 match_degree_table: &mut Option<TableInner<S>>,
1183 side_update_start_pos: usize,
1184 side_match_start_pos: usize,
1185 cond: &Option<NonStrictExpression>,
1186 update_row_degree: &mut u64,
1187 useful_state_clean_columns: &[(usize, &'a Watermark)],
1188 append_only_optimize: bool,
1189 append_only_matched_row: &mut Option<JoinRow<RO>>,
1190 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1191 map_output: impl Fn(R) -> RO,
1192 ) -> Option<StreamChunk> {
1193 let mut need_state_clean = false;
1194 let mut chunk_opt = None;
1195
1196 let join_condition_satisfied = Self::check_join_condition(
1200 update_row,
1201 side_update_start_pos,
1202 &matched_row.row,
1203 side_match_start_pos,
1204 cond,
1205 )
1206 .await;
1207
1208 if join_condition_satisfied {
1209 *update_row_degree += 1;
1211 if matches!(JOIN_OP, JoinOp::Insert)
1216 && !forward_exactly_once(T, SIDE)
1217 && let Some(chunk) =
1218 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1219 {
1220 chunk_opt = Some(chunk);
1221 }
1222 if let Some(degree_table) = match_degree_table {
1224 update_degree::<S, { JOIN_OP }>(
1225 match_order_key_indices,
1226 degree_table,
1227 &mut matched_row,
1228 );
1229 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1230 match JOIN_OP {
1232 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1233 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1234 }
1235 }
1236 }
1237
1238 if matches!(JOIN_OP, JoinOp::Delete)
1241 && !forward_exactly_once(T, SIDE)
1242 && let Some(chunk) =
1243 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1244 {
1245 chunk_opt = Some(chunk);
1246 }
1247 } else {
1248 for (column_idx, watermark) in useful_state_clean_columns {
1250 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1251 scalar
1252 .default_cmp(&watermark.val.as_scalar_ref_impl())
1253 .is_lt()
1254 }) {
1255 need_state_clean = true;
1256 break;
1257 }
1258 }
1259 }
1260 if append_only_optimize {
1264 assert_matches!(JOIN_OP, JoinOp::Insert);
1265 assert!(append_only_matched_row.is_none());
1268 *append_only_matched_row = Some(matched_row.map(map_output));
1269 } else if need_state_clean {
1270 matched_rows_to_clean.push(matched_row.map(map_output));
1274 }
1275
1276 chunk_opt
1277 }
1278
1279 #[inline]
1284 async fn check_join_condition(
1285 row: impl Row,
1286 side_update_start_pos: usize,
1287 matched_row: impl Row,
1288 side_match_start_pos: usize,
1289 join_condition: &Option<NonStrictExpression>,
1290 ) -> bool {
1291 if let Some(join_condition) = join_condition {
1292 let new_row = Self::row_concat(
1293 row,
1294 side_update_start_pos,
1295 matched_row,
1296 side_match_start_pos,
1297 );
1298 join_condition
1299 .eval_row_infallible(&new_row)
1300 .await
1301 .map(|s| *s.as_bool())
1302 .unwrap_or(false)
1303 } else {
1304 true
1305 }
1306 }
1307}
1308
1309#[cfg(test)]
1310mod tests {
1311 use std::sync::atomic::AtomicU64;
1312
1313 use pretty_assertions::assert_eq;
1314 use risingwave_common::array::*;
1315 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1316 use risingwave_common::hash::{Key64, Key128};
1317 use risingwave_common::util::epoch::test_epoch;
1318 use risingwave_common::util::sort_util::OrderType;
1319 use risingwave_storage::memory::MemoryStateStore;
1320
1321 use super::*;
1322 use crate::common::table::test_utils::gen_pbtable;
1323 use crate::executor::MemoryEncoding;
1324 use crate::executor::test_utils::expr::build_from_pretty;
1325 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1326
1327 async fn create_in_memory_state_table(
1328 mem_state: MemoryStateStore,
1329 data_types: &[DataType],
1330 order_types: &[OrderType],
1331 pk_indices: &[usize],
1332 table_id: u32,
1333 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1334 let column_descs = data_types
1335 .iter()
1336 .enumerate()
1337 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1338 .collect_vec();
1339 let state_table = StateTable::from_table_catalog(
1340 &gen_pbtable(
1341 TableId::new(table_id),
1342 column_descs,
1343 order_types.to_vec(),
1344 pk_indices.to_vec(),
1345 0,
1346 ),
1347 mem_state.clone(),
1348 None,
1349 )
1350 .await;
1351
1352 let mut degree_table_column_descs = vec![];
1354 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1355 degree_table_column_descs.push(ColumnDesc::unnamed(
1356 ColumnId::new(pk_id as i32),
1357 data_types[*idx].clone(),
1358 ))
1359 });
1360 degree_table_column_descs.push(ColumnDesc::unnamed(
1361 ColumnId::new(pk_indices.len() as i32),
1362 DataType::Int64,
1363 ));
1364 let degree_state_table = StateTable::from_table_catalog(
1365 &gen_pbtable(
1366 TableId::new(table_id + 1),
1367 degree_table_column_descs,
1368 order_types.to_vec(),
1369 pk_indices.to_vec(),
1370 0,
1371 ),
1372 mem_state,
1373 None,
1374 )
1375 .await;
1376 (state_table, degree_state_table)
1377 }
1378
1379 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1380 build_from_pretty(
1381 condition_text
1382 .as_deref()
1383 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1384 )
1385 }
1386
1387 async fn create_executor<const T: JoinTypePrimitive>(
1388 with_condition: bool,
1389 null_safe: bool,
1390 condition_text: Option<String>,
1391 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1392 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1393 let schema = Schema {
1394 fields: vec![
1395 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1397 ],
1398 };
1399 let (tx_l, source_l) = MockSource::channel();
1400 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1401 let (tx_r, source_r) = MockSource::channel();
1402 let source_r = source_r.into_executor(schema, vec![1]);
1403 let params_l = JoinParams::new(vec![0], vec![1]);
1404 let params_r = JoinParams::new(vec![0], vec![1]);
1405 let cond = with_condition.then(|| create_cond(condition_text));
1406
1407 let mem_state = MemoryStateStore::new();
1408
1409 let (state_l, degree_state_l) = create_in_memory_state_table(
1410 mem_state.clone(),
1411 &[DataType::Int64, DataType::Int64],
1412 &[OrderType::ascending(), OrderType::ascending()],
1413 &[0, 1],
1414 0,
1415 )
1416 .await;
1417
1418 let (state_r, degree_state_r) = create_in_memory_state_table(
1419 mem_state,
1420 &[DataType::Int64, DataType::Int64],
1421 &[OrderType::ascending(), OrderType::ascending()],
1422 &[0, 1],
1423 2,
1424 )
1425 .await;
1426
1427 let schema = match T {
1428 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1429 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1430 _ => [source_l.schema().fields(), source_r.schema().fields()]
1431 .concat()
1432 .into_iter()
1433 .collect(),
1434 };
1435 let schema_len = schema.len();
1436 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1437
1438 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1439 ActorContext::for_test(123),
1440 info,
1441 source_l,
1442 source_r,
1443 params_l,
1444 params_r,
1445 vec![null_safe],
1446 (0..schema_len).collect_vec(),
1447 cond,
1448 inequality_pairs,
1449 state_l,
1450 degree_state_l,
1451 state_r,
1452 degree_state_r,
1453 Arc::new(AtomicU64::new(0)),
1454 false,
1455 Arc::new(StreamingMetrics::unused()),
1456 1024,
1457 2048,
1458 );
1459 (tx_l, tx_r, executor.boxed().execute())
1460 }
1461
1462 async fn create_classical_executor<const T: JoinTypePrimitive>(
1463 with_condition: bool,
1464 null_safe: bool,
1465 condition_text: Option<String>,
1466 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1467 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1468 }
1469
1470 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1471 with_condition: bool,
1472 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1473 let schema = Schema {
1474 fields: vec![
1475 Field::unnamed(DataType::Int64),
1476 Field::unnamed(DataType::Int64),
1477 Field::unnamed(DataType::Int64),
1478 ],
1479 };
1480 let (tx_l, source_l) = MockSource::channel();
1481 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1482 let (tx_r, source_r) = MockSource::channel();
1483 let source_r = source_r.into_executor(schema, vec![0]);
1484 let params_l = JoinParams::new(vec![0, 1], vec![]);
1485 let params_r = JoinParams::new(vec![0, 1], vec![]);
1486 let cond = with_condition.then(|| create_cond(None));
1487
1488 let mem_state = MemoryStateStore::new();
1489
1490 let (state_l, degree_state_l) = create_in_memory_state_table(
1491 mem_state.clone(),
1492 &[DataType::Int64, DataType::Int64, DataType::Int64],
1493 &[
1494 OrderType::ascending(),
1495 OrderType::ascending(),
1496 OrderType::ascending(),
1497 ],
1498 &[0, 1, 0],
1499 0,
1500 )
1501 .await;
1502
1503 let (state_r, degree_state_r) = create_in_memory_state_table(
1504 mem_state,
1505 &[DataType::Int64, DataType::Int64, DataType::Int64],
1506 &[
1507 OrderType::ascending(),
1508 OrderType::ascending(),
1509 OrderType::ascending(),
1510 ],
1511 &[0, 1, 1],
1512 1,
1513 )
1514 .await;
1515
1516 let schema = match T {
1517 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1518 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1519 _ => [source_l.schema().fields(), source_r.schema().fields()]
1520 .concat()
1521 .into_iter()
1522 .collect(),
1523 };
1524 let schema_len = schema.len();
1525 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1526
1527 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1528 ActorContext::for_test(123),
1529 info,
1530 source_l,
1531 source_r,
1532 params_l,
1533 params_r,
1534 vec![false],
1535 (0..schema_len).collect_vec(),
1536 cond,
1537 vec![],
1538 state_l,
1539 degree_state_l,
1540 state_r,
1541 degree_state_r,
1542 Arc::new(AtomicU64::new(0)),
1543 true,
1544 Arc::new(StreamingMetrics::unused()),
1545 1024,
1546 2048,
1547 );
1548 (tx_l, tx_r, executor.boxed().execute())
1549 }
1550
1551 #[tokio::test]
1552 async fn test_interval_join() -> StreamExecutorResult<()> {
1553 let chunk_l1 = StreamChunk::from_pretty(
1554 " I I
1555 + 1 4
1556 + 2 3
1557 + 2 5
1558 + 3 6",
1559 );
1560 let chunk_l2 = StreamChunk::from_pretty(
1561 " I I
1562 + 3 8
1563 - 3 8",
1564 );
1565 let chunk_r1 = StreamChunk::from_pretty(
1566 " I I
1567 + 2 6
1568 + 4 8
1569 + 6 9",
1570 );
1571 let chunk_r2 = StreamChunk::from_pretty(
1572 " I I
1573 + 2 3
1574 + 6 11",
1575 );
1576 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1577 true,
1578 false,
1579 Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1580 vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1581 )
1582 .await;
1583
1584 tx_l.push_barrier(test_epoch(1), false);
1586 tx_r.push_barrier(test_epoch(1), false);
1587 hash_join.next_unwrap_ready_barrier()?;
1588
1589 tx_l.push_chunk(chunk_l1);
1591 hash_join.next_unwrap_pending();
1592
1593 tx_l.push_barrier(test_epoch(2), false);
1595 tx_r.push_barrier(test_epoch(2), false);
1596 hash_join.next_unwrap_ready_barrier()?;
1597
1598 tx_l.push_chunk(chunk_l2);
1600 hash_join.next_unwrap_pending();
1601
1602 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1603 hash_join.next_unwrap_pending();
1604
1605 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1606 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1607 assert_eq!(
1608 output_watermark,
1609 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1610 );
1611 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1612 assert_eq!(
1613 output_watermark,
1614 Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1615 );
1616
1617 tx_r.push_chunk(chunk_r1);
1619 let chunk = hash_join.next_unwrap_ready_chunk()?;
1620 assert_eq!(
1622 chunk,
1623 StreamChunk::from_pretty(
1624 " I I I I
1625 + 2 5 2 6"
1626 )
1627 );
1628
1629 tx_r.push_chunk(chunk_r2);
1631 hash_join.next_unwrap_pending();
1634
1635 Ok(())
1636 }
1637
1638 #[tokio::test]
1639 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1640 let chunk_l1 = StreamChunk::from_pretty(
1641 " I I
1642 + 1 4
1643 + 2 5
1644 + 3 6",
1645 );
1646 let chunk_l2 = StreamChunk::from_pretty(
1647 " I I
1648 + 3 8
1649 - 3 8",
1650 );
1651 let chunk_r1 = StreamChunk::from_pretty(
1652 " I I
1653 + 2 7
1654 + 4 8
1655 + 6 9",
1656 );
1657 let chunk_r2 = StreamChunk::from_pretty(
1658 " I I
1659 + 3 10
1660 + 6 11",
1661 );
1662 let (mut tx_l, mut tx_r, mut hash_join) =
1663 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1664
1665 tx_l.push_barrier(test_epoch(1), false);
1667 tx_r.push_barrier(test_epoch(1), false);
1668 hash_join.next_unwrap_ready_barrier()?;
1669
1670 tx_l.push_chunk(chunk_l1);
1672 hash_join.next_unwrap_pending();
1673
1674 tx_l.push_barrier(test_epoch(2), false);
1676 tx_r.push_barrier(test_epoch(2), false);
1677 hash_join.next_unwrap_ready_barrier()?;
1678
1679 tx_l.push_chunk(chunk_l2);
1681 hash_join.next_unwrap_pending();
1682
1683 tx_r.push_chunk(chunk_r1);
1685 let chunk = hash_join.next_unwrap_ready_chunk()?;
1686 assert_eq!(
1687 chunk,
1688 StreamChunk::from_pretty(
1689 " I I I I
1690 + 2 5 2 7"
1691 )
1692 );
1693
1694 tx_r.push_chunk(chunk_r2);
1696 let chunk = hash_join.next_unwrap_ready_chunk()?;
1697 assert_eq!(
1698 chunk,
1699 StreamChunk::from_pretty(
1700 " I I I I
1701 + 3 6 3 10"
1702 )
1703 );
1704
1705 Ok(())
1706 }
1707
1708 #[tokio::test]
1709 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1710 let chunk_l1 = StreamChunk::from_pretty(
1711 " I I
1712 + 1 4
1713 + 2 5
1714 + . 6",
1715 );
1716 let chunk_l2 = StreamChunk::from_pretty(
1717 " I I
1718 + . 8
1719 - . 8",
1720 );
1721 let chunk_r1 = StreamChunk::from_pretty(
1722 " I I
1723 + 2 7
1724 + 4 8
1725 + 6 9",
1726 );
1727 let chunk_r2 = StreamChunk::from_pretty(
1728 " I I
1729 + . 10
1730 + 6 11",
1731 );
1732 let (mut tx_l, mut tx_r, mut hash_join) =
1733 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1734
1735 tx_l.push_barrier(test_epoch(1), false);
1737 tx_r.push_barrier(test_epoch(1), false);
1738 hash_join.next_unwrap_ready_barrier()?;
1739
1740 tx_l.push_chunk(chunk_l1);
1742 hash_join.next_unwrap_pending();
1743
1744 tx_l.push_barrier(test_epoch(2), false);
1746 tx_r.push_barrier(test_epoch(2), false);
1747 hash_join.next_unwrap_ready_barrier()?;
1748
1749 tx_l.push_chunk(chunk_l2);
1751 hash_join.next_unwrap_pending();
1752
1753 tx_r.push_chunk(chunk_r1);
1755 let chunk = hash_join.next_unwrap_ready_chunk()?;
1756 assert_eq!(
1757 chunk,
1758 StreamChunk::from_pretty(
1759 " I I I I
1760 + 2 5 2 7"
1761 )
1762 );
1763
1764 tx_r.push_chunk(chunk_r2);
1766 let chunk = hash_join.next_unwrap_ready_chunk()?;
1767 assert_eq!(
1768 chunk,
1769 StreamChunk::from_pretty(
1770 " I I I I
1771 + . 6 . 10"
1772 )
1773 );
1774
1775 Ok(())
1776 }
1777
1778 #[tokio::test]
1779 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1780 let chunk_l1 = StreamChunk::from_pretty(
1781 " I I
1782 + 1 4
1783 + 2 5
1784 + 3 6",
1785 );
1786 let chunk_l2 = StreamChunk::from_pretty(
1787 " I I
1788 + 3 8
1789 - 3 8",
1790 );
1791 let chunk_r1 = StreamChunk::from_pretty(
1792 " I I
1793 + 2 7
1794 + 4 8
1795 + 6 9",
1796 );
1797 let chunk_r2 = StreamChunk::from_pretty(
1798 " I I
1799 + 3 10
1800 + 6 11",
1801 );
1802 let chunk_l3 = StreamChunk::from_pretty(
1803 " I I
1804 + 6 10",
1805 );
1806 let chunk_r3 = StreamChunk::from_pretty(
1807 " I I
1808 - 6 11",
1809 );
1810 let chunk_r4 = StreamChunk::from_pretty(
1811 " I I
1812 - 6 9",
1813 );
1814 let (mut tx_l, mut tx_r, mut hash_join) =
1815 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1816
1817 tx_l.push_barrier(test_epoch(1), false);
1819 tx_r.push_barrier(test_epoch(1), false);
1820 hash_join.next_unwrap_ready_barrier()?;
1821
1822 tx_l.push_chunk(chunk_l1);
1824 hash_join.next_unwrap_pending();
1825
1826 tx_l.push_barrier(test_epoch(2), false);
1828 tx_r.push_barrier(test_epoch(2), false);
1829 hash_join.next_unwrap_ready_barrier()?;
1830
1831 tx_l.push_chunk(chunk_l2);
1833 hash_join.next_unwrap_pending();
1834
1835 tx_r.push_chunk(chunk_r1);
1837 let chunk = hash_join.next_unwrap_ready_chunk()?;
1838 assert_eq!(
1839 chunk,
1840 StreamChunk::from_pretty(
1841 " I I
1842 + 2 5"
1843 )
1844 );
1845
1846 tx_r.push_chunk(chunk_r2);
1848 let chunk = hash_join.next_unwrap_ready_chunk()?;
1849 assert_eq!(
1850 chunk,
1851 StreamChunk::from_pretty(
1852 " I I
1853 + 3 6"
1854 )
1855 );
1856
1857 tx_l.push_chunk(chunk_l3);
1859 let chunk = hash_join.next_unwrap_ready_chunk()?;
1860 assert_eq!(
1861 chunk,
1862 StreamChunk::from_pretty(
1863 " I I
1864 + 6 10"
1865 )
1866 );
1867
1868 tx_r.push_chunk(chunk_r3);
1871 hash_join.next_unwrap_pending();
1872
1873 tx_r.push_chunk(chunk_r4);
1876 let chunk = hash_join.next_unwrap_ready_chunk()?;
1877 assert_eq!(
1878 chunk,
1879 StreamChunk::from_pretty(
1880 " I I
1881 - 6 10"
1882 )
1883 );
1884
1885 Ok(())
1886 }
1887
1888 #[tokio::test]
1889 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1890 let chunk_l1 = StreamChunk::from_pretty(
1891 " I I
1892 + 1 4
1893 + 2 5
1894 + . 6",
1895 );
1896 let chunk_l2 = StreamChunk::from_pretty(
1897 " I I
1898 + . 8
1899 - . 8",
1900 );
1901 let chunk_r1 = StreamChunk::from_pretty(
1902 " I I
1903 + 2 7
1904 + 4 8
1905 + 6 9",
1906 );
1907 let chunk_r2 = StreamChunk::from_pretty(
1908 " I I
1909 + . 10
1910 + 6 11",
1911 );
1912 let chunk_l3 = StreamChunk::from_pretty(
1913 " I I
1914 + 6 10",
1915 );
1916 let chunk_r3 = StreamChunk::from_pretty(
1917 " I I
1918 - 6 11",
1919 );
1920 let chunk_r4 = StreamChunk::from_pretty(
1921 " I I
1922 - 6 9",
1923 );
1924 let (mut tx_l, mut tx_r, mut hash_join) =
1925 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1926
1927 tx_l.push_barrier(test_epoch(1), false);
1929 tx_r.push_barrier(test_epoch(1), false);
1930 hash_join.next_unwrap_ready_barrier()?;
1931
1932 tx_l.push_chunk(chunk_l1);
1934 hash_join.next_unwrap_pending();
1935
1936 tx_l.push_barrier(test_epoch(2), false);
1938 tx_r.push_barrier(test_epoch(2), false);
1939 hash_join.next_unwrap_ready_barrier()?;
1940
1941 tx_l.push_chunk(chunk_l2);
1943 hash_join.next_unwrap_pending();
1944
1945 tx_r.push_chunk(chunk_r1);
1947 let chunk = hash_join.next_unwrap_ready_chunk()?;
1948 assert_eq!(
1949 chunk,
1950 StreamChunk::from_pretty(
1951 " I I
1952 + 2 5"
1953 )
1954 );
1955
1956 tx_r.push_chunk(chunk_r2);
1958 let chunk = hash_join.next_unwrap_ready_chunk()?;
1959 assert_eq!(
1960 chunk,
1961 StreamChunk::from_pretty(
1962 " I I
1963 + . 6"
1964 )
1965 );
1966
1967 tx_l.push_chunk(chunk_l3);
1969 let chunk = hash_join.next_unwrap_ready_chunk()?;
1970 assert_eq!(
1971 chunk,
1972 StreamChunk::from_pretty(
1973 " I I
1974 + 6 10"
1975 )
1976 );
1977
1978 tx_r.push_chunk(chunk_r3);
1981 hash_join.next_unwrap_pending();
1982
1983 tx_r.push_chunk(chunk_r4);
1986 let chunk = hash_join.next_unwrap_ready_chunk()?;
1987 assert_eq!(
1988 chunk,
1989 StreamChunk::from_pretty(
1990 " I I
1991 - 6 10"
1992 )
1993 );
1994
1995 Ok(())
1996 }
1997
1998 #[tokio::test]
1999 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
2000 let chunk_l1 = StreamChunk::from_pretty(
2001 " I I I
2002 + 1 4 1
2003 + 2 5 2
2004 + 3 6 3",
2005 );
2006 let chunk_l2 = StreamChunk::from_pretty(
2007 " I I I
2008 + 4 9 4
2009 + 5 10 5",
2010 );
2011 let chunk_r1 = StreamChunk::from_pretty(
2012 " I I I
2013 + 2 5 1
2014 + 4 9 2
2015 + 6 9 3",
2016 );
2017 let chunk_r2 = StreamChunk::from_pretty(
2018 " I I I
2019 + 1 4 4
2020 + 3 6 5",
2021 );
2022
2023 let (mut tx_l, mut tx_r, mut hash_join) =
2024 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2025
2026 tx_l.push_barrier(test_epoch(1), false);
2028 tx_r.push_barrier(test_epoch(1), false);
2029 hash_join.next_unwrap_ready_barrier()?;
2030
2031 tx_l.push_chunk(chunk_l1);
2033 hash_join.next_unwrap_pending();
2034
2035 tx_l.push_barrier(test_epoch(2), false);
2037 tx_r.push_barrier(test_epoch(2), false);
2038 hash_join.next_unwrap_ready_barrier()?;
2039
2040 tx_l.push_chunk(chunk_l2);
2042 hash_join.next_unwrap_pending();
2043
2044 tx_r.push_chunk(chunk_r1);
2046 let chunk = hash_join.next_unwrap_ready_chunk()?;
2047 assert_eq!(
2048 chunk,
2049 StreamChunk::from_pretty(
2050 " I I I I I I
2051 + 2 5 2 2 5 1
2052 + 4 9 4 4 9 2"
2053 )
2054 );
2055
2056 tx_r.push_chunk(chunk_r2);
2058 let chunk = hash_join.next_unwrap_ready_chunk()?;
2059 assert_eq!(
2060 chunk,
2061 StreamChunk::from_pretty(
2062 " I I I I I I
2063 + 1 4 1 1 4 4
2064 + 3 6 3 3 6 5"
2065 )
2066 );
2067
2068 Ok(())
2069 }
2070
2071 #[tokio::test]
2072 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2073 let chunk_l1 = StreamChunk::from_pretty(
2074 " I I I
2075 + 1 4 1
2076 + 2 5 2
2077 + 3 6 3",
2078 );
2079 let chunk_l2 = StreamChunk::from_pretty(
2080 " I I I
2081 + 4 9 4
2082 + 5 10 5",
2083 );
2084 let chunk_r1 = StreamChunk::from_pretty(
2085 " I I I
2086 + 2 5 1
2087 + 4 9 2
2088 + 6 9 3",
2089 );
2090 let chunk_r2 = StreamChunk::from_pretty(
2091 " I I I
2092 + 1 4 4
2093 + 3 6 5",
2094 );
2095
2096 let (mut tx_l, mut tx_r, mut hash_join) =
2097 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2098
2099 tx_l.push_barrier(test_epoch(1), false);
2101 tx_r.push_barrier(test_epoch(1), false);
2102 hash_join.next_unwrap_ready_barrier()?;
2103
2104 tx_l.push_chunk(chunk_l1);
2106 hash_join.next_unwrap_pending();
2107
2108 tx_l.push_barrier(test_epoch(2), false);
2110 tx_r.push_barrier(test_epoch(2), false);
2111 hash_join.next_unwrap_ready_barrier()?;
2112
2113 tx_l.push_chunk(chunk_l2);
2115 hash_join.next_unwrap_pending();
2116
2117 tx_r.push_chunk(chunk_r1);
2119 let chunk = hash_join.next_unwrap_ready_chunk()?;
2120 assert_eq!(
2121 chunk,
2122 StreamChunk::from_pretty(
2123 " I I I
2124 + 2 5 2
2125 + 4 9 4"
2126 )
2127 );
2128
2129 tx_r.push_chunk(chunk_r2);
2131 let chunk = hash_join.next_unwrap_ready_chunk()?;
2132 assert_eq!(
2133 chunk,
2134 StreamChunk::from_pretty(
2135 " I I I
2136 + 1 4 1
2137 + 3 6 3"
2138 )
2139 );
2140
2141 Ok(())
2142 }
2143
2144 #[tokio::test]
2145 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2146 let chunk_l1 = StreamChunk::from_pretty(
2147 " I I I
2148 + 1 4 1
2149 + 2 5 2
2150 + 3 6 3",
2151 );
2152 let chunk_l2 = StreamChunk::from_pretty(
2153 " I I I
2154 + 4 9 4
2155 + 5 10 5",
2156 );
2157 let chunk_r1 = StreamChunk::from_pretty(
2158 " I I I
2159 + 2 5 1
2160 + 4 9 2
2161 + 6 9 3",
2162 );
2163 let chunk_r2 = StreamChunk::from_pretty(
2164 " I I I
2165 + 1 4 4
2166 + 3 6 5",
2167 );
2168
2169 let (mut tx_l, mut tx_r, mut hash_join) =
2170 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2171
2172 tx_l.push_barrier(test_epoch(1), false);
2174 tx_r.push_barrier(test_epoch(1), false);
2175 hash_join.next_unwrap_ready_barrier()?;
2176
2177 tx_l.push_chunk(chunk_l1);
2179 hash_join.next_unwrap_pending();
2180
2181 tx_l.push_barrier(test_epoch(2), false);
2183 tx_r.push_barrier(test_epoch(2), false);
2184 hash_join.next_unwrap_ready_barrier()?;
2185
2186 tx_l.push_chunk(chunk_l2);
2188 hash_join.next_unwrap_pending();
2189
2190 tx_r.push_chunk(chunk_r1);
2192 let chunk = hash_join.next_unwrap_ready_chunk()?;
2193 assert_eq!(
2194 chunk,
2195 StreamChunk::from_pretty(
2196 " I I I
2197 + 2 5 1
2198 + 4 9 2"
2199 )
2200 );
2201
2202 tx_r.push_chunk(chunk_r2);
2204 let chunk = hash_join.next_unwrap_ready_chunk()?;
2205 assert_eq!(
2206 chunk,
2207 StreamChunk::from_pretty(
2208 " I I I
2209 + 1 4 4
2210 + 3 6 5"
2211 )
2212 );
2213
2214 Ok(())
2215 }
2216
2217 #[tokio::test]
2218 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2219 let chunk_r1 = StreamChunk::from_pretty(
2220 " I I
2221 + 1 4
2222 + 2 5
2223 + 3 6",
2224 );
2225 let chunk_r2 = StreamChunk::from_pretty(
2226 " I I
2227 + 3 8
2228 - 3 8",
2229 );
2230 let chunk_l1 = StreamChunk::from_pretty(
2231 " I I
2232 + 2 7
2233 + 4 8
2234 + 6 9",
2235 );
2236 let chunk_l2 = StreamChunk::from_pretty(
2237 " I I
2238 + 3 10
2239 + 6 11",
2240 );
2241 let chunk_r3 = StreamChunk::from_pretty(
2242 " I I
2243 + 6 10",
2244 );
2245 let chunk_l3 = StreamChunk::from_pretty(
2246 " I I
2247 - 6 11",
2248 );
2249 let chunk_l4 = StreamChunk::from_pretty(
2250 " I I
2251 - 6 9",
2252 );
2253 let (mut tx_l, mut tx_r, mut hash_join) =
2254 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2255
2256 tx_l.push_barrier(test_epoch(1), false);
2258 tx_r.push_barrier(test_epoch(1), false);
2259 hash_join.next_unwrap_ready_barrier()?;
2260
2261 tx_r.push_chunk(chunk_r1);
2263 hash_join.next_unwrap_pending();
2264
2265 tx_l.push_barrier(test_epoch(2), false);
2267 tx_r.push_barrier(test_epoch(2), false);
2268 hash_join.next_unwrap_ready_barrier()?;
2269
2270 tx_r.push_chunk(chunk_r2);
2272 hash_join.next_unwrap_pending();
2273
2274 tx_l.push_chunk(chunk_l1);
2276 let chunk = hash_join.next_unwrap_ready_chunk()?;
2277 assert_eq!(
2278 chunk,
2279 StreamChunk::from_pretty(
2280 " I I
2281 + 2 5"
2282 )
2283 );
2284
2285 tx_l.push_chunk(chunk_l2);
2287 let chunk = hash_join.next_unwrap_ready_chunk()?;
2288 assert_eq!(
2289 chunk,
2290 StreamChunk::from_pretty(
2291 " I I
2292 + 3 6"
2293 )
2294 );
2295
2296 tx_r.push_chunk(chunk_r3);
2298 let chunk = hash_join.next_unwrap_ready_chunk()?;
2299 assert_eq!(
2300 chunk,
2301 StreamChunk::from_pretty(
2302 " I I
2303 + 6 10"
2304 )
2305 );
2306
2307 tx_l.push_chunk(chunk_l3);
2310 hash_join.next_unwrap_pending();
2311
2312 tx_l.push_chunk(chunk_l4);
2315 let chunk = hash_join.next_unwrap_ready_chunk()?;
2316 assert_eq!(
2317 chunk,
2318 StreamChunk::from_pretty(
2319 " I I
2320 - 6 10"
2321 )
2322 );
2323
2324 Ok(())
2325 }
2326
2327 #[tokio::test]
2328 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2329 let chunk_l1 = StreamChunk::from_pretty(
2330 " I I
2331 + 1 4
2332 + 2 5
2333 + 3 6",
2334 );
2335 let chunk_l2 = StreamChunk::from_pretty(
2336 " I I
2337 + 3 8
2338 - 3 8",
2339 );
2340 let chunk_r1 = StreamChunk::from_pretty(
2341 " I I
2342 + 2 7
2343 + 4 8
2344 + 6 9",
2345 );
2346 let chunk_r2 = StreamChunk::from_pretty(
2347 " I I
2348 + 3 10
2349 + 6 11
2350 + 1 2
2351 + 1 3",
2352 );
2353 let chunk_l3 = StreamChunk::from_pretty(
2354 " I I
2355 + 9 10",
2356 );
2357 let chunk_r3 = StreamChunk::from_pretty(
2358 " I I
2359 - 1 2",
2360 );
2361 let chunk_r4 = StreamChunk::from_pretty(
2362 " I I
2363 - 1 3",
2364 );
2365 let (mut tx_l, mut tx_r, mut hash_join) =
2366 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2367
2368 tx_l.push_barrier(test_epoch(1), false);
2370 tx_r.push_barrier(test_epoch(1), false);
2371 hash_join.next_unwrap_ready_barrier()?;
2372
2373 tx_l.push_chunk(chunk_l1);
2375 let chunk = hash_join.next_unwrap_ready_chunk()?;
2376 assert_eq!(
2377 chunk,
2378 StreamChunk::from_pretty(
2379 " I I
2380 + 1 4
2381 + 2 5
2382 + 3 6",
2383 )
2384 );
2385
2386 tx_l.push_barrier(test_epoch(2), false);
2388 tx_r.push_barrier(test_epoch(2), false);
2389 hash_join.next_unwrap_ready_barrier()?;
2390
2391 tx_l.push_chunk(chunk_l2);
2393 let chunk = hash_join.next_unwrap_ready_chunk()?;
2394 assert_eq!(
2395 chunk,
2396 StreamChunk::from_pretty(
2397 " I I
2398 + 3 8 D
2399 - 3 8 D",
2400 )
2401 );
2402
2403 tx_r.push_chunk(chunk_r1);
2405 let chunk = hash_join.next_unwrap_ready_chunk()?;
2406 assert_eq!(
2407 chunk,
2408 StreamChunk::from_pretty(
2409 " I I
2410 - 2 5"
2411 )
2412 );
2413
2414 tx_r.push_chunk(chunk_r2);
2416 let chunk = hash_join.next_unwrap_ready_chunk()?;
2417 assert_eq!(
2418 chunk,
2419 StreamChunk::from_pretty(
2420 " I I
2421 - 3 6
2422 - 1 4"
2423 )
2424 );
2425
2426 tx_l.push_chunk(chunk_l3);
2428 let chunk = hash_join.next_unwrap_ready_chunk()?;
2429 assert_eq!(
2430 chunk,
2431 StreamChunk::from_pretty(
2432 " I I
2433 + 9 10"
2434 )
2435 );
2436
2437 tx_r.push_chunk(chunk_r3);
2440 hash_join.next_unwrap_pending();
2441
2442 tx_r.push_chunk(chunk_r4);
2445 let chunk = hash_join.next_unwrap_ready_chunk()?;
2446 assert_eq!(
2447 chunk,
2448 StreamChunk::from_pretty(
2449 " I I
2450 + 1 4"
2451 )
2452 );
2453
2454 Ok(())
2455 }
2456
2457 #[tokio::test]
2458 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2459 let chunk_r1 = StreamChunk::from_pretty(
2460 " I I
2461 + 1 4
2462 + 2 5
2463 + 3 6",
2464 );
2465 let chunk_r2 = StreamChunk::from_pretty(
2466 " I I
2467 + 3 8
2468 - 3 8",
2469 );
2470 let chunk_l1 = StreamChunk::from_pretty(
2471 " I I
2472 + 2 7
2473 + 4 8
2474 + 6 9",
2475 );
2476 let chunk_l2 = StreamChunk::from_pretty(
2477 " I I
2478 + 3 10
2479 + 6 11
2480 + 1 2
2481 + 1 3",
2482 );
2483 let chunk_r3 = StreamChunk::from_pretty(
2484 " I I
2485 + 9 10",
2486 );
2487 let chunk_l3 = StreamChunk::from_pretty(
2488 " I I
2489 - 1 2",
2490 );
2491 let chunk_l4 = StreamChunk::from_pretty(
2492 " I I
2493 - 1 3",
2494 );
2495 let (mut tx_r, mut tx_l, mut hash_join) =
2496 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2497
2498 tx_r.push_barrier(test_epoch(1), false);
2500 tx_l.push_barrier(test_epoch(1), false);
2501 hash_join.next_unwrap_ready_barrier()?;
2502
2503 tx_r.push_chunk(chunk_r1);
2505 let chunk = hash_join.next_unwrap_ready_chunk()?;
2506 assert_eq!(
2507 chunk,
2508 StreamChunk::from_pretty(
2509 " I I
2510 + 1 4
2511 + 2 5
2512 + 3 6",
2513 )
2514 );
2515
2516 tx_r.push_barrier(test_epoch(2), false);
2518 tx_l.push_barrier(test_epoch(2), false);
2519 hash_join.next_unwrap_ready_barrier()?;
2520
2521 tx_r.push_chunk(chunk_r2);
2523 let chunk = hash_join.next_unwrap_ready_chunk()?;
2524 assert_eq!(
2525 chunk,
2526 StreamChunk::from_pretty(
2527 " I I
2528 + 3 8 D
2529 - 3 8 D",
2530 )
2531 );
2532
2533 tx_l.push_chunk(chunk_l1);
2535 let chunk = hash_join.next_unwrap_ready_chunk()?;
2536 assert_eq!(
2537 chunk,
2538 StreamChunk::from_pretty(
2539 " I I
2540 - 2 5"
2541 )
2542 );
2543
2544 tx_l.push_chunk(chunk_l2);
2546 let chunk = hash_join.next_unwrap_ready_chunk()?;
2547 assert_eq!(
2548 chunk,
2549 StreamChunk::from_pretty(
2550 " I I
2551 - 3 6
2552 - 1 4"
2553 )
2554 );
2555
2556 tx_r.push_chunk(chunk_r3);
2558 let chunk = hash_join.next_unwrap_ready_chunk()?;
2559 assert_eq!(
2560 chunk,
2561 StreamChunk::from_pretty(
2562 " I I
2563 + 9 10"
2564 )
2565 );
2566
2567 tx_l.push_chunk(chunk_l3);
2570 hash_join.next_unwrap_pending();
2571
2572 tx_l.push_chunk(chunk_l4);
2575 let chunk = hash_join.next_unwrap_ready_chunk()?;
2576 assert_eq!(
2577 chunk,
2578 StreamChunk::from_pretty(
2579 " I I
2580 + 1 4"
2581 )
2582 );
2583
2584 Ok(())
2585 }
2586
2587 #[tokio::test]
2588 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2589 let chunk_l1 = StreamChunk::from_pretty(
2590 " I I
2591 + 1 4
2592 + 2 5
2593 + 3 6",
2594 );
2595 let chunk_l2 = StreamChunk::from_pretty(
2596 " I I
2597 + 6 8
2598 + 3 8",
2599 );
2600 let chunk_r1 = StreamChunk::from_pretty(
2601 " I I
2602 + 2 7
2603 + 4 8
2604 + 6 9",
2605 );
2606 let chunk_r2 = StreamChunk::from_pretty(
2607 " I I
2608 + 3 10
2609 + 6 11",
2610 );
2611 let (mut tx_l, mut tx_r, mut hash_join) =
2612 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2613
2614 tx_l.push_barrier(test_epoch(1), false);
2616 tx_r.push_barrier(test_epoch(1), false);
2617 hash_join.next_unwrap_ready_barrier()?;
2618
2619 tx_l.push_chunk(chunk_l1);
2621 hash_join.next_unwrap_pending();
2622
2623 tx_l.push_barrier(test_epoch(2), false);
2625
2626 tx_l.push_chunk(chunk_l2);
2628
2629 tx_r.push_chunk(chunk_r1);
2631
2632 let chunk = hash_join.next_unwrap_ready_chunk()?;
2634 assert_eq!(
2635 chunk,
2636 StreamChunk::from_pretty(
2637 " I I I I
2638 + 2 5 2 7"
2639 )
2640 );
2641
2642 tx_r.push_barrier(test_epoch(2), false);
2644
2645 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2647 assert!(matches!(
2648 hash_join.next_unwrap_ready_barrier()?,
2649 Barrier {
2650 epoch,
2651 mutation: None,
2652 ..
2653 } if epoch == expected_epoch
2654 ));
2655
2656 let chunk = hash_join.next_unwrap_ready_chunk()?;
2658 assert_eq!(
2659 chunk,
2660 StreamChunk::from_pretty(
2661 " I I I I
2662 + 6 8 6 9"
2663 )
2664 );
2665
2666 tx_r.push_chunk(chunk_r2);
2668 let chunk = hash_join.next_unwrap_ready_chunk()?;
2669 assert_eq!(
2670 chunk,
2671 StreamChunk::from_pretty(
2672 " I I I I
2673 + 3 6 3 10
2674 + 3 8 3 10
2675 + 6 8 6 11"
2676 )
2677 );
2678
2679 Ok(())
2680 }
2681
2682 #[tokio::test]
2683 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2684 let chunk_l1 = StreamChunk::from_pretty(
2685 " I I
2686 + 1 4
2687 + 2 .
2688 + 3 .",
2689 );
2690 let chunk_l2 = StreamChunk::from_pretty(
2691 " I I
2692 + 6 .
2693 + 3 8",
2694 );
2695 let chunk_r1 = StreamChunk::from_pretty(
2696 " I I
2697 + 2 7
2698 + 4 8
2699 + 6 9",
2700 );
2701 let chunk_r2 = StreamChunk::from_pretty(
2702 " I I
2703 + 3 10
2704 + 6 11",
2705 );
2706 let (mut tx_l, mut tx_r, mut hash_join) =
2707 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2708
2709 tx_l.push_barrier(test_epoch(1), false);
2711 tx_r.push_barrier(test_epoch(1), false);
2712 hash_join.next_unwrap_ready_barrier()?;
2713
2714 tx_l.push_chunk(chunk_l1);
2716 hash_join.next_unwrap_pending();
2717
2718 tx_l.push_barrier(test_epoch(2), false);
2720
2721 tx_l.push_chunk(chunk_l2);
2723
2724 tx_r.push_chunk(chunk_r1);
2726
2727 let chunk = hash_join.next_unwrap_ready_chunk()?;
2729 assert_eq!(
2730 chunk,
2731 StreamChunk::from_pretty(
2732 " I I I I
2733 + 2 . 2 7"
2734 )
2735 );
2736
2737 tx_r.push_barrier(test_epoch(2), false);
2739
2740 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2742 assert!(matches!(
2743 hash_join.next_unwrap_ready_barrier()?,
2744 Barrier {
2745 epoch,
2746 mutation: None,
2747 ..
2748 } if epoch == expected_epoch
2749 ));
2750
2751 let chunk = hash_join.next_unwrap_ready_chunk()?;
2753 assert_eq!(
2754 chunk,
2755 StreamChunk::from_pretty(
2756 " I I I I
2757 + 6 . 6 9"
2758 )
2759 );
2760
2761 tx_r.push_chunk(chunk_r2);
2763 let chunk = hash_join.next_unwrap_ready_chunk()?;
2764 assert_eq!(
2765 chunk,
2766 StreamChunk::from_pretty(
2767 " I I I I
2768 + 3 8 3 10
2769 + 3 . 3 10
2770 + 6 . 6 11"
2771 )
2772 );
2773
2774 Ok(())
2775 }
2776
2777 #[tokio::test]
2778 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2779 let chunk_l1 = StreamChunk::from_pretty(
2780 " I I
2781 + 1 4
2782 + 2 5
2783 + 3 6",
2784 );
2785 let chunk_l2 = StreamChunk::from_pretty(
2786 " I I
2787 + 3 8
2788 - 3 8",
2789 );
2790 let chunk_r1 = StreamChunk::from_pretty(
2791 " I I
2792 + 2 7
2793 + 4 8
2794 + 6 9",
2795 );
2796 let chunk_r2 = StreamChunk::from_pretty(
2797 " I I
2798 + 3 10
2799 + 6 11",
2800 );
2801 let (mut tx_l, mut tx_r, mut hash_join) =
2802 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2803
2804 tx_l.push_barrier(test_epoch(1), false);
2806 tx_r.push_barrier(test_epoch(1), false);
2807 hash_join.next_unwrap_ready_barrier()?;
2808
2809 tx_l.push_chunk(chunk_l1);
2811 let chunk = hash_join.next_unwrap_ready_chunk()?;
2812 assert_eq!(
2813 chunk,
2814 StreamChunk::from_pretty(
2815 " I I I I
2816 + 1 4 . .
2817 + 2 5 . .
2818 + 3 6 . ."
2819 )
2820 );
2821
2822 tx_l.push_chunk(chunk_l2);
2824 let chunk = hash_join.next_unwrap_ready_chunk()?;
2825 assert_eq!(
2826 chunk,
2827 StreamChunk::from_pretty(
2828 " I I I I
2829 + 3 8 . . D
2830 - 3 8 . . D"
2831 )
2832 );
2833
2834 tx_r.push_chunk(chunk_r1);
2836 let chunk = hash_join.next_unwrap_ready_chunk()?;
2837 assert_eq!(
2838 chunk,
2839 StreamChunk::from_pretty(
2840 " I I I I
2841 - 2 5 . .
2842 + 2 5 2 7"
2843 )
2844 );
2845
2846 tx_r.push_chunk(chunk_r2);
2848 let chunk = hash_join.next_unwrap_ready_chunk()?;
2849 assert_eq!(
2850 chunk,
2851 StreamChunk::from_pretty(
2852 " I I I I
2853 - 3 6 . .
2854 + 3 6 3 10"
2855 )
2856 );
2857
2858 Ok(())
2859 }
2860
2861 #[tokio::test]
2862 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2863 let chunk_l1 = StreamChunk::from_pretty(
2864 " I I
2865 + 1 4
2866 + 2 5
2867 + . 6",
2868 );
2869 let chunk_l2 = StreamChunk::from_pretty(
2870 " I I
2871 + . 8
2872 - . 8",
2873 );
2874 let chunk_r1 = StreamChunk::from_pretty(
2875 " I I
2876 + 2 7
2877 + 4 8
2878 + 6 9",
2879 );
2880 let chunk_r2 = StreamChunk::from_pretty(
2881 " I I
2882 + . 10
2883 + 6 11",
2884 );
2885 let (mut tx_l, mut tx_r, mut hash_join) =
2886 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2887
2888 tx_l.push_barrier(test_epoch(1), false);
2890 tx_r.push_barrier(test_epoch(1), false);
2891 hash_join.next_unwrap_ready_barrier()?;
2892
2893 tx_l.push_chunk(chunk_l1);
2895 let chunk = hash_join.next_unwrap_ready_chunk()?;
2896 assert_eq!(
2897 chunk,
2898 StreamChunk::from_pretty(
2899 " I I I I
2900 + 1 4 . .
2901 + 2 5 . .
2902 + . 6 . ."
2903 )
2904 );
2905
2906 tx_l.push_chunk(chunk_l2);
2908 let chunk = hash_join.next_unwrap_ready_chunk()?;
2909 assert_eq!(
2910 chunk,
2911 StreamChunk::from_pretty(
2912 " I I I I
2913 + . 8 . . D
2914 - . 8 . . D"
2915 )
2916 );
2917
2918 tx_r.push_chunk(chunk_r1);
2920 let chunk = hash_join.next_unwrap_ready_chunk()?;
2921 assert_eq!(
2922 chunk,
2923 StreamChunk::from_pretty(
2924 " I I I I
2925 - 2 5 . .
2926 + 2 5 2 7"
2927 )
2928 );
2929
2930 tx_r.push_chunk(chunk_r2);
2932 let chunk = hash_join.next_unwrap_ready_chunk()?;
2933 assert_eq!(
2934 chunk,
2935 StreamChunk::from_pretty(
2936 " I I I I
2937 - . 6 . .
2938 + . 6 . 10"
2939 )
2940 );
2941
2942 Ok(())
2943 }
2944
2945 #[tokio::test]
2946 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2947 let chunk_l1 = StreamChunk::from_pretty(
2948 " I I
2949 + 1 4
2950 + 2 5
2951 + 3 6",
2952 );
2953 let chunk_l2 = StreamChunk::from_pretty(
2954 " I I
2955 + 3 8
2956 - 3 8",
2957 );
2958 let chunk_r1 = StreamChunk::from_pretty(
2959 " I I
2960 + 2 7
2961 + 4 8
2962 + 6 9",
2963 );
2964 let chunk_r2 = StreamChunk::from_pretty(
2965 " I I
2966 + 5 10
2967 - 5 10",
2968 );
2969 let (mut tx_l, mut tx_r, mut hash_join) =
2970 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2971
2972 tx_l.push_barrier(test_epoch(1), false);
2974 tx_r.push_barrier(test_epoch(1), false);
2975 hash_join.next_unwrap_ready_barrier()?;
2976
2977 tx_l.push_chunk(chunk_l1);
2979 hash_join.next_unwrap_pending();
2980
2981 tx_l.push_chunk(chunk_l2);
2983 hash_join.next_unwrap_pending();
2984
2985 tx_r.push_chunk(chunk_r1);
2987 let chunk = hash_join.next_unwrap_ready_chunk()?;
2988 assert_eq!(
2989 chunk,
2990 StreamChunk::from_pretty(
2991 " I I I I
2992 + 2 5 2 7
2993 + . . 4 8
2994 + . . 6 9"
2995 )
2996 );
2997
2998 tx_r.push_chunk(chunk_r2);
3000 let chunk = hash_join.next_unwrap_ready_chunk()?;
3001 assert_eq!(
3002 chunk,
3003 StreamChunk::from_pretty(
3004 " I I I I
3005 + . . 5 10 D
3006 - . . 5 10 D"
3007 )
3008 );
3009
3010 Ok(())
3011 }
3012
3013 #[tokio::test]
3014 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3015 let chunk_l1 = StreamChunk::from_pretty(
3016 " I I I
3017 + 1 4 1
3018 + 2 5 2
3019 + 3 6 3",
3020 );
3021 let chunk_l2 = StreamChunk::from_pretty(
3022 " I I I
3023 + 4 9 4
3024 + 5 10 5",
3025 );
3026 let chunk_r1 = StreamChunk::from_pretty(
3027 " I I I
3028 + 2 5 1
3029 + 4 9 2
3030 + 6 9 3",
3031 );
3032 let chunk_r2 = StreamChunk::from_pretty(
3033 " I I I
3034 + 1 4 4
3035 + 3 6 5",
3036 );
3037
3038 let (mut tx_l, mut tx_r, mut hash_join) =
3039 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3040
3041 tx_l.push_barrier(test_epoch(1), false);
3043 tx_r.push_barrier(test_epoch(1), false);
3044 hash_join.next_unwrap_ready_barrier()?;
3045
3046 tx_l.push_chunk(chunk_l1);
3048 let chunk = hash_join.next_unwrap_ready_chunk()?;
3049 assert_eq!(
3050 chunk,
3051 StreamChunk::from_pretty(
3052 " I I I I I I
3053 + 1 4 1 . . .
3054 + 2 5 2 . . .
3055 + 3 6 3 . . ."
3056 )
3057 );
3058
3059 tx_l.push_chunk(chunk_l2);
3061 let chunk = hash_join.next_unwrap_ready_chunk()?;
3062 assert_eq!(
3063 chunk,
3064 StreamChunk::from_pretty(
3065 " I I I I I I
3066 + 4 9 4 . . .
3067 + 5 10 5 . . ."
3068 )
3069 );
3070
3071 tx_r.push_chunk(chunk_r1);
3073 let chunk = hash_join.next_unwrap_ready_chunk()?;
3074 assert_eq!(
3075 chunk,
3076 StreamChunk::from_pretty(
3077 " I I I I I I
3078 - 2 5 2 . . .
3079 + 2 5 2 2 5 1
3080 - 4 9 4 . . .
3081 + 4 9 4 4 9 2"
3082 )
3083 );
3084
3085 tx_r.push_chunk(chunk_r2);
3087 let chunk = hash_join.next_unwrap_ready_chunk()?;
3088 assert_eq!(
3089 chunk,
3090 StreamChunk::from_pretty(
3091 " I I I I I I
3092 - 1 4 1 . . .
3093 + 1 4 1 1 4 4
3094 - 3 6 3 . . .
3095 + 3 6 3 3 6 5"
3096 )
3097 );
3098
3099 Ok(())
3100 }
3101
3102 #[tokio::test]
3103 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3104 let chunk_l1 = StreamChunk::from_pretty(
3105 " I I I
3106 + 1 4 1
3107 + 2 5 2
3108 + 3 6 3",
3109 );
3110 let chunk_l2 = StreamChunk::from_pretty(
3111 " I I I
3112 + 4 9 4
3113 + 5 10 5",
3114 );
3115 let chunk_r1 = StreamChunk::from_pretty(
3116 " I I I
3117 + 2 5 1
3118 + 4 9 2
3119 + 6 9 3",
3120 );
3121 let chunk_r2 = StreamChunk::from_pretty(
3122 " I I I
3123 + 1 4 4
3124 + 3 6 5
3125 + 7 7 6",
3126 );
3127
3128 let (mut tx_l, mut tx_r, mut hash_join) =
3129 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3130
3131 tx_l.push_barrier(test_epoch(1), false);
3133 tx_r.push_barrier(test_epoch(1), false);
3134 hash_join.next_unwrap_ready_barrier()?;
3135
3136 tx_l.push_chunk(chunk_l1);
3138 hash_join.next_unwrap_pending();
3139
3140 tx_l.push_chunk(chunk_l2);
3142 hash_join.next_unwrap_pending();
3143
3144 tx_r.push_chunk(chunk_r1);
3146 let chunk = hash_join.next_unwrap_ready_chunk()?;
3147 assert_eq!(
3148 chunk,
3149 StreamChunk::from_pretty(
3150 " I I I I I I
3151 + 2 5 2 2 5 1
3152 + 4 9 4 4 9 2
3153 + . . . 6 9 3"
3154 )
3155 );
3156
3157 tx_r.push_chunk(chunk_r2);
3159 let chunk = hash_join.next_unwrap_ready_chunk()?;
3160 assert_eq!(
3161 chunk,
3162 StreamChunk::from_pretty(
3163 " I I I I I I
3164 + 1 4 1 1 4 4
3165 + 3 6 3 3 6 5
3166 + . . . 7 7 6"
3167 )
3168 );
3169
3170 Ok(())
3171 }
3172
3173 #[tokio::test]
3174 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3175 let chunk_l1 = StreamChunk::from_pretty(
3176 " I I
3177 + 1 4
3178 + 2 5
3179 + 3 6",
3180 );
3181 let chunk_l2 = StreamChunk::from_pretty(
3182 " I I
3183 + 3 8
3184 - 3 8",
3185 );
3186 let chunk_r1 = StreamChunk::from_pretty(
3187 " I I
3188 + 2 7
3189 + 4 8
3190 + 6 9",
3191 );
3192 let chunk_r2 = StreamChunk::from_pretty(
3193 " I I
3194 + 5 10
3195 - 5 10",
3196 );
3197 let (mut tx_l, mut tx_r, mut hash_join) =
3198 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3199
3200 tx_l.push_barrier(test_epoch(1), false);
3202 tx_r.push_barrier(test_epoch(1), false);
3203 hash_join.next_unwrap_ready_barrier()?;
3204
3205 tx_l.push_chunk(chunk_l1);
3207 let chunk = hash_join.next_unwrap_ready_chunk()?;
3208 assert_eq!(
3209 chunk,
3210 StreamChunk::from_pretty(
3211 " I I I I
3212 + 1 4 . .
3213 + 2 5 . .
3214 + 3 6 . ."
3215 )
3216 );
3217
3218 tx_l.push_chunk(chunk_l2);
3220 let chunk = hash_join.next_unwrap_ready_chunk()?;
3221 assert_eq!(
3222 chunk,
3223 StreamChunk::from_pretty(
3224 " I I I I
3225 + 3 8 . . D
3226 - 3 8 . . D"
3227 )
3228 );
3229
3230 tx_r.push_chunk(chunk_r1);
3232 let chunk = hash_join.next_unwrap_ready_chunk()?;
3233 assert_eq!(
3234 chunk,
3235 StreamChunk::from_pretty(
3236 " I I I I
3237 - 2 5 . .
3238 + 2 5 2 7
3239 + . . 4 8
3240 + . . 6 9"
3241 )
3242 );
3243
3244 tx_r.push_chunk(chunk_r2);
3246 let chunk = hash_join.next_unwrap_ready_chunk()?;
3247 assert_eq!(
3248 chunk,
3249 StreamChunk::from_pretty(
3250 " I I I I
3251 + . . 5 10 D
3252 - . . 5 10 D"
3253 )
3254 );
3255
3256 Ok(())
3257 }
3258
3259 #[tokio::test]
3260 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3261 let (mut tx_l, mut tx_r, mut hash_join) =
3262 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3263
3264 tx_l.push_barrier(test_epoch(1), false);
3266 tx_r.push_barrier(test_epoch(1), false);
3267 hash_join.next_unwrap_ready_barrier()?;
3268
3269 tx_l.push_chunk(StreamChunk::from_pretty(
3270 " I I
3271 + 1 1
3272 ",
3273 ));
3274 let chunk = hash_join.next_unwrap_ready_chunk()?;
3275 assert_eq!(
3276 chunk,
3277 StreamChunk::from_pretty(
3278 " I I I I
3279 + 1 1 . ."
3280 )
3281 );
3282
3283 tx_r.push_chunk(StreamChunk::from_pretty(
3284 " I I
3285 + 1 1
3286 ",
3287 ));
3288 let chunk = hash_join.next_unwrap_ready_chunk()?;
3289
3290 assert_eq!(
3291 chunk,
3292 StreamChunk::from_pretty(
3293 " I I I I
3294 - 1 1 . .
3295 + 1 1 1 1"
3296 )
3297 );
3298
3299 tx_l.push_chunk(StreamChunk::from_pretty(
3300 " I I
3301 - 1 1
3302 + 1 2
3303 ",
3304 ));
3305 let chunk = hash_join.next_unwrap_ready_chunk()?;
3306 let chunk = chunk.compact();
3307 assert_eq!(
3308 chunk,
3309 StreamChunk::from_pretty(
3310 " I I I I
3311 - 1 1 1 1
3312 + 1 2 1 1
3313 "
3314 )
3315 );
3316
3317 Ok(())
3318 }
3319
3320 #[tokio::test]
3321 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3322 {
3323 let chunk_l1 = StreamChunk::from_pretty(
3324 " I I
3325 + 1 4
3326 + 2 5
3327 + 3 6
3328 + 3 7",
3329 );
3330 let chunk_l2 = StreamChunk::from_pretty(
3331 " I I
3332 + 3 8
3333 - 3 8
3334 - 1 4", );
3336 let chunk_r1 = StreamChunk::from_pretty(
3337 " I I
3338 + 2 6
3339 + 4 8
3340 + 3 4",
3341 );
3342 let chunk_r2 = StreamChunk::from_pretty(
3343 " I I
3344 + 5 10
3345 - 5 10
3346 + 1 2",
3347 );
3348 let (mut tx_l, mut tx_r, mut hash_join) =
3349 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3350
3351 tx_l.push_barrier(test_epoch(1), false);
3353 tx_r.push_barrier(test_epoch(1), false);
3354 hash_join.next_unwrap_ready_barrier()?;
3355
3356 tx_l.push_chunk(chunk_l1);
3358 let chunk = hash_join.next_unwrap_ready_chunk()?;
3359 assert_eq!(
3360 chunk,
3361 StreamChunk::from_pretty(
3362 " I I I I
3363 + 1 4 . .
3364 + 2 5 . .
3365 + 3 6 . .
3366 + 3 7 . ."
3367 )
3368 );
3369
3370 tx_l.push_chunk(chunk_l2);
3372 let chunk = hash_join.next_unwrap_ready_chunk()?;
3373 assert_eq!(
3374 chunk,
3375 StreamChunk::from_pretty(
3376 " I I I I
3377 + 3 8 . . D
3378 - 3 8 . . D
3379 - 1 4 . ."
3380 )
3381 );
3382
3383 tx_r.push_chunk(chunk_r1);
3385 let chunk = hash_join.next_unwrap_ready_chunk()?;
3386 assert_eq!(
3387 chunk,
3388 StreamChunk::from_pretty(
3389 " I I I I
3390 - 2 5 . .
3391 + 2 5 2 6
3392 + . . 4 8
3393 + . . 3 4" )
3397 );
3398
3399 tx_r.push_chunk(chunk_r2);
3401 let chunk = hash_join.next_unwrap_ready_chunk()?;
3402 assert_eq!(
3403 chunk,
3404 StreamChunk::from_pretty(
3405 " I I I I
3406 + . . 5 10 D
3407 - . . 5 10 D
3408 + . . 1 2" )
3411 );
3412
3413 Ok(())
3414 }
3415
3416 #[tokio::test]
3417 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3418 let chunk_l1 = StreamChunk::from_pretty(
3419 " I I
3420 + 1 4
3421 + 2 10
3422 + 3 6",
3423 );
3424 let chunk_l2 = StreamChunk::from_pretty(
3425 " I I
3426 + 3 8
3427 - 3 8",
3428 );
3429 let chunk_r1 = StreamChunk::from_pretty(
3430 " I I
3431 + 2 7
3432 + 4 8
3433 + 6 9",
3434 );
3435 let chunk_r2 = StreamChunk::from_pretty(
3436 " I I
3437 + 3 10
3438 + 6 11",
3439 );
3440 let (mut tx_l, mut tx_r, mut hash_join) =
3441 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3442
3443 tx_l.push_barrier(test_epoch(1), false);
3445 tx_r.push_barrier(test_epoch(1), false);
3446 hash_join.next_unwrap_ready_barrier()?;
3447
3448 tx_l.push_chunk(chunk_l1);
3450 hash_join.next_unwrap_pending();
3451
3452 tx_l.push_chunk(chunk_l2);
3454 hash_join.next_unwrap_pending();
3455
3456 tx_r.push_chunk(chunk_r1);
3458 hash_join.next_unwrap_pending();
3459
3460 tx_r.push_chunk(chunk_r2);
3462 let chunk = hash_join.next_unwrap_ready_chunk()?;
3463 assert_eq!(
3464 chunk,
3465 StreamChunk::from_pretty(
3466 " I I I I
3467 + 3 6 3 10"
3468 )
3469 );
3470
3471 Ok(())
3472 }
3473
3474 #[tokio::test]
3475 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3476 let (mut tx_l, mut tx_r, mut hash_join) =
3477 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3478
3479 tx_l.push_barrier(test_epoch(1), false);
3481 tx_r.push_barrier(test_epoch(1), false);
3482 hash_join.next_unwrap_ready_barrier()?;
3483
3484 tx_l.push_int64_watermark(0, 100);
3485
3486 tx_l.push_int64_watermark(0, 200);
3487
3488 tx_l.push_barrier(test_epoch(2), false);
3489 tx_r.push_barrier(test_epoch(2), false);
3490 hash_join.next_unwrap_ready_barrier()?;
3491
3492 tx_r.push_int64_watermark(0, 50);
3493
3494 let w1 = hash_join.next().await.unwrap().unwrap();
3495 let w1 = w1.as_watermark().unwrap();
3496
3497 let w2 = hash_join.next().await.unwrap().unwrap();
3498 let w2 = w2.as_watermark().unwrap();
3499
3500 tx_r.push_int64_watermark(0, 100);
3501
3502 let w3 = hash_join.next().await.unwrap().unwrap();
3503 let w3 = w3.as_watermark().unwrap();
3504
3505 let w4 = hash_join.next().await.unwrap().unwrap();
3506 let w4 = w4.as_watermark().unwrap();
3507
3508 assert_eq!(
3509 w1,
3510 &Watermark {
3511 col_idx: 2,
3512 data_type: DataType::Int64,
3513 val: ScalarImpl::Int64(50)
3514 }
3515 );
3516
3517 assert_eq!(
3518 w2,
3519 &Watermark {
3520 col_idx: 0,
3521 data_type: DataType::Int64,
3522 val: ScalarImpl::Int64(50)
3523 }
3524 );
3525
3526 assert_eq!(
3527 w3,
3528 &Watermark {
3529 col_idx: 2,
3530 data_type: DataType::Int64,
3531 val: ScalarImpl::Int64(100)
3532 }
3533 );
3534
3535 assert_eq!(
3536 w4,
3537 &Watermark {
3538 col_idx: 0,
3539 data_type: DataType::Int64,
3540 val: ScalarImpl::Int64(100)
3541 }
3542 );
3543
3544 Ok(())
3545 }
3546}