1use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use either::Either;
21use itertools::Itertools;
22use multimap::MultiMap;
23use risingwave_common::array::Op;
24use risingwave_common::hash::{HashKey, NullBitmap};
25use risingwave_common::row::RowExt;
26use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
27use risingwave_common::util::epoch::EpochPair;
28use risingwave_common::util::iter_util::ZipEqDebug;
29use risingwave_expr::ExprError;
30use risingwave_expr::expr::NonStrictExpression;
31use tokio::time::Instant;
32
33use self::builder::JoinChunkBuilder;
34use super::barrier_align::*;
35use super::join::hash_join::*;
36use super::join::row::{JoinEncoding, JoinRow};
37use super::join::*;
38use super::watermark::*;
39use crate::executor::CachedJoinRow;
40use crate::executor::join::builder::JoinStreamChunkBuilder;
41use crate::executor::join::hash_join::CacheResult;
42use crate::executor::prelude::*;
43
44const EVICT_EVERY_N_ROWS: u32 = 16;
46
47fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
48 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
49}
50
51pub struct JoinParams {
52 pub join_key_indices: Vec<usize>,
54 pub deduped_pk_indices: Vec<usize>,
56}
57
58impl JoinParams {
59 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
60 Self {
61 join_key_indices,
62 deduped_pk_indices,
63 }
64 }
65}
66
67struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
68 ht: JoinHashMap<K, S, E>,
70 join_key_indices: Vec<usize>,
72 all_data_types: Vec<DataType>,
74 start_pos: usize,
76 i2o_mapping: Vec<(usize, usize)>,
78 i2o_mapping_indexed: MultiMap<usize, usize>,
79 input2inequality_index: Vec<Vec<(usize, bool)>>,
87 non_null_fields: Vec<usize>,
89 state_clean_columns: Vec<(usize, usize)>,
92 need_degree_table: bool,
94 _marker: std::marker::PhantomData<E>,
95}
96
97impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("JoinSide")
100 .field("join_key_indices", &self.join_key_indices)
101 .field("col_types", &self.all_data_types)
102 .field("start_pos", &self.start_pos)
103 .field("i2o_mapping", &self.i2o_mapping)
104 .field("need_degree_table", &self.need_degree_table)
105 .finish()
106 }
107}
108
109impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
110 fn is_dirty(&self) -> bool {
112 unimplemented!()
113 }
114
115 #[expect(dead_code)]
116 fn clear_cache(&mut self) {
117 assert!(
118 !self.is_dirty(),
119 "cannot clear cache while states of hash join are dirty"
120 );
121
122 }
125
126 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
127 self.ht.init(epoch).await
128 }
129}
130
131pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
134{
135 ctx: ActorContextRef,
136 info: ExecutorInfo,
137
138 input_l: Option<Executor>,
140 input_r: Option<Executor>,
142 actual_output_data_types: Vec<DataType>,
144 side_l: JoinSide<K, S, E>,
146 side_r: JoinSide<K, S, E>,
148 cond: Option<NonStrictExpression>,
150 inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
152 inequality_watermarks: Vec<Option<Watermark>>,
156
157 append_only_optimize: bool,
159
160 metrics: Arc<StreamingMetrics>,
161 chunk_size: usize,
163 cnt_rows_received: u32,
165
166 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
168
169 high_join_amplification_threshold: usize,
171
172 entry_state_max_rows: usize,
174}
175
176impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
177 for HashJoinExecutor<K, S, T, E>
178{
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 f.debug_struct("HashJoinExecutor")
181 .field("join_type", &T)
182 .field("input_left", &self.input_l.as_ref().unwrap().identity())
183 .field("input_right", &self.input_r.as_ref().unwrap().identity())
184 .field("side_l", &self.side_l)
185 .field("side_r", &self.side_r)
186 .field("stream_key", &self.info.stream_key)
187 .field("schema", &self.info.schema)
188 .field("actual_output_data_types", &self.actual_output_data_types)
189 .finish()
190 }
191}
192
193impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
194 for HashJoinExecutor<K, S, T, E>
195{
196 fn execute(self: Box<Self>) -> BoxedMessageStream {
197 self.into_stream().boxed()
198 }
199}
200
201struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
202 ctx: &'a ActorContextRef,
203 side_l: &'a mut JoinSide<K, S, E>,
204 side_r: &'a mut JoinSide<K, S, E>,
205 actual_output_data_types: &'a [DataType],
206 cond: &'a mut Option<NonStrictExpression>,
207 inequality_watermarks: &'a [Option<Watermark>],
208 chunk: StreamChunk,
209 append_only_optimize: bool,
210 chunk_size: usize,
211 cnt_rows_received: &'a mut u32,
212 high_join_amplification_threshold: usize,
213 entry_state_max_rows: usize,
214}
215
216impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
217 HashJoinExecutor<K, S, T, E>
218{
219 #[allow(clippy::too_many_arguments)]
220 pub fn new(
221 ctx: ActorContextRef,
222 info: ExecutorInfo,
223 input_l: Executor,
224 input_r: Executor,
225 params_l: JoinParams,
226 params_r: JoinParams,
227 null_safe: Vec<bool>,
228 output_indices: Vec<usize>,
229 cond: Option<NonStrictExpression>,
230 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
231 state_table_l: StateTable<S>,
232 degree_state_table_l: StateTable<S>,
233 state_table_r: StateTable<S>,
234 degree_state_table_r: StateTable<S>,
235 watermark_epoch: AtomicU64Ref,
236 is_append_only: bool,
237 metrics: Arc<StreamingMetrics>,
238 chunk_size: usize,
239 high_join_amplification_threshold: usize,
240 ) -> Self {
241 Self::new_with_cache_size(
242 ctx,
243 info,
244 input_l,
245 input_r,
246 params_l,
247 params_r,
248 null_safe,
249 output_indices,
250 cond,
251 inequality_pairs,
252 state_table_l,
253 degree_state_table_l,
254 state_table_r,
255 degree_state_table_r,
256 watermark_epoch,
257 is_append_only,
258 metrics,
259 chunk_size,
260 high_join_amplification_threshold,
261 None,
262 )
263 }
264
265 #[allow(clippy::too_many_arguments)]
266 pub fn new_with_cache_size(
267 ctx: ActorContextRef,
268 info: ExecutorInfo,
269 input_l: Executor,
270 input_r: Executor,
271 params_l: JoinParams,
272 params_r: JoinParams,
273 null_safe: Vec<bool>,
274 output_indices: Vec<usize>,
275 cond: Option<NonStrictExpression>,
276 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
277 state_table_l: StateTable<S>,
278 degree_state_table_l: StateTable<S>,
279 state_table_r: StateTable<S>,
280 degree_state_table_r: StateTable<S>,
281 watermark_epoch: AtomicU64Ref,
282 is_append_only: bool,
283 metrics: Arc<StreamingMetrics>,
284 chunk_size: usize,
285 high_join_amplification_threshold: usize,
286 entry_state_max_rows: Option<usize>,
287 ) -> Self {
288 let entry_state_max_rows = match entry_state_max_rows {
289 None => ctx.config.developer.hash_join_entry_state_max_rows,
290 Some(entry_state_max_rows) => entry_state_max_rows,
291 };
292 let side_l_column_n = input_l.schema().len();
293
294 let schema_fields = match T {
295 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
296 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
297 _ => [
298 input_l.schema().fields.clone(),
299 input_r.schema().fields.clone(),
300 ]
301 .concat(),
302 };
303
304 let original_output_data_types = schema_fields
305 .iter()
306 .map(|field| field.data_type())
307 .collect_vec();
308 let actual_output_data_types = output_indices
309 .iter()
310 .map(|&idx| original_output_data_types[idx].clone())
311 .collect_vec();
312
313 let state_all_data_types_l = input_l.schema().data_types();
315 let state_all_data_types_r = input_r.schema().data_types();
316
317 let state_pk_indices_l = input_l.stream_key().to_vec();
318 let state_pk_indices_r = input_r.stream_key().to_vec();
319
320 let state_join_key_indices_l = params_l.join_key_indices;
321 let state_join_key_indices_r = params_r.join_key_indices;
322
323 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
324 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
325
326 let degree_pk_indices_l = (state_join_key_indices_l.len()
327 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
328 .collect_vec();
329 let degree_pk_indices_r = (state_join_key_indices_r.len()
330 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
331 .collect_vec();
332
333 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
335 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
336
337 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
339
340 let join_key_data_types_l = state_join_key_indices_l
341 .iter()
342 .map(|idx| state_all_data_types_l[*idx].clone())
343 .collect_vec();
344
345 let join_key_data_types_r = state_join_key_indices_r
346 .iter()
347 .map(|idx| state_all_data_types_r[*idx].clone())
348 .collect_vec();
349
350 assert_eq!(join_key_data_types_l, join_key_data_types_r);
351
352 let null_matched = K::Bitmap::from_bool_vec(null_safe);
353
354 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
355 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
356
357 let degree_state_l = need_degree_table_l.then(|| {
358 TableInner::new(
359 degree_pk_indices_l,
360 degree_join_key_indices_l,
361 degree_state_table_l,
362 )
363 });
364 let degree_state_r = need_degree_table_r.then(|| {
365 TableInner::new(
366 degree_pk_indices_r,
367 degree_join_key_indices_r,
368 degree_state_table_r,
369 )
370 });
371
372 let (left_to_output, right_to_output) = {
373 let (left_len, right_len) = if is_left_semi_or_anti(T) {
374 (state_all_data_types_l.len(), 0usize)
375 } else if is_right_semi_or_anti(T) {
376 (0usize, state_all_data_types_r.len())
377 } else {
378 (state_all_data_types_l.len(), state_all_data_types_r.len())
379 };
380 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
381 };
382
383 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
384 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
385
386 let left_input_len = input_l.schema().len();
387 let right_input_len = input_r.schema().len();
388 let mut l2inequality_index = vec![vec![]; left_input_len];
389 let mut r2inequality_index = vec![vec![]; right_input_len];
390 let mut l_state_clean_columns = vec![];
391 let mut r_state_clean_columns = vec![];
392 let inequality_pairs = inequality_pairs
393 .into_iter()
394 .enumerate()
395 .map(
396 |(
397 index,
398 (key_required_larger, key_required_smaller, clean_state, delta_expression),
399 )| {
400 let output_indices = if key_required_larger < key_required_smaller {
401 if clean_state {
402 l_state_clean_columns.push((key_required_larger, index));
403 }
404 l2inequality_index[key_required_larger].push((index, false));
405 r2inequality_index[key_required_smaller - left_input_len]
406 .push((index, true));
407 l2o_indexed
408 .get_vec(&key_required_larger)
409 .cloned()
410 .unwrap_or_default()
411 } else {
412 if clean_state {
413 r_state_clean_columns
414 .push((key_required_larger - left_input_len, index));
415 }
416 l2inequality_index[key_required_smaller].push((index, true));
417 r2inequality_index[key_required_larger - left_input_len]
418 .push((index, false));
419 r2o_indexed
420 .get_vec(&(key_required_larger - left_input_len))
421 .cloned()
422 .unwrap_or_default()
423 };
424 (output_indices, delta_expression)
425 },
426 )
427 .collect_vec();
428
429 let mut l_non_null_fields = l2inequality_index
430 .iter()
431 .positions(|inequalities| !inequalities.is_empty())
432 .collect_vec();
433 let mut r_non_null_fields = r2inequality_index
434 .iter()
435 .positions(|inequalities| !inequalities.is_empty())
436 .collect_vec();
437
438 if append_only_optimize {
439 l_state_clean_columns.clear();
440 r_state_clean_columns.clear();
441 l_non_null_fields.clear();
442 r_non_null_fields.clear();
443 }
444
445 let inequality_watermarks = vec![None; inequality_pairs.len()];
446 let watermark_buffers = BTreeMap::new();
447
448 Self {
449 ctx: ctx.clone(),
450 info,
451 input_l: Some(input_l),
452 input_r: Some(input_r),
453 actual_output_data_types,
454 side_l: JoinSide {
455 ht: JoinHashMap::new(
456 watermark_epoch.clone(),
457 join_key_data_types_l,
458 state_join_key_indices_l.clone(),
459 state_all_data_types_l.clone(),
460 state_table_l,
461 params_l.deduped_pk_indices,
462 degree_state_l,
463 null_matched.clone(),
464 pk_contained_in_jk_l,
465 None,
466 metrics.clone(),
467 ctx.id,
468 ctx.fragment_id,
469 "left",
470 ),
471 join_key_indices: state_join_key_indices_l,
472 all_data_types: state_all_data_types_l,
473 i2o_mapping: left_to_output,
474 i2o_mapping_indexed: l2o_indexed,
475 input2inequality_index: l2inequality_index,
476 non_null_fields: l_non_null_fields,
477 state_clean_columns: l_state_clean_columns,
478 start_pos: 0,
479 need_degree_table: need_degree_table_l,
480 _marker: PhantomData,
481 },
482 side_r: JoinSide {
483 ht: JoinHashMap::new(
484 watermark_epoch,
485 join_key_data_types_r,
486 state_join_key_indices_r.clone(),
487 state_all_data_types_r.clone(),
488 state_table_r,
489 params_r.deduped_pk_indices,
490 degree_state_r,
491 null_matched,
492 pk_contained_in_jk_r,
493 None,
494 metrics.clone(),
495 ctx.id,
496 ctx.fragment_id,
497 "right",
498 ),
499 join_key_indices: state_join_key_indices_r,
500 all_data_types: state_all_data_types_r,
501 start_pos: side_l_column_n,
502 i2o_mapping: right_to_output,
503 i2o_mapping_indexed: r2o_indexed,
504 input2inequality_index: r2inequality_index,
505 non_null_fields: r_non_null_fields,
506 state_clean_columns: r_state_clean_columns,
507 need_degree_table: need_degree_table_r,
508 _marker: PhantomData,
509 },
510 cond,
511 inequality_pairs,
512 inequality_watermarks,
513 append_only_optimize,
514 metrics,
515 chunk_size,
516 cnt_rows_received: 0,
517 watermark_buffers,
518 high_join_amplification_threshold,
519 entry_state_max_rows,
520 }
521 }
522
523 #[try_stream(ok = Message, error = StreamExecutorError)]
524 async fn into_stream(mut self) {
525 let input_l = self.input_l.take().unwrap();
526 let input_r = self.input_r.take().unwrap();
527 let aligned_stream = barrier_align(
528 input_l.execute(),
529 input_r.execute(),
530 self.ctx.id,
531 self.ctx.fragment_id,
532 self.metrics.clone(),
533 "Join",
534 );
535 pin_mut!(aligned_stream);
536
537 let actor_id = self.ctx.id;
538
539 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
540 let first_epoch = barrier.epoch;
541 yield Message::Barrier(barrier);
543 self.side_l.init(first_epoch).await?;
544 self.side_r.init(first_epoch).await?;
545
546 let actor_id_str = self.ctx.id.to_string();
547 let fragment_id_str = self.ctx.fragment_id.to_string();
548
549 let join_actor_input_waiting_duration_ns = self
551 .metrics
552 .join_actor_input_waiting_duration_ns
553 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
554 let left_join_match_duration_ns = self
555 .metrics
556 .join_match_duration_ns
557 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
558 let right_join_match_duration_ns = self
559 .metrics
560 .join_match_duration_ns
561 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
562
563 let barrier_join_match_duration_ns = self
564 .metrics
565 .join_match_duration_ns
566 .with_guarded_label_values(&[
567 actor_id_str.as_str(),
568 fragment_id_str.as_str(),
569 "barrier",
570 ]);
571
572 let left_join_cached_entry_count = self
573 .metrics
574 .join_cached_entry_count
575 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
576
577 let right_join_cached_entry_count = self
578 .metrics
579 .join_cached_entry_count
580 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
581
582 let mut start_time = Instant::now();
583
584 while let Some(msg) = aligned_stream
585 .next()
586 .instrument_await("hash_join_barrier_align")
587 .await
588 {
589 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
590 match msg? {
591 AlignedMessage::WatermarkLeft(watermark) => {
592 for watermark_to_emit in
593 self.handle_watermark(SideType::Left, watermark).await?
594 {
595 yield Message::Watermark(watermark_to_emit);
596 }
597 }
598 AlignedMessage::WatermarkRight(watermark) => {
599 for watermark_to_emit in
600 self.handle_watermark(SideType::Right, watermark).await?
601 {
602 yield Message::Watermark(watermark_to_emit);
603 }
604 }
605 AlignedMessage::Left(chunk) => {
606 let mut left_time = Duration::from_nanos(0);
607 let mut left_start_time = Instant::now();
608 #[for_await]
609 for chunk in Self::eq_join_left(EqJoinArgs {
610 ctx: &self.ctx,
611 side_l: &mut self.side_l,
612 side_r: &mut self.side_r,
613 actual_output_data_types: &self.actual_output_data_types,
614 cond: &mut self.cond,
615 inequality_watermarks: &self.inequality_watermarks,
616 chunk,
617 append_only_optimize: self.append_only_optimize,
618 chunk_size: self.chunk_size,
619 cnt_rows_received: &mut self.cnt_rows_received,
620 high_join_amplification_threshold: self.high_join_amplification_threshold,
621 entry_state_max_rows: self.entry_state_max_rows,
622 }) {
623 left_time += left_start_time.elapsed();
624 yield Message::Chunk(chunk?);
625 left_start_time = Instant::now();
626 }
627 left_time += left_start_time.elapsed();
628 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
629 self.try_flush_data().await?;
630 }
631 AlignedMessage::Right(chunk) => {
632 let mut right_time = Duration::from_nanos(0);
633 let mut right_start_time = Instant::now();
634 #[for_await]
635 for chunk in Self::eq_join_right(EqJoinArgs {
636 ctx: &self.ctx,
637 side_l: &mut self.side_l,
638 side_r: &mut self.side_r,
639 actual_output_data_types: &self.actual_output_data_types,
640 cond: &mut self.cond,
641 inequality_watermarks: &self.inequality_watermarks,
642 chunk,
643 append_only_optimize: self.append_only_optimize,
644 chunk_size: self.chunk_size,
645 cnt_rows_received: &mut self.cnt_rows_received,
646 high_join_amplification_threshold: self.high_join_amplification_threshold,
647 entry_state_max_rows: self.entry_state_max_rows,
648 }) {
649 right_time += right_start_time.elapsed();
650 yield Message::Chunk(chunk?);
651 right_start_time = Instant::now();
652 }
653 right_time += right_start_time.elapsed();
654 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
655 self.try_flush_data().await?;
656 }
657 AlignedMessage::Barrier(barrier) => {
658 let barrier_start_time = Instant::now();
659 let (left_post_commit, right_post_commit) =
660 self.flush_data(barrier.epoch).await?;
661
662 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
663
664 barrier_join_match_duration_ns
667 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
668 yield Message::Barrier(barrier);
669
670 right_post_commit
672 .post_yield_barrier(update_vnode_bitmap.clone())
673 .await?;
674 if left_post_commit
675 .post_yield_barrier(update_vnode_bitmap)
676 .await?
677 .unwrap_or(false)
678 {
679 self.watermark_buffers
680 .values_mut()
681 .for_each(|buffers| buffers.clear());
682 self.inequality_watermarks.fill(None);
683 }
684
685 for (join_cached_entry_count, ht) in [
687 (&left_join_cached_entry_count, &self.side_l.ht),
688 (&right_join_cached_entry_count, &self.side_r.ht),
689 ] {
690 join_cached_entry_count.set(ht.entry_count() as i64);
691 }
692 }
693 }
694 start_time = Instant::now();
695 }
696 }
697
698 async fn flush_data(
699 &mut self,
700 epoch: EpochPair,
701 ) -> StreamExecutorResult<(
702 JoinHashMapPostCommit<'_, K, S, E>,
703 JoinHashMapPostCommit<'_, K, S, E>,
704 )> {
705 let left = self.side_l.ht.flush(epoch).await?;
708 let right = self.side_r.ht.flush(epoch).await?;
709 Ok((left, right))
710 }
711
712 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
713 self.side_l.ht.try_flush().await?;
716 self.side_r.ht.try_flush().await?;
717 Ok(())
718 }
719
720 fn evict_cache(
722 side_update: &mut JoinSide<K, S, E>,
723 side_match: &mut JoinSide<K, S, E>,
724 cnt_rows_received: &mut u32,
725 ) {
726 *cnt_rows_received += 1;
727 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
728 side_update.ht.evict();
729 side_match.ht.evict();
730 *cnt_rows_received = 0;
731 }
732 }
733
734 async fn handle_watermark(
735 &mut self,
736 side: SideTypePrimitive,
737 watermark: Watermark,
738 ) -> StreamExecutorResult<Vec<Watermark>> {
739 let (side_update, side_match) = if side == SideType::Left {
740 (&mut self.side_l, &mut self.side_r)
741 } else {
742 (&mut self.side_r, &mut self.side_l)
743 };
744
745 if side_update.join_key_indices[0] == watermark.col_idx {
747 side_match.ht.update_watermark(watermark.val.clone());
748 }
749
750 let wm_in_jk = side_update
752 .join_key_indices
753 .iter()
754 .positions(|idx| *idx == watermark.col_idx);
755 let mut watermarks_to_emit = vec![];
756 for idx in wm_in_jk {
757 let buffers = self
758 .watermark_buffers
759 .entry(idx)
760 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
761 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
762 let empty_indices = vec![];
763 let output_indices = side_update
764 .i2o_mapping_indexed
765 .get_vec(&side_update.join_key_indices[idx])
766 .unwrap_or(&empty_indices)
767 .iter()
768 .chain(
769 side_match
770 .i2o_mapping_indexed
771 .get_vec(&side_match.join_key_indices[idx])
772 .unwrap_or(&empty_indices),
773 );
774 for output_idx in output_indices {
775 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
776 }
777 };
778 }
779 for (inequality_index, need_offset) in
780 &side_update.input2inequality_index[watermark.col_idx]
781 {
782 let buffers = self
783 .watermark_buffers
784 .entry(side_update.join_key_indices.len() + inequality_index)
785 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
786 let mut input_watermark = watermark.clone();
787 if *need_offset
788 && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
789 {
790 #[allow(clippy::disallowed_methods)]
792 let eval_result = delta_expression
793 .inner()
794 .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
795 .await;
796 match eval_result {
797 Ok(value) => input_watermark.val = value.unwrap(),
798 Err(err) => {
799 if !matches!(err, ExprError::NumericOutOfRange) {
800 self.ctx.on_compute_error(err, &self.info.identity);
801 }
802 continue;
803 }
804 }
805 };
806 if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
807 for output_idx in &self.inequality_pairs[*inequality_index].0 {
808 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
809 }
810 self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
811 }
812 }
813 Ok(watermarks_to_emit)
814 }
815
816 fn row_concat(
817 row_update: impl Row,
818 update_start_pos: usize,
819 row_matched: impl Row,
820 matched_start_pos: usize,
821 ) -> OwnedRow {
822 let mut new_row = vec![None; row_update.len() + row_matched.len()];
823
824 for (i, datum_ref) in row_update.iter().enumerate() {
825 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
826 }
827 for (i, datum_ref) in row_matched.iter().enumerate() {
828 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
829 }
830 OwnedRow::new(new_row)
831 }
832
833 fn eq_join_left(
835 args: EqJoinArgs<'_, K, S, E>,
836 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
837 Self::eq_join_oneside::<{ SideType::Left }>(args)
838 }
839
840 fn eq_join_right(
842 args: EqJoinArgs<'_, K, S, E>,
843 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
844 Self::eq_join_oneside::<{ SideType::Right }>(args)
845 }
846
847 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
848 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
849 let EqJoinArgs {
850 ctx,
851 side_l,
852 side_r,
853 actual_output_data_types,
854 cond,
855 inequality_watermarks,
856 chunk,
857 append_only_optimize,
858 chunk_size,
859 cnt_rows_received,
860 high_join_amplification_threshold,
861 entry_state_max_rows,
862 ..
863 } = args;
864
865 let (side_update, side_match) = if SIDE == SideType::Left {
866 (side_l, side_r)
867 } else {
868 (side_r, side_l)
869 };
870
871 let useful_state_clean_columns = side_match
872 .state_clean_columns
873 .iter()
874 .filter_map(|(column_idx, inequality_index)| {
875 inequality_watermarks[*inequality_index]
876 .as_ref()
877 .map(|watermark| (*column_idx, watermark))
878 })
879 .collect_vec();
880
881 let mut hashjoin_chunk_builder =
882 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
883 chunk_size,
884 actual_output_data_types.to_vec(),
885 side_update.i2o_mapping.clone(),
886 side_match.i2o_mapping.clone(),
887 ));
888
889 let join_matched_join_keys = ctx
890 .streaming_metrics
891 .join_matched_join_keys
892 .with_guarded_label_values(&[
893 &ctx.id.to_string(),
894 &ctx.fragment_id.to_string(),
895 &side_update.ht.table_id().to_string(),
896 ]);
897
898 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
899 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
900 let Some((op, row)) = r else {
901 continue;
902 };
903 Self::evict_cache(side_update, side_match, cnt_rows_received);
904
905 let cache_lookup_result = {
906 let probe_non_null_requirement_satisfied = side_update
907 .non_null_fields
908 .iter()
909 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
910 let build_non_null_requirement_satisfied =
911 key.null_bitmap().is_subset(side_match.ht.null_matched());
912 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
913 side_match.ht.take_state_opt(key)
914 } else {
915 CacheResult::NeverMatch
916 }
917 };
918 let mut total_matches = 0;
919
920 macro_rules! match_rows {
921 ($op:ident) => {
922 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
923 cache_lookup_result,
924 row,
925 key,
926 &mut hashjoin_chunk_builder,
927 side_match,
928 side_update,
929 &useful_state_clean_columns,
930 cond,
931 append_only_optimize,
932 entry_state_max_rows,
933 )
934 };
935 }
936
937 match op {
938 Op::Insert | Op::UpdateInsert =>
939 {
940 #[for_await]
941 for chunk in match_rows!(Insert) {
942 let chunk = chunk?;
943 total_matches += chunk.cardinality();
944 yield chunk;
945 }
946 }
947 Op::Delete | Op::UpdateDelete =>
948 {
949 #[for_await]
950 for chunk in match_rows!(Delete) {
951 let chunk = chunk?;
952 total_matches += chunk.cardinality();
953 yield chunk;
954 }
955 }
956 };
957
958 join_matched_join_keys.observe(total_matches as _);
959 if total_matches > high_join_amplification_threshold {
960 let join_key_data_types = side_update.ht.join_key_data_types();
961 let key = key.deserialize(join_key_data_types)?;
962 tracing::warn!(target: "high_join_amplification",
963 matched_rows_len = total_matches,
964 update_table_id = %side_update.ht.table_id(),
965 match_table_id = %side_match.ht.table_id(),
966 join_key = ?key,
967 actor_id = %ctx.id,
968 fragment_id = %ctx.fragment_id,
969 "large rows matched for join key"
970 );
971 }
972 }
973 if let Some(chunk) = hashjoin_chunk_builder.take() {
975 yield chunk;
976 }
977 }
978
979 #[allow(clippy::too_many_arguments)]
988 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
989 async fn handle_match_rows<
990 'a,
991 const SIDE: SideTypePrimitive,
992 const JOIN_OP: JoinOpPrimitive,
993 >(
994 cached_lookup_result: CacheResult<E>,
995 row: RowRef<'a>,
996 key: &'a K,
997 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
998 side_match: &'a mut JoinSide<K, S, E>,
999 side_update: &'a mut JoinSide<K, S, E>,
1000 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1001 cond: &'a mut Option<NonStrictExpression>,
1002 append_only_optimize: bool,
1003 entry_state_max_rows: usize,
1004 ) {
1005 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1006 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1007 let mut entry_state_count = 0;
1008
1009 let mut degree = 0;
1010 let mut append_only_matched_row = None;
1011 let mut matched_rows_to_clean = vec![];
1012
1013 macro_rules! match_row {
1014 (
1015 $match_order_key_indices:expr,
1016 $degree_table:expr,
1017 $matched_row:expr,
1018 $matched_row_ref:expr,
1019 $from_cache:literal,
1020 $map_output:expr,
1021 ) => {
1022 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1023 row,
1024 $matched_row,
1025 $matched_row_ref,
1026 hashjoin_chunk_builder,
1027 $match_order_key_indices,
1028 $degree_table,
1029 side_update.start_pos,
1030 side_match.start_pos,
1031 cond,
1032 &mut degree,
1033 useful_state_clean_columns,
1034 append_only_optimize,
1035 &mut append_only_matched_row,
1036 &mut matched_rows_to_clean,
1037 $map_output,
1038 )
1039 };
1040 }
1041
1042 let entry_state = match cached_lookup_result {
1043 CacheResult::NeverMatch => {
1044 let op = match JOIN_OP {
1045 JoinOp::Insert => Op::Insert,
1046 JoinOp::Delete => Op::Delete,
1047 };
1048 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1049 yield chunk;
1050 }
1051 return Ok(());
1052 }
1053 CacheResult::Hit(mut cached_rows) => {
1054 let (match_order_key_indices, match_degree_state) =
1055 side_match.ht.get_degree_state_mut_ref();
1056 for (matched_row_ref, matched_row) in
1058 cached_rows.values_mut(&side_match.all_data_types)
1059 {
1060 let matched_row = matched_row?;
1061 if let Some(chunk) = match_row!(
1062 match_order_key_indices,
1063 match_degree_state,
1064 matched_row,
1065 Some(matched_row_ref),
1066 true,
1067 Either::Left,
1068 )
1069 .await
1070 {
1071 yield chunk;
1072 }
1073 }
1074
1075 cached_rows
1076 }
1077 CacheResult::Miss => {
1078 let (matched_rows, match_order_key_indices, degree_table) = side_match
1080 .ht
1081 .fetch_matched_rows_and_get_degree_table_ref(key)
1082 .await?;
1083
1084 #[for_await]
1085 for matched_row in matched_rows {
1086 let (encoded_pk, matched_row) = matched_row?;
1087
1088 let mut matched_row_ref = None;
1089
1090 if entry_state_count <= entry_state_max_rows {
1092 let row_ref = entry_state
1093 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1095 matched_row_ref = Some(row_ref);
1096 entry_state_count += 1;
1097 }
1098 if let Some(chunk) = match_row!(
1099 match_order_key_indices,
1100 degree_table,
1101 matched_row,
1102 matched_row_ref,
1103 false,
1104 Either::Right,
1105 )
1106 .await
1107 {
1108 yield chunk;
1109 }
1110 }
1111 Box::new(entry_state)
1112 }
1113 };
1114
1115 let op = match JOIN_OP {
1117 JoinOp::Insert => Op::Insert,
1118 JoinOp::Delete => Op::Delete,
1119 };
1120 if degree == 0 {
1121 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1122 yield chunk;
1123 }
1124 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1125 {
1126 yield chunk;
1127 }
1128
1129 if cache_hit || entry_state_count <= entry_state_max_rows {
1131 side_match.ht.update_state(key, entry_state);
1132 }
1133
1134 for matched_row in matched_rows_to_clean {
1136 side_match.ht.delete_handle_degree(key, matched_row)?;
1137 }
1138
1139 if append_only_optimize && let Some(row) = append_only_matched_row {
1141 assert_matches!(JOIN_OP, JoinOp::Insert);
1142 side_match.ht.delete_handle_degree(key, row)?;
1143 return Ok(());
1144 }
1145
1146 match JOIN_OP {
1148 JoinOp::Insert => {
1149 side_update
1150 .ht
1151 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1152 }
1153 JoinOp::Delete => {
1154 side_update
1155 .ht
1156 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1157 }
1158 }
1159 }
1160
1161 #[allow(clippy::too_many_arguments)]
1162 #[inline]
1163 async fn handle_match_row<
1164 'a,
1165 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1168 const JOIN_OP: JoinOpPrimitive,
1169 const MATCHED_ROWS_FROM_CACHE: bool,
1170 >(
1171 update_row: RowRef<'a>,
1172 mut matched_row: JoinRow<R>,
1173 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1174 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1175 match_order_key_indices: &[usize],
1176 match_degree_table: &mut Option<TableInner<S>>,
1177 side_update_start_pos: usize,
1178 side_match_start_pos: usize,
1179 cond: &Option<NonStrictExpression>,
1180 update_row_degree: &mut u64,
1181 useful_state_clean_columns: &[(usize, &'a Watermark)],
1182 append_only_optimize: bool,
1183 append_only_matched_row: &mut Option<JoinRow<RO>>,
1184 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1185 map_output: impl Fn(R) -> RO,
1186 ) -> Option<StreamChunk> {
1187 let mut need_state_clean = false;
1188 let mut chunk_opt = None;
1189
1190 let join_condition_satisfied = Self::check_join_condition(
1194 update_row,
1195 side_update_start_pos,
1196 &matched_row.row,
1197 side_match_start_pos,
1198 cond,
1199 )
1200 .await;
1201
1202 if join_condition_satisfied {
1203 *update_row_degree += 1;
1205 if matches!(JOIN_OP, JoinOp::Insert)
1210 && !forward_exactly_once(T, SIDE)
1211 && let Some(chunk) =
1212 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1213 {
1214 chunk_opt = Some(chunk);
1215 }
1216 if let Some(degree_table) = match_degree_table {
1218 update_degree::<S, { JOIN_OP }>(
1219 match_order_key_indices,
1220 degree_table,
1221 &mut matched_row,
1222 );
1223 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1224 match JOIN_OP {
1226 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1227 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1228 }
1229 }
1230 }
1231
1232 if matches!(JOIN_OP, JoinOp::Delete)
1235 && !forward_exactly_once(T, SIDE)
1236 && let Some(chunk) =
1237 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1238 {
1239 chunk_opt = Some(chunk);
1240 }
1241 } else {
1242 for (column_idx, watermark) in useful_state_clean_columns {
1244 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1245 scalar
1246 .default_cmp(&watermark.val.as_scalar_ref_impl())
1247 .is_lt()
1248 }) {
1249 need_state_clean = true;
1250 break;
1251 }
1252 }
1253 }
1254 if append_only_optimize {
1258 assert_matches!(JOIN_OP, JoinOp::Insert);
1259 assert!(append_only_matched_row.is_none());
1262 *append_only_matched_row = Some(matched_row.map(map_output));
1263 } else if need_state_clean {
1264 matched_rows_to_clean.push(matched_row.map(map_output));
1268 }
1269
1270 chunk_opt
1271 }
1272
1273 #[inline]
1278 async fn check_join_condition(
1279 row: impl Row,
1280 side_update_start_pos: usize,
1281 matched_row: impl Row,
1282 side_match_start_pos: usize,
1283 join_condition: &Option<NonStrictExpression>,
1284 ) -> bool {
1285 if let Some(join_condition) = join_condition {
1286 let new_row = Self::row_concat(
1287 row,
1288 side_update_start_pos,
1289 matched_row,
1290 side_match_start_pos,
1291 );
1292 join_condition
1293 .eval_row_infallible(&new_row)
1294 .await
1295 .map(|s| *s.as_bool())
1296 .unwrap_or(false)
1297 } else {
1298 true
1299 }
1300 }
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305 use std::sync::atomic::AtomicU64;
1306
1307 use pretty_assertions::assert_eq;
1308 use risingwave_common::array::*;
1309 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1310 use risingwave_common::hash::{Key64, Key128};
1311 use risingwave_common::util::epoch::test_epoch;
1312 use risingwave_common::util::sort_util::OrderType;
1313 use risingwave_storage::memory::MemoryStateStore;
1314
1315 use super::*;
1316 use crate::common::table::test_utils::gen_pbtable;
1317 use crate::executor::MemoryEncoding;
1318 use crate::executor::test_utils::expr::build_from_pretty;
1319 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1320
1321 async fn create_in_memory_state_table(
1322 mem_state: MemoryStateStore,
1323 data_types: &[DataType],
1324 order_types: &[OrderType],
1325 pk_indices: &[usize],
1326 table_id: u32,
1327 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1328 let column_descs = data_types
1329 .iter()
1330 .enumerate()
1331 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1332 .collect_vec();
1333 let state_table = StateTable::from_table_catalog(
1334 &gen_pbtable(
1335 TableId::new(table_id),
1336 column_descs,
1337 order_types.to_vec(),
1338 pk_indices.to_vec(),
1339 0,
1340 ),
1341 mem_state.clone(),
1342 None,
1343 )
1344 .await;
1345
1346 let mut degree_table_column_descs = vec![];
1348 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1349 degree_table_column_descs.push(ColumnDesc::unnamed(
1350 ColumnId::new(pk_id as i32),
1351 data_types[*idx].clone(),
1352 ))
1353 });
1354 degree_table_column_descs.push(ColumnDesc::unnamed(
1355 ColumnId::new(pk_indices.len() as i32),
1356 DataType::Int64,
1357 ));
1358 let degree_state_table = StateTable::from_table_catalog(
1359 &gen_pbtable(
1360 TableId::new(table_id + 1),
1361 degree_table_column_descs,
1362 order_types.to_vec(),
1363 pk_indices.to_vec(),
1364 0,
1365 ),
1366 mem_state,
1367 None,
1368 )
1369 .await;
1370 (state_table, degree_state_table)
1371 }
1372
1373 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1374 build_from_pretty(
1375 condition_text
1376 .as_deref()
1377 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1378 )
1379 }
1380
1381 async fn create_executor<const T: JoinTypePrimitive>(
1382 with_condition: bool,
1383 null_safe: bool,
1384 condition_text: Option<String>,
1385 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1386 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1387 let schema = Schema {
1388 fields: vec![
1389 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1391 ],
1392 };
1393 let (tx_l, source_l) = MockSource::channel();
1394 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1395 let (tx_r, source_r) = MockSource::channel();
1396 let source_r = source_r.into_executor(schema, vec![1]);
1397 let params_l = JoinParams::new(vec![0], vec![1]);
1398 let params_r = JoinParams::new(vec![0], vec![1]);
1399 let cond = with_condition.then(|| create_cond(condition_text));
1400
1401 let mem_state = MemoryStateStore::new();
1402
1403 let (state_l, degree_state_l) = create_in_memory_state_table(
1404 mem_state.clone(),
1405 &[DataType::Int64, DataType::Int64],
1406 &[OrderType::ascending(), OrderType::ascending()],
1407 &[0, 1],
1408 0,
1409 )
1410 .await;
1411
1412 let (state_r, degree_state_r) = create_in_memory_state_table(
1413 mem_state,
1414 &[DataType::Int64, DataType::Int64],
1415 &[OrderType::ascending(), OrderType::ascending()],
1416 &[0, 1],
1417 2,
1418 )
1419 .await;
1420
1421 let schema = match T {
1422 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1423 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1424 _ => [source_l.schema().fields(), source_r.schema().fields()]
1425 .concat()
1426 .into_iter()
1427 .collect(),
1428 };
1429 let schema_len = schema.len();
1430 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1431
1432 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1433 ActorContext::for_test(123),
1434 info,
1435 source_l,
1436 source_r,
1437 params_l,
1438 params_r,
1439 vec![null_safe],
1440 (0..schema_len).collect_vec(),
1441 cond,
1442 inequality_pairs,
1443 state_l,
1444 degree_state_l,
1445 state_r,
1446 degree_state_r,
1447 Arc::new(AtomicU64::new(0)),
1448 false,
1449 Arc::new(StreamingMetrics::unused()),
1450 1024,
1451 2048,
1452 );
1453 (tx_l, tx_r, executor.boxed().execute())
1454 }
1455
1456 async fn create_classical_executor<const T: JoinTypePrimitive>(
1457 with_condition: bool,
1458 null_safe: bool,
1459 condition_text: Option<String>,
1460 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1461 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1462 }
1463
1464 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1465 with_condition: bool,
1466 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1467 let schema = Schema {
1468 fields: vec![
1469 Field::unnamed(DataType::Int64),
1470 Field::unnamed(DataType::Int64),
1471 Field::unnamed(DataType::Int64),
1472 ],
1473 };
1474 let (tx_l, source_l) = MockSource::channel();
1475 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1476 let (tx_r, source_r) = MockSource::channel();
1477 let source_r = source_r.into_executor(schema, vec![0]);
1478 let params_l = JoinParams::new(vec![0, 1], vec![]);
1479 let params_r = JoinParams::new(vec![0, 1], vec![]);
1480 let cond = with_condition.then(|| create_cond(None));
1481
1482 let mem_state = MemoryStateStore::new();
1483
1484 let (state_l, degree_state_l) = create_in_memory_state_table(
1485 mem_state.clone(),
1486 &[DataType::Int64, DataType::Int64, DataType::Int64],
1487 &[
1488 OrderType::ascending(),
1489 OrderType::ascending(),
1490 OrderType::ascending(),
1491 ],
1492 &[0, 1, 0],
1493 0,
1494 )
1495 .await;
1496
1497 let (state_r, degree_state_r) = create_in_memory_state_table(
1498 mem_state,
1499 &[DataType::Int64, DataType::Int64, DataType::Int64],
1500 &[
1501 OrderType::ascending(),
1502 OrderType::ascending(),
1503 OrderType::ascending(),
1504 ],
1505 &[0, 1, 1],
1506 1,
1507 )
1508 .await;
1509
1510 let schema = match T {
1511 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1512 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1513 _ => [source_l.schema().fields(), source_r.schema().fields()]
1514 .concat()
1515 .into_iter()
1516 .collect(),
1517 };
1518 let schema_len = schema.len();
1519 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1520
1521 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1522 ActorContext::for_test(123),
1523 info,
1524 source_l,
1525 source_r,
1526 params_l,
1527 params_r,
1528 vec![false],
1529 (0..schema_len).collect_vec(),
1530 cond,
1531 vec![],
1532 state_l,
1533 degree_state_l,
1534 state_r,
1535 degree_state_r,
1536 Arc::new(AtomicU64::new(0)),
1537 true,
1538 Arc::new(StreamingMetrics::unused()),
1539 1024,
1540 2048,
1541 );
1542 (tx_l, tx_r, executor.boxed().execute())
1543 }
1544
1545 #[tokio::test]
1546 async fn test_interval_join() -> StreamExecutorResult<()> {
1547 let chunk_l1 = StreamChunk::from_pretty(
1548 " I I
1549 + 1 4
1550 + 2 3
1551 + 2 5
1552 + 3 6",
1553 );
1554 let chunk_l2 = StreamChunk::from_pretty(
1555 " I I
1556 + 3 8
1557 - 3 8",
1558 );
1559 let chunk_r1 = StreamChunk::from_pretty(
1560 " I I
1561 + 2 6
1562 + 4 8
1563 + 6 9",
1564 );
1565 let chunk_r2 = StreamChunk::from_pretty(
1566 " I I
1567 + 2 3
1568 + 6 11",
1569 );
1570 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1571 true,
1572 false,
1573 Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1574 vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1575 )
1576 .await;
1577
1578 tx_l.push_barrier(test_epoch(1), false);
1580 tx_r.push_barrier(test_epoch(1), false);
1581 hash_join.next_unwrap_ready_barrier()?;
1582
1583 tx_l.push_chunk(chunk_l1);
1585 hash_join.next_unwrap_pending();
1586
1587 tx_l.push_barrier(test_epoch(2), false);
1589 tx_r.push_barrier(test_epoch(2), false);
1590 hash_join.next_unwrap_ready_barrier()?;
1591
1592 tx_l.push_chunk(chunk_l2);
1594 hash_join.next_unwrap_pending();
1595
1596 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1597 hash_join.next_unwrap_pending();
1598
1599 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1600 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1601 assert_eq!(
1602 output_watermark,
1603 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1604 );
1605 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1606 assert_eq!(
1607 output_watermark,
1608 Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1609 );
1610
1611 tx_r.push_chunk(chunk_r1);
1613 let chunk = hash_join.next_unwrap_ready_chunk()?;
1614 assert_eq!(
1616 chunk,
1617 StreamChunk::from_pretty(
1618 " I I I I
1619 + 2 5 2 6"
1620 )
1621 );
1622
1623 tx_r.push_chunk(chunk_r2);
1625 hash_join.next_unwrap_pending();
1628
1629 Ok(())
1630 }
1631
1632 #[tokio::test]
1633 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1634 let chunk_l1 = StreamChunk::from_pretty(
1635 " I I
1636 + 1 4
1637 + 2 5
1638 + 3 6",
1639 );
1640 let chunk_l2 = StreamChunk::from_pretty(
1641 " I I
1642 + 3 8
1643 - 3 8",
1644 );
1645 let chunk_r1 = StreamChunk::from_pretty(
1646 " I I
1647 + 2 7
1648 + 4 8
1649 + 6 9",
1650 );
1651 let chunk_r2 = StreamChunk::from_pretty(
1652 " I I
1653 + 3 10
1654 + 6 11",
1655 );
1656 let (mut tx_l, mut tx_r, mut hash_join) =
1657 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1658
1659 tx_l.push_barrier(test_epoch(1), false);
1661 tx_r.push_barrier(test_epoch(1), false);
1662 hash_join.next_unwrap_ready_barrier()?;
1663
1664 tx_l.push_chunk(chunk_l1);
1666 hash_join.next_unwrap_pending();
1667
1668 tx_l.push_barrier(test_epoch(2), false);
1670 tx_r.push_barrier(test_epoch(2), false);
1671 hash_join.next_unwrap_ready_barrier()?;
1672
1673 tx_l.push_chunk(chunk_l2);
1675 hash_join.next_unwrap_pending();
1676
1677 tx_r.push_chunk(chunk_r1);
1679 let chunk = hash_join.next_unwrap_ready_chunk()?;
1680 assert_eq!(
1681 chunk,
1682 StreamChunk::from_pretty(
1683 " I I I I
1684 + 2 5 2 7"
1685 )
1686 );
1687
1688 tx_r.push_chunk(chunk_r2);
1690 let chunk = hash_join.next_unwrap_ready_chunk()?;
1691 assert_eq!(
1692 chunk,
1693 StreamChunk::from_pretty(
1694 " I I I I
1695 + 3 6 3 10"
1696 )
1697 );
1698
1699 Ok(())
1700 }
1701
1702 #[tokio::test]
1703 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1704 let chunk_l1 = StreamChunk::from_pretty(
1705 " I I
1706 + 1 4
1707 + 2 5
1708 + . 6",
1709 );
1710 let chunk_l2 = StreamChunk::from_pretty(
1711 " I I
1712 + . 8
1713 - . 8",
1714 );
1715 let chunk_r1 = StreamChunk::from_pretty(
1716 " I I
1717 + 2 7
1718 + 4 8
1719 + 6 9",
1720 );
1721 let chunk_r2 = StreamChunk::from_pretty(
1722 " I I
1723 + . 10
1724 + 6 11",
1725 );
1726 let (mut tx_l, mut tx_r, mut hash_join) =
1727 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1728
1729 tx_l.push_barrier(test_epoch(1), false);
1731 tx_r.push_barrier(test_epoch(1), false);
1732 hash_join.next_unwrap_ready_barrier()?;
1733
1734 tx_l.push_chunk(chunk_l1);
1736 hash_join.next_unwrap_pending();
1737
1738 tx_l.push_barrier(test_epoch(2), false);
1740 tx_r.push_barrier(test_epoch(2), false);
1741 hash_join.next_unwrap_ready_barrier()?;
1742
1743 tx_l.push_chunk(chunk_l2);
1745 hash_join.next_unwrap_pending();
1746
1747 tx_r.push_chunk(chunk_r1);
1749 let chunk = hash_join.next_unwrap_ready_chunk()?;
1750 assert_eq!(
1751 chunk,
1752 StreamChunk::from_pretty(
1753 " I I I I
1754 + 2 5 2 7"
1755 )
1756 );
1757
1758 tx_r.push_chunk(chunk_r2);
1760 let chunk = hash_join.next_unwrap_ready_chunk()?;
1761 assert_eq!(
1762 chunk,
1763 StreamChunk::from_pretty(
1764 " I I I I
1765 + . 6 . 10"
1766 )
1767 );
1768
1769 Ok(())
1770 }
1771
1772 #[tokio::test]
1773 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1774 let chunk_l1 = StreamChunk::from_pretty(
1775 " I I
1776 + 1 4
1777 + 2 5
1778 + 3 6",
1779 );
1780 let chunk_l2 = StreamChunk::from_pretty(
1781 " I I
1782 + 3 8
1783 - 3 8",
1784 );
1785 let chunk_r1 = StreamChunk::from_pretty(
1786 " I I
1787 + 2 7
1788 + 4 8
1789 + 6 9",
1790 );
1791 let chunk_r2 = StreamChunk::from_pretty(
1792 " I I
1793 + 3 10
1794 + 6 11",
1795 );
1796 let chunk_l3 = StreamChunk::from_pretty(
1797 " I I
1798 + 6 10",
1799 );
1800 let chunk_r3 = StreamChunk::from_pretty(
1801 " I I
1802 - 6 11",
1803 );
1804 let chunk_r4 = StreamChunk::from_pretty(
1805 " I I
1806 - 6 9",
1807 );
1808 let (mut tx_l, mut tx_r, mut hash_join) =
1809 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1810
1811 tx_l.push_barrier(test_epoch(1), false);
1813 tx_r.push_barrier(test_epoch(1), false);
1814 hash_join.next_unwrap_ready_barrier()?;
1815
1816 tx_l.push_chunk(chunk_l1);
1818 hash_join.next_unwrap_pending();
1819
1820 tx_l.push_barrier(test_epoch(2), false);
1822 tx_r.push_barrier(test_epoch(2), false);
1823 hash_join.next_unwrap_ready_barrier()?;
1824
1825 tx_l.push_chunk(chunk_l2);
1827 hash_join.next_unwrap_pending();
1828
1829 tx_r.push_chunk(chunk_r1);
1831 let chunk = hash_join.next_unwrap_ready_chunk()?;
1832 assert_eq!(
1833 chunk,
1834 StreamChunk::from_pretty(
1835 " I I
1836 + 2 5"
1837 )
1838 );
1839
1840 tx_r.push_chunk(chunk_r2);
1842 let chunk = hash_join.next_unwrap_ready_chunk()?;
1843 assert_eq!(
1844 chunk,
1845 StreamChunk::from_pretty(
1846 " I I
1847 + 3 6"
1848 )
1849 );
1850
1851 tx_l.push_chunk(chunk_l3);
1853 let chunk = hash_join.next_unwrap_ready_chunk()?;
1854 assert_eq!(
1855 chunk,
1856 StreamChunk::from_pretty(
1857 " I I
1858 + 6 10"
1859 )
1860 );
1861
1862 tx_r.push_chunk(chunk_r3);
1865 hash_join.next_unwrap_pending();
1866
1867 tx_r.push_chunk(chunk_r4);
1870 let chunk = hash_join.next_unwrap_ready_chunk()?;
1871 assert_eq!(
1872 chunk,
1873 StreamChunk::from_pretty(
1874 " I I
1875 - 6 10"
1876 )
1877 );
1878
1879 Ok(())
1880 }
1881
1882 #[tokio::test]
1883 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1884 let chunk_l1 = StreamChunk::from_pretty(
1885 " I I
1886 + 1 4
1887 + 2 5
1888 + . 6",
1889 );
1890 let chunk_l2 = StreamChunk::from_pretty(
1891 " I I
1892 + . 8
1893 - . 8",
1894 );
1895 let chunk_r1 = StreamChunk::from_pretty(
1896 " I I
1897 + 2 7
1898 + 4 8
1899 + 6 9",
1900 );
1901 let chunk_r2 = StreamChunk::from_pretty(
1902 " I I
1903 + . 10
1904 + 6 11",
1905 );
1906 let chunk_l3 = StreamChunk::from_pretty(
1907 " I I
1908 + 6 10",
1909 );
1910 let chunk_r3 = StreamChunk::from_pretty(
1911 " I I
1912 - 6 11",
1913 );
1914 let chunk_r4 = StreamChunk::from_pretty(
1915 " I I
1916 - 6 9",
1917 );
1918 let (mut tx_l, mut tx_r, mut hash_join) =
1919 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1920
1921 tx_l.push_barrier(test_epoch(1), false);
1923 tx_r.push_barrier(test_epoch(1), false);
1924 hash_join.next_unwrap_ready_barrier()?;
1925
1926 tx_l.push_chunk(chunk_l1);
1928 hash_join.next_unwrap_pending();
1929
1930 tx_l.push_barrier(test_epoch(2), false);
1932 tx_r.push_barrier(test_epoch(2), false);
1933 hash_join.next_unwrap_ready_barrier()?;
1934
1935 tx_l.push_chunk(chunk_l2);
1937 hash_join.next_unwrap_pending();
1938
1939 tx_r.push_chunk(chunk_r1);
1941 let chunk = hash_join.next_unwrap_ready_chunk()?;
1942 assert_eq!(
1943 chunk,
1944 StreamChunk::from_pretty(
1945 " I I
1946 + 2 5"
1947 )
1948 );
1949
1950 tx_r.push_chunk(chunk_r2);
1952 let chunk = hash_join.next_unwrap_ready_chunk()?;
1953 assert_eq!(
1954 chunk,
1955 StreamChunk::from_pretty(
1956 " I I
1957 + . 6"
1958 )
1959 );
1960
1961 tx_l.push_chunk(chunk_l3);
1963 let chunk = hash_join.next_unwrap_ready_chunk()?;
1964 assert_eq!(
1965 chunk,
1966 StreamChunk::from_pretty(
1967 " I I
1968 + 6 10"
1969 )
1970 );
1971
1972 tx_r.push_chunk(chunk_r3);
1975 hash_join.next_unwrap_pending();
1976
1977 tx_r.push_chunk(chunk_r4);
1980 let chunk = hash_join.next_unwrap_ready_chunk()?;
1981 assert_eq!(
1982 chunk,
1983 StreamChunk::from_pretty(
1984 " I I
1985 - 6 10"
1986 )
1987 );
1988
1989 Ok(())
1990 }
1991
1992 #[tokio::test]
1993 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1994 let chunk_l1 = StreamChunk::from_pretty(
1995 " I I I
1996 + 1 4 1
1997 + 2 5 2
1998 + 3 6 3",
1999 );
2000 let chunk_l2 = StreamChunk::from_pretty(
2001 " I I I
2002 + 4 9 4
2003 + 5 10 5",
2004 );
2005 let chunk_r1 = StreamChunk::from_pretty(
2006 " I I I
2007 + 2 5 1
2008 + 4 9 2
2009 + 6 9 3",
2010 );
2011 let chunk_r2 = StreamChunk::from_pretty(
2012 " I I I
2013 + 1 4 4
2014 + 3 6 5",
2015 );
2016
2017 let (mut tx_l, mut tx_r, mut hash_join) =
2018 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2019
2020 tx_l.push_barrier(test_epoch(1), false);
2022 tx_r.push_barrier(test_epoch(1), false);
2023 hash_join.next_unwrap_ready_barrier()?;
2024
2025 tx_l.push_chunk(chunk_l1);
2027 hash_join.next_unwrap_pending();
2028
2029 tx_l.push_barrier(test_epoch(2), false);
2031 tx_r.push_barrier(test_epoch(2), false);
2032 hash_join.next_unwrap_ready_barrier()?;
2033
2034 tx_l.push_chunk(chunk_l2);
2036 hash_join.next_unwrap_pending();
2037
2038 tx_r.push_chunk(chunk_r1);
2040 let chunk = hash_join.next_unwrap_ready_chunk()?;
2041 assert_eq!(
2042 chunk,
2043 StreamChunk::from_pretty(
2044 " I I I I I I
2045 + 2 5 2 2 5 1
2046 + 4 9 4 4 9 2"
2047 )
2048 );
2049
2050 tx_r.push_chunk(chunk_r2);
2052 let chunk = hash_join.next_unwrap_ready_chunk()?;
2053 assert_eq!(
2054 chunk,
2055 StreamChunk::from_pretty(
2056 " I I I I I I
2057 + 1 4 1 1 4 4
2058 + 3 6 3 3 6 5"
2059 )
2060 );
2061
2062 Ok(())
2063 }
2064
2065 #[tokio::test]
2066 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2067 let chunk_l1 = StreamChunk::from_pretty(
2068 " I I I
2069 + 1 4 1
2070 + 2 5 2
2071 + 3 6 3",
2072 );
2073 let chunk_l2 = StreamChunk::from_pretty(
2074 " I I I
2075 + 4 9 4
2076 + 5 10 5",
2077 );
2078 let chunk_r1 = StreamChunk::from_pretty(
2079 " I I I
2080 + 2 5 1
2081 + 4 9 2
2082 + 6 9 3",
2083 );
2084 let chunk_r2 = StreamChunk::from_pretty(
2085 " I I I
2086 + 1 4 4
2087 + 3 6 5",
2088 );
2089
2090 let (mut tx_l, mut tx_r, mut hash_join) =
2091 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2092
2093 tx_l.push_barrier(test_epoch(1), false);
2095 tx_r.push_barrier(test_epoch(1), false);
2096 hash_join.next_unwrap_ready_barrier()?;
2097
2098 tx_l.push_chunk(chunk_l1);
2100 hash_join.next_unwrap_pending();
2101
2102 tx_l.push_barrier(test_epoch(2), false);
2104 tx_r.push_barrier(test_epoch(2), false);
2105 hash_join.next_unwrap_ready_barrier()?;
2106
2107 tx_l.push_chunk(chunk_l2);
2109 hash_join.next_unwrap_pending();
2110
2111 tx_r.push_chunk(chunk_r1);
2113 let chunk = hash_join.next_unwrap_ready_chunk()?;
2114 assert_eq!(
2115 chunk,
2116 StreamChunk::from_pretty(
2117 " I I I
2118 + 2 5 2
2119 + 4 9 4"
2120 )
2121 );
2122
2123 tx_r.push_chunk(chunk_r2);
2125 let chunk = hash_join.next_unwrap_ready_chunk()?;
2126 assert_eq!(
2127 chunk,
2128 StreamChunk::from_pretty(
2129 " I I I
2130 + 1 4 1
2131 + 3 6 3"
2132 )
2133 );
2134
2135 Ok(())
2136 }
2137
2138 #[tokio::test]
2139 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2140 let chunk_l1 = StreamChunk::from_pretty(
2141 " I I I
2142 + 1 4 1
2143 + 2 5 2
2144 + 3 6 3",
2145 );
2146 let chunk_l2 = StreamChunk::from_pretty(
2147 " I I I
2148 + 4 9 4
2149 + 5 10 5",
2150 );
2151 let chunk_r1 = StreamChunk::from_pretty(
2152 " I I I
2153 + 2 5 1
2154 + 4 9 2
2155 + 6 9 3",
2156 );
2157 let chunk_r2 = StreamChunk::from_pretty(
2158 " I I I
2159 + 1 4 4
2160 + 3 6 5",
2161 );
2162
2163 let (mut tx_l, mut tx_r, mut hash_join) =
2164 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2165
2166 tx_l.push_barrier(test_epoch(1), false);
2168 tx_r.push_barrier(test_epoch(1), false);
2169 hash_join.next_unwrap_ready_barrier()?;
2170
2171 tx_l.push_chunk(chunk_l1);
2173 hash_join.next_unwrap_pending();
2174
2175 tx_l.push_barrier(test_epoch(2), false);
2177 tx_r.push_barrier(test_epoch(2), false);
2178 hash_join.next_unwrap_ready_barrier()?;
2179
2180 tx_l.push_chunk(chunk_l2);
2182 hash_join.next_unwrap_pending();
2183
2184 tx_r.push_chunk(chunk_r1);
2186 let chunk = hash_join.next_unwrap_ready_chunk()?;
2187 assert_eq!(
2188 chunk,
2189 StreamChunk::from_pretty(
2190 " I I I
2191 + 2 5 1
2192 + 4 9 2"
2193 )
2194 );
2195
2196 tx_r.push_chunk(chunk_r2);
2198 let chunk = hash_join.next_unwrap_ready_chunk()?;
2199 assert_eq!(
2200 chunk,
2201 StreamChunk::from_pretty(
2202 " I I I
2203 + 1 4 4
2204 + 3 6 5"
2205 )
2206 );
2207
2208 Ok(())
2209 }
2210
2211 #[tokio::test]
2212 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2213 let chunk_r1 = StreamChunk::from_pretty(
2214 " I I
2215 + 1 4
2216 + 2 5
2217 + 3 6",
2218 );
2219 let chunk_r2 = StreamChunk::from_pretty(
2220 " I I
2221 + 3 8
2222 - 3 8",
2223 );
2224 let chunk_l1 = StreamChunk::from_pretty(
2225 " I I
2226 + 2 7
2227 + 4 8
2228 + 6 9",
2229 );
2230 let chunk_l2 = StreamChunk::from_pretty(
2231 " I I
2232 + 3 10
2233 + 6 11",
2234 );
2235 let chunk_r3 = StreamChunk::from_pretty(
2236 " I I
2237 + 6 10",
2238 );
2239 let chunk_l3 = StreamChunk::from_pretty(
2240 " I I
2241 - 6 11",
2242 );
2243 let chunk_l4 = StreamChunk::from_pretty(
2244 " I I
2245 - 6 9",
2246 );
2247 let (mut tx_l, mut tx_r, mut hash_join) =
2248 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2249
2250 tx_l.push_barrier(test_epoch(1), false);
2252 tx_r.push_barrier(test_epoch(1), false);
2253 hash_join.next_unwrap_ready_barrier()?;
2254
2255 tx_r.push_chunk(chunk_r1);
2257 hash_join.next_unwrap_pending();
2258
2259 tx_l.push_barrier(test_epoch(2), false);
2261 tx_r.push_barrier(test_epoch(2), false);
2262 hash_join.next_unwrap_ready_barrier()?;
2263
2264 tx_r.push_chunk(chunk_r2);
2266 hash_join.next_unwrap_pending();
2267
2268 tx_l.push_chunk(chunk_l1);
2270 let chunk = hash_join.next_unwrap_ready_chunk()?;
2271 assert_eq!(
2272 chunk,
2273 StreamChunk::from_pretty(
2274 " I I
2275 + 2 5"
2276 )
2277 );
2278
2279 tx_l.push_chunk(chunk_l2);
2281 let chunk = hash_join.next_unwrap_ready_chunk()?;
2282 assert_eq!(
2283 chunk,
2284 StreamChunk::from_pretty(
2285 " I I
2286 + 3 6"
2287 )
2288 );
2289
2290 tx_r.push_chunk(chunk_r3);
2292 let chunk = hash_join.next_unwrap_ready_chunk()?;
2293 assert_eq!(
2294 chunk,
2295 StreamChunk::from_pretty(
2296 " I I
2297 + 6 10"
2298 )
2299 );
2300
2301 tx_l.push_chunk(chunk_l3);
2304 hash_join.next_unwrap_pending();
2305
2306 tx_l.push_chunk(chunk_l4);
2309 let chunk = hash_join.next_unwrap_ready_chunk()?;
2310 assert_eq!(
2311 chunk,
2312 StreamChunk::from_pretty(
2313 " I I
2314 - 6 10"
2315 )
2316 );
2317
2318 Ok(())
2319 }
2320
2321 #[tokio::test]
2322 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2323 let chunk_l1 = StreamChunk::from_pretty(
2324 " I I
2325 + 1 4
2326 + 2 5
2327 + 3 6",
2328 );
2329 let chunk_l2 = StreamChunk::from_pretty(
2330 " I I
2331 + 3 8
2332 - 3 8",
2333 );
2334 let chunk_r1 = StreamChunk::from_pretty(
2335 " I I
2336 + 2 7
2337 + 4 8
2338 + 6 9",
2339 );
2340 let chunk_r2 = StreamChunk::from_pretty(
2341 " I I
2342 + 3 10
2343 + 6 11
2344 + 1 2
2345 + 1 3",
2346 );
2347 let chunk_l3 = StreamChunk::from_pretty(
2348 " I I
2349 + 9 10",
2350 );
2351 let chunk_r3 = StreamChunk::from_pretty(
2352 " I I
2353 - 1 2",
2354 );
2355 let chunk_r4 = StreamChunk::from_pretty(
2356 " I I
2357 - 1 3",
2358 );
2359 let (mut tx_l, mut tx_r, mut hash_join) =
2360 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2361
2362 tx_l.push_barrier(test_epoch(1), false);
2364 tx_r.push_barrier(test_epoch(1), false);
2365 hash_join.next_unwrap_ready_barrier()?;
2366
2367 tx_l.push_chunk(chunk_l1);
2369 let chunk = hash_join.next_unwrap_ready_chunk()?;
2370 assert_eq!(
2371 chunk,
2372 StreamChunk::from_pretty(
2373 " I I
2374 + 1 4
2375 + 2 5
2376 + 3 6",
2377 )
2378 );
2379
2380 tx_l.push_barrier(test_epoch(2), false);
2382 tx_r.push_barrier(test_epoch(2), false);
2383 hash_join.next_unwrap_ready_barrier()?;
2384
2385 tx_l.push_chunk(chunk_l2);
2387 let chunk = hash_join.next_unwrap_ready_chunk()?;
2388 assert_eq!(
2389 chunk,
2390 StreamChunk::from_pretty(
2391 " I I
2392 + 3 8 D
2393 - 3 8 D",
2394 )
2395 );
2396
2397 tx_r.push_chunk(chunk_r1);
2399 let chunk = hash_join.next_unwrap_ready_chunk()?;
2400 assert_eq!(
2401 chunk,
2402 StreamChunk::from_pretty(
2403 " I I
2404 - 2 5"
2405 )
2406 );
2407
2408 tx_r.push_chunk(chunk_r2);
2410 let chunk = hash_join.next_unwrap_ready_chunk()?;
2411 assert_eq!(
2412 chunk,
2413 StreamChunk::from_pretty(
2414 " I I
2415 - 3 6
2416 - 1 4"
2417 )
2418 );
2419
2420 tx_l.push_chunk(chunk_l3);
2422 let chunk = hash_join.next_unwrap_ready_chunk()?;
2423 assert_eq!(
2424 chunk,
2425 StreamChunk::from_pretty(
2426 " I I
2427 + 9 10"
2428 )
2429 );
2430
2431 tx_r.push_chunk(chunk_r3);
2434 hash_join.next_unwrap_pending();
2435
2436 tx_r.push_chunk(chunk_r4);
2439 let chunk = hash_join.next_unwrap_ready_chunk()?;
2440 assert_eq!(
2441 chunk,
2442 StreamChunk::from_pretty(
2443 " I I
2444 + 1 4"
2445 )
2446 );
2447
2448 Ok(())
2449 }
2450
2451 #[tokio::test]
2452 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2453 let chunk_r1 = StreamChunk::from_pretty(
2454 " I I
2455 + 1 4
2456 + 2 5
2457 + 3 6",
2458 );
2459 let chunk_r2 = StreamChunk::from_pretty(
2460 " I I
2461 + 3 8
2462 - 3 8",
2463 );
2464 let chunk_l1 = StreamChunk::from_pretty(
2465 " I I
2466 + 2 7
2467 + 4 8
2468 + 6 9",
2469 );
2470 let chunk_l2 = StreamChunk::from_pretty(
2471 " I I
2472 + 3 10
2473 + 6 11
2474 + 1 2
2475 + 1 3",
2476 );
2477 let chunk_r3 = StreamChunk::from_pretty(
2478 " I I
2479 + 9 10",
2480 );
2481 let chunk_l3 = StreamChunk::from_pretty(
2482 " I I
2483 - 1 2",
2484 );
2485 let chunk_l4 = StreamChunk::from_pretty(
2486 " I I
2487 - 1 3",
2488 );
2489 let (mut tx_r, mut tx_l, mut hash_join) =
2490 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2491
2492 tx_r.push_barrier(test_epoch(1), false);
2494 tx_l.push_barrier(test_epoch(1), false);
2495 hash_join.next_unwrap_ready_barrier()?;
2496
2497 tx_r.push_chunk(chunk_r1);
2499 let chunk = hash_join.next_unwrap_ready_chunk()?;
2500 assert_eq!(
2501 chunk,
2502 StreamChunk::from_pretty(
2503 " I I
2504 + 1 4
2505 + 2 5
2506 + 3 6",
2507 )
2508 );
2509
2510 tx_r.push_barrier(test_epoch(2), false);
2512 tx_l.push_barrier(test_epoch(2), false);
2513 hash_join.next_unwrap_ready_barrier()?;
2514
2515 tx_r.push_chunk(chunk_r2);
2517 let chunk = hash_join.next_unwrap_ready_chunk()?;
2518 assert_eq!(
2519 chunk,
2520 StreamChunk::from_pretty(
2521 " I I
2522 + 3 8 D
2523 - 3 8 D",
2524 )
2525 );
2526
2527 tx_l.push_chunk(chunk_l1);
2529 let chunk = hash_join.next_unwrap_ready_chunk()?;
2530 assert_eq!(
2531 chunk,
2532 StreamChunk::from_pretty(
2533 " I I
2534 - 2 5"
2535 )
2536 );
2537
2538 tx_l.push_chunk(chunk_l2);
2540 let chunk = hash_join.next_unwrap_ready_chunk()?;
2541 assert_eq!(
2542 chunk,
2543 StreamChunk::from_pretty(
2544 " I I
2545 - 3 6
2546 - 1 4"
2547 )
2548 );
2549
2550 tx_r.push_chunk(chunk_r3);
2552 let chunk = hash_join.next_unwrap_ready_chunk()?;
2553 assert_eq!(
2554 chunk,
2555 StreamChunk::from_pretty(
2556 " I I
2557 + 9 10"
2558 )
2559 );
2560
2561 tx_l.push_chunk(chunk_l3);
2564 hash_join.next_unwrap_pending();
2565
2566 tx_l.push_chunk(chunk_l4);
2569 let chunk = hash_join.next_unwrap_ready_chunk()?;
2570 assert_eq!(
2571 chunk,
2572 StreamChunk::from_pretty(
2573 " I I
2574 + 1 4"
2575 )
2576 );
2577
2578 Ok(())
2579 }
2580
2581 #[tokio::test]
2582 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2583 let chunk_l1 = StreamChunk::from_pretty(
2584 " I I
2585 + 1 4
2586 + 2 5
2587 + 3 6",
2588 );
2589 let chunk_l2 = StreamChunk::from_pretty(
2590 " I I
2591 + 6 8
2592 + 3 8",
2593 );
2594 let chunk_r1 = StreamChunk::from_pretty(
2595 " I I
2596 + 2 7
2597 + 4 8
2598 + 6 9",
2599 );
2600 let chunk_r2 = StreamChunk::from_pretty(
2601 " I I
2602 + 3 10
2603 + 6 11",
2604 );
2605 let (mut tx_l, mut tx_r, mut hash_join) =
2606 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2607
2608 tx_l.push_barrier(test_epoch(1), false);
2610 tx_r.push_barrier(test_epoch(1), false);
2611 hash_join.next_unwrap_ready_barrier()?;
2612
2613 tx_l.push_chunk(chunk_l1);
2615 hash_join.next_unwrap_pending();
2616
2617 tx_l.push_barrier(test_epoch(2), false);
2619
2620 tx_l.push_chunk(chunk_l2);
2622
2623 tx_r.push_chunk(chunk_r1);
2625
2626 let chunk = hash_join.next_unwrap_ready_chunk()?;
2628 assert_eq!(
2629 chunk,
2630 StreamChunk::from_pretty(
2631 " I I I I
2632 + 2 5 2 7"
2633 )
2634 );
2635
2636 tx_r.push_barrier(test_epoch(2), false);
2638
2639 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2641 assert!(matches!(
2642 hash_join.next_unwrap_ready_barrier()?,
2643 Barrier {
2644 epoch,
2645 mutation: None,
2646 ..
2647 } if epoch == expected_epoch
2648 ));
2649
2650 let chunk = hash_join.next_unwrap_ready_chunk()?;
2652 assert_eq!(
2653 chunk,
2654 StreamChunk::from_pretty(
2655 " I I I I
2656 + 6 8 6 9"
2657 )
2658 );
2659
2660 tx_r.push_chunk(chunk_r2);
2662 let chunk = hash_join.next_unwrap_ready_chunk()?;
2663 assert_eq!(
2664 chunk,
2665 StreamChunk::from_pretty(
2666 " I I I I
2667 + 3 6 3 10
2668 + 3 8 3 10
2669 + 6 8 6 11"
2670 )
2671 );
2672
2673 Ok(())
2674 }
2675
2676 #[tokio::test]
2677 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2678 let chunk_l1 = StreamChunk::from_pretty(
2679 " I I
2680 + 1 4
2681 + 2 .
2682 + 3 .",
2683 );
2684 let chunk_l2 = StreamChunk::from_pretty(
2685 " I I
2686 + 6 .
2687 + 3 8",
2688 );
2689 let chunk_r1 = StreamChunk::from_pretty(
2690 " I I
2691 + 2 7
2692 + 4 8
2693 + 6 9",
2694 );
2695 let chunk_r2 = StreamChunk::from_pretty(
2696 " I I
2697 + 3 10
2698 + 6 11",
2699 );
2700 let (mut tx_l, mut tx_r, mut hash_join) =
2701 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2702
2703 tx_l.push_barrier(test_epoch(1), false);
2705 tx_r.push_barrier(test_epoch(1), false);
2706 hash_join.next_unwrap_ready_barrier()?;
2707
2708 tx_l.push_chunk(chunk_l1);
2710 hash_join.next_unwrap_pending();
2711
2712 tx_l.push_barrier(test_epoch(2), false);
2714
2715 tx_l.push_chunk(chunk_l2);
2717
2718 tx_r.push_chunk(chunk_r1);
2720
2721 let chunk = hash_join.next_unwrap_ready_chunk()?;
2723 assert_eq!(
2724 chunk,
2725 StreamChunk::from_pretty(
2726 " I I I I
2727 + 2 . 2 7"
2728 )
2729 );
2730
2731 tx_r.push_barrier(test_epoch(2), false);
2733
2734 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2736 assert!(matches!(
2737 hash_join.next_unwrap_ready_barrier()?,
2738 Barrier {
2739 epoch,
2740 mutation: None,
2741 ..
2742 } if epoch == expected_epoch
2743 ));
2744
2745 let chunk = hash_join.next_unwrap_ready_chunk()?;
2747 assert_eq!(
2748 chunk,
2749 StreamChunk::from_pretty(
2750 " I I I I
2751 + 6 . 6 9"
2752 )
2753 );
2754
2755 tx_r.push_chunk(chunk_r2);
2757 let chunk = hash_join.next_unwrap_ready_chunk()?;
2758 assert_eq!(
2759 chunk,
2760 StreamChunk::from_pretty(
2761 " I I I I
2762 + 3 8 3 10
2763 + 3 . 3 10
2764 + 6 . 6 11"
2765 )
2766 );
2767
2768 Ok(())
2769 }
2770
2771 #[tokio::test]
2772 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2773 let chunk_l1 = StreamChunk::from_pretty(
2774 " I I
2775 + 1 4
2776 + 2 5
2777 + 3 6",
2778 );
2779 let chunk_l2 = StreamChunk::from_pretty(
2780 " I I
2781 + 3 8
2782 - 3 8",
2783 );
2784 let chunk_r1 = StreamChunk::from_pretty(
2785 " I I
2786 + 2 7
2787 + 4 8
2788 + 6 9",
2789 );
2790 let chunk_r2 = StreamChunk::from_pretty(
2791 " I I
2792 + 3 10
2793 + 6 11",
2794 );
2795 let (mut tx_l, mut tx_r, mut hash_join) =
2796 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2797
2798 tx_l.push_barrier(test_epoch(1), false);
2800 tx_r.push_barrier(test_epoch(1), false);
2801 hash_join.next_unwrap_ready_barrier()?;
2802
2803 tx_l.push_chunk(chunk_l1);
2805 let chunk = hash_join.next_unwrap_ready_chunk()?;
2806 assert_eq!(
2807 chunk,
2808 StreamChunk::from_pretty(
2809 " I I I I
2810 + 1 4 . .
2811 + 2 5 . .
2812 + 3 6 . ."
2813 )
2814 );
2815
2816 tx_l.push_chunk(chunk_l2);
2818 let chunk = hash_join.next_unwrap_ready_chunk()?;
2819 assert_eq!(
2820 chunk,
2821 StreamChunk::from_pretty(
2822 " I I I I
2823 + 3 8 . . D
2824 - 3 8 . . D"
2825 )
2826 );
2827
2828 tx_r.push_chunk(chunk_r1);
2830 let chunk = hash_join.next_unwrap_ready_chunk()?;
2831 assert_eq!(
2832 chunk,
2833 StreamChunk::from_pretty(
2834 " I I I I
2835 - 2 5 . .
2836 + 2 5 2 7"
2837 )
2838 );
2839
2840 tx_r.push_chunk(chunk_r2);
2842 let chunk = hash_join.next_unwrap_ready_chunk()?;
2843 assert_eq!(
2844 chunk,
2845 StreamChunk::from_pretty(
2846 " I I I I
2847 - 3 6 . .
2848 + 3 6 3 10"
2849 )
2850 );
2851
2852 Ok(())
2853 }
2854
2855 #[tokio::test]
2856 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2857 let chunk_l1 = StreamChunk::from_pretty(
2858 " I I
2859 + 1 4
2860 + 2 5
2861 + . 6",
2862 );
2863 let chunk_l2 = StreamChunk::from_pretty(
2864 " I I
2865 + . 8
2866 - . 8",
2867 );
2868 let chunk_r1 = StreamChunk::from_pretty(
2869 " I I
2870 + 2 7
2871 + 4 8
2872 + 6 9",
2873 );
2874 let chunk_r2 = StreamChunk::from_pretty(
2875 " I I
2876 + . 10
2877 + 6 11",
2878 );
2879 let (mut tx_l, mut tx_r, mut hash_join) =
2880 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2881
2882 tx_l.push_barrier(test_epoch(1), false);
2884 tx_r.push_barrier(test_epoch(1), false);
2885 hash_join.next_unwrap_ready_barrier()?;
2886
2887 tx_l.push_chunk(chunk_l1);
2889 let chunk = hash_join.next_unwrap_ready_chunk()?;
2890 assert_eq!(
2891 chunk,
2892 StreamChunk::from_pretty(
2893 " I I I I
2894 + 1 4 . .
2895 + 2 5 . .
2896 + . 6 . ."
2897 )
2898 );
2899
2900 tx_l.push_chunk(chunk_l2);
2902 let chunk = hash_join.next_unwrap_ready_chunk()?;
2903 assert_eq!(
2904 chunk,
2905 StreamChunk::from_pretty(
2906 " I I I I
2907 + . 8 . . D
2908 - . 8 . . D"
2909 )
2910 );
2911
2912 tx_r.push_chunk(chunk_r1);
2914 let chunk = hash_join.next_unwrap_ready_chunk()?;
2915 assert_eq!(
2916 chunk,
2917 StreamChunk::from_pretty(
2918 " I I I I
2919 - 2 5 . .
2920 + 2 5 2 7"
2921 )
2922 );
2923
2924 tx_r.push_chunk(chunk_r2);
2926 let chunk = hash_join.next_unwrap_ready_chunk()?;
2927 assert_eq!(
2928 chunk,
2929 StreamChunk::from_pretty(
2930 " I I I I
2931 - . 6 . .
2932 + . 6 . 10"
2933 )
2934 );
2935
2936 Ok(())
2937 }
2938
2939 #[tokio::test]
2940 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2941 let chunk_l1 = StreamChunk::from_pretty(
2942 " I I
2943 + 1 4
2944 + 2 5
2945 + 3 6",
2946 );
2947 let chunk_l2 = StreamChunk::from_pretty(
2948 " I I
2949 + 3 8
2950 - 3 8",
2951 );
2952 let chunk_r1 = StreamChunk::from_pretty(
2953 " I I
2954 + 2 7
2955 + 4 8
2956 + 6 9",
2957 );
2958 let chunk_r2 = StreamChunk::from_pretty(
2959 " I I
2960 + 5 10
2961 - 5 10",
2962 );
2963 let (mut tx_l, mut tx_r, mut hash_join) =
2964 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2965
2966 tx_l.push_barrier(test_epoch(1), false);
2968 tx_r.push_barrier(test_epoch(1), false);
2969 hash_join.next_unwrap_ready_barrier()?;
2970
2971 tx_l.push_chunk(chunk_l1);
2973 hash_join.next_unwrap_pending();
2974
2975 tx_l.push_chunk(chunk_l2);
2977 hash_join.next_unwrap_pending();
2978
2979 tx_r.push_chunk(chunk_r1);
2981 let chunk = hash_join.next_unwrap_ready_chunk()?;
2982 assert_eq!(
2983 chunk,
2984 StreamChunk::from_pretty(
2985 " I I I I
2986 + 2 5 2 7
2987 + . . 4 8
2988 + . . 6 9"
2989 )
2990 );
2991
2992 tx_r.push_chunk(chunk_r2);
2994 let chunk = hash_join.next_unwrap_ready_chunk()?;
2995 assert_eq!(
2996 chunk,
2997 StreamChunk::from_pretty(
2998 " I I I I
2999 + . . 5 10 D
3000 - . . 5 10 D"
3001 )
3002 );
3003
3004 Ok(())
3005 }
3006
3007 #[tokio::test]
3008 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3009 let chunk_l1 = StreamChunk::from_pretty(
3010 " I I I
3011 + 1 4 1
3012 + 2 5 2
3013 + 3 6 3",
3014 );
3015 let chunk_l2 = StreamChunk::from_pretty(
3016 " I I I
3017 + 4 9 4
3018 + 5 10 5",
3019 );
3020 let chunk_r1 = StreamChunk::from_pretty(
3021 " I I I
3022 + 2 5 1
3023 + 4 9 2
3024 + 6 9 3",
3025 );
3026 let chunk_r2 = StreamChunk::from_pretty(
3027 " I I I
3028 + 1 4 4
3029 + 3 6 5",
3030 );
3031
3032 let (mut tx_l, mut tx_r, mut hash_join) =
3033 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3034
3035 tx_l.push_barrier(test_epoch(1), false);
3037 tx_r.push_barrier(test_epoch(1), false);
3038 hash_join.next_unwrap_ready_barrier()?;
3039
3040 tx_l.push_chunk(chunk_l1);
3042 let chunk = hash_join.next_unwrap_ready_chunk()?;
3043 assert_eq!(
3044 chunk,
3045 StreamChunk::from_pretty(
3046 " I I I I I I
3047 + 1 4 1 . . .
3048 + 2 5 2 . . .
3049 + 3 6 3 . . ."
3050 )
3051 );
3052
3053 tx_l.push_chunk(chunk_l2);
3055 let chunk = hash_join.next_unwrap_ready_chunk()?;
3056 assert_eq!(
3057 chunk,
3058 StreamChunk::from_pretty(
3059 " I I I I I I
3060 + 4 9 4 . . .
3061 + 5 10 5 . . ."
3062 )
3063 );
3064
3065 tx_r.push_chunk(chunk_r1);
3067 let chunk = hash_join.next_unwrap_ready_chunk()?;
3068 assert_eq!(
3069 chunk,
3070 StreamChunk::from_pretty(
3071 " I I I I I I
3072 - 2 5 2 . . .
3073 + 2 5 2 2 5 1
3074 - 4 9 4 . . .
3075 + 4 9 4 4 9 2"
3076 )
3077 );
3078
3079 tx_r.push_chunk(chunk_r2);
3081 let chunk = hash_join.next_unwrap_ready_chunk()?;
3082 assert_eq!(
3083 chunk,
3084 StreamChunk::from_pretty(
3085 " I I I I I I
3086 - 1 4 1 . . .
3087 + 1 4 1 1 4 4
3088 - 3 6 3 . . .
3089 + 3 6 3 3 6 5"
3090 )
3091 );
3092
3093 Ok(())
3094 }
3095
3096 #[tokio::test]
3097 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3098 let chunk_l1 = StreamChunk::from_pretty(
3099 " I I I
3100 + 1 4 1
3101 + 2 5 2
3102 + 3 6 3",
3103 );
3104 let chunk_l2 = StreamChunk::from_pretty(
3105 " I I I
3106 + 4 9 4
3107 + 5 10 5",
3108 );
3109 let chunk_r1 = StreamChunk::from_pretty(
3110 " I I I
3111 + 2 5 1
3112 + 4 9 2
3113 + 6 9 3",
3114 );
3115 let chunk_r2 = StreamChunk::from_pretty(
3116 " I I I
3117 + 1 4 4
3118 + 3 6 5
3119 + 7 7 6",
3120 );
3121
3122 let (mut tx_l, mut tx_r, mut hash_join) =
3123 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3124
3125 tx_l.push_barrier(test_epoch(1), false);
3127 tx_r.push_barrier(test_epoch(1), false);
3128 hash_join.next_unwrap_ready_barrier()?;
3129
3130 tx_l.push_chunk(chunk_l1);
3132 hash_join.next_unwrap_pending();
3133
3134 tx_l.push_chunk(chunk_l2);
3136 hash_join.next_unwrap_pending();
3137
3138 tx_r.push_chunk(chunk_r1);
3140 let chunk = hash_join.next_unwrap_ready_chunk()?;
3141 assert_eq!(
3142 chunk,
3143 StreamChunk::from_pretty(
3144 " I I I I I I
3145 + 2 5 2 2 5 1
3146 + 4 9 4 4 9 2
3147 + . . . 6 9 3"
3148 )
3149 );
3150
3151 tx_r.push_chunk(chunk_r2);
3153 let chunk = hash_join.next_unwrap_ready_chunk()?;
3154 assert_eq!(
3155 chunk,
3156 StreamChunk::from_pretty(
3157 " I I I I I I
3158 + 1 4 1 1 4 4
3159 + 3 6 3 3 6 5
3160 + . . . 7 7 6"
3161 )
3162 );
3163
3164 Ok(())
3165 }
3166
3167 #[tokio::test]
3168 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3169 let chunk_l1 = StreamChunk::from_pretty(
3170 " I I
3171 + 1 4
3172 + 2 5
3173 + 3 6",
3174 );
3175 let chunk_l2 = StreamChunk::from_pretty(
3176 " I I
3177 + 3 8
3178 - 3 8",
3179 );
3180 let chunk_r1 = StreamChunk::from_pretty(
3181 " I I
3182 + 2 7
3183 + 4 8
3184 + 6 9",
3185 );
3186 let chunk_r2 = StreamChunk::from_pretty(
3187 " I I
3188 + 5 10
3189 - 5 10",
3190 );
3191 let (mut tx_l, mut tx_r, mut hash_join) =
3192 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3193
3194 tx_l.push_barrier(test_epoch(1), false);
3196 tx_r.push_barrier(test_epoch(1), false);
3197 hash_join.next_unwrap_ready_barrier()?;
3198
3199 tx_l.push_chunk(chunk_l1);
3201 let chunk = hash_join.next_unwrap_ready_chunk()?;
3202 assert_eq!(
3203 chunk,
3204 StreamChunk::from_pretty(
3205 " I I I I
3206 + 1 4 . .
3207 + 2 5 . .
3208 + 3 6 . ."
3209 )
3210 );
3211
3212 tx_l.push_chunk(chunk_l2);
3214 let chunk = hash_join.next_unwrap_ready_chunk()?;
3215 assert_eq!(
3216 chunk,
3217 StreamChunk::from_pretty(
3218 " I I I I
3219 + 3 8 . . D
3220 - 3 8 . . D"
3221 )
3222 );
3223
3224 tx_r.push_chunk(chunk_r1);
3226 let chunk = hash_join.next_unwrap_ready_chunk()?;
3227 assert_eq!(
3228 chunk,
3229 StreamChunk::from_pretty(
3230 " I I I I
3231 - 2 5 . .
3232 + 2 5 2 7
3233 + . . 4 8
3234 + . . 6 9"
3235 )
3236 );
3237
3238 tx_r.push_chunk(chunk_r2);
3240 let chunk = hash_join.next_unwrap_ready_chunk()?;
3241 assert_eq!(
3242 chunk,
3243 StreamChunk::from_pretty(
3244 " I I I I
3245 + . . 5 10 D
3246 - . . 5 10 D"
3247 )
3248 );
3249
3250 Ok(())
3251 }
3252
3253 #[tokio::test]
3254 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3255 let (mut tx_l, mut tx_r, mut hash_join) =
3256 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3257
3258 tx_l.push_barrier(test_epoch(1), false);
3260 tx_r.push_barrier(test_epoch(1), false);
3261 hash_join.next_unwrap_ready_barrier()?;
3262
3263 tx_l.push_chunk(StreamChunk::from_pretty(
3264 " I I
3265 + 1 1
3266 ",
3267 ));
3268 let chunk = hash_join.next_unwrap_ready_chunk()?;
3269 assert_eq!(
3270 chunk,
3271 StreamChunk::from_pretty(
3272 " I I I I
3273 + 1 1 . ."
3274 )
3275 );
3276
3277 tx_r.push_chunk(StreamChunk::from_pretty(
3278 " I I
3279 + 1 1
3280 ",
3281 ));
3282 let chunk = hash_join.next_unwrap_ready_chunk()?;
3283
3284 assert_eq!(
3285 chunk,
3286 StreamChunk::from_pretty(
3287 " I I I I
3288 - 1 1 . .
3289 + 1 1 1 1"
3290 )
3291 );
3292
3293 tx_l.push_chunk(StreamChunk::from_pretty(
3294 " I I
3295 - 1 1
3296 + 1 2
3297 ",
3298 ));
3299 let chunk = hash_join.next_unwrap_ready_chunk()?;
3300 let chunk = chunk.compact_vis();
3301 assert_eq!(
3302 chunk,
3303 StreamChunk::from_pretty(
3304 " I I I I
3305 - 1 1 1 1
3306 + 1 2 1 1
3307 "
3308 )
3309 );
3310
3311 Ok(())
3312 }
3313
3314 #[tokio::test]
3315 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3316 {
3317 let chunk_l1 = StreamChunk::from_pretty(
3318 " I I
3319 + 1 4
3320 + 2 5
3321 + 3 6
3322 + 3 7",
3323 );
3324 let chunk_l2 = StreamChunk::from_pretty(
3325 " I I
3326 + 3 8
3327 - 3 8
3328 - 1 4", );
3330 let chunk_r1 = StreamChunk::from_pretty(
3331 " I I
3332 + 2 6
3333 + 4 8
3334 + 3 4",
3335 );
3336 let chunk_r2 = StreamChunk::from_pretty(
3337 " I I
3338 + 5 10
3339 - 5 10
3340 + 1 2",
3341 );
3342 let (mut tx_l, mut tx_r, mut hash_join) =
3343 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3344
3345 tx_l.push_barrier(test_epoch(1), false);
3347 tx_r.push_barrier(test_epoch(1), false);
3348 hash_join.next_unwrap_ready_barrier()?;
3349
3350 tx_l.push_chunk(chunk_l1);
3352 let chunk = hash_join.next_unwrap_ready_chunk()?;
3353 assert_eq!(
3354 chunk,
3355 StreamChunk::from_pretty(
3356 " I I I I
3357 + 1 4 . .
3358 + 2 5 . .
3359 + 3 6 . .
3360 + 3 7 . ."
3361 )
3362 );
3363
3364 tx_l.push_chunk(chunk_l2);
3366 let chunk = hash_join.next_unwrap_ready_chunk()?;
3367 assert_eq!(
3368 chunk,
3369 StreamChunk::from_pretty(
3370 " I I I I
3371 + 3 8 . . D
3372 - 3 8 . . D
3373 - 1 4 . ."
3374 )
3375 );
3376
3377 tx_r.push_chunk(chunk_r1);
3379 let chunk = hash_join.next_unwrap_ready_chunk()?;
3380 assert_eq!(
3381 chunk,
3382 StreamChunk::from_pretty(
3383 " I I I I
3384 - 2 5 . .
3385 + 2 5 2 6
3386 + . . 4 8
3387 + . . 3 4" )
3391 );
3392
3393 tx_r.push_chunk(chunk_r2);
3395 let chunk = hash_join.next_unwrap_ready_chunk()?;
3396 assert_eq!(
3397 chunk,
3398 StreamChunk::from_pretty(
3399 " I I I I
3400 + . . 5 10 D
3401 - . . 5 10 D
3402 + . . 1 2" )
3405 );
3406
3407 Ok(())
3408 }
3409
3410 #[tokio::test]
3411 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3412 let chunk_l1 = StreamChunk::from_pretty(
3413 " I I
3414 + 1 4
3415 + 2 10
3416 + 3 6",
3417 );
3418 let chunk_l2 = StreamChunk::from_pretty(
3419 " I I
3420 + 3 8
3421 - 3 8",
3422 );
3423 let chunk_r1 = StreamChunk::from_pretty(
3424 " I I
3425 + 2 7
3426 + 4 8
3427 + 6 9",
3428 );
3429 let chunk_r2 = StreamChunk::from_pretty(
3430 " I I
3431 + 3 10
3432 + 6 11",
3433 );
3434 let (mut tx_l, mut tx_r, mut hash_join) =
3435 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3436
3437 tx_l.push_barrier(test_epoch(1), false);
3439 tx_r.push_barrier(test_epoch(1), false);
3440 hash_join.next_unwrap_ready_barrier()?;
3441
3442 tx_l.push_chunk(chunk_l1);
3444 hash_join.next_unwrap_pending();
3445
3446 tx_l.push_chunk(chunk_l2);
3448 hash_join.next_unwrap_pending();
3449
3450 tx_r.push_chunk(chunk_r1);
3452 hash_join.next_unwrap_pending();
3453
3454 tx_r.push_chunk(chunk_r2);
3456 let chunk = hash_join.next_unwrap_ready_chunk()?;
3457 assert_eq!(
3458 chunk,
3459 StreamChunk::from_pretty(
3460 " I I I I
3461 + 3 6 3 10"
3462 )
3463 );
3464
3465 Ok(())
3466 }
3467
3468 #[tokio::test]
3469 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3470 let (mut tx_l, mut tx_r, mut hash_join) =
3471 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3472
3473 tx_l.push_barrier(test_epoch(1), false);
3475 tx_r.push_barrier(test_epoch(1), false);
3476 hash_join.next_unwrap_ready_barrier()?;
3477
3478 tx_l.push_int64_watermark(0, 100);
3479
3480 tx_l.push_int64_watermark(0, 200);
3481
3482 tx_l.push_barrier(test_epoch(2), false);
3483 tx_r.push_barrier(test_epoch(2), false);
3484 hash_join.next_unwrap_ready_barrier()?;
3485
3486 tx_r.push_int64_watermark(0, 50);
3487
3488 let w1 = hash_join.next().await.unwrap().unwrap();
3489 let w1 = w1.as_watermark().unwrap();
3490
3491 let w2 = hash_join.next().await.unwrap().unwrap();
3492 let w2 = w2.as_watermark().unwrap();
3493
3494 tx_r.push_int64_watermark(0, 100);
3495
3496 let w3 = hash_join.next().await.unwrap().unwrap();
3497 let w3 = w3.as_watermark().unwrap();
3498
3499 let w4 = hash_join.next().await.unwrap().unwrap();
3500 let w4 = w4.as_watermark().unwrap();
3501
3502 assert_eq!(
3503 w1,
3504 &Watermark {
3505 col_idx: 2,
3506 data_type: DataType::Int64,
3507 val: ScalarImpl::Int64(50)
3508 }
3509 );
3510
3511 assert_eq!(
3512 w2,
3513 &Watermark {
3514 col_idx: 0,
3515 data_type: DataType::Int64,
3516 val: ScalarImpl::Int64(50)
3517 }
3518 );
3519
3520 assert_eq!(
3521 w3,
3522 &Watermark {
3523 col_idx: 2,
3524 data_type: DataType::Int64,
3525 val: ScalarImpl::Int64(100)
3526 }
3527 );
3528
3529 assert_eq!(
3530 w4,
3531 &Watermark {
3532 col_idx: 0,
3533 data_type: DataType::Int64,
3534 val: ScalarImpl::Int64(100)
3535 }
3536 );
3537
3538 Ok(())
3539 }
3540}