1use std::assert_matches::assert_matches;
16use std::collections::{BTreeMap, HashSet};
17use std::marker::PhantomData;
18use std::time::Duration;
19
20use anyhow::Context;
21use either::Either;
22use itertools::Itertools;
23use multimap::MultiMap;
24use risingwave_common::array::Op;
25use risingwave_common::hash::{HashKey, NullBitmap};
26use risingwave_common::row::RowExt;
27use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
28use risingwave_common::util::epoch::EpochPair;
29use risingwave_common::util::iter_util::ZipEqDebug;
30use risingwave_expr::ExprError;
31use risingwave_expr::expr::NonStrictExpression;
32use tokio::time::Instant;
33
34use self::builder::JoinChunkBuilder;
35use super::barrier_align::*;
36use super::join::hash_join::*;
37use super::join::row::{JoinEncoding, JoinRow};
38use super::join::*;
39use super::watermark::*;
40use crate::executor::CachedJoinRow;
41use crate::executor::join::builder::JoinStreamChunkBuilder;
42use crate::executor::join::hash_join::CacheResult;
43use crate::executor::prelude::*;
44
45const EVICT_EVERY_N_ROWS: u32 = 16;
47
48fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
49 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
50}
51
52pub struct JoinParams {
53 pub join_key_indices: Vec<usize>,
55 pub deduped_pk_indices: Vec<usize>,
57}
58
59impl JoinParams {
60 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
61 Self {
62 join_key_indices,
63 deduped_pk_indices,
64 }
65 }
66}
67
68struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
69 ht: JoinHashMap<K, S, E>,
71 join_key_indices: Vec<usize>,
73 all_data_types: Vec<DataType>,
75 start_pos: usize,
77 i2o_mapping: Vec<(usize, usize)>,
79 i2o_mapping_indexed: MultiMap<usize, usize>,
80 input2inequality_index: Vec<Vec<(usize, bool)>>,
88 non_null_fields: Vec<usize>,
90 state_clean_columns: Vec<(usize, usize)>,
93 need_degree_table: bool,
95 _marker: std::marker::PhantomData<E>,
96}
97
98impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("JoinSide")
101 .field("join_key_indices", &self.join_key_indices)
102 .field("col_types", &self.all_data_types)
103 .field("start_pos", &self.start_pos)
104 .field("i2o_mapping", &self.i2o_mapping)
105 .field("need_degree_table", &self.need_degree_table)
106 .finish()
107 }
108}
109
110impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
111 fn is_dirty(&self) -> bool {
113 unimplemented!()
114 }
115
116 #[expect(dead_code)]
117 fn clear_cache(&mut self) {
118 assert!(
119 !self.is_dirty(),
120 "cannot clear cache while states of hash join are dirty"
121 );
122
123 }
126
127 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
128 self.ht.init(epoch).await
129 }
130}
131
132pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
135{
136 ctx: ActorContextRef,
137 info: ExecutorInfo,
138
139 input_l: Option<Executor>,
141 input_r: Option<Executor>,
143 actual_output_data_types: Vec<DataType>,
145 side_l: JoinSide<K, S, E>,
147 side_r: JoinSide<K, S, E>,
149 cond: Option<NonStrictExpression>,
151 inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
153 inequality_watermarks: Vec<Option<Watermark>>,
157
158 append_only_optimize: bool,
160
161 metrics: Arc<StreamingMetrics>,
162 chunk_size: usize,
164 cnt_rows_received: u32,
166
167 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
169
170 high_join_amplification_threshold: usize,
172
173 entry_state_max_rows: usize,
175}
176
177impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
178 for HashJoinExecutor<K, S, T, E>
179{
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 f.debug_struct("HashJoinExecutor")
182 .field("join_type", &T)
183 .field("input_left", &self.input_l.as_ref().unwrap().identity())
184 .field("input_right", &self.input_r.as_ref().unwrap().identity())
185 .field("side_l", &self.side_l)
186 .field("side_r", &self.side_r)
187 .field("stream_key", &self.info.stream_key)
188 .field("schema", &self.info.schema)
189 .field("actual_output_data_types", &self.actual_output_data_types)
190 .finish()
191 }
192}
193
194impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
195 for HashJoinExecutor<K, S, T, E>
196{
197 fn execute(self: Box<Self>) -> BoxedMessageStream {
198 self.into_stream().boxed()
199 }
200}
201
202struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
203 ctx: &'a ActorContextRef,
204 side_l: &'a mut JoinSide<K, S, E>,
205 side_r: &'a mut JoinSide<K, S, E>,
206 actual_output_data_types: &'a [DataType],
207 cond: &'a mut Option<NonStrictExpression>,
208 inequality_watermarks: &'a [Option<Watermark>],
209 chunk: StreamChunk,
210 append_only_optimize: bool,
211 chunk_size: usize,
212 cnt_rows_received: &'a mut u32,
213 high_join_amplification_threshold: usize,
214 entry_state_max_rows: usize,
215}
216
217impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
218 HashJoinExecutor<K, S, T, E>
219{
220 #[allow(clippy::too_many_arguments)]
221 pub fn new(
222 ctx: ActorContextRef,
223 info: ExecutorInfo,
224 input_l: Executor,
225 input_r: Executor,
226 params_l: JoinParams,
227 params_r: JoinParams,
228 null_safe: Vec<bool>,
229 output_indices: Vec<usize>,
230 cond: Option<NonStrictExpression>,
231 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
232 state_table_l: StateTable<S>,
233 degree_state_table_l: StateTable<S>,
234 state_table_r: StateTable<S>,
235 degree_state_table_r: StateTable<S>,
236 watermark_epoch: AtomicU64Ref,
237 is_append_only: bool,
238 metrics: Arc<StreamingMetrics>,
239 chunk_size: usize,
240 high_join_amplification_threshold: usize,
241 ) -> Self {
242 Self::new_with_cache_size(
243 ctx,
244 info,
245 input_l,
246 input_r,
247 params_l,
248 params_r,
249 null_safe,
250 output_indices,
251 cond,
252 inequality_pairs,
253 state_table_l,
254 degree_state_table_l,
255 state_table_r,
256 degree_state_table_r,
257 watermark_epoch,
258 is_append_only,
259 metrics,
260 chunk_size,
261 high_join_amplification_threshold,
262 None,
263 )
264 }
265
266 #[allow(clippy::too_many_arguments)]
267 pub fn new_with_cache_size(
268 ctx: ActorContextRef,
269 info: ExecutorInfo,
270 input_l: Executor,
271 input_r: Executor,
272 params_l: JoinParams,
273 params_r: JoinParams,
274 null_safe: Vec<bool>,
275 output_indices: Vec<usize>,
276 cond: Option<NonStrictExpression>,
277 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
278 state_table_l: StateTable<S>,
279 degree_state_table_l: StateTable<S>,
280 state_table_r: StateTable<S>,
281 degree_state_table_r: StateTable<S>,
282 watermark_epoch: AtomicU64Ref,
283 is_append_only: bool,
284 metrics: Arc<StreamingMetrics>,
285 chunk_size: usize,
286 high_join_amplification_threshold: usize,
287 entry_state_max_rows: Option<usize>,
288 ) -> Self {
289 let entry_state_max_rows = match entry_state_max_rows {
290 None => ctx.config.developer.hash_join_entry_state_max_rows,
291 Some(entry_state_max_rows) => entry_state_max_rows,
292 };
293 let side_l_column_n = input_l.schema().len();
294
295 let schema_fields = match T {
296 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
297 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
298 _ => [
299 input_l.schema().fields.clone(),
300 input_r.schema().fields.clone(),
301 ]
302 .concat(),
303 };
304
305 let original_output_data_types = schema_fields
306 .iter()
307 .map(|field| field.data_type())
308 .collect_vec();
309 let actual_output_data_types = output_indices
310 .iter()
311 .map(|&idx| original_output_data_types[idx].clone())
312 .collect_vec();
313
314 let state_all_data_types_l = input_l.schema().data_types();
316 let state_all_data_types_r = input_r.schema().data_types();
317
318 let state_pk_indices_l = input_l.stream_key().to_vec();
319 let state_pk_indices_r = input_r.stream_key().to_vec();
320
321 let state_join_key_indices_l = params_l.join_key_indices;
322 let state_join_key_indices_r = params_r.join_key_indices;
323
324 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
325 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
326
327 let degree_pk_indices_l = (state_join_key_indices_l.len()
328 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
329 .collect_vec();
330 let degree_pk_indices_r = (state_join_key_indices_r.len()
331 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
332 .collect_vec();
333
334 let pk_contained_in_jk_l = is_subset(state_pk_indices_l, state_join_key_indices_l.clone());
336 let pk_contained_in_jk_r = is_subset(state_pk_indices_r, state_join_key_indices_r.clone());
337
338 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
340
341 let join_key_data_types_l = state_join_key_indices_l
342 .iter()
343 .map(|idx| state_all_data_types_l[*idx].clone())
344 .collect_vec();
345
346 let join_key_data_types_r = state_join_key_indices_r
347 .iter()
348 .map(|idx| state_all_data_types_r[*idx].clone())
349 .collect_vec();
350
351 assert_eq!(join_key_data_types_l, join_key_data_types_r);
352
353 let null_matched = K::Bitmap::from_bool_vec(null_safe);
354
355 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
356 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
357
358 let degree_state_l = need_degree_table_l.then(|| {
359 TableInner::new(
360 degree_pk_indices_l,
361 degree_join_key_indices_l,
362 degree_state_table_l,
363 )
364 });
365 let degree_state_r = need_degree_table_r.then(|| {
366 TableInner::new(
367 degree_pk_indices_r,
368 degree_join_key_indices_r,
369 degree_state_table_r,
370 )
371 });
372
373 let (left_to_output, right_to_output) = {
374 let (left_len, right_len) = if is_left_semi_or_anti(T) {
375 (state_all_data_types_l.len(), 0usize)
376 } else if is_right_semi_or_anti(T) {
377 (0usize, state_all_data_types_r.len())
378 } else {
379 (state_all_data_types_l.len(), state_all_data_types_r.len())
380 };
381 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
382 };
383
384 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
385 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
386
387 let left_input_len = input_l.schema().len();
388 let right_input_len = input_r.schema().len();
389 let mut l2inequality_index = vec![vec![]; left_input_len];
390 let mut r2inequality_index = vec![vec![]; right_input_len];
391 let mut l_state_clean_columns = vec![];
392 let mut r_state_clean_columns = vec![];
393 let inequality_pairs = inequality_pairs
394 .into_iter()
395 .enumerate()
396 .map(
397 |(
398 index,
399 (key_required_larger, key_required_smaller, clean_state, delta_expression),
400 )| {
401 let output_indices = if key_required_larger < key_required_smaller {
402 if clean_state {
403 l_state_clean_columns.push((key_required_larger, index));
404 }
405 l2inequality_index[key_required_larger].push((index, false));
406 r2inequality_index[key_required_smaller - left_input_len]
407 .push((index, true));
408 l2o_indexed
409 .get_vec(&key_required_larger)
410 .cloned()
411 .unwrap_or_default()
412 } else {
413 if clean_state {
414 r_state_clean_columns
415 .push((key_required_larger - left_input_len, index));
416 }
417 l2inequality_index[key_required_smaller].push((index, true));
418 r2inequality_index[key_required_larger - left_input_len]
419 .push((index, false));
420 r2o_indexed
421 .get_vec(&(key_required_larger - left_input_len))
422 .cloned()
423 .unwrap_or_default()
424 };
425 (output_indices, delta_expression)
426 },
427 )
428 .collect_vec();
429
430 let mut l_non_null_fields = l2inequality_index
431 .iter()
432 .positions(|inequalities| !inequalities.is_empty())
433 .collect_vec();
434 let mut r_non_null_fields = r2inequality_index
435 .iter()
436 .positions(|inequalities| !inequalities.is_empty())
437 .collect_vec();
438
439 if append_only_optimize {
440 l_state_clean_columns.clear();
441 r_state_clean_columns.clear();
442 l_non_null_fields.clear();
443 r_non_null_fields.clear();
444 }
445
446 let inequality_watermarks = vec![None; inequality_pairs.len()];
447 let watermark_buffers = BTreeMap::new();
448
449 Self {
450 ctx: ctx.clone(),
451 info,
452 input_l: Some(input_l),
453 input_r: Some(input_r),
454 actual_output_data_types,
455 side_l: JoinSide {
456 ht: JoinHashMap::new(
457 watermark_epoch.clone(),
458 join_key_data_types_l,
459 state_join_key_indices_l.clone(),
460 state_all_data_types_l.clone(),
461 state_table_l,
462 params_l.deduped_pk_indices,
463 degree_state_l,
464 null_matched.clone(),
465 pk_contained_in_jk_l,
466 None,
467 metrics.clone(),
468 ctx.id,
469 ctx.fragment_id,
470 "left",
471 ),
472 join_key_indices: state_join_key_indices_l,
473 all_data_types: state_all_data_types_l,
474 i2o_mapping: left_to_output,
475 i2o_mapping_indexed: l2o_indexed,
476 input2inequality_index: l2inequality_index,
477 non_null_fields: l_non_null_fields,
478 state_clean_columns: l_state_clean_columns,
479 start_pos: 0,
480 need_degree_table: need_degree_table_l,
481 _marker: PhantomData,
482 },
483 side_r: JoinSide {
484 ht: JoinHashMap::new(
485 watermark_epoch,
486 join_key_data_types_r,
487 state_join_key_indices_r.clone(),
488 state_all_data_types_r.clone(),
489 state_table_r,
490 params_r.deduped_pk_indices,
491 degree_state_r,
492 null_matched,
493 pk_contained_in_jk_r,
494 None,
495 metrics.clone(),
496 ctx.id,
497 ctx.fragment_id,
498 "right",
499 ),
500 join_key_indices: state_join_key_indices_r,
501 all_data_types: state_all_data_types_r,
502 start_pos: side_l_column_n,
503 i2o_mapping: right_to_output,
504 i2o_mapping_indexed: r2o_indexed,
505 input2inequality_index: r2inequality_index,
506 non_null_fields: r_non_null_fields,
507 state_clean_columns: r_state_clean_columns,
508 need_degree_table: need_degree_table_r,
509 _marker: PhantomData,
510 },
511 cond,
512 inequality_pairs,
513 inequality_watermarks,
514 append_only_optimize,
515 metrics,
516 chunk_size,
517 cnt_rows_received: 0,
518 watermark_buffers,
519 high_join_amplification_threshold,
520 entry_state_max_rows,
521 }
522 }
523
524 #[try_stream(ok = Message, error = StreamExecutorError)]
525 async fn into_stream(mut self) {
526 let input_l = self.input_l.take().unwrap();
527 let input_r = self.input_r.take().unwrap();
528 let aligned_stream = barrier_align(
529 input_l.execute(),
530 input_r.execute(),
531 self.ctx.id,
532 self.ctx.fragment_id,
533 self.metrics.clone(),
534 "Join",
535 );
536 pin_mut!(aligned_stream);
537
538 let actor_id = self.ctx.id;
539
540 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
541 let first_epoch = barrier.epoch;
542 yield Message::Barrier(barrier);
544 self.side_l.init(first_epoch).await?;
545 self.side_r.init(first_epoch).await?;
546
547 let actor_id_str = self.ctx.id.to_string();
548 let fragment_id_str = self.ctx.fragment_id.to_string();
549
550 let join_actor_input_waiting_duration_ns = self
552 .metrics
553 .join_actor_input_waiting_duration_ns
554 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
555 let left_join_match_duration_ns = self
556 .metrics
557 .join_match_duration_ns
558 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
559 let right_join_match_duration_ns = self
560 .metrics
561 .join_match_duration_ns
562 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
563
564 let barrier_join_match_duration_ns = self
565 .metrics
566 .join_match_duration_ns
567 .with_guarded_label_values(&[
568 actor_id_str.as_str(),
569 fragment_id_str.as_str(),
570 "barrier",
571 ]);
572
573 let left_join_cached_entry_count = self
574 .metrics
575 .join_cached_entry_count
576 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
577
578 let right_join_cached_entry_count = self
579 .metrics
580 .join_cached_entry_count
581 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
582
583 let mut start_time = Instant::now();
584
585 while let Some(msg) = aligned_stream
586 .next()
587 .instrument_await("hash_join_barrier_align")
588 .await
589 {
590 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
591 match msg? {
592 AlignedMessage::WatermarkLeft(watermark) => {
593 for watermark_to_emit in
594 self.handle_watermark(SideType::Left, watermark).await?
595 {
596 yield Message::Watermark(watermark_to_emit);
597 }
598 }
599 AlignedMessage::WatermarkRight(watermark) => {
600 for watermark_to_emit in
601 self.handle_watermark(SideType::Right, watermark).await?
602 {
603 yield Message::Watermark(watermark_to_emit);
604 }
605 }
606 AlignedMessage::Left(chunk) => {
607 let mut left_time = Duration::from_nanos(0);
608 let mut left_start_time = Instant::now();
609 #[for_await]
610 for chunk in Self::eq_join_left(EqJoinArgs {
611 ctx: &self.ctx,
612 side_l: &mut self.side_l,
613 side_r: &mut self.side_r,
614 actual_output_data_types: &self.actual_output_data_types,
615 cond: &mut self.cond,
616 inequality_watermarks: &self.inequality_watermarks,
617 chunk,
618 append_only_optimize: self.append_only_optimize,
619 chunk_size: self.chunk_size,
620 cnt_rows_received: &mut self.cnt_rows_received,
621 high_join_amplification_threshold: self.high_join_amplification_threshold,
622 entry_state_max_rows: self.entry_state_max_rows,
623 }) {
624 left_time += left_start_time.elapsed();
625 yield Message::Chunk(chunk?);
626 left_start_time = Instant::now();
627 }
628 left_time += left_start_time.elapsed();
629 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
630 self.try_flush_data().await?;
631 }
632 AlignedMessage::Right(chunk) => {
633 let mut right_time = Duration::from_nanos(0);
634 let mut right_start_time = Instant::now();
635 #[for_await]
636 for chunk in Self::eq_join_right(EqJoinArgs {
637 ctx: &self.ctx,
638 side_l: &mut self.side_l,
639 side_r: &mut self.side_r,
640 actual_output_data_types: &self.actual_output_data_types,
641 cond: &mut self.cond,
642 inequality_watermarks: &self.inequality_watermarks,
643 chunk,
644 append_only_optimize: self.append_only_optimize,
645 chunk_size: self.chunk_size,
646 cnt_rows_received: &mut self.cnt_rows_received,
647 high_join_amplification_threshold: self.high_join_amplification_threshold,
648 entry_state_max_rows: self.entry_state_max_rows,
649 }) {
650 right_time += right_start_time.elapsed();
651 yield Message::Chunk(chunk?);
652 right_start_time = Instant::now();
653 }
654 right_time += right_start_time.elapsed();
655 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
656 self.try_flush_data().await?;
657 }
658 AlignedMessage::Barrier(barrier) => {
659 let barrier_start_time = Instant::now();
660 let (left_post_commit, right_post_commit) =
661 self.flush_data(barrier.epoch).await?;
662
663 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
664
665 barrier_join_match_duration_ns
668 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
669 yield Message::Barrier(barrier);
670
671 right_post_commit
673 .post_yield_barrier(update_vnode_bitmap.clone())
674 .await?;
675 if left_post_commit
676 .post_yield_barrier(update_vnode_bitmap)
677 .await?
678 .unwrap_or(false)
679 {
680 self.watermark_buffers
681 .values_mut()
682 .for_each(|buffers| buffers.clear());
683 self.inequality_watermarks.fill(None);
684 }
685
686 for (join_cached_entry_count, ht) in [
688 (&left_join_cached_entry_count, &self.side_l.ht),
689 (&right_join_cached_entry_count, &self.side_r.ht),
690 ] {
691 join_cached_entry_count.set(ht.entry_count() as i64);
692 }
693 }
694 }
695 start_time = Instant::now();
696 }
697 }
698
699 async fn flush_data(
700 &mut self,
701 epoch: EpochPair,
702 ) -> StreamExecutorResult<(
703 JoinHashMapPostCommit<'_, K, S, E>,
704 JoinHashMapPostCommit<'_, K, S, E>,
705 )> {
706 let left = self.side_l.ht.flush(epoch).await?;
709 let right = self.side_r.ht.flush(epoch).await?;
710 Ok((left, right))
711 }
712
713 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
714 self.side_l.ht.try_flush().await?;
717 self.side_r.ht.try_flush().await?;
718 Ok(())
719 }
720
721 fn evict_cache(
723 side_update: &mut JoinSide<K, S, E>,
724 side_match: &mut JoinSide<K, S, E>,
725 cnt_rows_received: &mut u32,
726 ) {
727 *cnt_rows_received += 1;
728 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
729 side_update.ht.evict();
730 side_match.ht.evict();
731 *cnt_rows_received = 0;
732 }
733 }
734
735 async fn handle_watermark(
736 &mut self,
737 side: SideTypePrimitive,
738 watermark: Watermark,
739 ) -> StreamExecutorResult<Vec<Watermark>> {
740 let (side_update, side_match) = if side == SideType::Left {
741 (&mut self.side_l, &mut self.side_r)
742 } else {
743 (&mut self.side_r, &mut self.side_l)
744 };
745
746 if side_update.join_key_indices[0] == watermark.col_idx {
748 side_match.ht.update_watermark(watermark.val.clone());
749 }
750
751 let wm_in_jk = side_update
753 .join_key_indices
754 .iter()
755 .positions(|idx| *idx == watermark.col_idx);
756 let mut watermarks_to_emit = vec![];
757 for idx in wm_in_jk {
758 let buffers = self
759 .watermark_buffers
760 .entry(idx)
761 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
762 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
763 let empty_indices = vec![];
764 let output_indices = side_update
765 .i2o_mapping_indexed
766 .get_vec(&side_update.join_key_indices[idx])
767 .unwrap_or(&empty_indices)
768 .iter()
769 .chain(
770 side_match
771 .i2o_mapping_indexed
772 .get_vec(&side_match.join_key_indices[idx])
773 .unwrap_or(&empty_indices),
774 );
775 for output_idx in output_indices {
776 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
777 }
778 };
779 }
780 for (inequality_index, need_offset) in
781 &side_update.input2inequality_index[watermark.col_idx]
782 {
783 let buffers = self
784 .watermark_buffers
785 .entry(side_update.join_key_indices.len() + inequality_index)
786 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
787 let mut input_watermark = watermark.clone();
788 if *need_offset
789 && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
790 {
791 #[allow(clippy::disallowed_methods)]
793 let eval_result = delta_expression
794 .inner()
795 .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
796 .await;
797 match eval_result {
798 Ok(value) => input_watermark.val = value.unwrap(),
799 Err(err) => {
800 if !matches!(err, ExprError::NumericOutOfRange) {
801 self.ctx.on_compute_error(err, &self.info.identity);
802 }
803 continue;
804 }
805 }
806 };
807 if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
808 for output_idx in &self.inequality_pairs[*inequality_index].0 {
809 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
810 }
811 self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
812 }
813 }
814 Ok(watermarks_to_emit)
815 }
816
817 fn row_concat(
818 row_update: impl Row,
819 update_start_pos: usize,
820 row_matched: impl Row,
821 matched_start_pos: usize,
822 ) -> OwnedRow {
823 let mut new_row = vec![None; row_update.len() + row_matched.len()];
824
825 for (i, datum_ref) in row_update.iter().enumerate() {
826 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
827 }
828 for (i, datum_ref) in row_matched.iter().enumerate() {
829 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
830 }
831 OwnedRow::new(new_row)
832 }
833
834 fn eq_join_left(
836 args: EqJoinArgs<'_, K, S, E>,
837 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
838 Self::eq_join_oneside::<{ SideType::Left }>(args)
839 }
840
841 fn eq_join_right(
843 args: EqJoinArgs<'_, K, S, E>,
844 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
845 Self::eq_join_oneside::<{ SideType::Right }>(args)
846 }
847
848 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
849 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
850 let EqJoinArgs {
851 ctx,
852 side_l,
853 side_r,
854 actual_output_data_types,
855 cond,
856 inequality_watermarks,
857 chunk,
858 append_only_optimize,
859 chunk_size,
860 cnt_rows_received,
861 high_join_amplification_threshold,
862 entry_state_max_rows,
863 ..
864 } = args;
865
866 let (side_update, side_match) = if SIDE == SideType::Left {
867 (side_l, side_r)
868 } else {
869 (side_r, side_l)
870 };
871
872 let useful_state_clean_columns = side_match
873 .state_clean_columns
874 .iter()
875 .filter_map(|(column_idx, inequality_index)| {
876 inequality_watermarks[*inequality_index]
877 .as_ref()
878 .map(|watermark| (*column_idx, watermark))
879 })
880 .collect_vec();
881
882 let mut hashjoin_chunk_builder =
883 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
884 chunk_size,
885 actual_output_data_types.to_vec(),
886 side_update.i2o_mapping.clone(),
887 side_match.i2o_mapping.clone(),
888 ));
889
890 let join_matched_join_keys = ctx
891 .streaming_metrics
892 .join_matched_join_keys
893 .with_guarded_label_values(&[
894 &ctx.id.to_string(),
895 &ctx.fragment_id.to_string(),
896 &side_update.ht.table_id().to_string(),
897 ]);
898
899 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
900 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
901 let Some((op, row)) = r else {
902 continue;
903 };
904 Self::evict_cache(side_update, side_match, cnt_rows_received);
905
906 let cache_lookup_result = {
907 let probe_non_null_requirement_satisfied = side_update
908 .non_null_fields
909 .iter()
910 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
911 let build_non_null_requirement_satisfied =
912 key.null_bitmap().is_subset(side_match.ht.null_matched());
913 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
914 side_match.ht.take_state_opt(key)
915 } else {
916 CacheResult::NeverMatch
917 }
918 };
919 let mut total_matches = 0;
920
921 macro_rules! match_rows {
922 ($op:ident) => {
923 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
924 cache_lookup_result,
925 row,
926 key,
927 &mut hashjoin_chunk_builder,
928 side_match,
929 side_update,
930 &useful_state_clean_columns,
931 cond,
932 append_only_optimize,
933 entry_state_max_rows,
934 )
935 };
936 }
937
938 match op {
939 Op::Insert | Op::UpdateInsert =>
940 {
941 #[for_await]
942 for chunk in match_rows!(Insert) {
943 let chunk = chunk?;
944 total_matches += chunk.cardinality();
945 yield chunk;
946 }
947 }
948 Op::Delete | Op::UpdateDelete =>
949 {
950 #[for_await]
951 for chunk in match_rows!(Delete) {
952 let chunk = chunk?;
953 total_matches += chunk.cardinality();
954 yield chunk;
955 }
956 }
957 };
958
959 join_matched_join_keys.observe(total_matches as _);
960 if total_matches > high_join_amplification_threshold {
961 let join_key_data_types = side_update.ht.join_key_data_types();
962 let key = key.deserialize(join_key_data_types)?;
963 tracing::warn!(target: "high_join_amplification",
964 matched_rows_len = total_matches,
965 update_table_id = %side_update.ht.table_id(),
966 match_table_id = %side_match.ht.table_id(),
967 join_key = ?key,
968 actor_id = %ctx.id,
969 fragment_id = %ctx.fragment_id,
970 "large rows matched for join key"
971 );
972 }
973 }
974 if let Some(chunk) = hashjoin_chunk_builder.take() {
976 yield chunk;
977 }
978 }
979
980 #[allow(clippy::too_many_arguments)]
989 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
990 async fn handle_match_rows<
991 'a,
992 const SIDE: SideTypePrimitive,
993 const JOIN_OP: JoinOpPrimitive,
994 >(
995 cached_lookup_result: CacheResult<E>,
996 row: RowRef<'a>,
997 key: &'a K,
998 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
999 side_match: &'a mut JoinSide<K, S, E>,
1000 side_update: &'a mut JoinSide<K, S, E>,
1001 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1002 cond: &'a mut Option<NonStrictExpression>,
1003 append_only_optimize: bool,
1004 entry_state_max_rows: usize,
1005 ) {
1006 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1007 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1008 let mut entry_state_count = 0;
1009
1010 let mut degree = 0;
1011 let mut append_only_matched_row = None;
1012 let mut matched_rows_to_clean = vec![];
1013
1014 macro_rules! match_row {
1015 (
1016 $match_order_key_indices:expr,
1017 $degree_table:expr,
1018 $matched_row:expr,
1019 $matched_row_ref:expr,
1020 $from_cache:literal,
1021 $map_output:expr,
1022 ) => {
1023 Self::handle_match_row::<_, _, SIDE, { JOIN_OP }, { $from_cache }>(
1024 row,
1025 $matched_row,
1026 $matched_row_ref,
1027 hashjoin_chunk_builder,
1028 $match_order_key_indices,
1029 $degree_table,
1030 side_update.start_pos,
1031 side_match.start_pos,
1032 cond,
1033 &mut degree,
1034 useful_state_clean_columns,
1035 append_only_optimize,
1036 &mut append_only_matched_row,
1037 &mut matched_rows_to_clean,
1038 $map_output,
1039 )
1040 };
1041 }
1042
1043 let entry_state = match cached_lookup_result {
1044 CacheResult::NeverMatch => {
1045 let op = match JOIN_OP {
1046 JoinOp::Insert => Op::Insert,
1047 JoinOp::Delete => Op::Delete,
1048 };
1049 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1050 yield chunk;
1051 }
1052 return Ok(());
1053 }
1054 CacheResult::Hit(mut cached_rows) => {
1055 let (match_order_key_indices, match_degree_state) =
1056 side_match.ht.get_degree_state_mut_ref();
1057 for (matched_row_ref, matched_row) in
1059 cached_rows.values_mut(&side_match.all_data_types)
1060 {
1061 let matched_row = matched_row?;
1062 if let Some(chunk) = match_row!(
1063 match_order_key_indices,
1064 match_degree_state,
1065 matched_row,
1066 Some(matched_row_ref),
1067 true,
1068 Either::Left,
1069 )
1070 .await
1071 {
1072 yield chunk;
1073 }
1074 }
1075
1076 cached_rows
1077 }
1078 CacheResult::Miss => {
1079 let (matched_rows, match_order_key_indices, degree_table) = side_match
1081 .ht
1082 .fetch_matched_rows_and_get_degree_table_ref(key)
1083 .await?;
1084
1085 #[for_await]
1086 for matched_row in matched_rows {
1087 let (encoded_pk, matched_row) = matched_row?;
1088
1089 let mut matched_row_ref = None;
1090
1091 if entry_state_count <= entry_state_max_rows {
1093 let row_ref = entry_state
1094 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1096 matched_row_ref = Some(row_ref);
1097 entry_state_count += 1;
1098 }
1099 if let Some(chunk) = match_row!(
1100 match_order_key_indices,
1101 degree_table,
1102 matched_row,
1103 matched_row_ref,
1104 false,
1105 Either::Right,
1106 )
1107 .await
1108 {
1109 yield chunk;
1110 }
1111 }
1112 Box::new(entry_state)
1113 }
1114 };
1115
1116 let op = match JOIN_OP {
1118 JoinOp::Insert => Op::Insert,
1119 JoinOp::Delete => Op::Delete,
1120 };
1121 if degree == 0 {
1122 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1123 yield chunk;
1124 }
1125 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1126 {
1127 yield chunk;
1128 }
1129
1130 if cache_hit || entry_state_count <= entry_state_max_rows {
1132 side_match.ht.update_state(key, entry_state);
1133 }
1134
1135 for matched_row in matched_rows_to_clean {
1137 side_match.ht.delete_handle_degree(key, matched_row)?;
1138 }
1139
1140 if append_only_optimize && let Some(row) = append_only_matched_row {
1142 assert_matches!(JOIN_OP, JoinOp::Insert);
1143 side_match.ht.delete_handle_degree(key, row)?;
1144 return Ok(());
1145 }
1146
1147 match JOIN_OP {
1149 JoinOp::Insert => {
1150 side_update
1151 .ht
1152 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1153 }
1154 JoinOp::Delete => {
1155 side_update
1156 .ht
1157 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1158 }
1159 }
1160 }
1161
1162 #[allow(clippy::too_many_arguments)]
1163 #[inline]
1164 async fn handle_match_row<
1165 'a,
1166 R: Row, RO: Row, const SIDE: SideTypePrimitive,
1169 const JOIN_OP: JoinOpPrimitive,
1170 const MATCHED_ROWS_FROM_CACHE: bool,
1171 >(
1172 update_row: RowRef<'a>,
1173 mut matched_row: JoinRow<R>,
1174 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1175 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1176 match_order_key_indices: &[usize],
1177 match_degree_table: &mut Option<TableInner<S>>,
1178 side_update_start_pos: usize,
1179 side_match_start_pos: usize,
1180 cond: &Option<NonStrictExpression>,
1181 update_row_degree: &mut u64,
1182 useful_state_clean_columns: &[(usize, &'a Watermark)],
1183 append_only_optimize: bool,
1184 append_only_matched_row: &mut Option<JoinRow<RO>>,
1185 matched_rows_to_clean: &mut Vec<JoinRow<RO>>,
1186 map_output: impl Fn(R) -> RO,
1187 ) -> Option<StreamChunk> {
1188 let mut need_state_clean = false;
1189 let mut chunk_opt = None;
1190
1191 let join_condition_satisfied = Self::check_join_condition(
1195 update_row,
1196 side_update_start_pos,
1197 &matched_row.row,
1198 side_match_start_pos,
1199 cond,
1200 )
1201 .await;
1202
1203 if join_condition_satisfied {
1204 *update_row_degree += 1;
1206 if matches!(JOIN_OP, JoinOp::Insert)
1211 && !forward_exactly_once(T, SIDE)
1212 && let Some(chunk) =
1213 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1214 {
1215 chunk_opt = Some(chunk);
1216 }
1217 if let Some(degree_table) = match_degree_table {
1219 update_degree::<S, { JOIN_OP }>(
1220 match_order_key_indices,
1221 degree_table,
1222 &mut matched_row,
1223 );
1224 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1225 match JOIN_OP {
1227 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1228 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1229 }
1230 }
1231 }
1232
1233 if matches!(JOIN_OP, JoinOp::Delete)
1236 && !forward_exactly_once(T, SIDE)
1237 && let Some(chunk) =
1238 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1239 {
1240 chunk_opt = Some(chunk);
1241 }
1242 } else {
1243 for (column_idx, watermark) in useful_state_clean_columns {
1245 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1246 scalar
1247 .default_cmp(&watermark.val.as_scalar_ref_impl())
1248 .is_lt()
1249 }) {
1250 need_state_clean = true;
1251 break;
1252 }
1253 }
1254 }
1255 if append_only_optimize {
1259 assert_matches!(JOIN_OP, JoinOp::Insert);
1260 assert!(append_only_matched_row.is_none());
1263 *append_only_matched_row = Some(matched_row.map(map_output));
1264 } else if need_state_clean {
1265 matched_rows_to_clean.push(matched_row.map(map_output));
1269 }
1270
1271 chunk_opt
1272 }
1273
1274 #[inline]
1279 async fn check_join_condition(
1280 row: impl Row,
1281 side_update_start_pos: usize,
1282 matched_row: impl Row,
1283 side_match_start_pos: usize,
1284 join_condition: &Option<NonStrictExpression>,
1285 ) -> bool {
1286 if let Some(join_condition) = join_condition {
1287 let new_row = Self::row_concat(
1288 row,
1289 side_update_start_pos,
1290 matched_row,
1291 side_match_start_pos,
1292 );
1293 join_condition
1294 .eval_row_infallible(&new_row)
1295 .await
1296 .map(|s| *s.as_bool())
1297 .unwrap_or(false)
1298 } else {
1299 true
1300 }
1301 }
1302}
1303
1304#[cfg(test)]
1305mod tests {
1306 use std::sync::atomic::AtomicU64;
1307
1308 use pretty_assertions::assert_eq;
1309 use risingwave_common::array::*;
1310 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1311 use risingwave_common::hash::{Key64, Key128};
1312 use risingwave_common::util::epoch::test_epoch;
1313 use risingwave_common::util::sort_util::OrderType;
1314 use risingwave_storage::memory::MemoryStateStore;
1315
1316 use super::*;
1317 use crate::common::table::test_utils::gen_pbtable;
1318 use crate::executor::MemoryEncoding;
1319 use crate::executor::test_utils::expr::build_from_pretty;
1320 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1321
1322 async fn create_in_memory_state_table(
1323 mem_state: MemoryStateStore,
1324 data_types: &[DataType],
1325 order_types: &[OrderType],
1326 pk_indices: &[usize],
1327 table_id: u32,
1328 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1329 let column_descs = data_types
1330 .iter()
1331 .enumerate()
1332 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1333 .collect_vec();
1334 let state_table = StateTable::from_table_catalog(
1335 &gen_pbtable(
1336 TableId::new(table_id),
1337 column_descs,
1338 order_types.to_vec(),
1339 pk_indices.to_vec(),
1340 0,
1341 ),
1342 mem_state.clone(),
1343 None,
1344 )
1345 .await;
1346
1347 let mut degree_table_column_descs = vec![];
1349 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1350 degree_table_column_descs.push(ColumnDesc::unnamed(
1351 ColumnId::new(pk_id as i32),
1352 data_types[*idx].clone(),
1353 ))
1354 });
1355 degree_table_column_descs.push(ColumnDesc::unnamed(
1356 ColumnId::new(pk_indices.len() as i32),
1357 DataType::Int64,
1358 ));
1359 let degree_state_table = StateTable::from_table_catalog(
1360 &gen_pbtable(
1361 TableId::new(table_id + 1),
1362 degree_table_column_descs,
1363 order_types.to_vec(),
1364 pk_indices.to_vec(),
1365 0,
1366 ),
1367 mem_state,
1368 None,
1369 )
1370 .await;
1371 (state_table, degree_state_table)
1372 }
1373
1374 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1375 build_from_pretty(
1376 condition_text
1377 .as_deref()
1378 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1379 )
1380 }
1381
1382 async fn create_executor<const T: JoinTypePrimitive>(
1383 with_condition: bool,
1384 null_safe: bool,
1385 condition_text: Option<String>,
1386 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1387 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1388 let schema = Schema {
1389 fields: vec![
1390 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1392 ],
1393 };
1394 let (tx_l, source_l) = MockSource::channel();
1395 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1396 let (tx_r, source_r) = MockSource::channel();
1397 let source_r = source_r.into_executor(schema, vec![1]);
1398 let params_l = JoinParams::new(vec![0], vec![1]);
1399 let params_r = JoinParams::new(vec![0], vec![1]);
1400 let cond = with_condition.then(|| create_cond(condition_text));
1401
1402 let mem_state = MemoryStateStore::new();
1403
1404 let (state_l, degree_state_l) = create_in_memory_state_table(
1405 mem_state.clone(),
1406 &[DataType::Int64, DataType::Int64],
1407 &[OrderType::ascending(), OrderType::ascending()],
1408 &[0, 1],
1409 0,
1410 )
1411 .await;
1412
1413 let (state_r, degree_state_r) = create_in_memory_state_table(
1414 mem_state,
1415 &[DataType::Int64, DataType::Int64],
1416 &[OrderType::ascending(), OrderType::ascending()],
1417 &[0, 1],
1418 2,
1419 )
1420 .await;
1421
1422 let schema = match T {
1423 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1424 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1425 _ => [source_l.schema().fields(), source_r.schema().fields()]
1426 .concat()
1427 .into_iter()
1428 .collect(),
1429 };
1430 let schema_len = schema.len();
1431 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1432
1433 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1434 ActorContext::for_test(123),
1435 info,
1436 source_l,
1437 source_r,
1438 params_l,
1439 params_r,
1440 vec![null_safe],
1441 (0..schema_len).collect_vec(),
1442 cond,
1443 inequality_pairs,
1444 state_l,
1445 degree_state_l,
1446 state_r,
1447 degree_state_r,
1448 Arc::new(AtomicU64::new(0)),
1449 false,
1450 Arc::new(StreamingMetrics::unused()),
1451 1024,
1452 2048,
1453 );
1454 (tx_l, tx_r, executor.boxed().execute())
1455 }
1456
1457 async fn create_classical_executor<const T: JoinTypePrimitive>(
1458 with_condition: bool,
1459 null_safe: bool,
1460 condition_text: Option<String>,
1461 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1462 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1463 }
1464
1465 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1466 with_condition: bool,
1467 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1468 let schema = Schema {
1469 fields: vec![
1470 Field::unnamed(DataType::Int64),
1471 Field::unnamed(DataType::Int64),
1472 Field::unnamed(DataType::Int64),
1473 ],
1474 };
1475 let (tx_l, source_l) = MockSource::channel();
1476 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1477 let (tx_r, source_r) = MockSource::channel();
1478 let source_r = source_r.into_executor(schema, vec![0]);
1479 let params_l = JoinParams::new(vec![0, 1], vec![]);
1480 let params_r = JoinParams::new(vec![0, 1], vec![]);
1481 let cond = with_condition.then(|| create_cond(None));
1482
1483 let mem_state = MemoryStateStore::new();
1484
1485 let (state_l, degree_state_l) = create_in_memory_state_table(
1486 mem_state.clone(),
1487 &[DataType::Int64, DataType::Int64, DataType::Int64],
1488 &[
1489 OrderType::ascending(),
1490 OrderType::ascending(),
1491 OrderType::ascending(),
1492 ],
1493 &[0, 1, 0],
1494 0,
1495 )
1496 .await;
1497
1498 let (state_r, degree_state_r) = create_in_memory_state_table(
1499 mem_state,
1500 &[DataType::Int64, DataType::Int64, DataType::Int64],
1501 &[
1502 OrderType::ascending(),
1503 OrderType::ascending(),
1504 OrderType::ascending(),
1505 ],
1506 &[0, 1, 1],
1507 1,
1508 )
1509 .await;
1510
1511 let schema = match T {
1512 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1513 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1514 _ => [source_l.schema().fields(), source_r.schema().fields()]
1515 .concat()
1516 .into_iter()
1517 .collect(),
1518 };
1519 let schema_len = schema.len();
1520 let info = ExecutorInfo::for_test(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1521
1522 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1523 ActorContext::for_test(123),
1524 info,
1525 source_l,
1526 source_r,
1527 params_l,
1528 params_r,
1529 vec![false],
1530 (0..schema_len).collect_vec(),
1531 cond,
1532 vec![],
1533 state_l,
1534 degree_state_l,
1535 state_r,
1536 degree_state_r,
1537 Arc::new(AtomicU64::new(0)),
1538 true,
1539 Arc::new(StreamingMetrics::unused()),
1540 1024,
1541 2048,
1542 );
1543 (tx_l, tx_r, executor.boxed().execute())
1544 }
1545
1546 #[tokio::test]
1547 async fn test_interval_join() -> StreamExecutorResult<()> {
1548 let chunk_l1 = StreamChunk::from_pretty(
1549 " I I
1550 + 1 4
1551 + 2 3
1552 + 2 5
1553 + 3 6",
1554 );
1555 let chunk_l2 = StreamChunk::from_pretty(
1556 " I I
1557 + 3 8
1558 - 3 8",
1559 );
1560 let chunk_r1 = StreamChunk::from_pretty(
1561 " I I
1562 + 2 6
1563 + 4 8
1564 + 6 9",
1565 );
1566 let chunk_r2 = StreamChunk::from_pretty(
1567 " I I
1568 + 2 3
1569 + 6 11",
1570 );
1571 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1572 true,
1573 false,
1574 Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1575 vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1576 )
1577 .await;
1578
1579 tx_l.push_barrier(test_epoch(1), false);
1581 tx_r.push_barrier(test_epoch(1), false);
1582 hash_join.next_unwrap_ready_barrier()?;
1583
1584 tx_l.push_chunk(chunk_l1);
1586 hash_join.next_unwrap_pending();
1587
1588 tx_l.push_barrier(test_epoch(2), false);
1590 tx_r.push_barrier(test_epoch(2), false);
1591 hash_join.next_unwrap_ready_barrier()?;
1592
1593 tx_l.push_chunk(chunk_l2);
1595 hash_join.next_unwrap_pending();
1596
1597 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1598 hash_join.next_unwrap_pending();
1599
1600 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1601 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1602 assert_eq!(
1603 output_watermark,
1604 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1605 );
1606 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1607 assert_eq!(
1608 output_watermark,
1609 Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1610 );
1611
1612 tx_r.push_chunk(chunk_r1);
1614 let chunk = hash_join.next_unwrap_ready_chunk()?;
1615 assert_eq!(
1617 chunk,
1618 StreamChunk::from_pretty(
1619 " I I I I
1620 + 2 5 2 6"
1621 )
1622 );
1623
1624 tx_r.push_chunk(chunk_r2);
1626 hash_join.next_unwrap_pending();
1629
1630 Ok(())
1631 }
1632
1633 #[tokio::test]
1634 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1635 let chunk_l1 = StreamChunk::from_pretty(
1636 " I I
1637 + 1 4
1638 + 2 5
1639 + 3 6",
1640 );
1641 let chunk_l2 = StreamChunk::from_pretty(
1642 " I I
1643 + 3 8
1644 - 3 8",
1645 );
1646 let chunk_r1 = StreamChunk::from_pretty(
1647 " I I
1648 + 2 7
1649 + 4 8
1650 + 6 9",
1651 );
1652 let chunk_r2 = StreamChunk::from_pretty(
1653 " I I
1654 + 3 10
1655 + 6 11",
1656 );
1657 let (mut tx_l, mut tx_r, mut hash_join) =
1658 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1659
1660 tx_l.push_barrier(test_epoch(1), false);
1662 tx_r.push_barrier(test_epoch(1), false);
1663 hash_join.next_unwrap_ready_barrier()?;
1664
1665 tx_l.push_chunk(chunk_l1);
1667 hash_join.next_unwrap_pending();
1668
1669 tx_l.push_barrier(test_epoch(2), false);
1671 tx_r.push_barrier(test_epoch(2), false);
1672 hash_join.next_unwrap_ready_barrier()?;
1673
1674 tx_l.push_chunk(chunk_l2);
1676 hash_join.next_unwrap_pending();
1677
1678 tx_r.push_chunk(chunk_r1);
1680 let chunk = hash_join.next_unwrap_ready_chunk()?;
1681 assert_eq!(
1682 chunk,
1683 StreamChunk::from_pretty(
1684 " I I I I
1685 + 2 5 2 7"
1686 )
1687 );
1688
1689 tx_r.push_chunk(chunk_r2);
1691 let chunk = hash_join.next_unwrap_ready_chunk()?;
1692 assert_eq!(
1693 chunk,
1694 StreamChunk::from_pretty(
1695 " I I I I
1696 + 3 6 3 10"
1697 )
1698 );
1699
1700 Ok(())
1701 }
1702
1703 #[tokio::test]
1704 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1705 let chunk_l1 = StreamChunk::from_pretty(
1706 " I I
1707 + 1 4
1708 + 2 5
1709 + . 6",
1710 );
1711 let chunk_l2 = StreamChunk::from_pretty(
1712 " I I
1713 + . 8
1714 - . 8",
1715 );
1716 let chunk_r1 = StreamChunk::from_pretty(
1717 " I I
1718 + 2 7
1719 + 4 8
1720 + 6 9",
1721 );
1722 let chunk_r2 = StreamChunk::from_pretty(
1723 " I I
1724 + . 10
1725 + 6 11",
1726 );
1727 let (mut tx_l, mut tx_r, mut hash_join) =
1728 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1729
1730 tx_l.push_barrier(test_epoch(1), false);
1732 tx_r.push_barrier(test_epoch(1), false);
1733 hash_join.next_unwrap_ready_barrier()?;
1734
1735 tx_l.push_chunk(chunk_l1);
1737 hash_join.next_unwrap_pending();
1738
1739 tx_l.push_barrier(test_epoch(2), false);
1741 tx_r.push_barrier(test_epoch(2), false);
1742 hash_join.next_unwrap_ready_barrier()?;
1743
1744 tx_l.push_chunk(chunk_l2);
1746 hash_join.next_unwrap_pending();
1747
1748 tx_r.push_chunk(chunk_r1);
1750 let chunk = hash_join.next_unwrap_ready_chunk()?;
1751 assert_eq!(
1752 chunk,
1753 StreamChunk::from_pretty(
1754 " I I I I
1755 + 2 5 2 7"
1756 )
1757 );
1758
1759 tx_r.push_chunk(chunk_r2);
1761 let chunk = hash_join.next_unwrap_ready_chunk()?;
1762 assert_eq!(
1763 chunk,
1764 StreamChunk::from_pretty(
1765 " I I I I
1766 + . 6 . 10"
1767 )
1768 );
1769
1770 Ok(())
1771 }
1772
1773 #[tokio::test]
1774 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1775 let chunk_l1 = StreamChunk::from_pretty(
1776 " I I
1777 + 1 4
1778 + 2 5
1779 + 3 6",
1780 );
1781 let chunk_l2 = StreamChunk::from_pretty(
1782 " I I
1783 + 3 8
1784 - 3 8",
1785 );
1786 let chunk_r1 = StreamChunk::from_pretty(
1787 " I I
1788 + 2 7
1789 + 4 8
1790 + 6 9",
1791 );
1792 let chunk_r2 = StreamChunk::from_pretty(
1793 " I I
1794 + 3 10
1795 + 6 11",
1796 );
1797 let chunk_l3 = StreamChunk::from_pretty(
1798 " I I
1799 + 6 10",
1800 );
1801 let chunk_r3 = StreamChunk::from_pretty(
1802 " I I
1803 - 6 11",
1804 );
1805 let chunk_r4 = StreamChunk::from_pretty(
1806 " I I
1807 - 6 9",
1808 );
1809 let (mut tx_l, mut tx_r, mut hash_join) =
1810 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1811
1812 tx_l.push_barrier(test_epoch(1), false);
1814 tx_r.push_barrier(test_epoch(1), false);
1815 hash_join.next_unwrap_ready_barrier()?;
1816
1817 tx_l.push_chunk(chunk_l1);
1819 hash_join.next_unwrap_pending();
1820
1821 tx_l.push_barrier(test_epoch(2), false);
1823 tx_r.push_barrier(test_epoch(2), false);
1824 hash_join.next_unwrap_ready_barrier()?;
1825
1826 tx_l.push_chunk(chunk_l2);
1828 hash_join.next_unwrap_pending();
1829
1830 tx_r.push_chunk(chunk_r1);
1832 let chunk = hash_join.next_unwrap_ready_chunk()?;
1833 assert_eq!(
1834 chunk,
1835 StreamChunk::from_pretty(
1836 " I I
1837 + 2 5"
1838 )
1839 );
1840
1841 tx_r.push_chunk(chunk_r2);
1843 let chunk = hash_join.next_unwrap_ready_chunk()?;
1844 assert_eq!(
1845 chunk,
1846 StreamChunk::from_pretty(
1847 " I I
1848 + 3 6"
1849 )
1850 );
1851
1852 tx_l.push_chunk(chunk_l3);
1854 let chunk = hash_join.next_unwrap_ready_chunk()?;
1855 assert_eq!(
1856 chunk,
1857 StreamChunk::from_pretty(
1858 " I I
1859 + 6 10"
1860 )
1861 );
1862
1863 tx_r.push_chunk(chunk_r3);
1866 hash_join.next_unwrap_pending();
1867
1868 tx_r.push_chunk(chunk_r4);
1871 let chunk = hash_join.next_unwrap_ready_chunk()?;
1872 assert_eq!(
1873 chunk,
1874 StreamChunk::from_pretty(
1875 " I I
1876 - 6 10"
1877 )
1878 );
1879
1880 Ok(())
1881 }
1882
1883 #[tokio::test]
1884 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1885 let chunk_l1 = StreamChunk::from_pretty(
1886 " I I
1887 + 1 4
1888 + 2 5
1889 + . 6",
1890 );
1891 let chunk_l2 = StreamChunk::from_pretty(
1892 " I I
1893 + . 8
1894 - . 8",
1895 );
1896 let chunk_r1 = StreamChunk::from_pretty(
1897 " I I
1898 + 2 7
1899 + 4 8
1900 + 6 9",
1901 );
1902 let chunk_r2 = StreamChunk::from_pretty(
1903 " I I
1904 + . 10
1905 + 6 11",
1906 );
1907 let chunk_l3 = StreamChunk::from_pretty(
1908 " I I
1909 + 6 10",
1910 );
1911 let chunk_r3 = StreamChunk::from_pretty(
1912 " I I
1913 - 6 11",
1914 );
1915 let chunk_r4 = StreamChunk::from_pretty(
1916 " I I
1917 - 6 9",
1918 );
1919 let (mut tx_l, mut tx_r, mut hash_join) =
1920 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1921
1922 tx_l.push_barrier(test_epoch(1), false);
1924 tx_r.push_barrier(test_epoch(1), false);
1925 hash_join.next_unwrap_ready_barrier()?;
1926
1927 tx_l.push_chunk(chunk_l1);
1929 hash_join.next_unwrap_pending();
1930
1931 tx_l.push_barrier(test_epoch(2), false);
1933 tx_r.push_barrier(test_epoch(2), false);
1934 hash_join.next_unwrap_ready_barrier()?;
1935
1936 tx_l.push_chunk(chunk_l2);
1938 hash_join.next_unwrap_pending();
1939
1940 tx_r.push_chunk(chunk_r1);
1942 let chunk = hash_join.next_unwrap_ready_chunk()?;
1943 assert_eq!(
1944 chunk,
1945 StreamChunk::from_pretty(
1946 " I I
1947 + 2 5"
1948 )
1949 );
1950
1951 tx_r.push_chunk(chunk_r2);
1953 let chunk = hash_join.next_unwrap_ready_chunk()?;
1954 assert_eq!(
1955 chunk,
1956 StreamChunk::from_pretty(
1957 " I I
1958 + . 6"
1959 )
1960 );
1961
1962 tx_l.push_chunk(chunk_l3);
1964 let chunk = hash_join.next_unwrap_ready_chunk()?;
1965 assert_eq!(
1966 chunk,
1967 StreamChunk::from_pretty(
1968 " I I
1969 + 6 10"
1970 )
1971 );
1972
1973 tx_r.push_chunk(chunk_r3);
1976 hash_join.next_unwrap_pending();
1977
1978 tx_r.push_chunk(chunk_r4);
1981 let chunk = hash_join.next_unwrap_ready_chunk()?;
1982 assert_eq!(
1983 chunk,
1984 StreamChunk::from_pretty(
1985 " I I
1986 - 6 10"
1987 )
1988 );
1989
1990 Ok(())
1991 }
1992
1993 #[tokio::test]
1994 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1995 let chunk_l1 = StreamChunk::from_pretty(
1996 " I I I
1997 + 1 4 1
1998 + 2 5 2
1999 + 3 6 3",
2000 );
2001 let chunk_l2 = StreamChunk::from_pretty(
2002 " I I I
2003 + 4 9 4
2004 + 5 10 5",
2005 );
2006 let chunk_r1 = StreamChunk::from_pretty(
2007 " I I I
2008 + 2 5 1
2009 + 4 9 2
2010 + 6 9 3",
2011 );
2012 let chunk_r2 = StreamChunk::from_pretty(
2013 " I I I
2014 + 1 4 4
2015 + 3 6 5",
2016 );
2017
2018 let (mut tx_l, mut tx_r, mut hash_join) =
2019 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2020
2021 tx_l.push_barrier(test_epoch(1), false);
2023 tx_r.push_barrier(test_epoch(1), false);
2024 hash_join.next_unwrap_ready_barrier()?;
2025
2026 tx_l.push_chunk(chunk_l1);
2028 hash_join.next_unwrap_pending();
2029
2030 tx_l.push_barrier(test_epoch(2), false);
2032 tx_r.push_barrier(test_epoch(2), false);
2033 hash_join.next_unwrap_ready_barrier()?;
2034
2035 tx_l.push_chunk(chunk_l2);
2037 hash_join.next_unwrap_pending();
2038
2039 tx_r.push_chunk(chunk_r1);
2041 let chunk = hash_join.next_unwrap_ready_chunk()?;
2042 assert_eq!(
2043 chunk,
2044 StreamChunk::from_pretty(
2045 " I I I I I I
2046 + 2 5 2 2 5 1
2047 + 4 9 4 4 9 2"
2048 )
2049 );
2050
2051 tx_r.push_chunk(chunk_r2);
2053 let chunk = hash_join.next_unwrap_ready_chunk()?;
2054 assert_eq!(
2055 chunk,
2056 StreamChunk::from_pretty(
2057 " I I I I I I
2058 + 1 4 1 1 4 4
2059 + 3 6 3 3 6 5"
2060 )
2061 );
2062
2063 Ok(())
2064 }
2065
2066 #[tokio::test]
2067 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2068 let chunk_l1 = StreamChunk::from_pretty(
2069 " I I I
2070 + 1 4 1
2071 + 2 5 2
2072 + 3 6 3",
2073 );
2074 let chunk_l2 = StreamChunk::from_pretty(
2075 " I I I
2076 + 4 9 4
2077 + 5 10 5",
2078 );
2079 let chunk_r1 = StreamChunk::from_pretty(
2080 " I I I
2081 + 2 5 1
2082 + 4 9 2
2083 + 6 9 3",
2084 );
2085 let chunk_r2 = StreamChunk::from_pretty(
2086 " I I I
2087 + 1 4 4
2088 + 3 6 5",
2089 );
2090
2091 let (mut tx_l, mut tx_r, mut hash_join) =
2092 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2093
2094 tx_l.push_barrier(test_epoch(1), false);
2096 tx_r.push_barrier(test_epoch(1), false);
2097 hash_join.next_unwrap_ready_barrier()?;
2098
2099 tx_l.push_chunk(chunk_l1);
2101 hash_join.next_unwrap_pending();
2102
2103 tx_l.push_barrier(test_epoch(2), false);
2105 tx_r.push_barrier(test_epoch(2), false);
2106 hash_join.next_unwrap_ready_barrier()?;
2107
2108 tx_l.push_chunk(chunk_l2);
2110 hash_join.next_unwrap_pending();
2111
2112 tx_r.push_chunk(chunk_r1);
2114 let chunk = hash_join.next_unwrap_ready_chunk()?;
2115 assert_eq!(
2116 chunk,
2117 StreamChunk::from_pretty(
2118 " I I I
2119 + 2 5 2
2120 + 4 9 4"
2121 )
2122 );
2123
2124 tx_r.push_chunk(chunk_r2);
2126 let chunk = hash_join.next_unwrap_ready_chunk()?;
2127 assert_eq!(
2128 chunk,
2129 StreamChunk::from_pretty(
2130 " I I I
2131 + 1 4 1
2132 + 3 6 3"
2133 )
2134 );
2135
2136 Ok(())
2137 }
2138
2139 #[tokio::test]
2140 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2141 let chunk_l1 = StreamChunk::from_pretty(
2142 " I I I
2143 + 1 4 1
2144 + 2 5 2
2145 + 3 6 3",
2146 );
2147 let chunk_l2 = StreamChunk::from_pretty(
2148 " I I I
2149 + 4 9 4
2150 + 5 10 5",
2151 );
2152 let chunk_r1 = StreamChunk::from_pretty(
2153 " I I I
2154 + 2 5 1
2155 + 4 9 2
2156 + 6 9 3",
2157 );
2158 let chunk_r2 = StreamChunk::from_pretty(
2159 " I I I
2160 + 1 4 4
2161 + 3 6 5",
2162 );
2163
2164 let (mut tx_l, mut tx_r, mut hash_join) =
2165 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2166
2167 tx_l.push_barrier(test_epoch(1), false);
2169 tx_r.push_barrier(test_epoch(1), false);
2170 hash_join.next_unwrap_ready_barrier()?;
2171
2172 tx_l.push_chunk(chunk_l1);
2174 hash_join.next_unwrap_pending();
2175
2176 tx_l.push_barrier(test_epoch(2), false);
2178 tx_r.push_barrier(test_epoch(2), false);
2179 hash_join.next_unwrap_ready_barrier()?;
2180
2181 tx_l.push_chunk(chunk_l2);
2183 hash_join.next_unwrap_pending();
2184
2185 tx_r.push_chunk(chunk_r1);
2187 let chunk = hash_join.next_unwrap_ready_chunk()?;
2188 assert_eq!(
2189 chunk,
2190 StreamChunk::from_pretty(
2191 " I I I
2192 + 2 5 1
2193 + 4 9 2"
2194 )
2195 );
2196
2197 tx_r.push_chunk(chunk_r2);
2199 let chunk = hash_join.next_unwrap_ready_chunk()?;
2200 assert_eq!(
2201 chunk,
2202 StreamChunk::from_pretty(
2203 " I I I
2204 + 1 4 4
2205 + 3 6 5"
2206 )
2207 );
2208
2209 Ok(())
2210 }
2211
2212 #[tokio::test]
2213 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2214 let chunk_r1 = StreamChunk::from_pretty(
2215 " I I
2216 + 1 4
2217 + 2 5
2218 + 3 6",
2219 );
2220 let chunk_r2 = StreamChunk::from_pretty(
2221 " I I
2222 + 3 8
2223 - 3 8",
2224 );
2225 let chunk_l1 = StreamChunk::from_pretty(
2226 " I I
2227 + 2 7
2228 + 4 8
2229 + 6 9",
2230 );
2231 let chunk_l2 = StreamChunk::from_pretty(
2232 " I I
2233 + 3 10
2234 + 6 11",
2235 );
2236 let chunk_r3 = StreamChunk::from_pretty(
2237 " I I
2238 + 6 10",
2239 );
2240 let chunk_l3 = StreamChunk::from_pretty(
2241 " I I
2242 - 6 11",
2243 );
2244 let chunk_l4 = StreamChunk::from_pretty(
2245 " I I
2246 - 6 9",
2247 );
2248 let (mut tx_l, mut tx_r, mut hash_join) =
2249 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2250
2251 tx_l.push_barrier(test_epoch(1), false);
2253 tx_r.push_barrier(test_epoch(1), false);
2254 hash_join.next_unwrap_ready_barrier()?;
2255
2256 tx_r.push_chunk(chunk_r1);
2258 hash_join.next_unwrap_pending();
2259
2260 tx_l.push_barrier(test_epoch(2), false);
2262 tx_r.push_barrier(test_epoch(2), false);
2263 hash_join.next_unwrap_ready_barrier()?;
2264
2265 tx_r.push_chunk(chunk_r2);
2267 hash_join.next_unwrap_pending();
2268
2269 tx_l.push_chunk(chunk_l1);
2271 let chunk = hash_join.next_unwrap_ready_chunk()?;
2272 assert_eq!(
2273 chunk,
2274 StreamChunk::from_pretty(
2275 " I I
2276 + 2 5"
2277 )
2278 );
2279
2280 tx_l.push_chunk(chunk_l2);
2282 let chunk = hash_join.next_unwrap_ready_chunk()?;
2283 assert_eq!(
2284 chunk,
2285 StreamChunk::from_pretty(
2286 " I I
2287 + 3 6"
2288 )
2289 );
2290
2291 tx_r.push_chunk(chunk_r3);
2293 let chunk = hash_join.next_unwrap_ready_chunk()?;
2294 assert_eq!(
2295 chunk,
2296 StreamChunk::from_pretty(
2297 " I I
2298 + 6 10"
2299 )
2300 );
2301
2302 tx_l.push_chunk(chunk_l3);
2305 hash_join.next_unwrap_pending();
2306
2307 tx_l.push_chunk(chunk_l4);
2310 let chunk = hash_join.next_unwrap_ready_chunk()?;
2311 assert_eq!(
2312 chunk,
2313 StreamChunk::from_pretty(
2314 " I I
2315 - 6 10"
2316 )
2317 );
2318
2319 Ok(())
2320 }
2321
2322 #[tokio::test]
2323 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2324 let chunk_l1 = StreamChunk::from_pretty(
2325 " I I
2326 + 1 4
2327 + 2 5
2328 + 3 6",
2329 );
2330 let chunk_l2 = StreamChunk::from_pretty(
2331 " I I
2332 + 3 8
2333 - 3 8",
2334 );
2335 let chunk_r1 = StreamChunk::from_pretty(
2336 " I I
2337 + 2 7
2338 + 4 8
2339 + 6 9",
2340 );
2341 let chunk_r2 = StreamChunk::from_pretty(
2342 " I I
2343 + 3 10
2344 + 6 11
2345 + 1 2
2346 + 1 3",
2347 );
2348 let chunk_l3 = StreamChunk::from_pretty(
2349 " I I
2350 + 9 10",
2351 );
2352 let chunk_r3 = StreamChunk::from_pretty(
2353 " I I
2354 - 1 2",
2355 );
2356 let chunk_r4 = StreamChunk::from_pretty(
2357 " I I
2358 - 1 3",
2359 );
2360 let (mut tx_l, mut tx_r, mut hash_join) =
2361 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2362
2363 tx_l.push_barrier(test_epoch(1), false);
2365 tx_r.push_barrier(test_epoch(1), false);
2366 hash_join.next_unwrap_ready_barrier()?;
2367
2368 tx_l.push_chunk(chunk_l1);
2370 let chunk = hash_join.next_unwrap_ready_chunk()?;
2371 assert_eq!(
2372 chunk,
2373 StreamChunk::from_pretty(
2374 " I I
2375 + 1 4
2376 + 2 5
2377 + 3 6",
2378 )
2379 );
2380
2381 tx_l.push_barrier(test_epoch(2), false);
2383 tx_r.push_barrier(test_epoch(2), false);
2384 hash_join.next_unwrap_ready_barrier()?;
2385
2386 tx_l.push_chunk(chunk_l2);
2388 let chunk = hash_join.next_unwrap_ready_chunk()?;
2389 assert_eq!(
2390 chunk,
2391 StreamChunk::from_pretty(
2392 " I I
2393 + 3 8 D
2394 - 3 8 D",
2395 )
2396 );
2397
2398 tx_r.push_chunk(chunk_r1);
2400 let chunk = hash_join.next_unwrap_ready_chunk()?;
2401 assert_eq!(
2402 chunk,
2403 StreamChunk::from_pretty(
2404 " I I
2405 - 2 5"
2406 )
2407 );
2408
2409 tx_r.push_chunk(chunk_r2);
2411 let chunk = hash_join.next_unwrap_ready_chunk()?;
2412 assert_eq!(
2413 chunk,
2414 StreamChunk::from_pretty(
2415 " I I
2416 - 3 6
2417 - 1 4"
2418 )
2419 );
2420
2421 tx_l.push_chunk(chunk_l3);
2423 let chunk = hash_join.next_unwrap_ready_chunk()?;
2424 assert_eq!(
2425 chunk,
2426 StreamChunk::from_pretty(
2427 " I I
2428 + 9 10"
2429 )
2430 );
2431
2432 tx_r.push_chunk(chunk_r3);
2435 hash_join.next_unwrap_pending();
2436
2437 tx_r.push_chunk(chunk_r4);
2440 let chunk = hash_join.next_unwrap_ready_chunk()?;
2441 assert_eq!(
2442 chunk,
2443 StreamChunk::from_pretty(
2444 " I I
2445 + 1 4"
2446 )
2447 );
2448
2449 Ok(())
2450 }
2451
2452 #[tokio::test]
2453 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2454 let chunk_r1 = StreamChunk::from_pretty(
2455 " I I
2456 + 1 4
2457 + 2 5
2458 + 3 6",
2459 );
2460 let chunk_r2 = StreamChunk::from_pretty(
2461 " I I
2462 + 3 8
2463 - 3 8",
2464 );
2465 let chunk_l1 = StreamChunk::from_pretty(
2466 " I I
2467 + 2 7
2468 + 4 8
2469 + 6 9",
2470 );
2471 let chunk_l2 = StreamChunk::from_pretty(
2472 " I I
2473 + 3 10
2474 + 6 11
2475 + 1 2
2476 + 1 3",
2477 );
2478 let chunk_r3 = StreamChunk::from_pretty(
2479 " I I
2480 + 9 10",
2481 );
2482 let chunk_l3 = StreamChunk::from_pretty(
2483 " I I
2484 - 1 2",
2485 );
2486 let chunk_l4 = StreamChunk::from_pretty(
2487 " I I
2488 - 1 3",
2489 );
2490 let (mut tx_r, mut tx_l, mut hash_join) =
2491 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2492
2493 tx_r.push_barrier(test_epoch(1), false);
2495 tx_l.push_barrier(test_epoch(1), false);
2496 hash_join.next_unwrap_ready_barrier()?;
2497
2498 tx_r.push_chunk(chunk_r1);
2500 let chunk = hash_join.next_unwrap_ready_chunk()?;
2501 assert_eq!(
2502 chunk,
2503 StreamChunk::from_pretty(
2504 " I I
2505 + 1 4
2506 + 2 5
2507 + 3 6",
2508 )
2509 );
2510
2511 tx_r.push_barrier(test_epoch(2), false);
2513 tx_l.push_barrier(test_epoch(2), false);
2514 hash_join.next_unwrap_ready_barrier()?;
2515
2516 tx_r.push_chunk(chunk_r2);
2518 let chunk = hash_join.next_unwrap_ready_chunk()?;
2519 assert_eq!(
2520 chunk,
2521 StreamChunk::from_pretty(
2522 " I I
2523 + 3 8 D
2524 - 3 8 D",
2525 )
2526 );
2527
2528 tx_l.push_chunk(chunk_l1);
2530 let chunk = hash_join.next_unwrap_ready_chunk()?;
2531 assert_eq!(
2532 chunk,
2533 StreamChunk::from_pretty(
2534 " I I
2535 - 2 5"
2536 )
2537 );
2538
2539 tx_l.push_chunk(chunk_l2);
2541 let chunk = hash_join.next_unwrap_ready_chunk()?;
2542 assert_eq!(
2543 chunk,
2544 StreamChunk::from_pretty(
2545 " I I
2546 - 3 6
2547 - 1 4"
2548 )
2549 );
2550
2551 tx_r.push_chunk(chunk_r3);
2553 let chunk = hash_join.next_unwrap_ready_chunk()?;
2554 assert_eq!(
2555 chunk,
2556 StreamChunk::from_pretty(
2557 " I I
2558 + 9 10"
2559 )
2560 );
2561
2562 tx_l.push_chunk(chunk_l3);
2565 hash_join.next_unwrap_pending();
2566
2567 tx_l.push_chunk(chunk_l4);
2570 let chunk = hash_join.next_unwrap_ready_chunk()?;
2571 assert_eq!(
2572 chunk,
2573 StreamChunk::from_pretty(
2574 " I I
2575 + 1 4"
2576 )
2577 );
2578
2579 Ok(())
2580 }
2581
2582 #[tokio::test]
2583 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2584 let chunk_l1 = StreamChunk::from_pretty(
2585 " I I
2586 + 1 4
2587 + 2 5
2588 + 3 6",
2589 );
2590 let chunk_l2 = StreamChunk::from_pretty(
2591 " I I
2592 + 6 8
2593 + 3 8",
2594 );
2595 let chunk_r1 = StreamChunk::from_pretty(
2596 " I I
2597 + 2 7
2598 + 4 8
2599 + 6 9",
2600 );
2601 let chunk_r2 = StreamChunk::from_pretty(
2602 " I I
2603 + 3 10
2604 + 6 11",
2605 );
2606 let (mut tx_l, mut tx_r, mut hash_join) =
2607 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2608
2609 tx_l.push_barrier(test_epoch(1), false);
2611 tx_r.push_barrier(test_epoch(1), false);
2612 hash_join.next_unwrap_ready_barrier()?;
2613
2614 tx_l.push_chunk(chunk_l1);
2616 hash_join.next_unwrap_pending();
2617
2618 tx_l.push_barrier(test_epoch(2), false);
2620
2621 tx_l.push_chunk(chunk_l2);
2623
2624 tx_r.push_chunk(chunk_r1);
2626
2627 let chunk = hash_join.next_unwrap_ready_chunk()?;
2629 assert_eq!(
2630 chunk,
2631 StreamChunk::from_pretty(
2632 " I I I I
2633 + 2 5 2 7"
2634 )
2635 );
2636
2637 tx_r.push_barrier(test_epoch(2), false);
2639
2640 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2642 assert!(matches!(
2643 hash_join.next_unwrap_ready_barrier()?,
2644 Barrier {
2645 epoch,
2646 mutation: None,
2647 ..
2648 } if epoch == expected_epoch
2649 ));
2650
2651 let chunk = hash_join.next_unwrap_ready_chunk()?;
2653 assert_eq!(
2654 chunk,
2655 StreamChunk::from_pretty(
2656 " I I I I
2657 + 6 8 6 9"
2658 )
2659 );
2660
2661 tx_r.push_chunk(chunk_r2);
2663 let chunk = hash_join.next_unwrap_ready_chunk()?;
2664 assert_eq!(
2665 chunk,
2666 StreamChunk::from_pretty(
2667 " I I I I
2668 + 3 6 3 10
2669 + 3 8 3 10
2670 + 6 8 6 11"
2671 )
2672 );
2673
2674 Ok(())
2675 }
2676
2677 #[tokio::test]
2678 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2679 let chunk_l1 = StreamChunk::from_pretty(
2680 " I I
2681 + 1 4
2682 + 2 .
2683 + 3 .",
2684 );
2685 let chunk_l2 = StreamChunk::from_pretty(
2686 " I I
2687 + 6 .
2688 + 3 8",
2689 );
2690 let chunk_r1 = StreamChunk::from_pretty(
2691 " I I
2692 + 2 7
2693 + 4 8
2694 + 6 9",
2695 );
2696 let chunk_r2 = StreamChunk::from_pretty(
2697 " I I
2698 + 3 10
2699 + 6 11",
2700 );
2701 let (mut tx_l, mut tx_r, mut hash_join) =
2702 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2703
2704 tx_l.push_barrier(test_epoch(1), false);
2706 tx_r.push_barrier(test_epoch(1), false);
2707 hash_join.next_unwrap_ready_barrier()?;
2708
2709 tx_l.push_chunk(chunk_l1);
2711 hash_join.next_unwrap_pending();
2712
2713 tx_l.push_barrier(test_epoch(2), false);
2715
2716 tx_l.push_chunk(chunk_l2);
2718
2719 tx_r.push_chunk(chunk_r1);
2721
2722 let chunk = hash_join.next_unwrap_ready_chunk()?;
2724 assert_eq!(
2725 chunk,
2726 StreamChunk::from_pretty(
2727 " I I I I
2728 + 2 . 2 7"
2729 )
2730 );
2731
2732 tx_r.push_barrier(test_epoch(2), false);
2734
2735 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2737 assert!(matches!(
2738 hash_join.next_unwrap_ready_barrier()?,
2739 Barrier {
2740 epoch,
2741 mutation: None,
2742 ..
2743 } if epoch == expected_epoch
2744 ));
2745
2746 let chunk = hash_join.next_unwrap_ready_chunk()?;
2748 assert_eq!(
2749 chunk,
2750 StreamChunk::from_pretty(
2751 " I I I I
2752 + 6 . 6 9"
2753 )
2754 );
2755
2756 tx_r.push_chunk(chunk_r2);
2758 let chunk = hash_join.next_unwrap_ready_chunk()?;
2759 assert_eq!(
2760 chunk,
2761 StreamChunk::from_pretty(
2762 " I I I I
2763 + 3 8 3 10
2764 + 3 . 3 10
2765 + 6 . 6 11"
2766 )
2767 );
2768
2769 Ok(())
2770 }
2771
2772 #[tokio::test]
2773 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2774 let chunk_l1 = StreamChunk::from_pretty(
2775 " I I
2776 + 1 4
2777 + 2 5
2778 + 3 6",
2779 );
2780 let chunk_l2 = StreamChunk::from_pretty(
2781 " I I
2782 + 3 8
2783 - 3 8",
2784 );
2785 let chunk_r1 = StreamChunk::from_pretty(
2786 " I I
2787 + 2 7
2788 + 4 8
2789 + 6 9",
2790 );
2791 let chunk_r2 = StreamChunk::from_pretty(
2792 " I I
2793 + 3 10
2794 + 6 11",
2795 );
2796 let (mut tx_l, mut tx_r, mut hash_join) =
2797 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2798
2799 tx_l.push_barrier(test_epoch(1), false);
2801 tx_r.push_barrier(test_epoch(1), false);
2802 hash_join.next_unwrap_ready_barrier()?;
2803
2804 tx_l.push_chunk(chunk_l1);
2806 let chunk = hash_join.next_unwrap_ready_chunk()?;
2807 assert_eq!(
2808 chunk,
2809 StreamChunk::from_pretty(
2810 " I I I I
2811 + 1 4 . .
2812 + 2 5 . .
2813 + 3 6 . ."
2814 )
2815 );
2816
2817 tx_l.push_chunk(chunk_l2);
2819 let chunk = hash_join.next_unwrap_ready_chunk()?;
2820 assert_eq!(
2821 chunk,
2822 StreamChunk::from_pretty(
2823 " I I I I
2824 + 3 8 . . D
2825 - 3 8 . . D"
2826 )
2827 );
2828
2829 tx_r.push_chunk(chunk_r1);
2831 let chunk = hash_join.next_unwrap_ready_chunk()?;
2832 assert_eq!(
2833 chunk,
2834 StreamChunk::from_pretty(
2835 " I I I I
2836 - 2 5 . .
2837 + 2 5 2 7"
2838 )
2839 );
2840
2841 tx_r.push_chunk(chunk_r2);
2843 let chunk = hash_join.next_unwrap_ready_chunk()?;
2844 assert_eq!(
2845 chunk,
2846 StreamChunk::from_pretty(
2847 " I I I I
2848 - 3 6 . .
2849 + 3 6 3 10"
2850 )
2851 );
2852
2853 Ok(())
2854 }
2855
2856 #[tokio::test]
2857 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2858 let chunk_l1 = StreamChunk::from_pretty(
2859 " I I
2860 + 1 4
2861 + 2 5
2862 + . 6",
2863 );
2864 let chunk_l2 = StreamChunk::from_pretty(
2865 " I I
2866 + . 8
2867 - . 8",
2868 );
2869 let chunk_r1 = StreamChunk::from_pretty(
2870 " I I
2871 + 2 7
2872 + 4 8
2873 + 6 9",
2874 );
2875 let chunk_r2 = StreamChunk::from_pretty(
2876 " I I
2877 + . 10
2878 + 6 11",
2879 );
2880 let (mut tx_l, mut tx_r, mut hash_join) =
2881 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2882
2883 tx_l.push_barrier(test_epoch(1), false);
2885 tx_r.push_barrier(test_epoch(1), false);
2886 hash_join.next_unwrap_ready_barrier()?;
2887
2888 tx_l.push_chunk(chunk_l1);
2890 let chunk = hash_join.next_unwrap_ready_chunk()?;
2891 assert_eq!(
2892 chunk,
2893 StreamChunk::from_pretty(
2894 " I I I I
2895 + 1 4 . .
2896 + 2 5 . .
2897 + . 6 . ."
2898 )
2899 );
2900
2901 tx_l.push_chunk(chunk_l2);
2903 let chunk = hash_join.next_unwrap_ready_chunk()?;
2904 assert_eq!(
2905 chunk,
2906 StreamChunk::from_pretty(
2907 " I I I I
2908 + . 8 . . D
2909 - . 8 . . D"
2910 )
2911 );
2912
2913 tx_r.push_chunk(chunk_r1);
2915 let chunk = hash_join.next_unwrap_ready_chunk()?;
2916 assert_eq!(
2917 chunk,
2918 StreamChunk::from_pretty(
2919 " I I I I
2920 - 2 5 . .
2921 + 2 5 2 7"
2922 )
2923 );
2924
2925 tx_r.push_chunk(chunk_r2);
2927 let chunk = hash_join.next_unwrap_ready_chunk()?;
2928 assert_eq!(
2929 chunk,
2930 StreamChunk::from_pretty(
2931 " I I I I
2932 - . 6 . .
2933 + . 6 . 10"
2934 )
2935 );
2936
2937 Ok(())
2938 }
2939
2940 #[tokio::test]
2941 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2942 let chunk_l1 = StreamChunk::from_pretty(
2943 " I I
2944 + 1 4
2945 + 2 5
2946 + 3 6",
2947 );
2948 let chunk_l2 = StreamChunk::from_pretty(
2949 " I I
2950 + 3 8
2951 - 3 8",
2952 );
2953 let chunk_r1 = StreamChunk::from_pretty(
2954 " I I
2955 + 2 7
2956 + 4 8
2957 + 6 9",
2958 );
2959 let chunk_r2 = StreamChunk::from_pretty(
2960 " I I
2961 + 5 10
2962 - 5 10",
2963 );
2964 let (mut tx_l, mut tx_r, mut hash_join) =
2965 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2966
2967 tx_l.push_barrier(test_epoch(1), false);
2969 tx_r.push_barrier(test_epoch(1), false);
2970 hash_join.next_unwrap_ready_barrier()?;
2971
2972 tx_l.push_chunk(chunk_l1);
2974 hash_join.next_unwrap_pending();
2975
2976 tx_l.push_chunk(chunk_l2);
2978 hash_join.next_unwrap_pending();
2979
2980 tx_r.push_chunk(chunk_r1);
2982 let chunk = hash_join.next_unwrap_ready_chunk()?;
2983 assert_eq!(
2984 chunk,
2985 StreamChunk::from_pretty(
2986 " I I I I
2987 + 2 5 2 7
2988 + . . 4 8
2989 + . . 6 9"
2990 )
2991 );
2992
2993 tx_r.push_chunk(chunk_r2);
2995 let chunk = hash_join.next_unwrap_ready_chunk()?;
2996 assert_eq!(
2997 chunk,
2998 StreamChunk::from_pretty(
2999 " I I I I
3000 + . . 5 10 D
3001 - . . 5 10 D"
3002 )
3003 );
3004
3005 Ok(())
3006 }
3007
3008 #[tokio::test]
3009 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3010 let chunk_l1 = StreamChunk::from_pretty(
3011 " I I I
3012 + 1 4 1
3013 + 2 5 2
3014 + 3 6 3",
3015 );
3016 let chunk_l2 = StreamChunk::from_pretty(
3017 " I I I
3018 + 4 9 4
3019 + 5 10 5",
3020 );
3021 let chunk_r1 = StreamChunk::from_pretty(
3022 " I I I
3023 + 2 5 1
3024 + 4 9 2
3025 + 6 9 3",
3026 );
3027 let chunk_r2 = StreamChunk::from_pretty(
3028 " I I I
3029 + 1 4 4
3030 + 3 6 5",
3031 );
3032
3033 let (mut tx_l, mut tx_r, mut hash_join) =
3034 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3035
3036 tx_l.push_barrier(test_epoch(1), false);
3038 tx_r.push_barrier(test_epoch(1), false);
3039 hash_join.next_unwrap_ready_barrier()?;
3040
3041 tx_l.push_chunk(chunk_l1);
3043 let chunk = hash_join.next_unwrap_ready_chunk()?;
3044 assert_eq!(
3045 chunk,
3046 StreamChunk::from_pretty(
3047 " I I I I I I
3048 + 1 4 1 . . .
3049 + 2 5 2 . . .
3050 + 3 6 3 . . ."
3051 )
3052 );
3053
3054 tx_l.push_chunk(chunk_l2);
3056 let chunk = hash_join.next_unwrap_ready_chunk()?;
3057 assert_eq!(
3058 chunk,
3059 StreamChunk::from_pretty(
3060 " I I I I I I
3061 + 4 9 4 . . .
3062 + 5 10 5 . . ."
3063 )
3064 );
3065
3066 tx_r.push_chunk(chunk_r1);
3068 let chunk = hash_join.next_unwrap_ready_chunk()?;
3069 assert_eq!(
3070 chunk,
3071 StreamChunk::from_pretty(
3072 " I I I I I I
3073 - 2 5 2 . . .
3074 + 2 5 2 2 5 1
3075 - 4 9 4 . . .
3076 + 4 9 4 4 9 2"
3077 )
3078 );
3079
3080 tx_r.push_chunk(chunk_r2);
3082 let chunk = hash_join.next_unwrap_ready_chunk()?;
3083 assert_eq!(
3084 chunk,
3085 StreamChunk::from_pretty(
3086 " I I I I I I
3087 - 1 4 1 . . .
3088 + 1 4 1 1 4 4
3089 - 3 6 3 . . .
3090 + 3 6 3 3 6 5"
3091 )
3092 );
3093
3094 Ok(())
3095 }
3096
3097 #[tokio::test]
3098 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3099 let chunk_l1 = StreamChunk::from_pretty(
3100 " I I I
3101 + 1 4 1
3102 + 2 5 2
3103 + 3 6 3",
3104 );
3105 let chunk_l2 = StreamChunk::from_pretty(
3106 " I I I
3107 + 4 9 4
3108 + 5 10 5",
3109 );
3110 let chunk_r1 = StreamChunk::from_pretty(
3111 " I I I
3112 + 2 5 1
3113 + 4 9 2
3114 + 6 9 3",
3115 );
3116 let chunk_r2 = StreamChunk::from_pretty(
3117 " I I I
3118 + 1 4 4
3119 + 3 6 5
3120 + 7 7 6",
3121 );
3122
3123 let (mut tx_l, mut tx_r, mut hash_join) =
3124 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3125
3126 tx_l.push_barrier(test_epoch(1), false);
3128 tx_r.push_barrier(test_epoch(1), false);
3129 hash_join.next_unwrap_ready_barrier()?;
3130
3131 tx_l.push_chunk(chunk_l1);
3133 hash_join.next_unwrap_pending();
3134
3135 tx_l.push_chunk(chunk_l2);
3137 hash_join.next_unwrap_pending();
3138
3139 tx_r.push_chunk(chunk_r1);
3141 let chunk = hash_join.next_unwrap_ready_chunk()?;
3142 assert_eq!(
3143 chunk,
3144 StreamChunk::from_pretty(
3145 " I I I I I I
3146 + 2 5 2 2 5 1
3147 + 4 9 4 4 9 2
3148 + . . . 6 9 3"
3149 )
3150 );
3151
3152 tx_r.push_chunk(chunk_r2);
3154 let chunk = hash_join.next_unwrap_ready_chunk()?;
3155 assert_eq!(
3156 chunk,
3157 StreamChunk::from_pretty(
3158 " I I I I I I
3159 + 1 4 1 1 4 4
3160 + 3 6 3 3 6 5
3161 + . . . 7 7 6"
3162 )
3163 );
3164
3165 Ok(())
3166 }
3167
3168 #[tokio::test]
3169 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3170 let chunk_l1 = StreamChunk::from_pretty(
3171 " I I
3172 + 1 4
3173 + 2 5
3174 + 3 6",
3175 );
3176 let chunk_l2 = StreamChunk::from_pretty(
3177 " I I
3178 + 3 8
3179 - 3 8",
3180 );
3181 let chunk_r1 = StreamChunk::from_pretty(
3182 " I I
3183 + 2 7
3184 + 4 8
3185 + 6 9",
3186 );
3187 let chunk_r2 = StreamChunk::from_pretty(
3188 " I I
3189 + 5 10
3190 - 5 10",
3191 );
3192 let (mut tx_l, mut tx_r, mut hash_join) =
3193 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3194
3195 tx_l.push_barrier(test_epoch(1), false);
3197 tx_r.push_barrier(test_epoch(1), false);
3198 hash_join.next_unwrap_ready_barrier()?;
3199
3200 tx_l.push_chunk(chunk_l1);
3202 let chunk = hash_join.next_unwrap_ready_chunk()?;
3203 assert_eq!(
3204 chunk,
3205 StreamChunk::from_pretty(
3206 " I I I I
3207 + 1 4 . .
3208 + 2 5 . .
3209 + 3 6 . ."
3210 )
3211 );
3212
3213 tx_l.push_chunk(chunk_l2);
3215 let chunk = hash_join.next_unwrap_ready_chunk()?;
3216 assert_eq!(
3217 chunk,
3218 StreamChunk::from_pretty(
3219 " I I I I
3220 + 3 8 . . D
3221 - 3 8 . . D"
3222 )
3223 );
3224
3225 tx_r.push_chunk(chunk_r1);
3227 let chunk = hash_join.next_unwrap_ready_chunk()?;
3228 assert_eq!(
3229 chunk,
3230 StreamChunk::from_pretty(
3231 " I I I I
3232 - 2 5 . .
3233 + 2 5 2 7
3234 + . . 4 8
3235 + . . 6 9"
3236 )
3237 );
3238
3239 tx_r.push_chunk(chunk_r2);
3241 let chunk = hash_join.next_unwrap_ready_chunk()?;
3242 assert_eq!(
3243 chunk,
3244 StreamChunk::from_pretty(
3245 " I I I I
3246 + . . 5 10 D
3247 - . . 5 10 D"
3248 )
3249 );
3250
3251 Ok(())
3252 }
3253
3254 #[tokio::test]
3255 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3256 let (mut tx_l, mut tx_r, mut hash_join) =
3257 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3258
3259 tx_l.push_barrier(test_epoch(1), false);
3261 tx_r.push_barrier(test_epoch(1), false);
3262 hash_join.next_unwrap_ready_barrier()?;
3263
3264 tx_l.push_chunk(StreamChunk::from_pretty(
3265 " I I
3266 + 1 1
3267 ",
3268 ));
3269 let chunk = hash_join.next_unwrap_ready_chunk()?;
3270 assert_eq!(
3271 chunk,
3272 StreamChunk::from_pretty(
3273 " I I I I
3274 + 1 1 . ."
3275 )
3276 );
3277
3278 tx_r.push_chunk(StreamChunk::from_pretty(
3279 " I I
3280 + 1 1
3281 ",
3282 ));
3283 let chunk = hash_join.next_unwrap_ready_chunk()?;
3284
3285 assert_eq!(
3286 chunk,
3287 StreamChunk::from_pretty(
3288 " I I I I
3289 - 1 1 . .
3290 + 1 1 1 1"
3291 )
3292 );
3293
3294 tx_l.push_chunk(StreamChunk::from_pretty(
3295 " I I
3296 - 1 1
3297 + 1 2
3298 ",
3299 ));
3300 let chunk = hash_join.next_unwrap_ready_chunk()?;
3301 let chunk = chunk.compact_vis();
3302 assert_eq!(
3303 chunk,
3304 StreamChunk::from_pretty(
3305 " I I I I
3306 - 1 1 1 1
3307 + 1 2 1 1
3308 "
3309 )
3310 );
3311
3312 Ok(())
3313 }
3314
3315 #[tokio::test]
3316 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3317 {
3318 let chunk_l1 = StreamChunk::from_pretty(
3319 " I I
3320 + 1 4
3321 + 2 5
3322 + 3 6
3323 + 3 7",
3324 );
3325 let chunk_l2 = StreamChunk::from_pretty(
3326 " I I
3327 + 3 8
3328 - 3 8
3329 - 1 4", );
3331 let chunk_r1 = StreamChunk::from_pretty(
3332 " I I
3333 + 2 6
3334 + 4 8
3335 + 3 4",
3336 );
3337 let chunk_r2 = StreamChunk::from_pretty(
3338 " I I
3339 + 5 10
3340 - 5 10
3341 + 1 2",
3342 );
3343 let (mut tx_l, mut tx_r, mut hash_join) =
3344 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3345
3346 tx_l.push_barrier(test_epoch(1), false);
3348 tx_r.push_barrier(test_epoch(1), false);
3349 hash_join.next_unwrap_ready_barrier()?;
3350
3351 tx_l.push_chunk(chunk_l1);
3353 let chunk = hash_join.next_unwrap_ready_chunk()?;
3354 assert_eq!(
3355 chunk,
3356 StreamChunk::from_pretty(
3357 " I I I I
3358 + 1 4 . .
3359 + 2 5 . .
3360 + 3 6 . .
3361 + 3 7 . ."
3362 )
3363 );
3364
3365 tx_l.push_chunk(chunk_l2);
3367 let chunk = hash_join.next_unwrap_ready_chunk()?;
3368 assert_eq!(
3369 chunk,
3370 StreamChunk::from_pretty(
3371 " I I I I
3372 + 3 8 . . D
3373 - 3 8 . . D
3374 - 1 4 . ."
3375 )
3376 );
3377
3378 tx_r.push_chunk(chunk_r1);
3380 let chunk = hash_join.next_unwrap_ready_chunk()?;
3381 assert_eq!(
3382 chunk,
3383 StreamChunk::from_pretty(
3384 " I I I I
3385 - 2 5 . .
3386 + 2 5 2 6
3387 + . . 4 8
3388 + . . 3 4" )
3392 );
3393
3394 tx_r.push_chunk(chunk_r2);
3396 let chunk = hash_join.next_unwrap_ready_chunk()?;
3397 assert_eq!(
3398 chunk,
3399 StreamChunk::from_pretty(
3400 " I I I I
3401 + . . 5 10 D
3402 - . . 5 10 D
3403 + . . 1 2" )
3406 );
3407
3408 Ok(())
3409 }
3410
3411 #[tokio::test]
3412 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3413 let chunk_l1 = StreamChunk::from_pretty(
3414 " I I
3415 + 1 4
3416 + 2 10
3417 + 3 6",
3418 );
3419 let chunk_l2 = StreamChunk::from_pretty(
3420 " I I
3421 + 3 8
3422 - 3 8",
3423 );
3424 let chunk_r1 = StreamChunk::from_pretty(
3425 " I I
3426 + 2 7
3427 + 4 8
3428 + 6 9",
3429 );
3430 let chunk_r2 = StreamChunk::from_pretty(
3431 " I I
3432 + 3 10
3433 + 6 11",
3434 );
3435 let (mut tx_l, mut tx_r, mut hash_join) =
3436 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3437
3438 tx_l.push_barrier(test_epoch(1), false);
3440 tx_r.push_barrier(test_epoch(1), false);
3441 hash_join.next_unwrap_ready_barrier()?;
3442
3443 tx_l.push_chunk(chunk_l1);
3445 hash_join.next_unwrap_pending();
3446
3447 tx_l.push_chunk(chunk_l2);
3449 hash_join.next_unwrap_pending();
3450
3451 tx_r.push_chunk(chunk_r1);
3453 hash_join.next_unwrap_pending();
3454
3455 tx_r.push_chunk(chunk_r2);
3457 let chunk = hash_join.next_unwrap_ready_chunk()?;
3458 assert_eq!(
3459 chunk,
3460 StreamChunk::from_pretty(
3461 " I I I I
3462 + 3 6 3 10"
3463 )
3464 );
3465
3466 Ok(())
3467 }
3468
3469 #[tokio::test]
3470 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3471 let (mut tx_l, mut tx_r, mut hash_join) =
3472 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3473
3474 tx_l.push_barrier(test_epoch(1), false);
3476 tx_r.push_barrier(test_epoch(1), false);
3477 hash_join.next_unwrap_ready_barrier()?;
3478
3479 tx_l.push_int64_watermark(0, 100);
3480
3481 tx_l.push_int64_watermark(0, 200);
3482
3483 tx_l.push_barrier(test_epoch(2), false);
3484 tx_r.push_barrier(test_epoch(2), false);
3485 hash_join.next_unwrap_ready_barrier()?;
3486
3487 tx_r.push_int64_watermark(0, 50);
3488
3489 let w1 = hash_join.next().await.unwrap().unwrap();
3490 let w1 = w1.as_watermark().unwrap();
3491
3492 let w2 = hash_join.next().await.unwrap().unwrap();
3493 let w2 = w2.as_watermark().unwrap();
3494
3495 tx_r.push_int64_watermark(0, 100);
3496
3497 let w3 = hash_join.next().await.unwrap().unwrap();
3498 let w3 = w3.as_watermark().unwrap();
3499
3500 let w4 = hash_join.next().await.unwrap().unwrap();
3501 let w4 = w4.as_watermark().unwrap();
3502
3503 assert_eq!(
3504 w1,
3505 &Watermark {
3506 col_idx: 2,
3507 data_type: DataType::Int64,
3508 val: ScalarImpl::Int64(50)
3509 }
3510 );
3511
3512 assert_eq!(
3513 w2,
3514 &Watermark {
3515 col_idx: 0,
3516 data_type: DataType::Int64,
3517 val: ScalarImpl::Int64(50)
3518 }
3519 );
3520
3521 assert_eq!(
3522 w3,
3523 &Watermark {
3524 col_idx: 2,
3525 data_type: DataType::Int64,
3526 val: ScalarImpl::Int64(100)
3527 }
3528 );
3529
3530 assert_eq!(
3531 w4,
3532 &Watermark {
3533 col_idx: 0,
3534 data_type: DataType::Int64,
3535 val: ScalarImpl::Int64(100)
3536 }
3537 );
3538
3539 Ok(())
3540 }
3541}