1use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use itertools::Itertools;
21use multimap::MultiMap;
22use risingwave_common::array::Op;
23use risingwave_common::hash::{HashKey, NullBitmap};
24use risingwave_common::row::RowExt;
25use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_expr::ExprError;
29use risingwave_expr::expr::NonStrictExpression;
30use tokio::time::Instant;
31
32use self::builder::JoinChunkBuilder;
33use super::barrier_align::*;
34use super::join::hash_join::*;
35use super::join::row::{JoinEncoding, JoinRow};
36use super::join::*;
37use super::watermark::*;
38use crate::executor::CachedJoinRow;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::join::hash_join::CacheResult;
41use crate::executor::prelude::*;
42
43const EVICT_EVERY_N_ROWS: u32 = 16;
45
46fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
47 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
48}
49
50pub struct JoinParams {
51 pub join_key_indices: Vec<usize>,
53 pub deduped_pk_indices: Vec<usize>,
55}
56
57impl JoinParams {
58 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
59 Self {
60 join_key_indices,
61 deduped_pk_indices,
62 }
63 }
64}
65
66struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
67 ht: JoinHashMap<K, S, E>,
69 join_key_indices: Vec<usize>,
71 all_data_types: Vec<DataType>,
73 start_pos: usize,
75 i2o_mapping: Vec<(usize, usize)>,
77 i2o_mapping_indexed: MultiMap<usize, usize>,
78 input2inequality_index: Vec<Vec<(usize, bool)>>,
86 non_null_fields: Vec<usize>,
88 state_clean_columns: Vec<(usize, usize)>,
91 need_degree_table: bool,
93 _marker: std::marker::PhantomData<E>,
94}
95
96impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.debug_struct("JoinSide")
99 .field("join_key_indices", &self.join_key_indices)
100 .field("col_types", &self.all_data_types)
101 .field("start_pos", &self.start_pos)
102 .field("i2o_mapping", &self.i2o_mapping)
103 .field("need_degree_table", &self.need_degree_table)
104 .finish()
105 }
106}
107
108impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
109 fn is_dirty(&self) -> bool {
111 unimplemented!()
112 }
113
114 #[expect(dead_code)]
115 fn clear_cache(&mut self) {
116 assert!(
117 !self.is_dirty(),
118 "cannot clear cache while states of hash join are dirty"
119 );
120
121 }
124
125 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
126 self.ht.init(epoch).await
127 }
128}
129
130pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
133{
134 ctx: ActorContextRef,
135 info: ExecutorInfo,
136
137 input_l: Option<Executor>,
139 input_r: Option<Executor>,
141 actual_output_data_types: Vec<DataType>,
143 side_l: JoinSide<K, S, E>,
145 side_r: JoinSide<K, S, E>,
147 cond: Option<NonStrictExpression>,
149 inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
151 inequality_watermarks: Vec<Option<Watermark>>,
155
156 append_only_optimize: bool,
158
159 metrics: Arc<StreamingMetrics>,
160 chunk_size: usize,
162 cnt_rows_received: u32,
164
165 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
167
168 high_join_amplification_threshold: usize,
170
171 entry_state_max_rows: usize,
173}
174
175impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
176 for HashJoinExecutor<K, S, T, E>
177{
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 f.debug_struct("HashJoinExecutor")
180 .field("join_type", &T)
181 .field("input_left", &self.input_l.as_ref().unwrap().identity())
182 .field("input_right", &self.input_r.as_ref().unwrap().identity())
183 .field("side_l", &self.side_l)
184 .field("side_r", &self.side_r)
185 .field("pk_indices", &self.info.pk_indices)
186 .field("schema", &self.info.schema)
187 .field("actual_output_data_types", &self.actual_output_data_types)
188 .finish()
189 }
190}
191
192impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
193 for HashJoinExecutor<K, S, T, E>
194{
195 fn execute(self: Box<Self>) -> BoxedMessageStream {
196 self.into_stream().boxed()
197 }
198}
199
200struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
201 ctx: &'a ActorContextRef,
202 side_l: &'a mut JoinSide<K, S, E>,
203 side_r: &'a mut JoinSide<K, S, E>,
204 actual_output_data_types: &'a [DataType],
205 cond: &'a mut Option<NonStrictExpression>,
206 inequality_watermarks: &'a [Option<Watermark>],
207 chunk: StreamChunk,
208 append_only_optimize: bool,
209 chunk_size: usize,
210 cnt_rows_received: &'a mut u32,
211 high_join_amplification_threshold: usize,
212 entry_state_max_rows: usize,
213}
214
215impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
216 HashJoinExecutor<K, S, T, E>
217{
218 #[allow(clippy::too_many_arguments)]
219 pub fn new(
220 ctx: ActorContextRef,
221 info: ExecutorInfo,
222 input_l: Executor,
223 input_r: Executor,
224 params_l: JoinParams,
225 params_r: JoinParams,
226 null_safe: Vec<bool>,
227 output_indices: Vec<usize>,
228 cond: Option<NonStrictExpression>,
229 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
230 state_table_l: StateTable<S>,
231 degree_state_table_l: StateTable<S>,
232 state_table_r: StateTable<S>,
233 degree_state_table_r: StateTable<S>,
234 watermark_epoch: AtomicU64Ref,
235 is_append_only: bool,
236 metrics: Arc<StreamingMetrics>,
237 chunk_size: usize,
238 high_join_amplification_threshold: usize,
239 ) -> Self {
240 Self::new_with_cache_size(
241 ctx,
242 info,
243 input_l,
244 input_r,
245 params_l,
246 params_r,
247 null_safe,
248 output_indices,
249 cond,
250 inequality_pairs,
251 state_table_l,
252 degree_state_table_l,
253 state_table_r,
254 degree_state_table_r,
255 watermark_epoch,
256 is_append_only,
257 metrics,
258 chunk_size,
259 high_join_amplification_threshold,
260 None,
261 )
262 }
263
264 #[allow(clippy::too_many_arguments)]
265 pub fn new_with_cache_size(
266 ctx: ActorContextRef,
267 info: ExecutorInfo,
268 input_l: Executor,
269 input_r: Executor,
270 params_l: JoinParams,
271 params_r: JoinParams,
272 null_safe: Vec<bool>,
273 output_indices: Vec<usize>,
274 cond: Option<NonStrictExpression>,
275 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
276 state_table_l: StateTable<S>,
277 degree_state_table_l: StateTable<S>,
278 state_table_r: StateTable<S>,
279 degree_state_table_r: StateTable<S>,
280 watermark_epoch: AtomicU64Ref,
281 is_append_only: bool,
282 metrics: Arc<StreamingMetrics>,
283 chunk_size: usize,
284 high_join_amplification_threshold: usize,
285 entry_state_max_rows: Option<usize>,
286 ) -> Self {
287 let entry_state_max_rows = match entry_state_max_rows {
288 None => {
289 ctx.streaming_config
290 .developer
291 .hash_join_entry_state_max_rows
292 }
293 Some(entry_state_max_rows) => entry_state_max_rows,
294 };
295 let side_l_column_n = input_l.schema().len();
296
297 let schema_fields = match T {
298 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
299 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
300 _ => [
301 input_l.schema().fields.clone(),
302 input_r.schema().fields.clone(),
303 ]
304 .concat(),
305 };
306
307 let original_output_data_types = schema_fields
308 .iter()
309 .map(|field| field.data_type())
310 .collect_vec();
311 let actual_output_data_types = output_indices
312 .iter()
313 .map(|&idx| original_output_data_types[idx].clone())
314 .collect_vec();
315
316 let state_all_data_types_l = input_l.schema().data_types();
318 let state_all_data_types_r = input_r.schema().data_types();
319
320 let state_pk_indices_l = input_l.pk_indices().to_vec();
321 let state_pk_indices_r = input_r.pk_indices().to_vec();
322
323 let state_join_key_indices_l = params_l.join_key_indices;
324 let state_join_key_indices_r = params_r.join_key_indices;
325
326 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
327 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
328
329 let degree_pk_indices_l = (state_join_key_indices_l.len()
330 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
331 .collect_vec();
332 let degree_pk_indices_r = (state_join_key_indices_r.len()
333 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
334 .collect_vec();
335
336 let pk_contained_in_jk_l =
338 is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
339 let pk_contained_in_jk_r =
340 is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
341
342 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
344
345 let join_key_data_types_l = state_join_key_indices_l
346 .iter()
347 .map(|idx| state_all_data_types_l[*idx].clone())
348 .collect_vec();
349
350 let join_key_data_types_r = state_join_key_indices_r
351 .iter()
352 .map(|idx| state_all_data_types_r[*idx].clone())
353 .collect_vec();
354
355 assert_eq!(join_key_data_types_l, join_key_data_types_r);
356
357 let null_matched = K::Bitmap::from_bool_vec(null_safe);
358
359 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
360 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
361
362 let degree_state_l = need_degree_table_l.then(|| {
363 TableInner::new(
364 degree_pk_indices_l,
365 degree_join_key_indices_l,
366 degree_state_table_l,
367 )
368 });
369 let degree_state_r = need_degree_table_r.then(|| {
370 TableInner::new(
371 degree_pk_indices_r,
372 degree_join_key_indices_r,
373 degree_state_table_r,
374 )
375 });
376
377 let (left_to_output, right_to_output) = {
378 let (left_len, right_len) = if is_left_semi_or_anti(T) {
379 (state_all_data_types_l.len(), 0usize)
380 } else if is_right_semi_or_anti(T) {
381 (0usize, state_all_data_types_r.len())
382 } else {
383 (state_all_data_types_l.len(), state_all_data_types_r.len())
384 };
385 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
386 };
387
388 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
389 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
390
391 let left_input_len = input_l.schema().len();
392 let right_input_len = input_r.schema().len();
393 let mut l2inequality_index = vec![vec![]; left_input_len];
394 let mut r2inequality_index = vec![vec![]; right_input_len];
395 let mut l_state_clean_columns = vec![];
396 let mut r_state_clean_columns = vec![];
397 let inequality_pairs = inequality_pairs
398 .into_iter()
399 .enumerate()
400 .map(
401 |(
402 index,
403 (key_required_larger, key_required_smaller, clean_state, delta_expression),
404 )| {
405 let output_indices = if key_required_larger < key_required_smaller {
406 if clean_state {
407 l_state_clean_columns.push((key_required_larger, index));
408 }
409 l2inequality_index[key_required_larger].push((index, false));
410 r2inequality_index[key_required_smaller - left_input_len]
411 .push((index, true));
412 l2o_indexed
413 .get_vec(&key_required_larger)
414 .cloned()
415 .unwrap_or_default()
416 } else {
417 if clean_state {
418 r_state_clean_columns
419 .push((key_required_larger - left_input_len, index));
420 }
421 l2inequality_index[key_required_smaller].push((index, true));
422 r2inequality_index[key_required_larger - left_input_len]
423 .push((index, false));
424 r2o_indexed
425 .get_vec(&(key_required_larger - left_input_len))
426 .cloned()
427 .unwrap_or_default()
428 };
429 (output_indices, delta_expression)
430 },
431 )
432 .collect_vec();
433
434 let mut l_non_null_fields = l2inequality_index
435 .iter()
436 .positions(|inequalities| !inequalities.is_empty())
437 .collect_vec();
438 let mut r_non_null_fields = r2inequality_index
439 .iter()
440 .positions(|inequalities| !inequalities.is_empty())
441 .collect_vec();
442
443 if append_only_optimize {
444 l_state_clean_columns.clear();
445 r_state_clean_columns.clear();
446 l_non_null_fields.clear();
447 r_non_null_fields.clear();
448 }
449
450 let inequality_watermarks = vec![None; inequality_pairs.len()];
451 let watermark_buffers = BTreeMap::new();
452
453 Self {
454 ctx: ctx.clone(),
455 info,
456 input_l: Some(input_l),
457 input_r: Some(input_r),
458 actual_output_data_types,
459 side_l: JoinSide {
460 ht: JoinHashMap::new(
461 watermark_epoch.clone(),
462 join_key_data_types_l,
463 state_join_key_indices_l.clone(),
464 state_all_data_types_l.clone(),
465 state_table_l,
466 params_l.deduped_pk_indices,
467 degree_state_l,
468 null_matched.clone(),
469 pk_contained_in_jk_l,
470 None,
471 metrics.clone(),
472 ctx.id,
473 ctx.fragment_id,
474 "left",
475 ),
476 join_key_indices: state_join_key_indices_l,
477 all_data_types: state_all_data_types_l,
478 i2o_mapping: left_to_output,
479 i2o_mapping_indexed: l2o_indexed,
480 input2inequality_index: l2inequality_index,
481 non_null_fields: l_non_null_fields,
482 state_clean_columns: l_state_clean_columns,
483 start_pos: 0,
484 need_degree_table: need_degree_table_l,
485 _marker: PhantomData,
486 },
487 side_r: JoinSide {
488 ht: JoinHashMap::new(
489 watermark_epoch,
490 join_key_data_types_r,
491 state_join_key_indices_r.clone(),
492 state_all_data_types_r.clone(),
493 state_table_r,
494 params_r.deduped_pk_indices,
495 degree_state_r,
496 null_matched,
497 pk_contained_in_jk_r,
498 None,
499 metrics.clone(),
500 ctx.id,
501 ctx.fragment_id,
502 "right",
503 ),
504 join_key_indices: state_join_key_indices_r,
505 all_data_types: state_all_data_types_r,
506 start_pos: side_l_column_n,
507 i2o_mapping: right_to_output,
508 i2o_mapping_indexed: r2o_indexed,
509 input2inequality_index: r2inequality_index,
510 non_null_fields: r_non_null_fields,
511 state_clean_columns: r_state_clean_columns,
512 need_degree_table: need_degree_table_r,
513 _marker: PhantomData,
514 },
515 cond,
516 inequality_pairs,
517 inequality_watermarks,
518 append_only_optimize,
519 metrics,
520 chunk_size,
521 cnt_rows_received: 0,
522 watermark_buffers,
523 high_join_amplification_threshold,
524 entry_state_max_rows,
525 }
526 }
527
528 #[try_stream(ok = Message, error = StreamExecutorError)]
529 async fn into_stream(mut self) {
530 let input_l = self.input_l.take().unwrap();
531 let input_r = self.input_r.take().unwrap();
532 let aligned_stream = barrier_align(
533 input_l.execute(),
534 input_r.execute(),
535 self.ctx.id,
536 self.ctx.fragment_id,
537 self.metrics.clone(),
538 "Join",
539 );
540 pin_mut!(aligned_stream);
541
542 let actor_id = self.ctx.id;
543
544 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
545 let first_epoch = barrier.epoch;
546 yield Message::Barrier(barrier);
548 self.side_l.init(first_epoch).await?;
549 self.side_r.init(first_epoch).await?;
550
551 let actor_id_str = self.ctx.id.to_string();
552 let fragment_id_str = self.ctx.fragment_id.to_string();
553
554 let join_actor_input_waiting_duration_ns = self
556 .metrics
557 .join_actor_input_waiting_duration_ns
558 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
559 let left_join_match_duration_ns = self
560 .metrics
561 .join_match_duration_ns
562 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
563 let right_join_match_duration_ns = self
564 .metrics
565 .join_match_duration_ns
566 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
567
568 let barrier_join_match_duration_ns = self
569 .metrics
570 .join_match_duration_ns
571 .with_guarded_label_values(&[
572 actor_id_str.as_str(),
573 fragment_id_str.as_str(),
574 "barrier",
575 ]);
576
577 let left_join_cached_entry_count = self
578 .metrics
579 .join_cached_entry_count
580 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
581
582 let right_join_cached_entry_count = self
583 .metrics
584 .join_cached_entry_count
585 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
586
587 let mut start_time = Instant::now();
588
589 while let Some(msg) = aligned_stream
590 .next()
591 .instrument_await("hash_join_barrier_align")
592 .await
593 {
594 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
595 match msg? {
596 AlignedMessage::WatermarkLeft(watermark) => {
597 for watermark_to_emit in
598 self.handle_watermark(SideType::Left, watermark).await?
599 {
600 yield Message::Watermark(watermark_to_emit);
601 }
602 }
603 AlignedMessage::WatermarkRight(watermark) => {
604 for watermark_to_emit in
605 self.handle_watermark(SideType::Right, watermark).await?
606 {
607 yield Message::Watermark(watermark_to_emit);
608 }
609 }
610 AlignedMessage::Left(chunk) => {
611 let mut left_time = Duration::from_nanos(0);
612 let mut left_start_time = Instant::now();
613 #[for_await]
614 for chunk in Self::eq_join_left(EqJoinArgs {
615 ctx: &self.ctx,
616 side_l: &mut self.side_l,
617 side_r: &mut self.side_r,
618 actual_output_data_types: &self.actual_output_data_types,
619 cond: &mut self.cond,
620 inequality_watermarks: &self.inequality_watermarks,
621 chunk,
622 append_only_optimize: self.append_only_optimize,
623 chunk_size: self.chunk_size,
624 cnt_rows_received: &mut self.cnt_rows_received,
625 high_join_amplification_threshold: self.high_join_amplification_threshold,
626 entry_state_max_rows: self.entry_state_max_rows,
627 }) {
628 left_time += left_start_time.elapsed();
629 yield Message::Chunk(chunk?);
630 left_start_time = Instant::now();
631 }
632 left_time += left_start_time.elapsed();
633 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
634 self.try_flush_data().await?;
635 }
636 AlignedMessage::Right(chunk) => {
637 let mut right_time = Duration::from_nanos(0);
638 let mut right_start_time = Instant::now();
639 #[for_await]
640 for chunk in Self::eq_join_right(EqJoinArgs {
641 ctx: &self.ctx,
642 side_l: &mut self.side_l,
643 side_r: &mut self.side_r,
644 actual_output_data_types: &self.actual_output_data_types,
645 cond: &mut self.cond,
646 inequality_watermarks: &self.inequality_watermarks,
647 chunk,
648 append_only_optimize: self.append_only_optimize,
649 chunk_size: self.chunk_size,
650 cnt_rows_received: &mut self.cnt_rows_received,
651 high_join_amplification_threshold: self.high_join_amplification_threshold,
652 entry_state_max_rows: self.entry_state_max_rows,
653 }) {
654 right_time += right_start_time.elapsed();
655 yield Message::Chunk(chunk?);
656 right_start_time = Instant::now();
657 }
658 right_time += right_start_time.elapsed();
659 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
660 self.try_flush_data().await?;
661 }
662 AlignedMessage::Barrier(barrier) => {
663 let barrier_start_time = Instant::now();
664 let (left_post_commit, right_post_commit) =
665 self.flush_data(barrier.epoch).await?;
666
667 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
668
669 barrier_join_match_duration_ns
672 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
673 yield Message::Barrier(barrier);
674
675 right_post_commit
677 .post_yield_barrier(update_vnode_bitmap.clone())
678 .await?;
679 if left_post_commit
680 .post_yield_barrier(update_vnode_bitmap)
681 .await?
682 .unwrap_or(false)
683 {
684 self.watermark_buffers
685 .values_mut()
686 .for_each(|buffers| buffers.clear());
687 self.inequality_watermarks.fill(None);
688 }
689
690 for (join_cached_entry_count, ht) in [
692 (&left_join_cached_entry_count, &self.side_l.ht),
693 (&right_join_cached_entry_count, &self.side_r.ht),
694 ] {
695 join_cached_entry_count.set(ht.entry_count() as i64);
696 }
697 }
698 }
699 start_time = Instant::now();
700 }
701 }
702
703 async fn flush_data(
704 &mut self,
705 epoch: EpochPair,
706 ) -> StreamExecutorResult<(
707 JoinHashMapPostCommit<'_, K, S, E>,
708 JoinHashMapPostCommit<'_, K, S, E>,
709 )> {
710 let left = self.side_l.ht.flush(epoch).await?;
713 let right = self.side_r.ht.flush(epoch).await?;
714 Ok((left, right))
715 }
716
717 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
718 self.side_l.ht.try_flush().await?;
721 self.side_r.ht.try_flush().await?;
722 Ok(())
723 }
724
725 fn evict_cache(
727 side_update: &mut JoinSide<K, S, E>,
728 side_match: &mut JoinSide<K, S, E>,
729 cnt_rows_received: &mut u32,
730 ) {
731 *cnt_rows_received += 1;
732 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
733 side_update.ht.evict();
734 side_match.ht.evict();
735 *cnt_rows_received = 0;
736 }
737 }
738
739 async fn handle_watermark(
740 &mut self,
741 side: SideTypePrimitive,
742 watermark: Watermark,
743 ) -> StreamExecutorResult<Vec<Watermark>> {
744 let (side_update, side_match) = if side == SideType::Left {
745 (&mut self.side_l, &mut self.side_r)
746 } else {
747 (&mut self.side_r, &mut self.side_l)
748 };
749
750 if side_update.join_key_indices[0] == watermark.col_idx {
752 side_match.ht.update_watermark(watermark.val.clone());
753 }
754
755 let wm_in_jk = side_update
757 .join_key_indices
758 .iter()
759 .positions(|idx| *idx == watermark.col_idx);
760 let mut watermarks_to_emit = vec![];
761 for idx in wm_in_jk {
762 let buffers = self
763 .watermark_buffers
764 .entry(idx)
765 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
766 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
767 let empty_indices = vec![];
768 let output_indices = side_update
769 .i2o_mapping_indexed
770 .get_vec(&side_update.join_key_indices[idx])
771 .unwrap_or(&empty_indices)
772 .iter()
773 .chain(
774 side_match
775 .i2o_mapping_indexed
776 .get_vec(&side_match.join_key_indices[idx])
777 .unwrap_or(&empty_indices),
778 );
779 for output_idx in output_indices {
780 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
781 }
782 };
783 }
784 for (inequality_index, need_offset) in
785 &side_update.input2inequality_index[watermark.col_idx]
786 {
787 let buffers = self
788 .watermark_buffers
789 .entry(side_update.join_key_indices.len() + inequality_index)
790 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
791 let mut input_watermark = watermark.clone();
792 if *need_offset
793 && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
794 {
795 #[allow(clippy::disallowed_methods)]
797 let eval_result = delta_expression
798 .inner()
799 .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
800 .await;
801 match eval_result {
802 Ok(value) => input_watermark.val = value.unwrap(),
803 Err(err) => {
804 if !matches!(err, ExprError::NumericOutOfRange) {
805 self.ctx.on_compute_error(err, &self.info.identity);
806 }
807 continue;
808 }
809 }
810 };
811 if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
812 for output_idx in &self.inequality_pairs[*inequality_index].0 {
813 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
814 }
815 self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
816 }
817 }
818 Ok(watermarks_to_emit)
819 }
820
821 fn row_concat(
822 row_update: impl Row,
823 update_start_pos: usize,
824 row_matched: impl Row,
825 matched_start_pos: usize,
826 ) -> OwnedRow {
827 let mut new_row = vec![None; row_update.len() + row_matched.len()];
828
829 for (i, datum_ref) in row_update.iter().enumerate() {
830 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
831 }
832 for (i, datum_ref) in row_matched.iter().enumerate() {
833 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
834 }
835 OwnedRow::new(new_row)
836 }
837
838 fn eq_join_left(
840 args: EqJoinArgs<'_, K, S, E>,
841 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
842 Self::eq_join_oneside::<{ SideType::Left }>(args)
843 }
844
845 fn eq_join_right(
847 args: EqJoinArgs<'_, K, S, E>,
848 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
849 Self::eq_join_oneside::<{ SideType::Right }>(args)
850 }
851
852 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
853 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
854 let EqJoinArgs {
855 ctx,
856 side_l,
857 side_r,
858 actual_output_data_types,
859 cond,
860 inequality_watermarks,
861 chunk,
862 append_only_optimize,
863 chunk_size,
864 cnt_rows_received,
865 high_join_amplification_threshold,
866 entry_state_max_rows,
867 ..
868 } = args;
869
870 let (side_update, side_match) = if SIDE == SideType::Left {
871 (side_l, side_r)
872 } else {
873 (side_r, side_l)
874 };
875
876 let useful_state_clean_columns = side_match
877 .state_clean_columns
878 .iter()
879 .filter_map(|(column_idx, inequality_index)| {
880 inequality_watermarks[*inequality_index]
881 .as_ref()
882 .map(|watermark| (*column_idx, watermark))
883 })
884 .collect_vec();
885
886 let mut hashjoin_chunk_builder =
887 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
888 chunk_size,
889 actual_output_data_types.to_vec(),
890 side_update.i2o_mapping.clone(),
891 side_match.i2o_mapping.clone(),
892 ));
893
894 let join_matched_join_keys = ctx
895 .streaming_metrics
896 .join_matched_join_keys
897 .with_guarded_label_values(&[
898 &ctx.id.to_string(),
899 &ctx.fragment_id.to_string(),
900 &side_update.ht.table_id().to_string(),
901 ]);
902
903 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
904 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
905 let Some((op, row)) = r else {
906 continue;
907 };
908 Self::evict_cache(side_update, side_match, cnt_rows_received);
909
910 let cache_lookup_result = {
911 let probe_non_null_requirement_satisfied = side_update
912 .non_null_fields
913 .iter()
914 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
915 let build_non_null_requirement_satisfied =
916 key.null_bitmap().is_subset(side_match.ht.null_matched());
917 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
918 side_match.ht.take_state_opt(key)
919 } else {
920 CacheResult::NeverMatch
921 }
922 };
923 let mut total_matches = 0;
924
925 macro_rules! match_rows {
926 ($op:ident) => {
927 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
928 cache_lookup_result,
929 row,
930 key,
931 &mut hashjoin_chunk_builder,
932 side_match,
933 side_update,
934 &useful_state_clean_columns,
935 cond,
936 append_only_optimize,
937 entry_state_max_rows,
938 )
939 };
940 }
941
942 match op {
943 Op::Insert | Op::UpdateInsert =>
944 {
945 #[for_await]
946 for chunk in match_rows!(Insert) {
947 let chunk = chunk?;
948 total_matches += chunk.cardinality();
949 yield chunk;
950 }
951 }
952 Op::Delete | Op::UpdateDelete =>
953 {
954 #[for_await]
955 for chunk in match_rows!(Delete) {
956 let chunk = chunk?;
957 total_matches += chunk.cardinality();
958 yield chunk;
959 }
960 }
961 };
962
963 join_matched_join_keys.observe(total_matches as _);
964 if total_matches > high_join_amplification_threshold {
965 let join_key_data_types = side_update.ht.join_key_data_types();
966 let key = key.deserialize(join_key_data_types)?;
967 tracing::warn!(target: "high_join_amplification",
968 matched_rows_len = total_matches,
969 update_table_id = side_update.ht.table_id(),
970 match_table_id = side_match.ht.table_id(),
971 join_key = ?key,
972 actor_id = ctx.id,
973 fragment_id = ctx.fragment_id,
974 "large rows matched for join key"
975 );
976 }
977 }
978 if let Some(chunk) = hashjoin_chunk_builder.take() {
980 yield chunk;
981 }
982 }
983
984 #[allow(clippy::too_many_arguments)]
993 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
994 async fn handle_match_rows<
995 'a,
996 const SIDE: SideTypePrimitive,
997 const JOIN_OP: JoinOpPrimitive,
998 >(
999 cached_lookup_result: CacheResult<E>,
1000 row: RowRef<'a>,
1001 key: &'a K,
1002 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1003 side_match: &'a mut JoinSide<K, S, E>,
1004 side_update: &'a mut JoinSide<K, S, E>,
1005 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1006 cond: &'a mut Option<NonStrictExpression>,
1007 append_only_optimize: bool,
1008 entry_state_max_rows: usize,
1009 ) {
1010 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1011 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1012 let mut entry_state_count = 0;
1013
1014 let mut degree = 0;
1015 let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1016 let mut matched_rows_to_clean = vec![];
1017
1018 macro_rules! match_row {
1019 (
1020 $match_order_key_indices:expr,
1021 $degree_table:expr,
1022 $matched_row:expr,
1023 $matched_row_ref:expr,
1024 $from_cache:literal
1025 ) => {
1026 Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1027 row,
1028 $matched_row,
1029 $matched_row_ref,
1030 hashjoin_chunk_builder,
1031 $match_order_key_indices,
1032 $degree_table,
1033 side_update.start_pos,
1034 side_match.start_pos,
1035 cond,
1036 &mut degree,
1037 useful_state_clean_columns,
1038 append_only_optimize,
1039 &mut append_only_matched_row,
1040 &mut matched_rows_to_clean,
1041 )
1042 };
1043 }
1044
1045 let entry_state = match cached_lookup_result {
1046 CacheResult::NeverMatch => {
1047 let op = match JOIN_OP {
1048 JoinOp::Insert => Op::Insert,
1049 JoinOp::Delete => Op::Delete,
1050 };
1051 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1052 yield chunk;
1053 }
1054 return Ok(());
1055 }
1056 CacheResult::Hit(mut cached_rows) => {
1057 let (match_order_key_indices, match_degree_state) =
1058 side_match.ht.get_degree_state_mut_ref();
1059 for (matched_row_ref, matched_row) in
1061 cached_rows.values_mut(&side_match.all_data_types)
1062 {
1063 let matched_row = matched_row?;
1064 if let Some(chunk) = match_row!(
1065 match_order_key_indices,
1066 match_degree_state,
1067 matched_row,
1068 Some(matched_row_ref),
1069 true
1070 )
1071 .await
1072 {
1073 yield chunk;
1074 }
1075 }
1076
1077 cached_rows
1078 }
1079 CacheResult::Miss => {
1080 let (matched_rows, match_order_key_indices, degree_table) = side_match
1082 .ht
1083 .fetch_matched_rows_and_get_degree_table_ref(key)
1084 .await?;
1085
1086 #[for_await]
1087 for matched_row in matched_rows {
1088 let (encoded_pk, matched_row) = matched_row?;
1089
1090 let mut matched_row_ref = None;
1091
1092 if entry_state_count <= entry_state_max_rows {
1094 let row_ref = entry_state
1095 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1097 matched_row_ref = Some(row_ref);
1098 entry_state_count += 1;
1099 }
1100 if let Some(chunk) = match_row!(
1101 match_order_key_indices,
1102 degree_table,
1103 matched_row,
1104 matched_row_ref,
1105 false
1106 )
1107 .await
1108 {
1109 yield chunk;
1110 }
1111 }
1112 Box::new(entry_state)
1113 }
1114 };
1115
1116 let op = match JOIN_OP {
1118 JoinOp::Insert => Op::Insert,
1119 JoinOp::Delete => Op::Delete,
1120 };
1121 if degree == 0 {
1122 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1123 yield chunk;
1124 }
1125 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1126 {
1127 yield chunk;
1128 }
1129
1130 if cache_hit || entry_state_count <= entry_state_max_rows {
1132 side_match.ht.update_state(key, entry_state);
1133 }
1134
1135 for matched_row in matched_rows_to_clean {
1137 side_match.ht.delete_handle_degree(key, matched_row)?;
1138 }
1139
1140 if append_only_optimize && let Some(row) = append_only_matched_row {
1142 assert_matches!(JOIN_OP, JoinOp::Insert);
1143 side_match.ht.delete_handle_degree(key, row)?;
1144 return Ok(());
1145 }
1146
1147 match JOIN_OP {
1149 JoinOp::Insert => {
1150 side_update
1151 .ht
1152 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1153 }
1154 JoinOp::Delete => {
1155 side_update
1156 .ht
1157 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1158 }
1159 }
1160 }
1161
1162 #[allow(clippy::too_many_arguments)]
1163 #[inline]
1164 async fn handle_match_row<
1165 'a,
1166 const SIDE: SideTypePrimitive,
1167 const JOIN_OP: JoinOpPrimitive,
1168 const MATCHED_ROWS_FROM_CACHE: bool,
1169 >(
1170 update_row: RowRef<'a>,
1171 mut matched_row: JoinRow<OwnedRow>,
1172 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1173 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1174 match_order_key_indices: &[usize],
1175 match_degree_table: &mut Option<TableInner<S>>,
1176 side_update_start_pos: usize,
1177 side_match_start_pos: usize,
1178 cond: &Option<NonStrictExpression>,
1179 update_row_degree: &mut u64,
1180 useful_state_clean_columns: &[(usize, &'a Watermark)],
1181 append_only_optimize: bool,
1182 append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1183 matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1184 ) -> Option<StreamChunk> {
1185 let mut need_state_clean = false;
1186 let mut chunk_opt = None;
1187
1188 let join_condition_satisfied = Self::check_join_condition(
1192 update_row,
1193 side_update_start_pos,
1194 &matched_row.row,
1195 side_match_start_pos,
1196 cond,
1197 )
1198 .await;
1199
1200 if join_condition_satisfied {
1201 *update_row_degree += 1;
1203 if matches!(JOIN_OP, JoinOp::Insert)
1208 && !forward_exactly_once(T, SIDE)
1209 && let Some(chunk) =
1210 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1211 {
1212 chunk_opt = Some(chunk);
1213 }
1214 if let Some(degree_table) = match_degree_table {
1216 update_degree::<S, { JOIN_OP }>(
1217 match_order_key_indices,
1218 degree_table,
1219 &mut matched_row,
1220 );
1221 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1222 match JOIN_OP {
1224 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1225 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1226 }
1227 }
1228 }
1229
1230 if matches!(JOIN_OP, JoinOp::Delete)
1233 && !forward_exactly_once(T, SIDE)
1234 && let Some(chunk) =
1235 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1236 {
1237 chunk_opt = Some(chunk);
1238 }
1239 } else {
1240 for (column_idx, watermark) in useful_state_clean_columns {
1242 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1243 scalar
1244 .default_cmp(&watermark.val.as_scalar_ref_impl())
1245 .is_lt()
1246 }) {
1247 need_state_clean = true;
1248 break;
1249 }
1250 }
1251 }
1252 if append_only_optimize {
1256 assert_matches!(JOIN_OP, JoinOp::Insert);
1257 assert!(append_only_matched_row.is_none());
1260 *append_only_matched_row = Some(matched_row);
1261 } else if need_state_clean {
1262 matched_rows_to_clean.push(matched_row);
1266 }
1267
1268 chunk_opt
1269 }
1270
1271 #[inline]
1276 async fn check_join_condition(
1277 row: impl Row,
1278 side_update_start_pos: usize,
1279 matched_row: impl Row,
1280 side_match_start_pos: usize,
1281 join_condition: &Option<NonStrictExpression>,
1282 ) -> bool {
1283 if let Some(join_condition) = join_condition {
1284 let new_row = Self::row_concat(
1285 row,
1286 side_update_start_pos,
1287 matched_row,
1288 side_match_start_pos,
1289 );
1290 join_condition
1291 .eval_row_infallible(&new_row)
1292 .await
1293 .map(|s| *s.as_bool())
1294 .unwrap_or(false)
1295 } else {
1296 true
1297 }
1298 }
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303 use std::sync::atomic::AtomicU64;
1304
1305 use pretty_assertions::assert_eq;
1306 use risingwave_common::array::*;
1307 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1308 use risingwave_common::hash::{Key64, Key128};
1309 use risingwave_common::util::epoch::test_epoch;
1310 use risingwave_common::util::sort_util::OrderType;
1311 use risingwave_storage::memory::MemoryStateStore;
1312
1313 use super::*;
1314 use crate::common::table::test_utils::gen_pbtable;
1315 use crate::executor::MemoryEncoding;
1316 use crate::executor::test_utils::expr::build_from_pretty;
1317 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1318
1319 async fn create_in_memory_state_table(
1320 mem_state: MemoryStateStore,
1321 data_types: &[DataType],
1322 order_types: &[OrderType],
1323 pk_indices: &[usize],
1324 table_id: u32,
1325 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1326 let column_descs = data_types
1327 .iter()
1328 .enumerate()
1329 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1330 .collect_vec();
1331 let state_table = StateTable::from_table_catalog(
1332 &gen_pbtable(
1333 TableId::new(table_id),
1334 column_descs,
1335 order_types.to_vec(),
1336 pk_indices.to_vec(),
1337 0,
1338 ),
1339 mem_state.clone(),
1340 None,
1341 )
1342 .await;
1343
1344 let mut degree_table_column_descs = vec![];
1346 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1347 degree_table_column_descs.push(ColumnDesc::unnamed(
1348 ColumnId::new(pk_id as i32),
1349 data_types[*idx].clone(),
1350 ))
1351 });
1352 degree_table_column_descs.push(ColumnDesc::unnamed(
1353 ColumnId::new(pk_indices.len() as i32),
1354 DataType::Int64,
1355 ));
1356 let degree_state_table = StateTable::from_table_catalog(
1357 &gen_pbtable(
1358 TableId::new(table_id + 1),
1359 degree_table_column_descs,
1360 order_types.to_vec(),
1361 pk_indices.to_vec(),
1362 0,
1363 ),
1364 mem_state,
1365 None,
1366 )
1367 .await;
1368 (state_table, degree_state_table)
1369 }
1370
1371 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1372 build_from_pretty(
1373 condition_text
1374 .as_deref()
1375 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1376 )
1377 }
1378
1379 async fn create_executor<const T: JoinTypePrimitive>(
1380 with_condition: bool,
1381 null_safe: bool,
1382 condition_text: Option<String>,
1383 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1384 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1385 let schema = Schema {
1386 fields: vec![
1387 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1389 ],
1390 };
1391 let (tx_l, source_l) = MockSource::channel();
1392 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1393 let (tx_r, source_r) = MockSource::channel();
1394 let source_r = source_r.into_executor(schema, vec![1]);
1395 let params_l = JoinParams::new(vec![0], vec![1]);
1396 let params_r = JoinParams::new(vec![0], vec![1]);
1397 let cond = with_condition.then(|| create_cond(condition_text));
1398
1399 let mem_state = MemoryStateStore::new();
1400
1401 let (state_l, degree_state_l) = create_in_memory_state_table(
1402 mem_state.clone(),
1403 &[DataType::Int64, DataType::Int64],
1404 &[OrderType::ascending(), OrderType::ascending()],
1405 &[0, 1],
1406 0,
1407 )
1408 .await;
1409
1410 let (state_r, degree_state_r) = create_in_memory_state_table(
1411 mem_state,
1412 &[DataType::Int64, DataType::Int64],
1413 &[OrderType::ascending(), OrderType::ascending()],
1414 &[0, 1],
1415 2,
1416 )
1417 .await;
1418
1419 let schema = match T {
1420 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1421 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1422 _ => [source_l.schema().fields(), source_r.schema().fields()]
1423 .concat()
1424 .into_iter()
1425 .collect(),
1426 };
1427 let schema_len = schema.len();
1428 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1429
1430 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1431 ActorContext::for_test(123),
1432 info,
1433 source_l,
1434 source_r,
1435 params_l,
1436 params_r,
1437 vec![null_safe],
1438 (0..schema_len).collect_vec(),
1439 cond,
1440 inequality_pairs,
1441 state_l,
1442 degree_state_l,
1443 state_r,
1444 degree_state_r,
1445 Arc::new(AtomicU64::new(0)),
1446 false,
1447 Arc::new(StreamingMetrics::unused()),
1448 1024,
1449 2048,
1450 );
1451 (tx_l, tx_r, executor.boxed().execute())
1452 }
1453
1454 async fn create_classical_executor<const T: JoinTypePrimitive>(
1455 with_condition: bool,
1456 null_safe: bool,
1457 condition_text: Option<String>,
1458 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1459 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1460 }
1461
1462 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1463 with_condition: bool,
1464 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1465 let schema = Schema {
1466 fields: vec![
1467 Field::unnamed(DataType::Int64),
1468 Field::unnamed(DataType::Int64),
1469 Field::unnamed(DataType::Int64),
1470 ],
1471 };
1472 let (tx_l, source_l) = MockSource::channel();
1473 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1474 let (tx_r, source_r) = MockSource::channel();
1475 let source_r = source_r.into_executor(schema, vec![0]);
1476 let params_l = JoinParams::new(vec![0, 1], vec![]);
1477 let params_r = JoinParams::new(vec![0, 1], vec![]);
1478 let cond = with_condition.then(|| create_cond(None));
1479
1480 let mem_state = MemoryStateStore::new();
1481
1482 let (state_l, degree_state_l) = create_in_memory_state_table(
1483 mem_state.clone(),
1484 &[DataType::Int64, DataType::Int64, DataType::Int64],
1485 &[
1486 OrderType::ascending(),
1487 OrderType::ascending(),
1488 OrderType::ascending(),
1489 ],
1490 &[0, 1, 0],
1491 0,
1492 )
1493 .await;
1494
1495 let (state_r, degree_state_r) = create_in_memory_state_table(
1496 mem_state,
1497 &[DataType::Int64, DataType::Int64, DataType::Int64],
1498 &[
1499 OrderType::ascending(),
1500 OrderType::ascending(),
1501 OrderType::ascending(),
1502 ],
1503 &[0, 1, 1],
1504 1,
1505 )
1506 .await;
1507
1508 let schema = match T {
1509 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1510 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1511 _ => [source_l.schema().fields(), source_r.schema().fields()]
1512 .concat()
1513 .into_iter()
1514 .collect(),
1515 };
1516 let schema_len = schema.len();
1517 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1518
1519 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1520 ActorContext::for_test(123),
1521 info,
1522 source_l,
1523 source_r,
1524 params_l,
1525 params_r,
1526 vec![false],
1527 (0..schema_len).collect_vec(),
1528 cond,
1529 vec![],
1530 state_l,
1531 degree_state_l,
1532 state_r,
1533 degree_state_r,
1534 Arc::new(AtomicU64::new(0)),
1535 true,
1536 Arc::new(StreamingMetrics::unused()),
1537 1024,
1538 2048,
1539 );
1540 (tx_l, tx_r, executor.boxed().execute())
1541 }
1542
1543 #[tokio::test]
1544 async fn test_interval_join() -> StreamExecutorResult<()> {
1545 let chunk_l1 = StreamChunk::from_pretty(
1546 " I I
1547 + 1 4
1548 + 2 3
1549 + 2 5
1550 + 3 6",
1551 );
1552 let chunk_l2 = StreamChunk::from_pretty(
1553 " I I
1554 + 3 8
1555 - 3 8",
1556 );
1557 let chunk_r1 = StreamChunk::from_pretty(
1558 " I I
1559 + 2 6
1560 + 4 8
1561 + 6 9",
1562 );
1563 let chunk_r2 = StreamChunk::from_pretty(
1564 " I I
1565 + 2 3
1566 + 6 11",
1567 );
1568 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1569 true,
1570 false,
1571 Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1572 vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1573 )
1574 .await;
1575
1576 tx_l.push_barrier(test_epoch(1), false);
1578 tx_r.push_barrier(test_epoch(1), false);
1579 hash_join.next_unwrap_ready_barrier()?;
1580
1581 tx_l.push_chunk(chunk_l1);
1583 hash_join.next_unwrap_pending();
1584
1585 tx_l.push_barrier(test_epoch(2), false);
1587 tx_r.push_barrier(test_epoch(2), false);
1588 hash_join.next_unwrap_ready_barrier()?;
1589
1590 tx_l.push_chunk(chunk_l2);
1592 hash_join.next_unwrap_pending();
1593
1594 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1595 hash_join.next_unwrap_pending();
1596
1597 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1598 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1599 assert_eq!(
1600 output_watermark,
1601 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1602 );
1603 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1604 assert_eq!(
1605 output_watermark,
1606 Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1607 );
1608
1609 tx_r.push_chunk(chunk_r1);
1611 let chunk = hash_join.next_unwrap_ready_chunk()?;
1612 assert_eq!(
1614 chunk,
1615 StreamChunk::from_pretty(
1616 " I I I I
1617 + 2 5 2 6"
1618 )
1619 );
1620
1621 tx_r.push_chunk(chunk_r2);
1623 hash_join.next_unwrap_pending();
1626
1627 Ok(())
1628 }
1629
1630 #[tokio::test]
1631 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1632 let chunk_l1 = StreamChunk::from_pretty(
1633 " I I
1634 + 1 4
1635 + 2 5
1636 + 3 6",
1637 );
1638 let chunk_l2 = StreamChunk::from_pretty(
1639 " I I
1640 + 3 8
1641 - 3 8",
1642 );
1643 let chunk_r1 = StreamChunk::from_pretty(
1644 " I I
1645 + 2 7
1646 + 4 8
1647 + 6 9",
1648 );
1649 let chunk_r2 = StreamChunk::from_pretty(
1650 " I I
1651 + 3 10
1652 + 6 11",
1653 );
1654 let (mut tx_l, mut tx_r, mut hash_join) =
1655 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1656
1657 tx_l.push_barrier(test_epoch(1), false);
1659 tx_r.push_barrier(test_epoch(1), false);
1660 hash_join.next_unwrap_ready_barrier()?;
1661
1662 tx_l.push_chunk(chunk_l1);
1664 hash_join.next_unwrap_pending();
1665
1666 tx_l.push_barrier(test_epoch(2), false);
1668 tx_r.push_barrier(test_epoch(2), false);
1669 hash_join.next_unwrap_ready_barrier()?;
1670
1671 tx_l.push_chunk(chunk_l2);
1673 hash_join.next_unwrap_pending();
1674
1675 tx_r.push_chunk(chunk_r1);
1677 let chunk = hash_join.next_unwrap_ready_chunk()?;
1678 assert_eq!(
1679 chunk,
1680 StreamChunk::from_pretty(
1681 " I I I I
1682 + 2 5 2 7"
1683 )
1684 );
1685
1686 tx_r.push_chunk(chunk_r2);
1688 let chunk = hash_join.next_unwrap_ready_chunk()?;
1689 assert_eq!(
1690 chunk,
1691 StreamChunk::from_pretty(
1692 " I I I I
1693 + 3 6 3 10"
1694 )
1695 );
1696
1697 Ok(())
1698 }
1699
1700 #[tokio::test]
1701 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1702 let chunk_l1 = StreamChunk::from_pretty(
1703 " I I
1704 + 1 4
1705 + 2 5
1706 + . 6",
1707 );
1708 let chunk_l2 = StreamChunk::from_pretty(
1709 " I I
1710 + . 8
1711 - . 8",
1712 );
1713 let chunk_r1 = StreamChunk::from_pretty(
1714 " I I
1715 + 2 7
1716 + 4 8
1717 + 6 9",
1718 );
1719 let chunk_r2 = StreamChunk::from_pretty(
1720 " I I
1721 + . 10
1722 + 6 11",
1723 );
1724 let (mut tx_l, mut tx_r, mut hash_join) =
1725 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1726
1727 tx_l.push_barrier(test_epoch(1), false);
1729 tx_r.push_barrier(test_epoch(1), false);
1730 hash_join.next_unwrap_ready_barrier()?;
1731
1732 tx_l.push_chunk(chunk_l1);
1734 hash_join.next_unwrap_pending();
1735
1736 tx_l.push_barrier(test_epoch(2), false);
1738 tx_r.push_barrier(test_epoch(2), false);
1739 hash_join.next_unwrap_ready_barrier()?;
1740
1741 tx_l.push_chunk(chunk_l2);
1743 hash_join.next_unwrap_pending();
1744
1745 tx_r.push_chunk(chunk_r1);
1747 let chunk = hash_join.next_unwrap_ready_chunk()?;
1748 assert_eq!(
1749 chunk,
1750 StreamChunk::from_pretty(
1751 " I I I I
1752 + 2 5 2 7"
1753 )
1754 );
1755
1756 tx_r.push_chunk(chunk_r2);
1758 let chunk = hash_join.next_unwrap_ready_chunk()?;
1759 assert_eq!(
1760 chunk,
1761 StreamChunk::from_pretty(
1762 " I I I I
1763 + . 6 . 10"
1764 )
1765 );
1766
1767 Ok(())
1768 }
1769
1770 #[tokio::test]
1771 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1772 let chunk_l1 = StreamChunk::from_pretty(
1773 " I I
1774 + 1 4
1775 + 2 5
1776 + 3 6",
1777 );
1778 let chunk_l2 = StreamChunk::from_pretty(
1779 " I I
1780 + 3 8
1781 - 3 8",
1782 );
1783 let chunk_r1 = StreamChunk::from_pretty(
1784 " I I
1785 + 2 7
1786 + 4 8
1787 + 6 9",
1788 );
1789 let chunk_r2 = StreamChunk::from_pretty(
1790 " I I
1791 + 3 10
1792 + 6 11",
1793 );
1794 let chunk_l3 = StreamChunk::from_pretty(
1795 " I I
1796 + 6 10",
1797 );
1798 let chunk_r3 = StreamChunk::from_pretty(
1799 " I I
1800 - 6 11",
1801 );
1802 let chunk_r4 = StreamChunk::from_pretty(
1803 " I I
1804 - 6 9",
1805 );
1806 let (mut tx_l, mut tx_r, mut hash_join) =
1807 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1808
1809 tx_l.push_barrier(test_epoch(1), false);
1811 tx_r.push_barrier(test_epoch(1), false);
1812 hash_join.next_unwrap_ready_barrier()?;
1813
1814 tx_l.push_chunk(chunk_l1);
1816 hash_join.next_unwrap_pending();
1817
1818 tx_l.push_barrier(test_epoch(2), false);
1820 tx_r.push_barrier(test_epoch(2), false);
1821 hash_join.next_unwrap_ready_barrier()?;
1822
1823 tx_l.push_chunk(chunk_l2);
1825 hash_join.next_unwrap_pending();
1826
1827 tx_r.push_chunk(chunk_r1);
1829 let chunk = hash_join.next_unwrap_ready_chunk()?;
1830 assert_eq!(
1831 chunk,
1832 StreamChunk::from_pretty(
1833 " I I
1834 + 2 5"
1835 )
1836 );
1837
1838 tx_r.push_chunk(chunk_r2);
1840 let chunk = hash_join.next_unwrap_ready_chunk()?;
1841 assert_eq!(
1842 chunk,
1843 StreamChunk::from_pretty(
1844 " I I
1845 + 3 6"
1846 )
1847 );
1848
1849 tx_l.push_chunk(chunk_l3);
1851 let chunk = hash_join.next_unwrap_ready_chunk()?;
1852 assert_eq!(
1853 chunk,
1854 StreamChunk::from_pretty(
1855 " I I
1856 + 6 10"
1857 )
1858 );
1859
1860 tx_r.push_chunk(chunk_r3);
1863 hash_join.next_unwrap_pending();
1864
1865 tx_r.push_chunk(chunk_r4);
1868 let chunk = hash_join.next_unwrap_ready_chunk()?;
1869 assert_eq!(
1870 chunk,
1871 StreamChunk::from_pretty(
1872 " I I
1873 - 6 10"
1874 )
1875 );
1876
1877 Ok(())
1878 }
1879
1880 #[tokio::test]
1881 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1882 let chunk_l1 = StreamChunk::from_pretty(
1883 " I I
1884 + 1 4
1885 + 2 5
1886 + . 6",
1887 );
1888 let chunk_l2 = StreamChunk::from_pretty(
1889 " I I
1890 + . 8
1891 - . 8",
1892 );
1893 let chunk_r1 = StreamChunk::from_pretty(
1894 " I I
1895 + 2 7
1896 + 4 8
1897 + 6 9",
1898 );
1899 let chunk_r2 = StreamChunk::from_pretty(
1900 " I I
1901 + . 10
1902 + 6 11",
1903 );
1904 let chunk_l3 = StreamChunk::from_pretty(
1905 " I I
1906 + 6 10",
1907 );
1908 let chunk_r3 = StreamChunk::from_pretty(
1909 " I I
1910 - 6 11",
1911 );
1912 let chunk_r4 = StreamChunk::from_pretty(
1913 " I I
1914 - 6 9",
1915 );
1916 let (mut tx_l, mut tx_r, mut hash_join) =
1917 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1918
1919 tx_l.push_barrier(test_epoch(1), false);
1921 tx_r.push_barrier(test_epoch(1), false);
1922 hash_join.next_unwrap_ready_barrier()?;
1923
1924 tx_l.push_chunk(chunk_l1);
1926 hash_join.next_unwrap_pending();
1927
1928 tx_l.push_barrier(test_epoch(2), false);
1930 tx_r.push_barrier(test_epoch(2), false);
1931 hash_join.next_unwrap_ready_barrier()?;
1932
1933 tx_l.push_chunk(chunk_l2);
1935 hash_join.next_unwrap_pending();
1936
1937 tx_r.push_chunk(chunk_r1);
1939 let chunk = hash_join.next_unwrap_ready_chunk()?;
1940 assert_eq!(
1941 chunk,
1942 StreamChunk::from_pretty(
1943 " I I
1944 + 2 5"
1945 )
1946 );
1947
1948 tx_r.push_chunk(chunk_r2);
1950 let chunk = hash_join.next_unwrap_ready_chunk()?;
1951 assert_eq!(
1952 chunk,
1953 StreamChunk::from_pretty(
1954 " I I
1955 + . 6"
1956 )
1957 );
1958
1959 tx_l.push_chunk(chunk_l3);
1961 let chunk = hash_join.next_unwrap_ready_chunk()?;
1962 assert_eq!(
1963 chunk,
1964 StreamChunk::from_pretty(
1965 " I I
1966 + 6 10"
1967 )
1968 );
1969
1970 tx_r.push_chunk(chunk_r3);
1973 hash_join.next_unwrap_pending();
1974
1975 tx_r.push_chunk(chunk_r4);
1978 let chunk = hash_join.next_unwrap_ready_chunk()?;
1979 assert_eq!(
1980 chunk,
1981 StreamChunk::from_pretty(
1982 " I I
1983 - 6 10"
1984 )
1985 );
1986
1987 Ok(())
1988 }
1989
1990 #[tokio::test]
1991 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1992 let chunk_l1 = StreamChunk::from_pretty(
1993 " I I I
1994 + 1 4 1
1995 + 2 5 2
1996 + 3 6 3",
1997 );
1998 let chunk_l2 = StreamChunk::from_pretty(
1999 " I I I
2000 + 4 9 4
2001 + 5 10 5",
2002 );
2003 let chunk_r1 = StreamChunk::from_pretty(
2004 " I I I
2005 + 2 5 1
2006 + 4 9 2
2007 + 6 9 3",
2008 );
2009 let chunk_r2 = StreamChunk::from_pretty(
2010 " I I I
2011 + 1 4 4
2012 + 3 6 5",
2013 );
2014
2015 let (mut tx_l, mut tx_r, mut hash_join) =
2016 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2017
2018 tx_l.push_barrier(test_epoch(1), false);
2020 tx_r.push_barrier(test_epoch(1), false);
2021 hash_join.next_unwrap_ready_barrier()?;
2022
2023 tx_l.push_chunk(chunk_l1);
2025 hash_join.next_unwrap_pending();
2026
2027 tx_l.push_barrier(test_epoch(2), false);
2029 tx_r.push_barrier(test_epoch(2), false);
2030 hash_join.next_unwrap_ready_barrier()?;
2031
2032 tx_l.push_chunk(chunk_l2);
2034 hash_join.next_unwrap_pending();
2035
2036 tx_r.push_chunk(chunk_r1);
2038 let chunk = hash_join.next_unwrap_ready_chunk()?;
2039 assert_eq!(
2040 chunk,
2041 StreamChunk::from_pretty(
2042 " I I I I I I
2043 + 2 5 2 2 5 1
2044 + 4 9 4 4 9 2"
2045 )
2046 );
2047
2048 tx_r.push_chunk(chunk_r2);
2050 let chunk = hash_join.next_unwrap_ready_chunk()?;
2051 assert_eq!(
2052 chunk,
2053 StreamChunk::from_pretty(
2054 " I I I I I I
2055 + 1 4 1 1 4 4
2056 + 3 6 3 3 6 5"
2057 )
2058 );
2059
2060 Ok(())
2061 }
2062
2063 #[tokio::test]
2064 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2065 let chunk_l1 = StreamChunk::from_pretty(
2066 " I I I
2067 + 1 4 1
2068 + 2 5 2
2069 + 3 6 3",
2070 );
2071 let chunk_l2 = StreamChunk::from_pretty(
2072 " I I I
2073 + 4 9 4
2074 + 5 10 5",
2075 );
2076 let chunk_r1 = StreamChunk::from_pretty(
2077 " I I I
2078 + 2 5 1
2079 + 4 9 2
2080 + 6 9 3",
2081 );
2082 let chunk_r2 = StreamChunk::from_pretty(
2083 " I I I
2084 + 1 4 4
2085 + 3 6 5",
2086 );
2087
2088 let (mut tx_l, mut tx_r, mut hash_join) =
2089 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2090
2091 tx_l.push_barrier(test_epoch(1), false);
2093 tx_r.push_barrier(test_epoch(1), false);
2094 hash_join.next_unwrap_ready_barrier()?;
2095
2096 tx_l.push_chunk(chunk_l1);
2098 hash_join.next_unwrap_pending();
2099
2100 tx_l.push_barrier(test_epoch(2), false);
2102 tx_r.push_barrier(test_epoch(2), false);
2103 hash_join.next_unwrap_ready_barrier()?;
2104
2105 tx_l.push_chunk(chunk_l2);
2107 hash_join.next_unwrap_pending();
2108
2109 tx_r.push_chunk(chunk_r1);
2111 let chunk = hash_join.next_unwrap_ready_chunk()?;
2112 assert_eq!(
2113 chunk,
2114 StreamChunk::from_pretty(
2115 " I I I
2116 + 2 5 2
2117 + 4 9 4"
2118 )
2119 );
2120
2121 tx_r.push_chunk(chunk_r2);
2123 let chunk = hash_join.next_unwrap_ready_chunk()?;
2124 assert_eq!(
2125 chunk,
2126 StreamChunk::from_pretty(
2127 " I I I
2128 + 1 4 1
2129 + 3 6 3"
2130 )
2131 );
2132
2133 Ok(())
2134 }
2135
2136 #[tokio::test]
2137 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2138 let chunk_l1 = StreamChunk::from_pretty(
2139 " I I I
2140 + 1 4 1
2141 + 2 5 2
2142 + 3 6 3",
2143 );
2144 let chunk_l2 = StreamChunk::from_pretty(
2145 " I I I
2146 + 4 9 4
2147 + 5 10 5",
2148 );
2149 let chunk_r1 = StreamChunk::from_pretty(
2150 " I I I
2151 + 2 5 1
2152 + 4 9 2
2153 + 6 9 3",
2154 );
2155 let chunk_r2 = StreamChunk::from_pretty(
2156 " I I I
2157 + 1 4 4
2158 + 3 6 5",
2159 );
2160
2161 let (mut tx_l, mut tx_r, mut hash_join) =
2162 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2163
2164 tx_l.push_barrier(test_epoch(1), false);
2166 tx_r.push_barrier(test_epoch(1), false);
2167 hash_join.next_unwrap_ready_barrier()?;
2168
2169 tx_l.push_chunk(chunk_l1);
2171 hash_join.next_unwrap_pending();
2172
2173 tx_l.push_barrier(test_epoch(2), false);
2175 tx_r.push_barrier(test_epoch(2), false);
2176 hash_join.next_unwrap_ready_barrier()?;
2177
2178 tx_l.push_chunk(chunk_l2);
2180 hash_join.next_unwrap_pending();
2181
2182 tx_r.push_chunk(chunk_r1);
2184 let chunk = hash_join.next_unwrap_ready_chunk()?;
2185 assert_eq!(
2186 chunk,
2187 StreamChunk::from_pretty(
2188 " I I I
2189 + 2 5 1
2190 + 4 9 2"
2191 )
2192 );
2193
2194 tx_r.push_chunk(chunk_r2);
2196 let chunk = hash_join.next_unwrap_ready_chunk()?;
2197 assert_eq!(
2198 chunk,
2199 StreamChunk::from_pretty(
2200 " I I I
2201 + 1 4 4
2202 + 3 6 5"
2203 )
2204 );
2205
2206 Ok(())
2207 }
2208
2209 #[tokio::test]
2210 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2211 let chunk_r1 = StreamChunk::from_pretty(
2212 " I I
2213 + 1 4
2214 + 2 5
2215 + 3 6",
2216 );
2217 let chunk_r2 = StreamChunk::from_pretty(
2218 " I I
2219 + 3 8
2220 - 3 8",
2221 );
2222 let chunk_l1 = StreamChunk::from_pretty(
2223 " I I
2224 + 2 7
2225 + 4 8
2226 + 6 9",
2227 );
2228 let chunk_l2 = StreamChunk::from_pretty(
2229 " I I
2230 + 3 10
2231 + 6 11",
2232 );
2233 let chunk_r3 = StreamChunk::from_pretty(
2234 " I I
2235 + 6 10",
2236 );
2237 let chunk_l3 = StreamChunk::from_pretty(
2238 " I I
2239 - 6 11",
2240 );
2241 let chunk_l4 = StreamChunk::from_pretty(
2242 " I I
2243 - 6 9",
2244 );
2245 let (mut tx_l, mut tx_r, mut hash_join) =
2246 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2247
2248 tx_l.push_barrier(test_epoch(1), false);
2250 tx_r.push_barrier(test_epoch(1), false);
2251 hash_join.next_unwrap_ready_barrier()?;
2252
2253 tx_r.push_chunk(chunk_r1);
2255 hash_join.next_unwrap_pending();
2256
2257 tx_l.push_barrier(test_epoch(2), false);
2259 tx_r.push_barrier(test_epoch(2), false);
2260 hash_join.next_unwrap_ready_barrier()?;
2261
2262 tx_r.push_chunk(chunk_r2);
2264 hash_join.next_unwrap_pending();
2265
2266 tx_l.push_chunk(chunk_l1);
2268 let chunk = hash_join.next_unwrap_ready_chunk()?;
2269 assert_eq!(
2270 chunk,
2271 StreamChunk::from_pretty(
2272 " I I
2273 + 2 5"
2274 )
2275 );
2276
2277 tx_l.push_chunk(chunk_l2);
2279 let chunk = hash_join.next_unwrap_ready_chunk()?;
2280 assert_eq!(
2281 chunk,
2282 StreamChunk::from_pretty(
2283 " I I
2284 + 3 6"
2285 )
2286 );
2287
2288 tx_r.push_chunk(chunk_r3);
2290 let chunk = hash_join.next_unwrap_ready_chunk()?;
2291 assert_eq!(
2292 chunk,
2293 StreamChunk::from_pretty(
2294 " I I
2295 + 6 10"
2296 )
2297 );
2298
2299 tx_l.push_chunk(chunk_l3);
2302 hash_join.next_unwrap_pending();
2303
2304 tx_l.push_chunk(chunk_l4);
2307 let chunk = hash_join.next_unwrap_ready_chunk()?;
2308 assert_eq!(
2309 chunk,
2310 StreamChunk::from_pretty(
2311 " I I
2312 - 6 10"
2313 )
2314 );
2315
2316 Ok(())
2317 }
2318
2319 #[tokio::test]
2320 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2321 let chunk_l1 = StreamChunk::from_pretty(
2322 " I I
2323 + 1 4
2324 + 2 5
2325 + 3 6",
2326 );
2327 let chunk_l2 = StreamChunk::from_pretty(
2328 " I I
2329 + 3 8
2330 - 3 8",
2331 );
2332 let chunk_r1 = StreamChunk::from_pretty(
2333 " I I
2334 + 2 7
2335 + 4 8
2336 + 6 9",
2337 );
2338 let chunk_r2 = StreamChunk::from_pretty(
2339 " I I
2340 + 3 10
2341 + 6 11
2342 + 1 2
2343 + 1 3",
2344 );
2345 let chunk_l3 = StreamChunk::from_pretty(
2346 " I I
2347 + 9 10",
2348 );
2349 let chunk_r3 = StreamChunk::from_pretty(
2350 " I I
2351 - 1 2",
2352 );
2353 let chunk_r4 = StreamChunk::from_pretty(
2354 " I I
2355 - 1 3",
2356 );
2357 let (mut tx_l, mut tx_r, mut hash_join) =
2358 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2359
2360 tx_l.push_barrier(test_epoch(1), false);
2362 tx_r.push_barrier(test_epoch(1), false);
2363 hash_join.next_unwrap_ready_barrier()?;
2364
2365 tx_l.push_chunk(chunk_l1);
2367 let chunk = hash_join.next_unwrap_ready_chunk()?;
2368 assert_eq!(
2369 chunk,
2370 StreamChunk::from_pretty(
2371 " I I
2372 + 1 4
2373 + 2 5
2374 + 3 6",
2375 )
2376 );
2377
2378 tx_l.push_barrier(test_epoch(2), false);
2380 tx_r.push_barrier(test_epoch(2), false);
2381 hash_join.next_unwrap_ready_barrier()?;
2382
2383 tx_l.push_chunk(chunk_l2);
2385 let chunk = hash_join.next_unwrap_ready_chunk()?;
2386 assert_eq!(
2387 chunk,
2388 StreamChunk::from_pretty(
2389 " I I
2390 + 3 8
2391 - 3 8",
2392 )
2393 );
2394
2395 tx_r.push_chunk(chunk_r1);
2397 let chunk = hash_join.next_unwrap_ready_chunk()?;
2398 assert_eq!(
2399 chunk,
2400 StreamChunk::from_pretty(
2401 " I I
2402 - 2 5"
2403 )
2404 );
2405
2406 tx_r.push_chunk(chunk_r2);
2408 let chunk = hash_join.next_unwrap_ready_chunk()?;
2409 assert_eq!(
2410 chunk,
2411 StreamChunk::from_pretty(
2412 " I I
2413 - 3 6
2414 - 1 4"
2415 )
2416 );
2417
2418 tx_l.push_chunk(chunk_l3);
2420 let chunk = hash_join.next_unwrap_ready_chunk()?;
2421 assert_eq!(
2422 chunk,
2423 StreamChunk::from_pretty(
2424 " I I
2425 + 9 10"
2426 )
2427 );
2428
2429 tx_r.push_chunk(chunk_r3);
2432 hash_join.next_unwrap_pending();
2433
2434 tx_r.push_chunk(chunk_r4);
2437 let chunk = hash_join.next_unwrap_ready_chunk()?;
2438 assert_eq!(
2439 chunk,
2440 StreamChunk::from_pretty(
2441 " I I
2442 + 1 4"
2443 )
2444 );
2445
2446 Ok(())
2447 }
2448
2449 #[tokio::test]
2450 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2451 let chunk_r1 = StreamChunk::from_pretty(
2452 " I I
2453 + 1 4
2454 + 2 5
2455 + 3 6",
2456 );
2457 let chunk_r2 = StreamChunk::from_pretty(
2458 " I I
2459 + 3 8
2460 - 3 8",
2461 );
2462 let chunk_l1 = StreamChunk::from_pretty(
2463 " I I
2464 + 2 7
2465 + 4 8
2466 + 6 9",
2467 );
2468 let chunk_l2 = StreamChunk::from_pretty(
2469 " I I
2470 + 3 10
2471 + 6 11
2472 + 1 2
2473 + 1 3",
2474 );
2475 let chunk_r3 = StreamChunk::from_pretty(
2476 " I I
2477 + 9 10",
2478 );
2479 let chunk_l3 = StreamChunk::from_pretty(
2480 " I I
2481 - 1 2",
2482 );
2483 let chunk_l4 = StreamChunk::from_pretty(
2484 " I I
2485 - 1 3",
2486 );
2487 let (mut tx_r, mut tx_l, mut hash_join) =
2488 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2489
2490 tx_r.push_barrier(test_epoch(1), false);
2492 tx_l.push_barrier(test_epoch(1), false);
2493 hash_join.next_unwrap_ready_barrier()?;
2494
2495 tx_r.push_chunk(chunk_r1);
2497 let chunk = hash_join.next_unwrap_ready_chunk()?;
2498 assert_eq!(
2499 chunk,
2500 StreamChunk::from_pretty(
2501 " I I
2502 + 1 4
2503 + 2 5
2504 + 3 6",
2505 )
2506 );
2507
2508 tx_r.push_barrier(test_epoch(2), false);
2510 tx_l.push_barrier(test_epoch(2), false);
2511 hash_join.next_unwrap_ready_barrier()?;
2512
2513 tx_r.push_chunk(chunk_r2);
2515 let chunk = hash_join.next_unwrap_ready_chunk()?;
2516 assert_eq!(
2517 chunk,
2518 StreamChunk::from_pretty(
2519 " I I
2520 + 3 8
2521 - 3 8",
2522 )
2523 );
2524
2525 tx_l.push_chunk(chunk_l1);
2527 let chunk = hash_join.next_unwrap_ready_chunk()?;
2528 assert_eq!(
2529 chunk,
2530 StreamChunk::from_pretty(
2531 " I I
2532 - 2 5"
2533 )
2534 );
2535
2536 tx_l.push_chunk(chunk_l2);
2538 let chunk = hash_join.next_unwrap_ready_chunk()?;
2539 assert_eq!(
2540 chunk,
2541 StreamChunk::from_pretty(
2542 " I I
2543 - 3 6
2544 - 1 4"
2545 )
2546 );
2547
2548 tx_r.push_chunk(chunk_r3);
2550 let chunk = hash_join.next_unwrap_ready_chunk()?;
2551 assert_eq!(
2552 chunk,
2553 StreamChunk::from_pretty(
2554 " I I
2555 + 9 10"
2556 )
2557 );
2558
2559 tx_l.push_chunk(chunk_l3);
2562 hash_join.next_unwrap_pending();
2563
2564 tx_l.push_chunk(chunk_l4);
2567 let chunk = hash_join.next_unwrap_ready_chunk()?;
2568 assert_eq!(
2569 chunk,
2570 StreamChunk::from_pretty(
2571 " I I
2572 + 1 4"
2573 )
2574 );
2575
2576 Ok(())
2577 }
2578
2579 #[tokio::test]
2580 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2581 let chunk_l1 = StreamChunk::from_pretty(
2582 " I I
2583 + 1 4
2584 + 2 5
2585 + 3 6",
2586 );
2587 let chunk_l2 = StreamChunk::from_pretty(
2588 " I I
2589 + 6 8
2590 + 3 8",
2591 );
2592 let chunk_r1 = StreamChunk::from_pretty(
2593 " I I
2594 + 2 7
2595 + 4 8
2596 + 6 9",
2597 );
2598 let chunk_r2 = StreamChunk::from_pretty(
2599 " I I
2600 + 3 10
2601 + 6 11",
2602 );
2603 let (mut tx_l, mut tx_r, mut hash_join) =
2604 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2605
2606 tx_l.push_barrier(test_epoch(1), false);
2608 tx_r.push_barrier(test_epoch(1), false);
2609 hash_join.next_unwrap_ready_barrier()?;
2610
2611 tx_l.push_chunk(chunk_l1);
2613 hash_join.next_unwrap_pending();
2614
2615 tx_l.push_barrier(test_epoch(2), false);
2617
2618 tx_l.push_chunk(chunk_l2);
2620
2621 tx_r.push_chunk(chunk_r1);
2623
2624 let chunk = hash_join.next_unwrap_ready_chunk()?;
2626 assert_eq!(
2627 chunk,
2628 StreamChunk::from_pretty(
2629 " I I I I
2630 + 2 5 2 7"
2631 )
2632 );
2633
2634 tx_r.push_barrier(test_epoch(2), false);
2636
2637 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2639 assert!(matches!(
2640 hash_join.next_unwrap_ready_barrier()?,
2641 Barrier {
2642 epoch,
2643 mutation: None,
2644 ..
2645 } if epoch == expected_epoch
2646 ));
2647
2648 let chunk = hash_join.next_unwrap_ready_chunk()?;
2650 assert_eq!(
2651 chunk,
2652 StreamChunk::from_pretty(
2653 " I I I I
2654 + 6 8 6 9"
2655 )
2656 );
2657
2658 tx_r.push_chunk(chunk_r2);
2660 let chunk = hash_join.next_unwrap_ready_chunk()?;
2661 assert_eq!(
2662 chunk,
2663 StreamChunk::from_pretty(
2664 " I I I I
2665 + 3 6 3 10
2666 + 3 8 3 10
2667 + 6 8 6 11"
2668 )
2669 );
2670
2671 Ok(())
2672 }
2673
2674 #[tokio::test]
2675 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2676 let chunk_l1 = StreamChunk::from_pretty(
2677 " I I
2678 + 1 4
2679 + 2 .
2680 + 3 .",
2681 );
2682 let chunk_l2 = StreamChunk::from_pretty(
2683 " I I
2684 + 6 .
2685 + 3 8",
2686 );
2687 let chunk_r1 = StreamChunk::from_pretty(
2688 " I I
2689 + 2 7
2690 + 4 8
2691 + 6 9",
2692 );
2693 let chunk_r2 = StreamChunk::from_pretty(
2694 " I I
2695 + 3 10
2696 + 6 11",
2697 );
2698 let (mut tx_l, mut tx_r, mut hash_join) =
2699 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2700
2701 tx_l.push_barrier(test_epoch(1), false);
2703 tx_r.push_barrier(test_epoch(1), false);
2704 hash_join.next_unwrap_ready_barrier()?;
2705
2706 tx_l.push_chunk(chunk_l1);
2708 hash_join.next_unwrap_pending();
2709
2710 tx_l.push_barrier(test_epoch(2), false);
2712
2713 tx_l.push_chunk(chunk_l2);
2715
2716 tx_r.push_chunk(chunk_r1);
2718
2719 let chunk = hash_join.next_unwrap_ready_chunk()?;
2721 assert_eq!(
2722 chunk,
2723 StreamChunk::from_pretty(
2724 " I I I I
2725 + 2 . 2 7"
2726 )
2727 );
2728
2729 tx_r.push_barrier(test_epoch(2), false);
2731
2732 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2734 assert!(matches!(
2735 hash_join.next_unwrap_ready_barrier()?,
2736 Barrier {
2737 epoch,
2738 mutation: None,
2739 ..
2740 } if epoch == expected_epoch
2741 ));
2742
2743 let chunk = hash_join.next_unwrap_ready_chunk()?;
2745 assert_eq!(
2746 chunk,
2747 StreamChunk::from_pretty(
2748 " I I I I
2749 + 6 . 6 9"
2750 )
2751 );
2752
2753 tx_r.push_chunk(chunk_r2);
2755 let chunk = hash_join.next_unwrap_ready_chunk()?;
2756 assert_eq!(
2757 chunk,
2758 StreamChunk::from_pretty(
2759 " I I I I
2760 + 3 8 3 10
2761 + 3 . 3 10
2762 + 6 . 6 11"
2763 )
2764 );
2765
2766 Ok(())
2767 }
2768
2769 #[tokio::test]
2770 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2771 let chunk_l1 = StreamChunk::from_pretty(
2772 " I I
2773 + 1 4
2774 + 2 5
2775 + 3 6",
2776 );
2777 let chunk_l2 = StreamChunk::from_pretty(
2778 " I I
2779 + 3 8
2780 - 3 8",
2781 );
2782 let chunk_r1 = StreamChunk::from_pretty(
2783 " I I
2784 + 2 7
2785 + 4 8
2786 + 6 9",
2787 );
2788 let chunk_r2 = StreamChunk::from_pretty(
2789 " I I
2790 + 3 10
2791 + 6 11",
2792 );
2793 let (mut tx_l, mut tx_r, mut hash_join) =
2794 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2795
2796 tx_l.push_barrier(test_epoch(1), false);
2798 tx_r.push_barrier(test_epoch(1), false);
2799 hash_join.next_unwrap_ready_barrier()?;
2800
2801 tx_l.push_chunk(chunk_l1);
2803 let chunk = hash_join.next_unwrap_ready_chunk()?;
2804 assert_eq!(
2805 chunk,
2806 StreamChunk::from_pretty(
2807 " I I I I
2808 + 1 4 . .
2809 + 2 5 . .
2810 + 3 6 . ."
2811 )
2812 );
2813
2814 tx_l.push_chunk(chunk_l2);
2816 let chunk = hash_join.next_unwrap_ready_chunk()?;
2817 assert_eq!(
2818 chunk,
2819 StreamChunk::from_pretty(
2820 " I I I I
2821 + 3 8 . .
2822 - 3 8 . ."
2823 )
2824 );
2825
2826 tx_r.push_chunk(chunk_r1);
2828 let chunk = hash_join.next_unwrap_ready_chunk()?;
2829 assert_eq!(
2830 chunk,
2831 StreamChunk::from_pretty(
2832 " I I I I
2833 U- 2 5 . .
2834 U+ 2 5 2 7"
2835 )
2836 );
2837
2838 tx_r.push_chunk(chunk_r2);
2840 let chunk = hash_join.next_unwrap_ready_chunk()?;
2841 assert_eq!(
2842 chunk,
2843 StreamChunk::from_pretty(
2844 " I I I I
2845 U- 3 6 . .
2846 U+ 3 6 3 10"
2847 )
2848 );
2849
2850 Ok(())
2851 }
2852
2853 #[tokio::test]
2854 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2855 let chunk_l1 = StreamChunk::from_pretty(
2856 " I I
2857 + 1 4
2858 + 2 5
2859 + . 6",
2860 );
2861 let chunk_l2 = StreamChunk::from_pretty(
2862 " I I
2863 + . 8
2864 - . 8",
2865 );
2866 let chunk_r1 = StreamChunk::from_pretty(
2867 " I I
2868 + 2 7
2869 + 4 8
2870 + 6 9",
2871 );
2872 let chunk_r2 = StreamChunk::from_pretty(
2873 " I I
2874 + . 10
2875 + 6 11",
2876 );
2877 let (mut tx_l, mut tx_r, mut hash_join) =
2878 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2879
2880 tx_l.push_barrier(test_epoch(1), false);
2882 tx_r.push_barrier(test_epoch(1), false);
2883 hash_join.next_unwrap_ready_barrier()?;
2884
2885 tx_l.push_chunk(chunk_l1);
2887 let chunk = hash_join.next_unwrap_ready_chunk()?;
2888 assert_eq!(
2889 chunk,
2890 StreamChunk::from_pretty(
2891 " I I I I
2892 + 1 4 . .
2893 + 2 5 . .
2894 + . 6 . ."
2895 )
2896 );
2897
2898 tx_l.push_chunk(chunk_l2);
2900 let chunk = hash_join.next_unwrap_ready_chunk()?;
2901 assert_eq!(
2902 chunk,
2903 StreamChunk::from_pretty(
2904 " I I I I
2905 + . 8 . .
2906 - . 8 . ."
2907 )
2908 );
2909
2910 tx_r.push_chunk(chunk_r1);
2912 let chunk = hash_join.next_unwrap_ready_chunk()?;
2913 assert_eq!(
2914 chunk,
2915 StreamChunk::from_pretty(
2916 " I I I I
2917 U- 2 5 . .
2918 U+ 2 5 2 7"
2919 )
2920 );
2921
2922 tx_r.push_chunk(chunk_r2);
2924 let chunk = hash_join.next_unwrap_ready_chunk()?;
2925 assert_eq!(
2926 chunk,
2927 StreamChunk::from_pretty(
2928 " I I I I
2929 U- . 6 . .
2930 U+ . 6 . 10"
2931 )
2932 );
2933
2934 Ok(())
2935 }
2936
2937 #[tokio::test]
2938 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2939 let chunk_l1 = StreamChunk::from_pretty(
2940 " I I
2941 + 1 4
2942 + 2 5
2943 + 3 6",
2944 );
2945 let chunk_l2 = StreamChunk::from_pretty(
2946 " I I
2947 + 3 8
2948 - 3 8",
2949 );
2950 let chunk_r1 = StreamChunk::from_pretty(
2951 " I I
2952 + 2 7
2953 + 4 8
2954 + 6 9",
2955 );
2956 let chunk_r2 = StreamChunk::from_pretty(
2957 " I I
2958 + 5 10
2959 - 5 10",
2960 );
2961 let (mut tx_l, mut tx_r, mut hash_join) =
2962 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2963
2964 tx_l.push_barrier(test_epoch(1), false);
2966 tx_r.push_barrier(test_epoch(1), false);
2967 hash_join.next_unwrap_ready_barrier()?;
2968
2969 tx_l.push_chunk(chunk_l1);
2971 hash_join.next_unwrap_pending();
2972
2973 tx_l.push_chunk(chunk_l2);
2975 hash_join.next_unwrap_pending();
2976
2977 tx_r.push_chunk(chunk_r1);
2979 let chunk = hash_join.next_unwrap_ready_chunk()?;
2980 assert_eq!(
2981 chunk,
2982 StreamChunk::from_pretty(
2983 " I I I I
2984 + 2 5 2 7
2985 + . . 4 8
2986 + . . 6 9"
2987 )
2988 );
2989
2990 tx_r.push_chunk(chunk_r2);
2992 let chunk = hash_join.next_unwrap_ready_chunk()?;
2993 assert_eq!(
2994 chunk,
2995 StreamChunk::from_pretty(
2996 " I I I I
2997 + . . 5 10
2998 - . . 5 10"
2999 )
3000 );
3001
3002 Ok(())
3003 }
3004
3005 #[tokio::test]
3006 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3007 let chunk_l1 = StreamChunk::from_pretty(
3008 " I I I
3009 + 1 4 1
3010 + 2 5 2
3011 + 3 6 3",
3012 );
3013 let chunk_l2 = StreamChunk::from_pretty(
3014 " I I I
3015 + 4 9 4
3016 + 5 10 5",
3017 );
3018 let chunk_r1 = StreamChunk::from_pretty(
3019 " I I I
3020 + 2 5 1
3021 + 4 9 2
3022 + 6 9 3",
3023 );
3024 let chunk_r2 = StreamChunk::from_pretty(
3025 " I I I
3026 + 1 4 4
3027 + 3 6 5",
3028 );
3029
3030 let (mut tx_l, mut tx_r, mut hash_join) =
3031 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3032
3033 tx_l.push_barrier(test_epoch(1), false);
3035 tx_r.push_barrier(test_epoch(1), false);
3036 hash_join.next_unwrap_ready_barrier()?;
3037
3038 tx_l.push_chunk(chunk_l1);
3040 let chunk = hash_join.next_unwrap_ready_chunk()?;
3041 assert_eq!(
3042 chunk,
3043 StreamChunk::from_pretty(
3044 " I I I I I I
3045 + 1 4 1 . . .
3046 + 2 5 2 . . .
3047 + 3 6 3 . . ."
3048 )
3049 );
3050
3051 tx_l.push_chunk(chunk_l2);
3053 let chunk = hash_join.next_unwrap_ready_chunk()?;
3054 assert_eq!(
3055 chunk,
3056 StreamChunk::from_pretty(
3057 " I I I I I I
3058 + 4 9 4 . . .
3059 + 5 10 5 . . ."
3060 )
3061 );
3062
3063 tx_r.push_chunk(chunk_r1);
3065 let chunk = hash_join.next_unwrap_ready_chunk()?;
3066 assert_eq!(
3067 chunk,
3068 StreamChunk::from_pretty(
3069 " I I I I I I
3070 U- 2 5 2 . . .
3071 U+ 2 5 2 2 5 1
3072 U- 4 9 4 . . .
3073 U+ 4 9 4 4 9 2"
3074 )
3075 );
3076
3077 tx_r.push_chunk(chunk_r2);
3079 let chunk = hash_join.next_unwrap_ready_chunk()?;
3080 assert_eq!(
3081 chunk,
3082 StreamChunk::from_pretty(
3083 " I I I I I I
3084 U- 1 4 1 . . .
3085 U+ 1 4 1 1 4 4
3086 U- 3 6 3 . . .
3087 U+ 3 6 3 3 6 5"
3088 )
3089 );
3090
3091 Ok(())
3092 }
3093
3094 #[tokio::test]
3095 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3096 let chunk_l1 = StreamChunk::from_pretty(
3097 " I I I
3098 + 1 4 1
3099 + 2 5 2
3100 + 3 6 3",
3101 );
3102 let chunk_l2 = StreamChunk::from_pretty(
3103 " I I I
3104 + 4 9 4
3105 + 5 10 5",
3106 );
3107 let chunk_r1 = StreamChunk::from_pretty(
3108 " I I I
3109 + 2 5 1
3110 + 4 9 2
3111 + 6 9 3",
3112 );
3113 let chunk_r2 = StreamChunk::from_pretty(
3114 " I I I
3115 + 1 4 4
3116 + 3 6 5
3117 + 7 7 6",
3118 );
3119
3120 let (mut tx_l, mut tx_r, mut hash_join) =
3121 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3122
3123 tx_l.push_barrier(test_epoch(1), false);
3125 tx_r.push_barrier(test_epoch(1), false);
3126 hash_join.next_unwrap_ready_barrier()?;
3127
3128 tx_l.push_chunk(chunk_l1);
3130 hash_join.next_unwrap_pending();
3131
3132 tx_l.push_chunk(chunk_l2);
3134 hash_join.next_unwrap_pending();
3135
3136 tx_r.push_chunk(chunk_r1);
3138 let chunk = hash_join.next_unwrap_ready_chunk()?;
3139 assert_eq!(
3140 chunk,
3141 StreamChunk::from_pretty(
3142 " I I I I I I
3143 + 2 5 2 2 5 1
3144 + 4 9 4 4 9 2
3145 + . . . 6 9 3"
3146 )
3147 );
3148
3149 tx_r.push_chunk(chunk_r2);
3151 let chunk = hash_join.next_unwrap_ready_chunk()?;
3152 assert_eq!(
3153 chunk,
3154 StreamChunk::from_pretty(
3155 " I I I I I I
3156 + 1 4 1 1 4 4
3157 + 3 6 3 3 6 5
3158 + . . . 7 7 6"
3159 )
3160 );
3161
3162 Ok(())
3163 }
3164
3165 #[tokio::test]
3166 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3167 let chunk_l1 = StreamChunk::from_pretty(
3168 " I I
3169 + 1 4
3170 + 2 5
3171 + 3 6",
3172 );
3173 let chunk_l2 = StreamChunk::from_pretty(
3174 " I I
3175 + 3 8
3176 - 3 8",
3177 );
3178 let chunk_r1 = StreamChunk::from_pretty(
3179 " I I
3180 + 2 7
3181 + 4 8
3182 + 6 9",
3183 );
3184 let chunk_r2 = StreamChunk::from_pretty(
3185 " I I
3186 + 5 10
3187 - 5 10",
3188 );
3189 let (mut tx_l, mut tx_r, mut hash_join) =
3190 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3191
3192 tx_l.push_barrier(test_epoch(1), false);
3194 tx_r.push_barrier(test_epoch(1), false);
3195 hash_join.next_unwrap_ready_barrier()?;
3196
3197 tx_l.push_chunk(chunk_l1);
3199 let chunk = hash_join.next_unwrap_ready_chunk()?;
3200 assert_eq!(
3201 chunk,
3202 StreamChunk::from_pretty(
3203 " I I I I
3204 + 1 4 . .
3205 + 2 5 . .
3206 + 3 6 . ."
3207 )
3208 );
3209
3210 tx_l.push_chunk(chunk_l2);
3212 let chunk = hash_join.next_unwrap_ready_chunk()?;
3213 assert_eq!(
3214 chunk,
3215 StreamChunk::from_pretty(
3216 " I I I I
3217 + 3 8 . .
3218 - 3 8 . ."
3219 )
3220 );
3221
3222 tx_r.push_chunk(chunk_r1);
3224 let chunk = hash_join.next_unwrap_ready_chunk()?;
3225 assert_eq!(
3226 chunk,
3227 StreamChunk::from_pretty(
3228 " I I I I
3229 U- 2 5 . .
3230 U+ 2 5 2 7
3231 + . . 4 8
3232 + . . 6 9"
3233 )
3234 );
3235
3236 tx_r.push_chunk(chunk_r2);
3238 let chunk = hash_join.next_unwrap_ready_chunk()?;
3239 assert_eq!(
3240 chunk,
3241 StreamChunk::from_pretty(
3242 " I I I I
3243 + . . 5 10
3244 - . . 5 10"
3245 )
3246 );
3247
3248 Ok(())
3249 }
3250
3251 #[tokio::test]
3252 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3253 let (mut tx_l, mut tx_r, mut hash_join) =
3254 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3255
3256 tx_l.push_barrier(test_epoch(1), false);
3258 tx_r.push_barrier(test_epoch(1), false);
3259 hash_join.next_unwrap_ready_barrier()?;
3260
3261 tx_l.push_chunk(StreamChunk::from_pretty(
3262 " I I
3263 + 1 1
3264 ",
3265 ));
3266 let chunk = hash_join.next_unwrap_ready_chunk()?;
3267 assert_eq!(
3268 chunk,
3269 StreamChunk::from_pretty(
3270 " I I I I
3271 + 1 1 . ."
3272 )
3273 );
3274
3275 tx_r.push_chunk(StreamChunk::from_pretty(
3276 " I I
3277 + 1 1
3278 ",
3279 ));
3280 let chunk = hash_join.next_unwrap_ready_chunk()?;
3281
3282 assert_eq!(
3283 chunk,
3284 StreamChunk::from_pretty(
3285 " I I I I
3286 U- 1 1 . .
3287 U+ 1 1 1 1"
3288 )
3289 );
3290
3291 tx_l.push_chunk(StreamChunk::from_pretty(
3292 " I I
3293 - 1 1
3294 + 1 2
3295 ",
3296 ));
3297 let chunk = hash_join.next_unwrap_ready_chunk()?;
3298 let chunk = chunk.compact();
3299 assert_eq!(
3300 chunk,
3301 StreamChunk::from_pretty(
3302 " I I I I
3303 - 1 1 1 1
3304 + 1 2 1 1
3305 "
3306 )
3307 );
3308
3309 Ok(())
3310 }
3311
3312 #[tokio::test]
3313 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3314 {
3315 let chunk_l1 = StreamChunk::from_pretty(
3316 " I I
3317 + 1 4
3318 + 2 5
3319 + 3 6
3320 + 3 7",
3321 );
3322 let chunk_l2 = StreamChunk::from_pretty(
3323 " I I
3324 + 3 8
3325 - 3 8
3326 - 1 4", );
3328 let chunk_r1 = StreamChunk::from_pretty(
3329 " I I
3330 + 2 6
3331 + 4 8
3332 + 3 4",
3333 );
3334 let chunk_r2 = StreamChunk::from_pretty(
3335 " I I
3336 + 5 10
3337 - 5 10
3338 + 1 2",
3339 );
3340 let (mut tx_l, mut tx_r, mut hash_join) =
3341 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3342
3343 tx_l.push_barrier(test_epoch(1), false);
3345 tx_r.push_barrier(test_epoch(1), false);
3346 hash_join.next_unwrap_ready_barrier()?;
3347
3348 tx_l.push_chunk(chunk_l1);
3350 let chunk = hash_join.next_unwrap_ready_chunk()?;
3351 assert_eq!(
3352 chunk,
3353 StreamChunk::from_pretty(
3354 " I I I I
3355 + 1 4 . .
3356 + 2 5 . .
3357 + 3 6 . .
3358 + 3 7 . ."
3359 )
3360 );
3361
3362 tx_l.push_chunk(chunk_l2);
3364 let chunk = hash_join.next_unwrap_ready_chunk()?;
3365 assert_eq!(
3366 chunk,
3367 StreamChunk::from_pretty(
3368 " I I I I
3369 + 3 8 . .
3370 - 3 8 . .
3371 - 1 4 . ."
3372 )
3373 );
3374
3375 tx_r.push_chunk(chunk_r1);
3377 let chunk = hash_join.next_unwrap_ready_chunk()?;
3378 assert_eq!(
3379 chunk,
3380 StreamChunk::from_pretty(
3381 " I I I I
3382 U- 2 5 . .
3383 U+ 2 5 2 6
3384 + . . 4 8
3385 + . . 3 4" )
3389 );
3390
3391 tx_r.push_chunk(chunk_r2);
3393 let chunk = hash_join.next_unwrap_ready_chunk()?;
3394 assert_eq!(
3395 chunk,
3396 StreamChunk::from_pretty(
3397 " I I I I
3398 + . . 5 10
3399 - . . 5 10
3400 + . . 1 2" )
3403 );
3404
3405 Ok(())
3406 }
3407
3408 #[tokio::test]
3409 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3410 let chunk_l1 = StreamChunk::from_pretty(
3411 " I I
3412 + 1 4
3413 + 2 10
3414 + 3 6",
3415 );
3416 let chunk_l2 = StreamChunk::from_pretty(
3417 " I I
3418 + 3 8
3419 - 3 8",
3420 );
3421 let chunk_r1 = StreamChunk::from_pretty(
3422 " I I
3423 + 2 7
3424 + 4 8
3425 + 6 9",
3426 );
3427 let chunk_r2 = StreamChunk::from_pretty(
3428 " I I
3429 + 3 10
3430 + 6 11",
3431 );
3432 let (mut tx_l, mut tx_r, mut hash_join) =
3433 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3434
3435 tx_l.push_barrier(test_epoch(1), false);
3437 tx_r.push_barrier(test_epoch(1), false);
3438 hash_join.next_unwrap_ready_barrier()?;
3439
3440 tx_l.push_chunk(chunk_l1);
3442 hash_join.next_unwrap_pending();
3443
3444 tx_l.push_chunk(chunk_l2);
3446 hash_join.next_unwrap_pending();
3447
3448 tx_r.push_chunk(chunk_r1);
3450 hash_join.next_unwrap_pending();
3451
3452 tx_r.push_chunk(chunk_r2);
3454 let chunk = hash_join.next_unwrap_ready_chunk()?;
3455 assert_eq!(
3456 chunk,
3457 StreamChunk::from_pretty(
3458 " I I I I
3459 + 3 6 3 10"
3460 )
3461 );
3462
3463 Ok(())
3464 }
3465
3466 #[tokio::test]
3467 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3468 let (mut tx_l, mut tx_r, mut hash_join) =
3469 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3470
3471 tx_l.push_barrier(test_epoch(1), false);
3473 tx_r.push_barrier(test_epoch(1), false);
3474 hash_join.next_unwrap_ready_barrier()?;
3475
3476 tx_l.push_int64_watermark(0, 100);
3477
3478 tx_l.push_int64_watermark(0, 200);
3479
3480 tx_l.push_barrier(test_epoch(2), false);
3481 tx_r.push_barrier(test_epoch(2), false);
3482 hash_join.next_unwrap_ready_barrier()?;
3483
3484 tx_r.push_int64_watermark(0, 50);
3485
3486 let w1 = hash_join.next().await.unwrap().unwrap();
3487 let w1 = w1.as_watermark().unwrap();
3488
3489 let w2 = hash_join.next().await.unwrap().unwrap();
3490 let w2 = w2.as_watermark().unwrap();
3491
3492 tx_r.push_int64_watermark(0, 100);
3493
3494 let w3 = hash_join.next().await.unwrap().unwrap();
3495 let w3 = w3.as_watermark().unwrap();
3496
3497 let w4 = hash_join.next().await.unwrap().unwrap();
3498 let w4 = w4.as_watermark().unwrap();
3499
3500 assert_eq!(
3501 w1,
3502 &Watermark {
3503 col_idx: 2,
3504 data_type: DataType::Int64,
3505 val: ScalarImpl::Int64(50)
3506 }
3507 );
3508
3509 assert_eq!(
3510 w2,
3511 &Watermark {
3512 col_idx: 0,
3513 data_type: DataType::Int64,
3514 val: ScalarImpl::Int64(50)
3515 }
3516 );
3517
3518 assert_eq!(
3519 w3,
3520 &Watermark {
3521 col_idx: 2,
3522 data_type: DataType::Int64,
3523 val: ScalarImpl::Int64(100)
3524 }
3525 );
3526
3527 assert_eq!(
3528 w4,
3529 &Watermark {
3530 col_idx: 0,
3531 data_type: DataType::Int64,
3532 val: ScalarImpl::Int64(100)
3533 }
3534 );
3535
3536 Ok(())
3537 }
3538}