1use std::assert_matches::assert_matches;
15use std::collections::{BTreeMap, HashSet};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use anyhow::Context;
20use itertools::Itertools;
21use multimap::MultiMap;
22use risingwave_common::array::Op;
23use risingwave_common::hash::{HashKey, NullBitmap};
24use risingwave_common::row::RowExt;
25use risingwave_common::types::{DefaultOrd, ToOwnedDatum};
26use risingwave_common::util::epoch::EpochPair;
27use risingwave_common::util::iter_util::ZipEqDebug;
28use risingwave_expr::ExprError;
29use risingwave_expr::expr::NonStrictExpression;
30use tokio::time::Instant;
31
32use self::builder::JoinChunkBuilder;
33use super::barrier_align::*;
34use super::join::hash_join::*;
35use super::join::row::{JoinEncoding, JoinRow};
36use super::join::*;
37use super::watermark::*;
38use crate::executor::CachedJoinRow;
39use crate::executor::join::builder::JoinStreamChunkBuilder;
40use crate::executor::join::hash_join::CacheResult;
41use crate::executor::prelude::*;
42
43const EVICT_EVERY_N_ROWS: u32 = 16;
45
46fn is_subset(vec1: Vec<usize>, vec2: Vec<usize>) -> bool {
47 HashSet::<usize>::from_iter(vec1).is_subset(&vec2.into_iter().collect())
48}
49
50pub struct JoinParams {
51 pub join_key_indices: Vec<usize>,
53 pub deduped_pk_indices: Vec<usize>,
55}
56
57impl JoinParams {
58 pub fn new(join_key_indices: Vec<usize>, deduped_pk_indices: Vec<usize>) -> Self {
59 Self {
60 join_key_indices,
61 deduped_pk_indices,
62 }
63 }
64}
65
66struct JoinSide<K: HashKey, S: StateStore, E: JoinEncoding> {
67 ht: JoinHashMap<K, S, E>,
69 join_key_indices: Vec<usize>,
71 all_data_types: Vec<DataType>,
73 start_pos: usize,
75 i2o_mapping: Vec<(usize, usize)>,
77 i2o_mapping_indexed: MultiMap<usize, usize>,
78 input2inequality_index: Vec<Vec<(usize, bool)>>,
86 non_null_fields: Vec<usize>,
88 state_clean_columns: Vec<(usize, usize)>,
91 need_degree_table: bool,
93 _marker: std::marker::PhantomData<E>,
94}
95
96impl<K: HashKey, S: StateStore, E: JoinEncoding> std::fmt::Debug for JoinSide<K, S, E> {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.debug_struct("JoinSide")
99 .field("join_key_indices", &self.join_key_indices)
100 .field("col_types", &self.all_data_types)
101 .field("start_pos", &self.start_pos)
102 .field("i2o_mapping", &self.i2o_mapping)
103 .field("need_degree_table", &self.need_degree_table)
104 .finish()
105 }
106}
107
108impl<K: HashKey, S: StateStore, E: JoinEncoding> JoinSide<K, S, E> {
109 fn is_dirty(&self) -> bool {
111 unimplemented!()
112 }
113
114 #[expect(dead_code)]
115 fn clear_cache(&mut self) {
116 assert!(
117 !self.is_dirty(),
118 "cannot clear cache while states of hash join are dirty"
119 );
120
121 }
124
125 pub async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
126 self.ht.init(epoch).await
127 }
128}
129
130pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
133{
134 ctx: ActorContextRef,
135 info: ExecutorInfo,
136
137 input_l: Option<Executor>,
139 input_r: Option<Executor>,
141 actual_output_data_types: Vec<DataType>,
143 side_l: JoinSide<K, S, E>,
145 side_r: JoinSide<K, S, E>,
147 cond: Option<NonStrictExpression>,
149 inequality_pairs: Vec<(Vec<usize>, Option<NonStrictExpression>)>,
151 inequality_watermarks: Vec<Option<Watermark>>,
155
156 append_only_optimize: bool,
158
159 metrics: Arc<StreamingMetrics>,
160 chunk_size: usize,
162 cnt_rows_received: u32,
164
165 watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
167
168 high_join_amplification_threshold: usize,
170
171 entry_state_max_rows: usize,
173}
174
175impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> std::fmt::Debug
176 for HashJoinExecutor<K, S, T, E>
177{
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 f.debug_struct("HashJoinExecutor")
180 .field("join_type", &T)
181 .field("input_left", &self.input_l.as_ref().unwrap().identity())
182 .field("input_right", &self.input_r.as_ref().unwrap().identity())
183 .field("side_l", &self.side_l)
184 .field("side_r", &self.side_r)
185 .field("pk_indices", &self.info.pk_indices)
186 .field("schema", &self.info.schema)
187 .field("actual_output_data_types", &self.actual_output_data_types)
188 .finish()
189 }
190}
191
192impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding> Execute
193 for HashJoinExecutor<K, S, T, E>
194{
195 fn execute(self: Box<Self>) -> BoxedMessageStream {
196 self.into_stream().boxed()
197 }
198}
199
200struct EqJoinArgs<'a, K: HashKey, S: StateStore, E: JoinEncoding> {
201 ctx: &'a ActorContextRef,
202 side_l: &'a mut JoinSide<K, S, E>,
203 side_r: &'a mut JoinSide<K, S, E>,
204 actual_output_data_types: &'a [DataType],
205 cond: &'a mut Option<NonStrictExpression>,
206 inequality_watermarks: &'a [Option<Watermark>],
207 chunk: StreamChunk,
208 append_only_optimize: bool,
209 chunk_size: usize,
210 cnt_rows_received: &'a mut u32,
211 high_join_amplification_threshold: usize,
212 entry_state_max_rows: usize,
213}
214
215impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive, E: JoinEncoding>
216 HashJoinExecutor<K, S, T, E>
217{
218 #[allow(clippy::too_many_arguments)]
219 pub fn new(
220 ctx: ActorContextRef,
221 info: ExecutorInfo,
222 input_l: Executor,
223 input_r: Executor,
224 params_l: JoinParams,
225 params_r: JoinParams,
226 null_safe: Vec<bool>,
227 output_indices: Vec<usize>,
228 cond: Option<NonStrictExpression>,
229 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
230 state_table_l: StateTable<S>,
231 degree_state_table_l: StateTable<S>,
232 state_table_r: StateTable<S>,
233 degree_state_table_r: StateTable<S>,
234 watermark_epoch: AtomicU64Ref,
235 is_append_only: bool,
236 metrics: Arc<StreamingMetrics>,
237 chunk_size: usize,
238 high_join_amplification_threshold: usize,
239 ) -> Self {
240 Self::new_with_cache_size(
241 ctx,
242 info,
243 input_l,
244 input_r,
245 params_l,
246 params_r,
247 null_safe,
248 output_indices,
249 cond,
250 inequality_pairs,
251 state_table_l,
252 degree_state_table_l,
253 state_table_r,
254 degree_state_table_r,
255 watermark_epoch,
256 is_append_only,
257 metrics,
258 chunk_size,
259 high_join_amplification_threshold,
260 None,
261 )
262 }
263
264 #[allow(clippy::too_many_arguments)]
265 pub fn new_with_cache_size(
266 ctx: ActorContextRef,
267 info: ExecutorInfo,
268 input_l: Executor,
269 input_r: Executor,
270 params_l: JoinParams,
271 params_r: JoinParams,
272 null_safe: Vec<bool>,
273 output_indices: Vec<usize>,
274 cond: Option<NonStrictExpression>,
275 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
276 state_table_l: StateTable<S>,
277 degree_state_table_l: StateTable<S>,
278 state_table_r: StateTable<S>,
279 degree_state_table_r: StateTable<S>,
280 watermark_epoch: AtomicU64Ref,
281 is_append_only: bool,
282 metrics: Arc<StreamingMetrics>,
283 chunk_size: usize,
284 high_join_amplification_threshold: usize,
285 entry_state_max_rows: Option<usize>,
286 ) -> Self {
287 let entry_state_max_rows = match entry_state_max_rows {
288 None => {
289 ctx.streaming_config
290 .developer
291 .hash_join_entry_state_max_rows
292 }
293 Some(entry_state_max_rows) => entry_state_max_rows,
294 };
295 let side_l_column_n = input_l.schema().len();
296
297 let schema_fields = match T {
298 JoinType::LeftSemi | JoinType::LeftAnti => input_l.schema().fields.clone(),
299 JoinType::RightSemi | JoinType::RightAnti => input_r.schema().fields.clone(),
300 _ => [
301 input_l.schema().fields.clone(),
302 input_r.schema().fields.clone(),
303 ]
304 .concat(),
305 };
306
307 let original_output_data_types = schema_fields
308 .iter()
309 .map(|field| field.data_type())
310 .collect_vec();
311 let actual_output_data_types = output_indices
312 .iter()
313 .map(|&idx| original_output_data_types[idx].clone())
314 .collect_vec();
315
316 let state_all_data_types_l = input_l.schema().data_types();
318 let state_all_data_types_r = input_r.schema().data_types();
319
320 let state_pk_indices_l = input_l.pk_indices().to_vec();
321 let state_pk_indices_r = input_r.pk_indices().to_vec();
322
323 let state_join_key_indices_l = params_l.join_key_indices;
324 let state_join_key_indices_r = params_r.join_key_indices;
325
326 let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
327 let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();
328
329 let degree_pk_indices_l = (state_join_key_indices_l.len()
330 ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
331 .collect_vec();
332 let degree_pk_indices_r = (state_join_key_indices_r.len()
333 ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
334 .collect_vec();
335
336 let pk_contained_in_jk_l =
338 is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
339 let pk_contained_in_jk_r =
340 is_subset(state_pk_indices_r.clone(), state_join_key_indices_r.clone());
341
342 let append_only_optimize = is_append_only && pk_contained_in_jk_l && pk_contained_in_jk_r;
344
345 let join_key_data_types_l = state_join_key_indices_l
346 .iter()
347 .map(|idx| state_all_data_types_l[*idx].clone())
348 .collect_vec();
349
350 let join_key_data_types_r = state_join_key_indices_r
351 .iter()
352 .map(|idx| state_all_data_types_r[*idx].clone())
353 .collect_vec();
354
355 assert_eq!(join_key_data_types_l, join_key_data_types_r);
356
357 let null_matched = K::Bitmap::from_bool_vec(null_safe);
358
359 let need_degree_table_l = need_left_degree(T) && !pk_contained_in_jk_r;
360 let need_degree_table_r = need_right_degree(T) && !pk_contained_in_jk_l;
361
362 let degree_state_l = need_degree_table_l.then(|| {
363 TableInner::new(
364 degree_pk_indices_l,
365 degree_join_key_indices_l,
366 degree_state_table_l,
367 )
368 });
369 let degree_state_r = need_degree_table_r.then(|| {
370 TableInner::new(
371 degree_pk_indices_r,
372 degree_join_key_indices_r,
373 degree_state_table_r,
374 )
375 });
376
377 let (left_to_output, right_to_output) = {
378 let (left_len, right_len) = if is_left_semi_or_anti(T) {
379 (state_all_data_types_l.len(), 0usize)
380 } else if is_right_semi_or_anti(T) {
381 (0usize, state_all_data_types_r.len())
382 } else {
383 (state_all_data_types_l.len(), state_all_data_types_r.len())
384 };
385 JoinStreamChunkBuilder::get_i2o_mapping(&output_indices, left_len, right_len)
386 };
387
388 let l2o_indexed = MultiMap::from_iter(left_to_output.iter().copied());
389 let r2o_indexed = MultiMap::from_iter(right_to_output.iter().copied());
390
391 let left_input_len = input_l.schema().len();
392 let right_input_len = input_r.schema().len();
393 let mut l2inequality_index = vec![vec![]; left_input_len];
394 let mut r2inequality_index = vec![vec![]; right_input_len];
395 let mut l_state_clean_columns = vec![];
396 let mut r_state_clean_columns = vec![];
397 let inequality_pairs = inequality_pairs
398 .into_iter()
399 .enumerate()
400 .map(
401 |(
402 index,
403 (key_required_larger, key_required_smaller, clean_state, delta_expression),
404 )| {
405 let output_indices = if key_required_larger < key_required_smaller {
406 if clean_state {
407 l_state_clean_columns.push((key_required_larger, index));
408 }
409 l2inequality_index[key_required_larger].push((index, false));
410 r2inequality_index[key_required_smaller - left_input_len]
411 .push((index, true));
412 l2o_indexed
413 .get_vec(&key_required_larger)
414 .cloned()
415 .unwrap_or_default()
416 } else {
417 if clean_state {
418 r_state_clean_columns
419 .push((key_required_larger - left_input_len, index));
420 }
421 l2inequality_index[key_required_smaller].push((index, true));
422 r2inequality_index[key_required_larger - left_input_len]
423 .push((index, false));
424 r2o_indexed
425 .get_vec(&(key_required_larger - left_input_len))
426 .cloned()
427 .unwrap_or_default()
428 };
429 (output_indices, delta_expression)
430 },
431 )
432 .collect_vec();
433
434 let mut l_non_null_fields = l2inequality_index
435 .iter()
436 .positions(|inequalities| !inequalities.is_empty())
437 .collect_vec();
438 let mut r_non_null_fields = r2inequality_index
439 .iter()
440 .positions(|inequalities| !inequalities.is_empty())
441 .collect_vec();
442
443 if append_only_optimize {
444 l_state_clean_columns.clear();
445 r_state_clean_columns.clear();
446 l_non_null_fields.clear();
447 r_non_null_fields.clear();
448 }
449
450 let inequality_watermarks = vec![None; inequality_pairs.len()];
451 let watermark_buffers = BTreeMap::new();
452
453 Self {
454 ctx: ctx.clone(),
455 info,
456 input_l: Some(input_l),
457 input_r: Some(input_r),
458 actual_output_data_types,
459 side_l: JoinSide {
460 ht: JoinHashMap::new(
461 watermark_epoch.clone(),
462 join_key_data_types_l,
463 state_join_key_indices_l.clone(),
464 state_all_data_types_l.clone(),
465 state_table_l,
466 params_l.deduped_pk_indices,
467 degree_state_l,
468 null_matched.clone(),
469 pk_contained_in_jk_l,
470 None,
471 metrics.clone(),
472 ctx.id,
473 ctx.fragment_id,
474 "left",
475 ),
476 join_key_indices: state_join_key_indices_l,
477 all_data_types: state_all_data_types_l,
478 i2o_mapping: left_to_output,
479 i2o_mapping_indexed: l2o_indexed,
480 input2inequality_index: l2inequality_index,
481 non_null_fields: l_non_null_fields,
482 state_clean_columns: l_state_clean_columns,
483 start_pos: 0,
484 need_degree_table: need_degree_table_l,
485 _marker: PhantomData,
486 },
487 side_r: JoinSide {
488 ht: JoinHashMap::new(
489 watermark_epoch,
490 join_key_data_types_r,
491 state_join_key_indices_r.clone(),
492 state_all_data_types_r.clone(),
493 state_table_r,
494 params_r.deduped_pk_indices,
495 degree_state_r,
496 null_matched,
497 pk_contained_in_jk_r,
498 None,
499 metrics.clone(),
500 ctx.id,
501 ctx.fragment_id,
502 "right",
503 ),
504 join_key_indices: state_join_key_indices_r,
505 all_data_types: state_all_data_types_r,
506 start_pos: side_l_column_n,
507 i2o_mapping: right_to_output,
508 i2o_mapping_indexed: r2o_indexed,
509 input2inequality_index: r2inequality_index,
510 non_null_fields: r_non_null_fields,
511 state_clean_columns: r_state_clean_columns,
512 need_degree_table: need_degree_table_r,
513 _marker: PhantomData,
514 },
515 cond,
516 inequality_pairs,
517 inequality_watermarks,
518 append_only_optimize,
519 metrics,
520 chunk_size,
521 cnt_rows_received: 0,
522 watermark_buffers,
523 high_join_amplification_threshold,
524 entry_state_max_rows,
525 }
526 }
527
528 #[try_stream(ok = Message, error = StreamExecutorError)]
529 async fn into_stream(mut self) {
530 let input_l = self.input_l.take().unwrap();
531 let input_r = self.input_r.take().unwrap();
532 let aligned_stream = barrier_align(
533 input_l.execute(),
534 input_r.execute(),
535 self.ctx.id,
536 self.ctx.fragment_id,
537 self.metrics.clone(),
538 "Join",
539 );
540 pin_mut!(aligned_stream);
541
542 let actor_id = self.ctx.id;
543
544 let barrier = expect_first_barrier_from_aligned_stream(&mut aligned_stream).await?;
545 let first_epoch = barrier.epoch;
546 yield Message::Barrier(barrier);
548 self.side_l.init(first_epoch).await?;
549 self.side_r.init(first_epoch).await?;
550
551 let actor_id_str = self.ctx.id.to_string();
552 let fragment_id_str = self.ctx.fragment_id.to_string();
553
554 let join_actor_input_waiting_duration_ns = self
556 .metrics
557 .join_actor_input_waiting_duration_ns
558 .with_guarded_label_values(&[&actor_id_str, &fragment_id_str]);
559 let left_join_match_duration_ns = self
560 .metrics
561 .join_match_duration_ns
562 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
563 let right_join_match_duration_ns = self
564 .metrics
565 .join_match_duration_ns
566 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
567
568 let barrier_join_match_duration_ns = self
569 .metrics
570 .join_match_duration_ns
571 .with_guarded_label_values(&[
572 actor_id_str.as_str(),
573 fragment_id_str.as_str(),
574 "barrier",
575 ]);
576
577 let left_join_cached_entry_count = self
578 .metrics
579 .join_cached_entry_count
580 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "left"]);
581
582 let right_join_cached_entry_count = self
583 .metrics
584 .join_cached_entry_count
585 .with_guarded_label_values(&[actor_id_str.as_str(), fragment_id_str.as_str(), "right"]);
586
587 let mut start_time = Instant::now();
588
589 while let Some(msg) = aligned_stream
590 .next()
591 .instrument_await("hash_join_barrier_align")
592 .await
593 {
594 join_actor_input_waiting_duration_ns.inc_by(start_time.elapsed().as_nanos() as u64);
595 match msg? {
596 AlignedMessage::WatermarkLeft(watermark) => {
597 for watermark_to_emit in
598 self.handle_watermark(SideType::Left, watermark).await?
599 {
600 yield Message::Watermark(watermark_to_emit);
601 }
602 }
603 AlignedMessage::WatermarkRight(watermark) => {
604 for watermark_to_emit in
605 self.handle_watermark(SideType::Right, watermark).await?
606 {
607 yield Message::Watermark(watermark_to_emit);
608 }
609 }
610 AlignedMessage::Left(chunk) => {
611 let mut left_time = Duration::from_nanos(0);
612 let mut left_start_time = Instant::now();
613 #[for_await]
614 for chunk in Self::eq_join_left(EqJoinArgs {
615 ctx: &self.ctx,
616 side_l: &mut self.side_l,
617 side_r: &mut self.side_r,
618 actual_output_data_types: &self.actual_output_data_types,
619 cond: &mut self.cond,
620 inequality_watermarks: &self.inequality_watermarks,
621 chunk,
622 append_only_optimize: self.append_only_optimize,
623 chunk_size: self.chunk_size,
624 cnt_rows_received: &mut self.cnt_rows_received,
625 high_join_amplification_threshold: self.high_join_amplification_threshold,
626 entry_state_max_rows: self.entry_state_max_rows,
627 }) {
628 left_time += left_start_time.elapsed();
629 yield Message::Chunk(chunk?);
630 left_start_time = Instant::now();
631 }
632 left_time += left_start_time.elapsed();
633 left_join_match_duration_ns.inc_by(left_time.as_nanos() as u64);
634 self.try_flush_data().await?;
635 }
636 AlignedMessage::Right(chunk) => {
637 let mut right_time = Duration::from_nanos(0);
638 let mut right_start_time = Instant::now();
639 #[for_await]
640 for chunk in Self::eq_join_right(EqJoinArgs {
641 ctx: &self.ctx,
642 side_l: &mut self.side_l,
643 side_r: &mut self.side_r,
644 actual_output_data_types: &self.actual_output_data_types,
645 cond: &mut self.cond,
646 inequality_watermarks: &self.inequality_watermarks,
647 chunk,
648 append_only_optimize: self.append_only_optimize,
649 chunk_size: self.chunk_size,
650 cnt_rows_received: &mut self.cnt_rows_received,
651 high_join_amplification_threshold: self.high_join_amplification_threshold,
652 entry_state_max_rows: self.entry_state_max_rows,
653 }) {
654 right_time += right_start_time.elapsed();
655 yield Message::Chunk(chunk?);
656 right_start_time = Instant::now();
657 }
658 right_time += right_start_time.elapsed();
659 right_join_match_duration_ns.inc_by(right_time.as_nanos() as u64);
660 self.try_flush_data().await?;
661 }
662 AlignedMessage::Barrier(barrier) => {
663 let barrier_start_time = Instant::now();
664 let (left_post_commit, right_post_commit) =
665 self.flush_data(barrier.epoch).await?;
666
667 let update_vnode_bitmap = barrier.as_update_vnode_bitmap(actor_id);
668 yield Message::Barrier(barrier);
669
670 right_post_commit
672 .post_yield_barrier(update_vnode_bitmap.clone())
673 .await?;
674 if left_post_commit
675 .post_yield_barrier(update_vnode_bitmap)
676 .await?
677 .unwrap_or(false)
678 {
679 self.watermark_buffers
680 .values_mut()
681 .for_each(|buffers| buffers.clear());
682 self.inequality_watermarks.fill(None);
683 }
684
685 for (join_cached_entry_count, ht) in [
687 (&left_join_cached_entry_count, &self.side_l.ht),
688 (&right_join_cached_entry_count, &self.side_r.ht),
689 ] {
690 join_cached_entry_count.set(ht.entry_count() as i64);
691 }
692
693 barrier_join_match_duration_ns
694 .inc_by(barrier_start_time.elapsed().as_nanos() as u64);
695 }
696 }
697 start_time = Instant::now();
698 }
699 }
700
701 async fn flush_data(
702 &mut self,
703 epoch: EpochPair,
704 ) -> StreamExecutorResult<(
705 JoinHashMapPostCommit<'_, K, S, E>,
706 JoinHashMapPostCommit<'_, K, S, E>,
707 )> {
708 let left = self.side_l.ht.flush(epoch).await?;
711 let right = self.side_r.ht.flush(epoch).await?;
712 Ok((left, right))
713 }
714
715 async fn try_flush_data(&mut self) -> StreamExecutorResult<()> {
716 self.side_l.ht.try_flush().await?;
719 self.side_r.ht.try_flush().await?;
720 Ok(())
721 }
722
723 fn evict_cache(
725 side_update: &mut JoinSide<K, S, E>,
726 side_match: &mut JoinSide<K, S, E>,
727 cnt_rows_received: &mut u32,
728 ) {
729 *cnt_rows_received += 1;
730 if *cnt_rows_received == EVICT_EVERY_N_ROWS {
731 side_update.ht.evict();
732 side_match.ht.evict();
733 *cnt_rows_received = 0;
734 }
735 }
736
737 async fn handle_watermark(
738 &mut self,
739 side: SideTypePrimitive,
740 watermark: Watermark,
741 ) -> StreamExecutorResult<Vec<Watermark>> {
742 let (side_update, side_match) = if side == SideType::Left {
743 (&mut self.side_l, &mut self.side_r)
744 } else {
745 (&mut self.side_r, &mut self.side_l)
746 };
747
748 if side_update.join_key_indices[0] == watermark.col_idx {
750 side_match.ht.update_watermark(watermark.val.clone());
751 }
752
753 let wm_in_jk = side_update
755 .join_key_indices
756 .iter()
757 .positions(|idx| *idx == watermark.col_idx);
758 let mut watermarks_to_emit = vec![];
759 for idx in wm_in_jk {
760 let buffers = self
761 .watermark_buffers
762 .entry(idx)
763 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
764 if let Some(selected_watermark) = buffers.handle_watermark(side, watermark.clone()) {
765 let empty_indices = vec![];
766 let output_indices = side_update
767 .i2o_mapping_indexed
768 .get_vec(&side_update.join_key_indices[idx])
769 .unwrap_or(&empty_indices)
770 .iter()
771 .chain(
772 side_match
773 .i2o_mapping_indexed
774 .get_vec(&side_match.join_key_indices[idx])
775 .unwrap_or(&empty_indices),
776 );
777 for output_idx in output_indices {
778 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
779 }
780 };
781 }
782 for (inequality_index, need_offset) in
783 &side_update.input2inequality_index[watermark.col_idx]
784 {
785 let buffers = self
786 .watermark_buffers
787 .entry(side_update.join_key_indices.len() + inequality_index)
788 .or_insert_with(|| BufferedWatermarks::with_ids([SideType::Left, SideType::Right]));
789 let mut input_watermark = watermark.clone();
790 if *need_offset
791 && let Some(delta_expression) = self.inequality_pairs[*inequality_index].1.as_ref()
792 {
793 #[allow(clippy::disallowed_methods)]
795 let eval_result = delta_expression
796 .inner()
797 .eval_row(&OwnedRow::new(vec![Some(input_watermark.val)]))
798 .await;
799 match eval_result {
800 Ok(value) => input_watermark.val = value.unwrap(),
801 Err(err) => {
802 if !matches!(err, ExprError::NumericOutOfRange) {
803 self.ctx.on_compute_error(err, &self.info.identity);
804 }
805 continue;
806 }
807 }
808 };
809 if let Some(selected_watermark) = buffers.handle_watermark(side, input_watermark) {
810 for output_idx in &self.inequality_pairs[*inequality_index].0 {
811 watermarks_to_emit.push(selected_watermark.clone().with_idx(*output_idx));
812 }
813 self.inequality_watermarks[*inequality_index] = Some(selected_watermark);
814 }
815 }
816 Ok(watermarks_to_emit)
817 }
818
819 fn row_concat(
820 row_update: impl Row,
821 update_start_pos: usize,
822 row_matched: impl Row,
823 matched_start_pos: usize,
824 ) -> OwnedRow {
825 let mut new_row = vec![None; row_update.len() + row_matched.len()];
826
827 for (i, datum_ref) in row_update.iter().enumerate() {
828 new_row[i + update_start_pos] = datum_ref.to_owned_datum();
829 }
830 for (i, datum_ref) in row_matched.iter().enumerate() {
831 new_row[i + matched_start_pos] = datum_ref.to_owned_datum();
832 }
833 OwnedRow::new(new_row)
834 }
835
836 fn eq_join_left(
838 args: EqJoinArgs<'_, K, S, E>,
839 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
840 Self::eq_join_oneside::<{ SideType::Left }>(args)
841 }
842
843 fn eq_join_right(
845 args: EqJoinArgs<'_, K, S, E>,
846 ) -> impl Stream<Item = Result<StreamChunk, StreamExecutorError>> + '_ {
847 Self::eq_join_oneside::<{ SideType::Right }>(args)
848 }
849
850 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
851 async fn eq_join_oneside<const SIDE: SideTypePrimitive>(args: EqJoinArgs<'_, K, S, E>) {
852 let EqJoinArgs {
853 ctx,
854 side_l,
855 side_r,
856 actual_output_data_types,
857 cond,
858 inequality_watermarks,
859 chunk,
860 append_only_optimize,
861 chunk_size,
862 cnt_rows_received,
863 high_join_amplification_threshold,
864 entry_state_max_rows,
865 ..
866 } = args;
867
868 let (side_update, side_match) = if SIDE == SideType::Left {
869 (side_l, side_r)
870 } else {
871 (side_r, side_l)
872 };
873
874 let useful_state_clean_columns = side_match
875 .state_clean_columns
876 .iter()
877 .filter_map(|(column_idx, inequality_index)| {
878 inequality_watermarks[*inequality_index]
879 .as_ref()
880 .map(|watermark| (*column_idx, watermark))
881 })
882 .collect_vec();
883
884 let mut hashjoin_chunk_builder =
885 JoinChunkBuilder::<T, SIDE>::new(JoinStreamChunkBuilder::new(
886 chunk_size,
887 actual_output_data_types.to_vec(),
888 side_update.i2o_mapping.clone(),
889 side_match.i2o_mapping.clone(),
890 ));
891
892 let join_matched_join_keys = ctx
893 .streaming_metrics
894 .join_matched_join_keys
895 .with_guarded_label_values(&[
896 &ctx.id.to_string(),
897 &ctx.fragment_id.to_string(),
898 &side_update.ht.table_id().to_string(),
899 ]);
900
901 let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
902 for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
903 let Some((op, row)) = r else {
904 continue;
905 };
906 Self::evict_cache(side_update, side_match, cnt_rows_received);
907
908 let cache_lookup_result = {
909 let probe_non_null_requirement_satisfied = side_update
910 .non_null_fields
911 .iter()
912 .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() });
913 let build_non_null_requirement_satisfied =
914 key.null_bitmap().is_subset(side_match.ht.null_matched());
915 if probe_non_null_requirement_satisfied && build_non_null_requirement_satisfied {
916 side_match.ht.take_state_opt(key)
917 } else {
918 CacheResult::NeverMatch
919 }
920 };
921 let mut total_matches = 0;
922
923 macro_rules! match_rows {
924 ($op:ident) => {
925 Self::handle_match_rows::<SIDE, { JoinOp::$op }>(
926 cache_lookup_result,
927 row,
928 key,
929 &mut hashjoin_chunk_builder,
930 side_match,
931 side_update,
932 &useful_state_clean_columns,
933 cond,
934 append_only_optimize,
935 entry_state_max_rows,
936 )
937 };
938 }
939
940 match op {
941 Op::Insert | Op::UpdateInsert =>
942 {
943 #[for_await]
944 for chunk in match_rows!(Insert) {
945 let chunk = chunk?;
946 total_matches += chunk.cardinality();
947 yield chunk;
948 }
949 }
950 Op::Delete | Op::UpdateDelete =>
951 {
952 #[for_await]
953 for chunk in match_rows!(Delete) {
954 let chunk = chunk?;
955 total_matches += chunk.cardinality();
956 yield chunk;
957 }
958 }
959 };
960
961 join_matched_join_keys.observe(total_matches as _);
962 if total_matches > high_join_amplification_threshold {
963 let join_key_data_types = side_update.ht.join_key_data_types();
964 let key = key.deserialize(join_key_data_types)?;
965 tracing::warn!(target: "high_join_amplification",
966 matched_rows_len = total_matches,
967 update_table_id = side_update.ht.table_id(),
968 match_table_id = side_match.ht.table_id(),
969 join_key = ?key,
970 actor_id = ctx.id,
971 fragment_id = ctx.fragment_id,
972 "large rows matched for join key"
973 );
974 }
975 }
976 if let Some(chunk) = hashjoin_chunk_builder.take() {
978 yield chunk;
979 }
980 }
981
982 #[allow(clippy::too_many_arguments)]
991 #[try_stream(ok = StreamChunk, error = StreamExecutorError)]
992 async fn handle_match_rows<
993 'a,
994 const SIDE: SideTypePrimitive,
995 const JOIN_OP: JoinOpPrimitive,
996 >(
997 cached_lookup_result: CacheResult<E>,
998 row: RowRef<'a>,
999 key: &'a K,
1000 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1001 side_match: &'a mut JoinSide<K, S, E>,
1002 side_update: &'a mut JoinSide<K, S, E>,
1003 useful_state_clean_columns: &'a [(usize, &'a Watermark)],
1004 cond: &'a mut Option<NonStrictExpression>,
1005 append_only_optimize: bool,
1006 entry_state_max_rows: usize,
1007 ) {
1008 let cache_hit = matches!(cached_lookup_result, CacheResult::Hit(_));
1009 let mut entry_state: JoinEntryState<E> = JoinEntryState::default();
1010 let mut entry_state_count = 0;
1011
1012 let mut degree = 0;
1013 let mut append_only_matched_row: Option<JoinRow<OwnedRow>> = None;
1014 let mut matched_rows_to_clean = vec![];
1015
1016 macro_rules! match_row {
1017 (
1018 $match_order_key_indices:expr,
1019 $degree_table:expr,
1020 $matched_row:expr,
1021 $matched_row_ref:expr,
1022 $from_cache:literal
1023 ) => {
1024 Self::handle_match_row::<SIDE, { JOIN_OP }, { $from_cache }>(
1025 row,
1026 $matched_row,
1027 $matched_row_ref,
1028 hashjoin_chunk_builder,
1029 $match_order_key_indices,
1030 $degree_table,
1031 side_update.start_pos,
1032 side_match.start_pos,
1033 cond,
1034 &mut degree,
1035 useful_state_clean_columns,
1036 append_only_optimize,
1037 &mut append_only_matched_row,
1038 &mut matched_rows_to_clean,
1039 )
1040 };
1041 }
1042
1043 let entry_state = match cached_lookup_result {
1044 CacheResult::NeverMatch => {
1045 let op = match JOIN_OP {
1046 JoinOp::Insert => Op::Insert,
1047 JoinOp::Delete => Op::Delete,
1048 };
1049 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1050 yield chunk;
1051 }
1052 return Ok(());
1053 }
1054 CacheResult::Hit(mut cached_rows) => {
1055 let (match_order_key_indices, match_degree_state) =
1056 side_match.ht.get_degree_state_mut_ref();
1057 for (matched_row_ref, matched_row) in
1059 cached_rows.values_mut(&side_match.all_data_types)
1060 {
1061 let matched_row = matched_row?;
1062 if let Some(chunk) = match_row!(
1063 match_order_key_indices,
1064 match_degree_state,
1065 matched_row,
1066 Some(matched_row_ref),
1067 true
1068 )
1069 .await
1070 {
1071 yield chunk;
1072 }
1073 }
1074
1075 cached_rows
1076 }
1077 CacheResult::Miss => {
1078 let (matched_rows, match_order_key_indices, degree_table) = side_match
1080 .ht
1081 .fetch_matched_rows_and_get_degree_table_ref(key)
1082 .await?;
1083
1084 #[for_await]
1085 for matched_row in matched_rows {
1086 let (encoded_pk, matched_row) = matched_row?;
1087
1088 let mut matched_row_ref = None;
1089
1090 if entry_state_count <= entry_state_max_rows {
1092 let row_ref = entry_state
1093 .insert(encoded_pk, E::encode(&matched_row), None) .with_context(|| format!("row: {}", row.display(),))?;
1095 matched_row_ref = Some(row_ref);
1096 entry_state_count += 1;
1097 }
1098 if let Some(chunk) = match_row!(
1099 match_order_key_indices,
1100 degree_table,
1101 matched_row,
1102 matched_row_ref,
1103 false
1104 )
1105 .await
1106 {
1107 yield chunk;
1108 }
1109 }
1110 Box::new(entry_state)
1111 }
1112 };
1113
1114 let op = match JOIN_OP {
1116 JoinOp::Insert => Op::Insert,
1117 JoinOp::Delete => Op::Delete,
1118 };
1119 if degree == 0 {
1120 if let Some(chunk) = hashjoin_chunk_builder.forward_if_not_matched(op, row) {
1121 yield chunk;
1122 }
1123 } else if let Some(chunk) = hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
1124 {
1125 yield chunk;
1126 }
1127
1128 if cache_hit || entry_state_count <= entry_state_max_rows {
1130 side_match.ht.update_state(key, entry_state);
1131 }
1132
1133 for matched_row in matched_rows_to_clean {
1135 side_match.ht.delete_handle_degree(key, matched_row)?;
1136 }
1137
1138 if append_only_optimize && let Some(row) = append_only_matched_row {
1140 assert_matches!(JOIN_OP, JoinOp::Insert);
1141 side_match.ht.delete_handle_degree(key, row)?;
1142 return Ok(());
1143 }
1144
1145 match JOIN_OP {
1147 JoinOp::Insert => {
1148 side_update
1149 .ht
1150 .insert_handle_degree(key, JoinRow::new(row, degree))?;
1151 }
1152 JoinOp::Delete => {
1153 side_update
1154 .ht
1155 .delete_handle_degree(key, JoinRow::new(row, degree))?;
1156 }
1157 }
1158 }
1159
1160 #[allow(clippy::too_many_arguments)]
1161 #[inline]
1162 async fn handle_match_row<
1163 'a,
1164 const SIDE: SideTypePrimitive,
1165 const JOIN_OP: JoinOpPrimitive,
1166 const MATCHED_ROWS_FROM_CACHE: bool,
1167 >(
1168 update_row: RowRef<'a>,
1169 mut matched_row: JoinRow<OwnedRow>,
1170 mut matched_row_cache_ref: Option<&mut E::EncodedRow>,
1171 hashjoin_chunk_builder: &'a mut JoinChunkBuilder<T, SIDE>,
1172 match_order_key_indices: &[usize],
1173 match_degree_table: &mut Option<TableInner<S>>,
1174 side_update_start_pos: usize,
1175 side_match_start_pos: usize,
1176 cond: &Option<NonStrictExpression>,
1177 update_row_degree: &mut u64,
1178 useful_state_clean_columns: &[(usize, &'a Watermark)],
1179 append_only_optimize: bool,
1180 append_only_matched_row: &mut Option<JoinRow<OwnedRow>>,
1181 matched_rows_to_clean: &mut Vec<JoinRow<OwnedRow>>,
1182 ) -> Option<StreamChunk> {
1183 let mut need_state_clean = false;
1184 let mut chunk_opt = None;
1185
1186 let join_condition_satisfied = Self::check_join_condition(
1190 update_row,
1191 side_update_start_pos,
1192 &matched_row.row,
1193 side_match_start_pos,
1194 cond,
1195 )
1196 .await;
1197
1198 if join_condition_satisfied {
1199 *update_row_degree += 1;
1201 if matches!(JOIN_OP, JoinOp::Insert)
1206 && !forward_exactly_once(T, SIDE)
1207 && let Some(chunk) =
1208 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1209 {
1210 chunk_opt = Some(chunk);
1211 }
1212 if let Some(degree_table) = match_degree_table {
1214 update_degree::<S, { JOIN_OP }>(
1215 match_order_key_indices,
1216 degree_table,
1217 &mut matched_row,
1218 );
1219 if MATCHED_ROWS_FROM_CACHE || matched_row_cache_ref.is_some() {
1220 match JOIN_OP {
1222 JoinOp::Insert => matched_row_cache_ref.as_mut().unwrap().increase_degree(),
1223 JoinOp::Delete => matched_row_cache_ref.as_mut().unwrap().decrease_degree(),
1224 }
1225 }
1226 }
1227
1228 if matches!(JOIN_OP, JoinOp::Delete)
1231 && !forward_exactly_once(T, SIDE)
1232 && let Some(chunk) =
1233 hashjoin_chunk_builder.with_match::<JOIN_OP>(&update_row, &matched_row)
1234 {
1235 chunk_opt = Some(chunk);
1236 }
1237 } else {
1238 for (column_idx, watermark) in useful_state_clean_columns {
1240 if matched_row.row.datum_at(*column_idx).is_some_and(|scalar| {
1241 scalar
1242 .default_cmp(&watermark.val.as_scalar_ref_impl())
1243 .is_lt()
1244 }) {
1245 need_state_clean = true;
1246 break;
1247 }
1248 }
1249 }
1250 if append_only_optimize {
1254 assert_matches!(JOIN_OP, JoinOp::Insert);
1255 assert!(append_only_matched_row.is_none());
1258 *append_only_matched_row = Some(matched_row);
1259 } else if need_state_clean {
1260 matched_rows_to_clean.push(matched_row);
1264 }
1265
1266 chunk_opt
1267 }
1268
1269 #[inline]
1274 async fn check_join_condition(
1275 row: impl Row,
1276 side_update_start_pos: usize,
1277 matched_row: impl Row,
1278 side_match_start_pos: usize,
1279 join_condition: &Option<NonStrictExpression>,
1280 ) -> bool {
1281 if let Some(join_condition) = join_condition {
1282 let new_row = Self::row_concat(
1283 row,
1284 side_update_start_pos,
1285 matched_row,
1286 side_match_start_pos,
1287 );
1288 join_condition
1289 .eval_row_infallible(&new_row)
1290 .await
1291 .map(|s| *s.as_bool())
1292 .unwrap_or(false)
1293 } else {
1294 true
1295 }
1296 }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use std::sync::atomic::AtomicU64;
1302
1303 use pretty_assertions::assert_eq;
1304 use risingwave_common::array::*;
1305 use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, TableId};
1306 use risingwave_common::hash::{Key64, Key128};
1307 use risingwave_common::util::epoch::test_epoch;
1308 use risingwave_common::util::sort_util::OrderType;
1309 use risingwave_storage::memory::MemoryStateStore;
1310
1311 use super::*;
1312 use crate::common::table::test_utils::gen_pbtable;
1313 use crate::executor::MemoryEncoding;
1314 use crate::executor::test_utils::expr::build_from_pretty;
1315 use crate::executor::test_utils::{MessageSender, MockSource, StreamExecutorTestExt};
1316
1317 async fn create_in_memory_state_table(
1318 mem_state: MemoryStateStore,
1319 data_types: &[DataType],
1320 order_types: &[OrderType],
1321 pk_indices: &[usize],
1322 table_id: u32,
1323 ) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
1324 let column_descs = data_types
1325 .iter()
1326 .enumerate()
1327 .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
1328 .collect_vec();
1329 let state_table = StateTable::from_table_catalog(
1330 &gen_pbtable(
1331 TableId::new(table_id),
1332 column_descs,
1333 order_types.to_vec(),
1334 pk_indices.to_vec(),
1335 0,
1336 ),
1337 mem_state.clone(),
1338 None,
1339 )
1340 .await;
1341
1342 let mut degree_table_column_descs = vec![];
1344 pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
1345 degree_table_column_descs.push(ColumnDesc::unnamed(
1346 ColumnId::new(pk_id as i32),
1347 data_types[*idx].clone(),
1348 ))
1349 });
1350 degree_table_column_descs.push(ColumnDesc::unnamed(
1351 ColumnId::new(pk_indices.len() as i32),
1352 DataType::Int64,
1353 ));
1354 let degree_state_table = StateTable::from_table_catalog(
1355 &gen_pbtable(
1356 TableId::new(table_id + 1),
1357 degree_table_column_descs,
1358 order_types.to_vec(),
1359 pk_indices.to_vec(),
1360 0,
1361 ),
1362 mem_state,
1363 None,
1364 )
1365 .await;
1366 (state_table, degree_state_table)
1367 }
1368
1369 fn create_cond(condition_text: Option<String>) -> NonStrictExpression {
1370 build_from_pretty(
1371 condition_text
1372 .as_deref()
1373 .unwrap_or("(less_than:boolean $1:int8 $3:int8)"),
1374 )
1375 }
1376
1377 async fn create_executor<const T: JoinTypePrimitive>(
1378 with_condition: bool,
1379 null_safe: bool,
1380 condition_text: Option<String>,
1381 inequality_pairs: Vec<(usize, usize, bool, Option<NonStrictExpression>)>,
1382 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1383 let schema = Schema {
1384 fields: vec![
1385 Field::unnamed(DataType::Int64), Field::unnamed(DataType::Int64),
1387 ],
1388 };
1389 let (tx_l, source_l) = MockSource::channel();
1390 let source_l = source_l.into_executor(schema.clone(), vec![1]);
1391 let (tx_r, source_r) = MockSource::channel();
1392 let source_r = source_r.into_executor(schema, vec![1]);
1393 let params_l = JoinParams::new(vec![0], vec![1]);
1394 let params_r = JoinParams::new(vec![0], vec![1]);
1395 let cond = with_condition.then(|| create_cond(condition_text));
1396
1397 let mem_state = MemoryStateStore::new();
1398
1399 let (state_l, degree_state_l) = create_in_memory_state_table(
1400 mem_state.clone(),
1401 &[DataType::Int64, DataType::Int64],
1402 &[OrderType::ascending(), OrderType::ascending()],
1403 &[0, 1],
1404 0,
1405 )
1406 .await;
1407
1408 let (state_r, degree_state_r) = create_in_memory_state_table(
1409 mem_state,
1410 &[DataType::Int64, DataType::Int64],
1411 &[OrderType::ascending(), OrderType::ascending()],
1412 &[0, 1],
1413 2,
1414 )
1415 .await;
1416
1417 let schema = match T {
1418 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1419 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1420 _ => [source_l.schema().fields(), source_r.schema().fields()]
1421 .concat()
1422 .into_iter()
1423 .collect(),
1424 };
1425 let schema_len = schema.len();
1426 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1427
1428 let executor = HashJoinExecutor::<Key64, MemoryStateStore, T, MemoryEncoding>::new(
1429 ActorContext::for_test(123),
1430 info,
1431 source_l,
1432 source_r,
1433 params_l,
1434 params_r,
1435 vec![null_safe],
1436 (0..schema_len).collect_vec(),
1437 cond,
1438 inequality_pairs,
1439 state_l,
1440 degree_state_l,
1441 state_r,
1442 degree_state_r,
1443 Arc::new(AtomicU64::new(0)),
1444 false,
1445 Arc::new(StreamingMetrics::unused()),
1446 1024,
1447 2048,
1448 );
1449 (tx_l, tx_r, executor.boxed().execute())
1450 }
1451
1452 async fn create_classical_executor<const T: JoinTypePrimitive>(
1453 with_condition: bool,
1454 null_safe: bool,
1455 condition_text: Option<String>,
1456 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1457 create_executor::<T>(with_condition, null_safe, condition_text, vec![]).await
1458 }
1459
1460 async fn create_append_only_executor<const T: JoinTypePrimitive>(
1461 with_condition: bool,
1462 ) -> (MessageSender, MessageSender, BoxedMessageStream) {
1463 let schema = Schema {
1464 fields: vec![
1465 Field::unnamed(DataType::Int64),
1466 Field::unnamed(DataType::Int64),
1467 Field::unnamed(DataType::Int64),
1468 ],
1469 };
1470 let (tx_l, source_l) = MockSource::channel();
1471 let source_l = source_l.into_executor(schema.clone(), vec![0]);
1472 let (tx_r, source_r) = MockSource::channel();
1473 let source_r = source_r.into_executor(schema, vec![0]);
1474 let params_l = JoinParams::new(vec![0, 1], vec![]);
1475 let params_r = JoinParams::new(vec![0, 1], vec![]);
1476 let cond = with_condition.then(|| create_cond(None));
1477
1478 let mem_state = MemoryStateStore::new();
1479
1480 let (state_l, degree_state_l) = create_in_memory_state_table(
1481 mem_state.clone(),
1482 &[DataType::Int64, DataType::Int64, DataType::Int64],
1483 &[
1484 OrderType::ascending(),
1485 OrderType::ascending(),
1486 OrderType::ascending(),
1487 ],
1488 &[0, 1, 0],
1489 0,
1490 )
1491 .await;
1492
1493 let (state_r, degree_state_r) = create_in_memory_state_table(
1494 mem_state,
1495 &[DataType::Int64, DataType::Int64, DataType::Int64],
1496 &[
1497 OrderType::ascending(),
1498 OrderType::ascending(),
1499 OrderType::ascending(),
1500 ],
1501 &[0, 1, 1],
1502 1,
1503 )
1504 .await;
1505
1506 let schema = match T {
1507 JoinType::LeftSemi | JoinType::LeftAnti => source_l.schema().clone(),
1508 JoinType::RightSemi | JoinType::RightAnti => source_r.schema().clone(),
1509 _ => [source_l.schema().fields(), source_r.schema().fields()]
1510 .concat()
1511 .into_iter()
1512 .collect(),
1513 };
1514 let schema_len = schema.len();
1515 let info = ExecutorInfo::new(schema, vec![1], "HashJoinExecutor".to_owned(), 0);
1516
1517 let executor = HashJoinExecutor::<Key128, MemoryStateStore, T, MemoryEncoding>::new(
1518 ActorContext::for_test(123),
1519 info,
1520 source_l,
1521 source_r,
1522 params_l,
1523 params_r,
1524 vec![false],
1525 (0..schema_len).collect_vec(),
1526 cond,
1527 vec![],
1528 state_l,
1529 degree_state_l,
1530 state_r,
1531 degree_state_r,
1532 Arc::new(AtomicU64::new(0)),
1533 true,
1534 Arc::new(StreamingMetrics::unused()),
1535 1024,
1536 2048,
1537 );
1538 (tx_l, tx_r, executor.boxed().execute())
1539 }
1540
1541 #[tokio::test]
1542 async fn test_interval_join() -> StreamExecutorResult<()> {
1543 let chunk_l1 = StreamChunk::from_pretty(
1544 " I I
1545 + 1 4
1546 + 2 3
1547 + 2 5
1548 + 3 6",
1549 );
1550 let chunk_l2 = StreamChunk::from_pretty(
1551 " I I
1552 + 3 8
1553 - 3 8",
1554 );
1555 let chunk_r1 = StreamChunk::from_pretty(
1556 " I I
1557 + 2 6
1558 + 4 8
1559 + 6 9",
1560 );
1561 let chunk_r2 = StreamChunk::from_pretty(
1562 " I I
1563 + 2 3
1564 + 6 11",
1565 );
1566 let (mut tx_l, mut tx_r, mut hash_join) = create_executor::<{ JoinType::Inner }>(
1567 true,
1568 false,
1569 Some(String::from("(and:boolean (greater_than:boolean $1:int8 (subtract:int8 $3:int8 2:int8)) (greater_than:boolean $3:int8 (subtract:int8 $1:int8 2:int8)))")),
1570 vec![(1, 3, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)"))), (3, 1, true, Some(build_from_pretty("(subtract:int8 $0:int8 2:int8)")))],
1571 )
1572 .await;
1573
1574 tx_l.push_barrier(test_epoch(1), false);
1576 tx_r.push_barrier(test_epoch(1), false);
1577 hash_join.next_unwrap_ready_barrier()?;
1578
1579 tx_l.push_chunk(chunk_l1);
1581 hash_join.next_unwrap_pending();
1582
1583 tx_l.push_barrier(test_epoch(2), false);
1585 tx_r.push_barrier(test_epoch(2), false);
1586 hash_join.next_unwrap_ready_barrier()?;
1587
1588 tx_l.push_chunk(chunk_l2);
1590 hash_join.next_unwrap_pending();
1591
1592 tx_l.push_watermark(1, DataType::Int64, ScalarImpl::Int64(10));
1593 hash_join.next_unwrap_pending();
1594
1595 tx_r.push_watermark(1, DataType::Int64, ScalarImpl::Int64(6));
1596 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1597 assert_eq!(
1598 output_watermark,
1599 Watermark::new(1, DataType::Int64, ScalarImpl::Int64(4))
1600 );
1601 let output_watermark = hash_join.next_unwrap_ready_watermark()?;
1602 assert_eq!(
1603 output_watermark,
1604 Watermark::new(3, DataType::Int64, ScalarImpl::Int64(6))
1605 );
1606
1607 tx_r.push_chunk(chunk_r1);
1609 let chunk = hash_join.next_unwrap_ready_chunk()?;
1610 assert_eq!(
1612 chunk,
1613 StreamChunk::from_pretty(
1614 " I I I I
1615 + 2 5 2 6"
1616 )
1617 );
1618
1619 tx_r.push_chunk(chunk_r2);
1621 hash_join.next_unwrap_pending();
1624
1625 Ok(())
1626 }
1627
1628 #[tokio::test]
1629 async fn test_streaming_hash_inner_join() -> StreamExecutorResult<()> {
1630 let chunk_l1 = StreamChunk::from_pretty(
1631 " I I
1632 + 1 4
1633 + 2 5
1634 + 3 6",
1635 );
1636 let chunk_l2 = StreamChunk::from_pretty(
1637 " I I
1638 + 3 8
1639 - 3 8",
1640 );
1641 let chunk_r1 = StreamChunk::from_pretty(
1642 " I I
1643 + 2 7
1644 + 4 8
1645 + 6 9",
1646 );
1647 let chunk_r2 = StreamChunk::from_pretty(
1648 " I I
1649 + 3 10
1650 + 6 11",
1651 );
1652 let (mut tx_l, mut tx_r, mut hash_join) =
1653 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
1654
1655 tx_l.push_barrier(test_epoch(1), false);
1657 tx_r.push_barrier(test_epoch(1), false);
1658 hash_join.next_unwrap_ready_barrier()?;
1659
1660 tx_l.push_chunk(chunk_l1);
1662 hash_join.next_unwrap_pending();
1663
1664 tx_l.push_barrier(test_epoch(2), false);
1666 tx_r.push_barrier(test_epoch(2), false);
1667 hash_join.next_unwrap_ready_barrier()?;
1668
1669 tx_l.push_chunk(chunk_l2);
1671 hash_join.next_unwrap_pending();
1672
1673 tx_r.push_chunk(chunk_r1);
1675 let chunk = hash_join.next_unwrap_ready_chunk()?;
1676 assert_eq!(
1677 chunk,
1678 StreamChunk::from_pretty(
1679 " I I I I
1680 + 2 5 2 7"
1681 )
1682 );
1683
1684 tx_r.push_chunk(chunk_r2);
1686 let chunk = hash_join.next_unwrap_ready_chunk()?;
1687 assert_eq!(
1688 chunk,
1689 StreamChunk::from_pretty(
1690 " I I I I
1691 + 3 6 3 10"
1692 )
1693 );
1694
1695 Ok(())
1696 }
1697
1698 #[tokio::test]
1699 async fn test_streaming_null_safe_hash_inner_join() -> StreamExecutorResult<()> {
1700 let chunk_l1 = StreamChunk::from_pretty(
1701 " I I
1702 + 1 4
1703 + 2 5
1704 + . 6",
1705 );
1706 let chunk_l2 = StreamChunk::from_pretty(
1707 " I I
1708 + . 8
1709 - . 8",
1710 );
1711 let chunk_r1 = StreamChunk::from_pretty(
1712 " I I
1713 + 2 7
1714 + 4 8
1715 + 6 9",
1716 );
1717 let chunk_r2 = StreamChunk::from_pretty(
1718 " I I
1719 + . 10
1720 + 6 11",
1721 );
1722 let (mut tx_l, mut tx_r, mut hash_join) =
1723 create_classical_executor::<{ JoinType::Inner }>(false, true, None).await;
1724
1725 tx_l.push_barrier(test_epoch(1), false);
1727 tx_r.push_barrier(test_epoch(1), false);
1728 hash_join.next_unwrap_ready_barrier()?;
1729
1730 tx_l.push_chunk(chunk_l1);
1732 hash_join.next_unwrap_pending();
1733
1734 tx_l.push_barrier(test_epoch(2), false);
1736 tx_r.push_barrier(test_epoch(2), false);
1737 hash_join.next_unwrap_ready_barrier()?;
1738
1739 tx_l.push_chunk(chunk_l2);
1741 hash_join.next_unwrap_pending();
1742
1743 tx_r.push_chunk(chunk_r1);
1745 let chunk = hash_join.next_unwrap_ready_chunk()?;
1746 assert_eq!(
1747 chunk,
1748 StreamChunk::from_pretty(
1749 " I I I I
1750 + 2 5 2 7"
1751 )
1752 );
1753
1754 tx_r.push_chunk(chunk_r2);
1756 let chunk = hash_join.next_unwrap_ready_chunk()?;
1757 assert_eq!(
1758 chunk,
1759 StreamChunk::from_pretty(
1760 " I I I I
1761 + . 6 . 10"
1762 )
1763 );
1764
1765 Ok(())
1766 }
1767
1768 #[tokio::test]
1769 async fn test_streaming_hash_left_semi_join() -> StreamExecutorResult<()> {
1770 let chunk_l1 = StreamChunk::from_pretty(
1771 " I I
1772 + 1 4
1773 + 2 5
1774 + 3 6",
1775 );
1776 let chunk_l2 = StreamChunk::from_pretty(
1777 " I I
1778 + 3 8
1779 - 3 8",
1780 );
1781 let chunk_r1 = StreamChunk::from_pretty(
1782 " I I
1783 + 2 7
1784 + 4 8
1785 + 6 9",
1786 );
1787 let chunk_r2 = StreamChunk::from_pretty(
1788 " I I
1789 + 3 10
1790 + 6 11",
1791 );
1792 let chunk_l3 = StreamChunk::from_pretty(
1793 " I I
1794 + 6 10",
1795 );
1796 let chunk_r3 = StreamChunk::from_pretty(
1797 " I I
1798 - 6 11",
1799 );
1800 let chunk_r4 = StreamChunk::from_pretty(
1801 " I I
1802 - 6 9",
1803 );
1804 let (mut tx_l, mut tx_r, mut hash_join) =
1805 create_classical_executor::<{ JoinType::LeftSemi }>(false, false, None).await;
1806
1807 tx_l.push_barrier(test_epoch(1), false);
1809 tx_r.push_barrier(test_epoch(1), false);
1810 hash_join.next_unwrap_ready_barrier()?;
1811
1812 tx_l.push_chunk(chunk_l1);
1814 hash_join.next_unwrap_pending();
1815
1816 tx_l.push_barrier(test_epoch(2), false);
1818 tx_r.push_barrier(test_epoch(2), false);
1819 hash_join.next_unwrap_ready_barrier()?;
1820
1821 tx_l.push_chunk(chunk_l2);
1823 hash_join.next_unwrap_pending();
1824
1825 tx_r.push_chunk(chunk_r1);
1827 let chunk = hash_join.next_unwrap_ready_chunk()?;
1828 assert_eq!(
1829 chunk,
1830 StreamChunk::from_pretty(
1831 " I I
1832 + 2 5"
1833 )
1834 );
1835
1836 tx_r.push_chunk(chunk_r2);
1838 let chunk = hash_join.next_unwrap_ready_chunk()?;
1839 assert_eq!(
1840 chunk,
1841 StreamChunk::from_pretty(
1842 " I I
1843 + 3 6"
1844 )
1845 );
1846
1847 tx_l.push_chunk(chunk_l3);
1849 let chunk = hash_join.next_unwrap_ready_chunk()?;
1850 assert_eq!(
1851 chunk,
1852 StreamChunk::from_pretty(
1853 " I I
1854 + 6 10"
1855 )
1856 );
1857
1858 tx_r.push_chunk(chunk_r3);
1861 hash_join.next_unwrap_pending();
1862
1863 tx_r.push_chunk(chunk_r4);
1866 let chunk = hash_join.next_unwrap_ready_chunk()?;
1867 assert_eq!(
1868 chunk,
1869 StreamChunk::from_pretty(
1870 " I I
1871 - 6 10"
1872 )
1873 );
1874
1875 Ok(())
1876 }
1877
1878 #[tokio::test]
1879 async fn test_streaming_null_safe_hash_left_semi_join() -> StreamExecutorResult<()> {
1880 let chunk_l1 = StreamChunk::from_pretty(
1881 " I I
1882 + 1 4
1883 + 2 5
1884 + . 6",
1885 );
1886 let chunk_l2 = StreamChunk::from_pretty(
1887 " I I
1888 + . 8
1889 - . 8",
1890 );
1891 let chunk_r1 = StreamChunk::from_pretty(
1892 " I I
1893 + 2 7
1894 + 4 8
1895 + 6 9",
1896 );
1897 let chunk_r2 = StreamChunk::from_pretty(
1898 " I I
1899 + . 10
1900 + 6 11",
1901 );
1902 let chunk_l3 = StreamChunk::from_pretty(
1903 " I I
1904 + 6 10",
1905 );
1906 let chunk_r3 = StreamChunk::from_pretty(
1907 " I I
1908 - 6 11",
1909 );
1910 let chunk_r4 = StreamChunk::from_pretty(
1911 " I I
1912 - 6 9",
1913 );
1914 let (mut tx_l, mut tx_r, mut hash_join) =
1915 create_classical_executor::<{ JoinType::LeftSemi }>(false, true, None).await;
1916
1917 tx_l.push_barrier(test_epoch(1), false);
1919 tx_r.push_barrier(test_epoch(1), false);
1920 hash_join.next_unwrap_ready_barrier()?;
1921
1922 tx_l.push_chunk(chunk_l1);
1924 hash_join.next_unwrap_pending();
1925
1926 tx_l.push_barrier(test_epoch(2), false);
1928 tx_r.push_barrier(test_epoch(2), false);
1929 hash_join.next_unwrap_ready_barrier()?;
1930
1931 tx_l.push_chunk(chunk_l2);
1933 hash_join.next_unwrap_pending();
1934
1935 tx_r.push_chunk(chunk_r1);
1937 let chunk = hash_join.next_unwrap_ready_chunk()?;
1938 assert_eq!(
1939 chunk,
1940 StreamChunk::from_pretty(
1941 " I I
1942 + 2 5"
1943 )
1944 );
1945
1946 tx_r.push_chunk(chunk_r2);
1948 let chunk = hash_join.next_unwrap_ready_chunk()?;
1949 assert_eq!(
1950 chunk,
1951 StreamChunk::from_pretty(
1952 " I I
1953 + . 6"
1954 )
1955 );
1956
1957 tx_l.push_chunk(chunk_l3);
1959 let chunk = hash_join.next_unwrap_ready_chunk()?;
1960 assert_eq!(
1961 chunk,
1962 StreamChunk::from_pretty(
1963 " I I
1964 + 6 10"
1965 )
1966 );
1967
1968 tx_r.push_chunk(chunk_r3);
1971 hash_join.next_unwrap_pending();
1972
1973 tx_r.push_chunk(chunk_r4);
1976 let chunk = hash_join.next_unwrap_ready_chunk()?;
1977 assert_eq!(
1978 chunk,
1979 StreamChunk::from_pretty(
1980 " I I
1981 - 6 10"
1982 )
1983 );
1984
1985 Ok(())
1986 }
1987
1988 #[tokio::test]
1989 async fn test_streaming_hash_inner_join_append_only() -> StreamExecutorResult<()> {
1990 let chunk_l1 = StreamChunk::from_pretty(
1991 " I I I
1992 + 1 4 1
1993 + 2 5 2
1994 + 3 6 3",
1995 );
1996 let chunk_l2 = StreamChunk::from_pretty(
1997 " I I I
1998 + 4 9 4
1999 + 5 10 5",
2000 );
2001 let chunk_r1 = StreamChunk::from_pretty(
2002 " I I I
2003 + 2 5 1
2004 + 4 9 2
2005 + 6 9 3",
2006 );
2007 let chunk_r2 = StreamChunk::from_pretty(
2008 " I I I
2009 + 1 4 4
2010 + 3 6 5",
2011 );
2012
2013 let (mut tx_l, mut tx_r, mut hash_join) =
2014 create_append_only_executor::<{ JoinType::Inner }>(false).await;
2015
2016 tx_l.push_barrier(test_epoch(1), false);
2018 tx_r.push_barrier(test_epoch(1), false);
2019 hash_join.next_unwrap_ready_barrier()?;
2020
2021 tx_l.push_chunk(chunk_l1);
2023 hash_join.next_unwrap_pending();
2024
2025 tx_l.push_barrier(test_epoch(2), false);
2027 tx_r.push_barrier(test_epoch(2), false);
2028 hash_join.next_unwrap_ready_barrier()?;
2029
2030 tx_l.push_chunk(chunk_l2);
2032 hash_join.next_unwrap_pending();
2033
2034 tx_r.push_chunk(chunk_r1);
2036 let chunk = hash_join.next_unwrap_ready_chunk()?;
2037 assert_eq!(
2038 chunk,
2039 StreamChunk::from_pretty(
2040 " I I I I I I
2041 + 2 5 2 2 5 1
2042 + 4 9 4 4 9 2"
2043 )
2044 );
2045
2046 tx_r.push_chunk(chunk_r2);
2048 let chunk = hash_join.next_unwrap_ready_chunk()?;
2049 assert_eq!(
2050 chunk,
2051 StreamChunk::from_pretty(
2052 " I I I I I I
2053 + 1 4 1 1 4 4
2054 + 3 6 3 3 6 5"
2055 )
2056 );
2057
2058 Ok(())
2059 }
2060
2061 #[tokio::test]
2062 async fn test_streaming_hash_left_semi_join_append_only() -> StreamExecutorResult<()> {
2063 let chunk_l1 = StreamChunk::from_pretty(
2064 " I I I
2065 + 1 4 1
2066 + 2 5 2
2067 + 3 6 3",
2068 );
2069 let chunk_l2 = StreamChunk::from_pretty(
2070 " I I I
2071 + 4 9 4
2072 + 5 10 5",
2073 );
2074 let chunk_r1 = StreamChunk::from_pretty(
2075 " I I I
2076 + 2 5 1
2077 + 4 9 2
2078 + 6 9 3",
2079 );
2080 let chunk_r2 = StreamChunk::from_pretty(
2081 " I I I
2082 + 1 4 4
2083 + 3 6 5",
2084 );
2085
2086 let (mut tx_l, mut tx_r, mut hash_join) =
2087 create_append_only_executor::<{ JoinType::LeftSemi }>(false).await;
2088
2089 tx_l.push_barrier(test_epoch(1), false);
2091 tx_r.push_barrier(test_epoch(1), false);
2092 hash_join.next_unwrap_ready_barrier()?;
2093
2094 tx_l.push_chunk(chunk_l1);
2096 hash_join.next_unwrap_pending();
2097
2098 tx_l.push_barrier(test_epoch(2), false);
2100 tx_r.push_barrier(test_epoch(2), false);
2101 hash_join.next_unwrap_ready_barrier()?;
2102
2103 tx_l.push_chunk(chunk_l2);
2105 hash_join.next_unwrap_pending();
2106
2107 tx_r.push_chunk(chunk_r1);
2109 let chunk = hash_join.next_unwrap_ready_chunk()?;
2110 assert_eq!(
2111 chunk,
2112 StreamChunk::from_pretty(
2113 " I I I
2114 + 2 5 2
2115 + 4 9 4"
2116 )
2117 );
2118
2119 tx_r.push_chunk(chunk_r2);
2121 let chunk = hash_join.next_unwrap_ready_chunk()?;
2122 assert_eq!(
2123 chunk,
2124 StreamChunk::from_pretty(
2125 " I I I
2126 + 1 4 1
2127 + 3 6 3"
2128 )
2129 );
2130
2131 Ok(())
2132 }
2133
2134 #[tokio::test]
2135 async fn test_streaming_hash_right_semi_join_append_only() -> StreamExecutorResult<()> {
2136 let chunk_l1 = StreamChunk::from_pretty(
2137 " I I I
2138 + 1 4 1
2139 + 2 5 2
2140 + 3 6 3",
2141 );
2142 let chunk_l2 = StreamChunk::from_pretty(
2143 " I I I
2144 + 4 9 4
2145 + 5 10 5",
2146 );
2147 let chunk_r1 = StreamChunk::from_pretty(
2148 " I I I
2149 + 2 5 1
2150 + 4 9 2
2151 + 6 9 3",
2152 );
2153 let chunk_r2 = StreamChunk::from_pretty(
2154 " I I I
2155 + 1 4 4
2156 + 3 6 5",
2157 );
2158
2159 let (mut tx_l, mut tx_r, mut hash_join) =
2160 create_append_only_executor::<{ JoinType::RightSemi }>(false).await;
2161
2162 tx_l.push_barrier(test_epoch(1), false);
2164 tx_r.push_barrier(test_epoch(1), false);
2165 hash_join.next_unwrap_ready_barrier()?;
2166
2167 tx_l.push_chunk(chunk_l1);
2169 hash_join.next_unwrap_pending();
2170
2171 tx_l.push_barrier(test_epoch(2), false);
2173 tx_r.push_barrier(test_epoch(2), false);
2174 hash_join.next_unwrap_ready_barrier()?;
2175
2176 tx_l.push_chunk(chunk_l2);
2178 hash_join.next_unwrap_pending();
2179
2180 tx_r.push_chunk(chunk_r1);
2182 let chunk = hash_join.next_unwrap_ready_chunk()?;
2183 assert_eq!(
2184 chunk,
2185 StreamChunk::from_pretty(
2186 " I I I
2187 + 2 5 1
2188 + 4 9 2"
2189 )
2190 );
2191
2192 tx_r.push_chunk(chunk_r2);
2194 let chunk = hash_join.next_unwrap_ready_chunk()?;
2195 assert_eq!(
2196 chunk,
2197 StreamChunk::from_pretty(
2198 " I I I
2199 + 1 4 4
2200 + 3 6 5"
2201 )
2202 );
2203
2204 Ok(())
2205 }
2206
2207 #[tokio::test]
2208 async fn test_streaming_hash_right_semi_join() -> StreamExecutorResult<()> {
2209 let chunk_r1 = StreamChunk::from_pretty(
2210 " I I
2211 + 1 4
2212 + 2 5
2213 + 3 6",
2214 );
2215 let chunk_r2 = StreamChunk::from_pretty(
2216 " I I
2217 + 3 8
2218 - 3 8",
2219 );
2220 let chunk_l1 = StreamChunk::from_pretty(
2221 " I I
2222 + 2 7
2223 + 4 8
2224 + 6 9",
2225 );
2226 let chunk_l2 = StreamChunk::from_pretty(
2227 " I I
2228 + 3 10
2229 + 6 11",
2230 );
2231 let chunk_r3 = StreamChunk::from_pretty(
2232 " I I
2233 + 6 10",
2234 );
2235 let chunk_l3 = StreamChunk::from_pretty(
2236 " I I
2237 - 6 11",
2238 );
2239 let chunk_l4 = StreamChunk::from_pretty(
2240 " I I
2241 - 6 9",
2242 );
2243 let (mut tx_l, mut tx_r, mut hash_join) =
2244 create_classical_executor::<{ JoinType::RightSemi }>(false, false, None).await;
2245
2246 tx_l.push_barrier(test_epoch(1), false);
2248 tx_r.push_barrier(test_epoch(1), false);
2249 hash_join.next_unwrap_ready_barrier()?;
2250
2251 tx_r.push_chunk(chunk_r1);
2253 hash_join.next_unwrap_pending();
2254
2255 tx_l.push_barrier(test_epoch(2), false);
2257 tx_r.push_barrier(test_epoch(2), false);
2258 hash_join.next_unwrap_ready_barrier()?;
2259
2260 tx_r.push_chunk(chunk_r2);
2262 hash_join.next_unwrap_pending();
2263
2264 tx_l.push_chunk(chunk_l1);
2266 let chunk = hash_join.next_unwrap_ready_chunk()?;
2267 assert_eq!(
2268 chunk,
2269 StreamChunk::from_pretty(
2270 " I I
2271 + 2 5"
2272 )
2273 );
2274
2275 tx_l.push_chunk(chunk_l2);
2277 let chunk = hash_join.next_unwrap_ready_chunk()?;
2278 assert_eq!(
2279 chunk,
2280 StreamChunk::from_pretty(
2281 " I I
2282 + 3 6"
2283 )
2284 );
2285
2286 tx_r.push_chunk(chunk_r3);
2288 let chunk = hash_join.next_unwrap_ready_chunk()?;
2289 assert_eq!(
2290 chunk,
2291 StreamChunk::from_pretty(
2292 " I I
2293 + 6 10"
2294 )
2295 );
2296
2297 tx_l.push_chunk(chunk_l3);
2300 hash_join.next_unwrap_pending();
2301
2302 tx_l.push_chunk(chunk_l4);
2305 let chunk = hash_join.next_unwrap_ready_chunk()?;
2306 assert_eq!(
2307 chunk,
2308 StreamChunk::from_pretty(
2309 " I I
2310 - 6 10"
2311 )
2312 );
2313
2314 Ok(())
2315 }
2316
2317 #[tokio::test]
2318 async fn test_streaming_hash_left_anti_join() -> StreamExecutorResult<()> {
2319 let chunk_l1 = StreamChunk::from_pretty(
2320 " I I
2321 + 1 4
2322 + 2 5
2323 + 3 6",
2324 );
2325 let chunk_l2 = StreamChunk::from_pretty(
2326 " I I
2327 + 3 8
2328 - 3 8",
2329 );
2330 let chunk_r1 = StreamChunk::from_pretty(
2331 " I I
2332 + 2 7
2333 + 4 8
2334 + 6 9",
2335 );
2336 let chunk_r2 = StreamChunk::from_pretty(
2337 " I I
2338 + 3 10
2339 + 6 11
2340 + 1 2
2341 + 1 3",
2342 );
2343 let chunk_l3 = StreamChunk::from_pretty(
2344 " I I
2345 + 9 10",
2346 );
2347 let chunk_r3 = StreamChunk::from_pretty(
2348 " I I
2349 - 1 2",
2350 );
2351 let chunk_r4 = StreamChunk::from_pretty(
2352 " I I
2353 - 1 3",
2354 );
2355 let (mut tx_l, mut tx_r, mut hash_join) =
2356 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2357
2358 tx_l.push_barrier(test_epoch(1), false);
2360 tx_r.push_barrier(test_epoch(1), false);
2361 hash_join.next_unwrap_ready_barrier()?;
2362
2363 tx_l.push_chunk(chunk_l1);
2365 let chunk = hash_join.next_unwrap_ready_chunk()?;
2366 assert_eq!(
2367 chunk,
2368 StreamChunk::from_pretty(
2369 " I I
2370 + 1 4
2371 + 2 5
2372 + 3 6",
2373 )
2374 );
2375
2376 tx_l.push_barrier(test_epoch(2), false);
2378 tx_r.push_barrier(test_epoch(2), false);
2379 hash_join.next_unwrap_ready_barrier()?;
2380
2381 tx_l.push_chunk(chunk_l2);
2383 let chunk = hash_join.next_unwrap_ready_chunk()?;
2384 assert_eq!(
2385 chunk,
2386 StreamChunk::from_pretty(
2387 " I I
2388 + 3 8
2389 - 3 8",
2390 )
2391 );
2392
2393 tx_r.push_chunk(chunk_r1);
2395 let chunk = hash_join.next_unwrap_ready_chunk()?;
2396 assert_eq!(
2397 chunk,
2398 StreamChunk::from_pretty(
2399 " I I
2400 - 2 5"
2401 )
2402 );
2403
2404 tx_r.push_chunk(chunk_r2);
2406 let chunk = hash_join.next_unwrap_ready_chunk()?;
2407 assert_eq!(
2408 chunk,
2409 StreamChunk::from_pretty(
2410 " I I
2411 - 3 6
2412 - 1 4"
2413 )
2414 );
2415
2416 tx_l.push_chunk(chunk_l3);
2418 let chunk = hash_join.next_unwrap_ready_chunk()?;
2419 assert_eq!(
2420 chunk,
2421 StreamChunk::from_pretty(
2422 " I I
2423 + 9 10"
2424 )
2425 );
2426
2427 tx_r.push_chunk(chunk_r3);
2430 hash_join.next_unwrap_pending();
2431
2432 tx_r.push_chunk(chunk_r4);
2435 let chunk = hash_join.next_unwrap_ready_chunk()?;
2436 assert_eq!(
2437 chunk,
2438 StreamChunk::from_pretty(
2439 " I I
2440 + 1 4"
2441 )
2442 );
2443
2444 Ok(())
2445 }
2446
2447 #[tokio::test]
2448 async fn test_streaming_hash_right_anti_join() -> StreamExecutorResult<()> {
2449 let chunk_r1 = StreamChunk::from_pretty(
2450 " I I
2451 + 1 4
2452 + 2 5
2453 + 3 6",
2454 );
2455 let chunk_r2 = StreamChunk::from_pretty(
2456 " I I
2457 + 3 8
2458 - 3 8",
2459 );
2460 let chunk_l1 = StreamChunk::from_pretty(
2461 " I I
2462 + 2 7
2463 + 4 8
2464 + 6 9",
2465 );
2466 let chunk_l2 = StreamChunk::from_pretty(
2467 " I I
2468 + 3 10
2469 + 6 11
2470 + 1 2
2471 + 1 3",
2472 );
2473 let chunk_r3 = StreamChunk::from_pretty(
2474 " I I
2475 + 9 10",
2476 );
2477 let chunk_l3 = StreamChunk::from_pretty(
2478 " I I
2479 - 1 2",
2480 );
2481 let chunk_l4 = StreamChunk::from_pretty(
2482 " I I
2483 - 1 3",
2484 );
2485 let (mut tx_r, mut tx_l, mut hash_join) =
2486 create_classical_executor::<{ JoinType::LeftAnti }>(false, false, None).await;
2487
2488 tx_r.push_barrier(test_epoch(1), false);
2490 tx_l.push_barrier(test_epoch(1), false);
2491 hash_join.next_unwrap_ready_barrier()?;
2492
2493 tx_r.push_chunk(chunk_r1);
2495 let chunk = hash_join.next_unwrap_ready_chunk()?;
2496 assert_eq!(
2497 chunk,
2498 StreamChunk::from_pretty(
2499 " I I
2500 + 1 4
2501 + 2 5
2502 + 3 6",
2503 )
2504 );
2505
2506 tx_r.push_barrier(test_epoch(2), false);
2508 tx_l.push_barrier(test_epoch(2), false);
2509 hash_join.next_unwrap_ready_barrier()?;
2510
2511 tx_r.push_chunk(chunk_r2);
2513 let chunk = hash_join.next_unwrap_ready_chunk()?;
2514 assert_eq!(
2515 chunk,
2516 StreamChunk::from_pretty(
2517 " I I
2518 + 3 8
2519 - 3 8",
2520 )
2521 );
2522
2523 tx_l.push_chunk(chunk_l1);
2525 let chunk = hash_join.next_unwrap_ready_chunk()?;
2526 assert_eq!(
2527 chunk,
2528 StreamChunk::from_pretty(
2529 " I I
2530 - 2 5"
2531 )
2532 );
2533
2534 tx_l.push_chunk(chunk_l2);
2536 let chunk = hash_join.next_unwrap_ready_chunk()?;
2537 assert_eq!(
2538 chunk,
2539 StreamChunk::from_pretty(
2540 " I I
2541 - 3 6
2542 - 1 4"
2543 )
2544 );
2545
2546 tx_r.push_chunk(chunk_r3);
2548 let chunk = hash_join.next_unwrap_ready_chunk()?;
2549 assert_eq!(
2550 chunk,
2551 StreamChunk::from_pretty(
2552 " I I
2553 + 9 10"
2554 )
2555 );
2556
2557 tx_l.push_chunk(chunk_l3);
2560 hash_join.next_unwrap_pending();
2561
2562 tx_l.push_chunk(chunk_l4);
2565 let chunk = hash_join.next_unwrap_ready_chunk()?;
2566 assert_eq!(
2567 chunk,
2568 StreamChunk::from_pretty(
2569 " I I
2570 + 1 4"
2571 )
2572 );
2573
2574 Ok(())
2575 }
2576
2577 #[tokio::test]
2578 async fn test_streaming_hash_inner_join_with_barrier() -> StreamExecutorResult<()> {
2579 let chunk_l1 = StreamChunk::from_pretty(
2580 " I I
2581 + 1 4
2582 + 2 5
2583 + 3 6",
2584 );
2585 let chunk_l2 = StreamChunk::from_pretty(
2586 " I I
2587 + 6 8
2588 + 3 8",
2589 );
2590 let chunk_r1 = StreamChunk::from_pretty(
2591 " I I
2592 + 2 7
2593 + 4 8
2594 + 6 9",
2595 );
2596 let chunk_r2 = StreamChunk::from_pretty(
2597 " I I
2598 + 3 10
2599 + 6 11",
2600 );
2601 let (mut tx_l, mut tx_r, mut hash_join) =
2602 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2603
2604 tx_l.push_barrier(test_epoch(1), false);
2606 tx_r.push_barrier(test_epoch(1), false);
2607 hash_join.next_unwrap_ready_barrier()?;
2608
2609 tx_l.push_chunk(chunk_l1);
2611 hash_join.next_unwrap_pending();
2612
2613 tx_l.push_barrier(test_epoch(2), false);
2615
2616 tx_l.push_chunk(chunk_l2);
2618
2619 tx_r.push_chunk(chunk_r1);
2621
2622 let chunk = hash_join.next_unwrap_ready_chunk()?;
2624 assert_eq!(
2625 chunk,
2626 StreamChunk::from_pretty(
2627 " I I I I
2628 + 2 5 2 7"
2629 )
2630 );
2631
2632 tx_r.push_barrier(test_epoch(2), false);
2634
2635 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2637 assert!(matches!(
2638 hash_join.next_unwrap_ready_barrier()?,
2639 Barrier {
2640 epoch,
2641 mutation: None,
2642 ..
2643 } if epoch == expected_epoch
2644 ));
2645
2646 let chunk = hash_join.next_unwrap_ready_chunk()?;
2648 assert_eq!(
2649 chunk,
2650 StreamChunk::from_pretty(
2651 " I I I I
2652 + 6 8 6 9"
2653 )
2654 );
2655
2656 tx_r.push_chunk(chunk_r2);
2658 let chunk = hash_join.next_unwrap_ready_chunk()?;
2659 assert_eq!(
2660 chunk,
2661 StreamChunk::from_pretty(
2662 " I I I I
2663 + 3 6 3 10
2664 + 3 8 3 10
2665 + 6 8 6 11"
2666 )
2667 );
2668
2669 Ok(())
2670 }
2671
2672 #[tokio::test]
2673 async fn test_streaming_hash_inner_join_with_null_and_barrier() -> StreamExecutorResult<()> {
2674 let chunk_l1 = StreamChunk::from_pretty(
2675 " I I
2676 + 1 4
2677 + 2 .
2678 + 3 .",
2679 );
2680 let chunk_l2 = StreamChunk::from_pretty(
2681 " I I
2682 + 6 .
2683 + 3 8",
2684 );
2685 let chunk_r1 = StreamChunk::from_pretty(
2686 " I I
2687 + 2 7
2688 + 4 8
2689 + 6 9",
2690 );
2691 let chunk_r2 = StreamChunk::from_pretty(
2692 " I I
2693 + 3 10
2694 + 6 11",
2695 );
2696 let (mut tx_l, mut tx_r, mut hash_join) =
2697 create_classical_executor::<{ JoinType::Inner }>(false, false, None).await;
2698
2699 tx_l.push_barrier(test_epoch(1), false);
2701 tx_r.push_barrier(test_epoch(1), false);
2702 hash_join.next_unwrap_ready_barrier()?;
2703
2704 tx_l.push_chunk(chunk_l1);
2706 hash_join.next_unwrap_pending();
2707
2708 tx_l.push_barrier(test_epoch(2), false);
2710
2711 tx_l.push_chunk(chunk_l2);
2713
2714 tx_r.push_chunk(chunk_r1);
2716
2717 let chunk = hash_join.next_unwrap_ready_chunk()?;
2719 assert_eq!(
2720 chunk,
2721 StreamChunk::from_pretty(
2722 " I I I I
2723 + 2 . 2 7"
2724 )
2725 );
2726
2727 tx_r.push_barrier(test_epoch(2), false);
2729
2730 let expected_epoch = EpochPair::new_test_epoch(test_epoch(2));
2732 assert!(matches!(
2733 hash_join.next_unwrap_ready_barrier()?,
2734 Barrier {
2735 epoch,
2736 mutation: None,
2737 ..
2738 } if epoch == expected_epoch
2739 ));
2740
2741 let chunk = hash_join.next_unwrap_ready_chunk()?;
2743 assert_eq!(
2744 chunk,
2745 StreamChunk::from_pretty(
2746 " I I I I
2747 + 6 . 6 9"
2748 )
2749 );
2750
2751 tx_r.push_chunk(chunk_r2);
2753 let chunk = hash_join.next_unwrap_ready_chunk()?;
2754 assert_eq!(
2755 chunk,
2756 StreamChunk::from_pretty(
2757 " I I I I
2758 + 3 8 3 10
2759 + 3 . 3 10
2760 + 6 . 6 11"
2761 )
2762 );
2763
2764 Ok(())
2765 }
2766
2767 #[tokio::test]
2768 async fn test_streaming_hash_left_join() -> StreamExecutorResult<()> {
2769 let chunk_l1 = StreamChunk::from_pretty(
2770 " I I
2771 + 1 4
2772 + 2 5
2773 + 3 6",
2774 );
2775 let chunk_l2 = StreamChunk::from_pretty(
2776 " I I
2777 + 3 8
2778 - 3 8",
2779 );
2780 let chunk_r1 = StreamChunk::from_pretty(
2781 " I I
2782 + 2 7
2783 + 4 8
2784 + 6 9",
2785 );
2786 let chunk_r2 = StreamChunk::from_pretty(
2787 " I I
2788 + 3 10
2789 + 6 11",
2790 );
2791 let (mut tx_l, mut tx_r, mut hash_join) =
2792 create_classical_executor::<{ JoinType::LeftOuter }>(false, false, None).await;
2793
2794 tx_l.push_barrier(test_epoch(1), false);
2796 tx_r.push_barrier(test_epoch(1), false);
2797 hash_join.next_unwrap_ready_barrier()?;
2798
2799 tx_l.push_chunk(chunk_l1);
2801 let chunk = hash_join.next_unwrap_ready_chunk()?;
2802 assert_eq!(
2803 chunk,
2804 StreamChunk::from_pretty(
2805 " I I I I
2806 + 1 4 . .
2807 + 2 5 . .
2808 + 3 6 . ."
2809 )
2810 );
2811
2812 tx_l.push_chunk(chunk_l2);
2814 let chunk = hash_join.next_unwrap_ready_chunk()?;
2815 assert_eq!(
2816 chunk,
2817 StreamChunk::from_pretty(
2818 " I I I I
2819 + 3 8 . .
2820 - 3 8 . ."
2821 )
2822 );
2823
2824 tx_r.push_chunk(chunk_r1);
2826 let chunk = hash_join.next_unwrap_ready_chunk()?;
2827 assert_eq!(
2828 chunk,
2829 StreamChunk::from_pretty(
2830 " I I I I
2831 U- 2 5 . .
2832 U+ 2 5 2 7"
2833 )
2834 );
2835
2836 tx_r.push_chunk(chunk_r2);
2838 let chunk = hash_join.next_unwrap_ready_chunk()?;
2839 assert_eq!(
2840 chunk,
2841 StreamChunk::from_pretty(
2842 " I I I I
2843 U- 3 6 . .
2844 U+ 3 6 3 10"
2845 )
2846 );
2847
2848 Ok(())
2849 }
2850
2851 #[tokio::test]
2852 async fn test_streaming_null_safe_hash_left_join() -> StreamExecutorResult<()> {
2853 let chunk_l1 = StreamChunk::from_pretty(
2854 " I I
2855 + 1 4
2856 + 2 5
2857 + . 6",
2858 );
2859 let chunk_l2 = StreamChunk::from_pretty(
2860 " I I
2861 + . 8
2862 - . 8",
2863 );
2864 let chunk_r1 = StreamChunk::from_pretty(
2865 " I I
2866 + 2 7
2867 + 4 8
2868 + 6 9",
2869 );
2870 let chunk_r2 = StreamChunk::from_pretty(
2871 " I I
2872 + . 10
2873 + 6 11",
2874 );
2875 let (mut tx_l, mut tx_r, mut hash_join) =
2876 create_classical_executor::<{ JoinType::LeftOuter }>(false, true, None).await;
2877
2878 tx_l.push_barrier(test_epoch(1), false);
2880 tx_r.push_barrier(test_epoch(1), false);
2881 hash_join.next_unwrap_ready_barrier()?;
2882
2883 tx_l.push_chunk(chunk_l1);
2885 let chunk = hash_join.next_unwrap_ready_chunk()?;
2886 assert_eq!(
2887 chunk,
2888 StreamChunk::from_pretty(
2889 " I I I I
2890 + 1 4 . .
2891 + 2 5 . .
2892 + . 6 . ."
2893 )
2894 );
2895
2896 tx_l.push_chunk(chunk_l2);
2898 let chunk = hash_join.next_unwrap_ready_chunk()?;
2899 assert_eq!(
2900 chunk,
2901 StreamChunk::from_pretty(
2902 " I I I I
2903 + . 8 . .
2904 - . 8 . ."
2905 )
2906 );
2907
2908 tx_r.push_chunk(chunk_r1);
2910 let chunk = hash_join.next_unwrap_ready_chunk()?;
2911 assert_eq!(
2912 chunk,
2913 StreamChunk::from_pretty(
2914 " I I I I
2915 U- 2 5 . .
2916 U+ 2 5 2 7"
2917 )
2918 );
2919
2920 tx_r.push_chunk(chunk_r2);
2922 let chunk = hash_join.next_unwrap_ready_chunk()?;
2923 assert_eq!(
2924 chunk,
2925 StreamChunk::from_pretty(
2926 " I I I I
2927 U- . 6 . .
2928 U+ . 6 . 10"
2929 )
2930 );
2931
2932 Ok(())
2933 }
2934
2935 #[tokio::test]
2936 async fn test_streaming_hash_right_join() -> StreamExecutorResult<()> {
2937 let chunk_l1 = StreamChunk::from_pretty(
2938 " I I
2939 + 1 4
2940 + 2 5
2941 + 3 6",
2942 );
2943 let chunk_l2 = StreamChunk::from_pretty(
2944 " I I
2945 + 3 8
2946 - 3 8",
2947 );
2948 let chunk_r1 = StreamChunk::from_pretty(
2949 " I I
2950 + 2 7
2951 + 4 8
2952 + 6 9",
2953 );
2954 let chunk_r2 = StreamChunk::from_pretty(
2955 " I I
2956 + 5 10
2957 - 5 10",
2958 );
2959 let (mut tx_l, mut tx_r, mut hash_join) =
2960 create_classical_executor::<{ JoinType::RightOuter }>(false, false, None).await;
2961
2962 tx_l.push_barrier(test_epoch(1), false);
2964 tx_r.push_barrier(test_epoch(1), false);
2965 hash_join.next_unwrap_ready_barrier()?;
2966
2967 tx_l.push_chunk(chunk_l1);
2969 hash_join.next_unwrap_pending();
2970
2971 tx_l.push_chunk(chunk_l2);
2973 hash_join.next_unwrap_pending();
2974
2975 tx_r.push_chunk(chunk_r1);
2977 let chunk = hash_join.next_unwrap_ready_chunk()?;
2978 assert_eq!(
2979 chunk,
2980 StreamChunk::from_pretty(
2981 " I I I I
2982 + 2 5 2 7
2983 + . . 4 8
2984 + . . 6 9"
2985 )
2986 );
2987
2988 tx_r.push_chunk(chunk_r2);
2990 let chunk = hash_join.next_unwrap_ready_chunk()?;
2991 assert_eq!(
2992 chunk,
2993 StreamChunk::from_pretty(
2994 " I I I I
2995 + . . 5 10
2996 - . . 5 10"
2997 )
2998 );
2999
3000 Ok(())
3001 }
3002
3003 #[tokio::test]
3004 async fn test_streaming_hash_left_join_append_only() -> StreamExecutorResult<()> {
3005 let chunk_l1 = StreamChunk::from_pretty(
3006 " I I I
3007 + 1 4 1
3008 + 2 5 2
3009 + 3 6 3",
3010 );
3011 let chunk_l2 = StreamChunk::from_pretty(
3012 " I I I
3013 + 4 9 4
3014 + 5 10 5",
3015 );
3016 let chunk_r1 = StreamChunk::from_pretty(
3017 " I I I
3018 + 2 5 1
3019 + 4 9 2
3020 + 6 9 3",
3021 );
3022 let chunk_r2 = StreamChunk::from_pretty(
3023 " I I I
3024 + 1 4 4
3025 + 3 6 5",
3026 );
3027
3028 let (mut tx_l, mut tx_r, mut hash_join) =
3029 create_append_only_executor::<{ JoinType::LeftOuter }>(false).await;
3030
3031 tx_l.push_barrier(test_epoch(1), false);
3033 tx_r.push_barrier(test_epoch(1), false);
3034 hash_join.next_unwrap_ready_barrier()?;
3035
3036 tx_l.push_chunk(chunk_l1);
3038 let chunk = hash_join.next_unwrap_ready_chunk()?;
3039 assert_eq!(
3040 chunk,
3041 StreamChunk::from_pretty(
3042 " I I I I I I
3043 + 1 4 1 . . .
3044 + 2 5 2 . . .
3045 + 3 6 3 . . ."
3046 )
3047 );
3048
3049 tx_l.push_chunk(chunk_l2);
3051 let chunk = hash_join.next_unwrap_ready_chunk()?;
3052 assert_eq!(
3053 chunk,
3054 StreamChunk::from_pretty(
3055 " I I I I I I
3056 + 4 9 4 . . .
3057 + 5 10 5 . . ."
3058 )
3059 );
3060
3061 tx_r.push_chunk(chunk_r1);
3063 let chunk = hash_join.next_unwrap_ready_chunk()?;
3064 assert_eq!(
3065 chunk,
3066 StreamChunk::from_pretty(
3067 " I I I I I I
3068 U- 2 5 2 . . .
3069 U+ 2 5 2 2 5 1
3070 U- 4 9 4 . . .
3071 U+ 4 9 4 4 9 2"
3072 )
3073 );
3074
3075 tx_r.push_chunk(chunk_r2);
3077 let chunk = hash_join.next_unwrap_ready_chunk()?;
3078 assert_eq!(
3079 chunk,
3080 StreamChunk::from_pretty(
3081 " I I I I I I
3082 U- 1 4 1 . . .
3083 U+ 1 4 1 1 4 4
3084 U- 3 6 3 . . .
3085 U+ 3 6 3 3 6 5"
3086 )
3087 );
3088
3089 Ok(())
3090 }
3091
3092 #[tokio::test]
3093 async fn test_streaming_hash_right_join_append_only() -> StreamExecutorResult<()> {
3094 let chunk_l1 = StreamChunk::from_pretty(
3095 " I I I
3096 + 1 4 1
3097 + 2 5 2
3098 + 3 6 3",
3099 );
3100 let chunk_l2 = StreamChunk::from_pretty(
3101 " I I I
3102 + 4 9 4
3103 + 5 10 5",
3104 );
3105 let chunk_r1 = StreamChunk::from_pretty(
3106 " I I I
3107 + 2 5 1
3108 + 4 9 2
3109 + 6 9 3",
3110 );
3111 let chunk_r2 = StreamChunk::from_pretty(
3112 " I I I
3113 + 1 4 4
3114 + 3 6 5
3115 + 7 7 6",
3116 );
3117
3118 let (mut tx_l, mut tx_r, mut hash_join) =
3119 create_append_only_executor::<{ JoinType::RightOuter }>(false).await;
3120
3121 tx_l.push_barrier(test_epoch(1), false);
3123 tx_r.push_barrier(test_epoch(1), false);
3124 hash_join.next_unwrap_ready_barrier()?;
3125
3126 tx_l.push_chunk(chunk_l1);
3128 hash_join.next_unwrap_pending();
3129
3130 tx_l.push_chunk(chunk_l2);
3132 hash_join.next_unwrap_pending();
3133
3134 tx_r.push_chunk(chunk_r1);
3136 let chunk = hash_join.next_unwrap_ready_chunk()?;
3137 assert_eq!(
3138 chunk,
3139 StreamChunk::from_pretty(
3140 " I I I I I I
3141 + 2 5 2 2 5 1
3142 + 4 9 4 4 9 2
3143 + . . . 6 9 3"
3144 )
3145 );
3146
3147 tx_r.push_chunk(chunk_r2);
3149 let chunk = hash_join.next_unwrap_ready_chunk()?;
3150 assert_eq!(
3151 chunk,
3152 StreamChunk::from_pretty(
3153 " I I I I I I
3154 + 1 4 1 1 4 4
3155 + 3 6 3 3 6 5
3156 + . . . 7 7 6"
3157 )
3158 );
3159
3160 Ok(())
3161 }
3162
3163 #[tokio::test]
3164 async fn test_streaming_hash_full_outer_join() -> StreamExecutorResult<()> {
3165 let chunk_l1 = StreamChunk::from_pretty(
3166 " I I
3167 + 1 4
3168 + 2 5
3169 + 3 6",
3170 );
3171 let chunk_l2 = StreamChunk::from_pretty(
3172 " I I
3173 + 3 8
3174 - 3 8",
3175 );
3176 let chunk_r1 = StreamChunk::from_pretty(
3177 " I I
3178 + 2 7
3179 + 4 8
3180 + 6 9",
3181 );
3182 let chunk_r2 = StreamChunk::from_pretty(
3183 " I I
3184 + 5 10
3185 - 5 10",
3186 );
3187 let (mut tx_l, mut tx_r, mut hash_join) =
3188 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3189
3190 tx_l.push_barrier(test_epoch(1), false);
3192 tx_r.push_barrier(test_epoch(1), false);
3193 hash_join.next_unwrap_ready_barrier()?;
3194
3195 tx_l.push_chunk(chunk_l1);
3197 let chunk = hash_join.next_unwrap_ready_chunk()?;
3198 assert_eq!(
3199 chunk,
3200 StreamChunk::from_pretty(
3201 " I I I I
3202 + 1 4 . .
3203 + 2 5 . .
3204 + 3 6 . ."
3205 )
3206 );
3207
3208 tx_l.push_chunk(chunk_l2);
3210 let chunk = hash_join.next_unwrap_ready_chunk()?;
3211 assert_eq!(
3212 chunk,
3213 StreamChunk::from_pretty(
3214 " I I I I
3215 + 3 8 . .
3216 - 3 8 . ."
3217 )
3218 );
3219
3220 tx_r.push_chunk(chunk_r1);
3222 let chunk = hash_join.next_unwrap_ready_chunk()?;
3223 assert_eq!(
3224 chunk,
3225 StreamChunk::from_pretty(
3226 " I I I I
3227 U- 2 5 . .
3228 U+ 2 5 2 7
3229 + . . 4 8
3230 + . . 6 9"
3231 )
3232 );
3233
3234 tx_r.push_chunk(chunk_r2);
3236 let chunk = hash_join.next_unwrap_ready_chunk()?;
3237 assert_eq!(
3238 chunk,
3239 StreamChunk::from_pretty(
3240 " I I I I
3241 + . . 5 10
3242 - . . 5 10"
3243 )
3244 );
3245
3246 Ok(())
3247 }
3248
3249 #[tokio::test]
3250 async fn test_streaming_hash_full_outer_join_update() -> StreamExecutorResult<()> {
3251 let (mut tx_l, mut tx_r, mut hash_join) =
3252 create_classical_executor::<{ JoinType::FullOuter }>(false, false, None).await;
3253
3254 tx_l.push_barrier(test_epoch(1), false);
3256 tx_r.push_barrier(test_epoch(1), false);
3257 hash_join.next_unwrap_ready_barrier()?;
3258
3259 tx_l.push_chunk(StreamChunk::from_pretty(
3260 " I I
3261 + 1 1
3262 ",
3263 ));
3264 let chunk = hash_join.next_unwrap_ready_chunk()?;
3265 assert_eq!(
3266 chunk,
3267 StreamChunk::from_pretty(
3268 " I I I I
3269 + 1 1 . ."
3270 )
3271 );
3272
3273 tx_r.push_chunk(StreamChunk::from_pretty(
3274 " I I
3275 + 1 1
3276 ",
3277 ));
3278 let chunk = hash_join.next_unwrap_ready_chunk()?;
3279
3280 assert_eq!(
3281 chunk,
3282 StreamChunk::from_pretty(
3283 " I I I I
3284 U- 1 1 . .
3285 U+ 1 1 1 1"
3286 )
3287 );
3288
3289 tx_l.push_chunk(StreamChunk::from_pretty(
3290 " I I
3291 - 1 1
3292 + 1 2
3293 ",
3294 ));
3295 let chunk = hash_join.next_unwrap_ready_chunk()?;
3296 let chunk = chunk.compact();
3297 assert_eq!(
3298 chunk,
3299 StreamChunk::from_pretty(
3300 " I I I I
3301 - 1 1 1 1
3302 + 1 2 1 1
3303 "
3304 )
3305 );
3306
3307 Ok(())
3308 }
3309
3310 #[tokio::test]
3311 async fn test_streaming_hash_full_outer_join_with_nonequi_condition() -> StreamExecutorResult<()>
3312 {
3313 let chunk_l1 = StreamChunk::from_pretty(
3314 " I I
3315 + 1 4
3316 + 2 5
3317 + 3 6
3318 + 3 7",
3319 );
3320 let chunk_l2 = StreamChunk::from_pretty(
3321 " I I
3322 + 3 8
3323 - 3 8
3324 - 1 4", );
3326 let chunk_r1 = StreamChunk::from_pretty(
3327 " I I
3328 + 2 6
3329 + 4 8
3330 + 3 4",
3331 );
3332 let chunk_r2 = StreamChunk::from_pretty(
3333 " I I
3334 + 5 10
3335 - 5 10
3336 + 1 2",
3337 );
3338 let (mut tx_l, mut tx_r, mut hash_join) =
3339 create_classical_executor::<{ JoinType::FullOuter }>(true, false, None).await;
3340
3341 tx_l.push_barrier(test_epoch(1), false);
3343 tx_r.push_barrier(test_epoch(1), false);
3344 hash_join.next_unwrap_ready_barrier()?;
3345
3346 tx_l.push_chunk(chunk_l1);
3348 let chunk = hash_join.next_unwrap_ready_chunk()?;
3349 assert_eq!(
3350 chunk,
3351 StreamChunk::from_pretty(
3352 " I I I I
3353 + 1 4 . .
3354 + 2 5 . .
3355 + 3 6 . .
3356 + 3 7 . ."
3357 )
3358 );
3359
3360 tx_l.push_chunk(chunk_l2);
3362 let chunk = hash_join.next_unwrap_ready_chunk()?;
3363 assert_eq!(
3364 chunk,
3365 StreamChunk::from_pretty(
3366 " I I I I
3367 + 3 8 . .
3368 - 3 8 . .
3369 - 1 4 . ."
3370 )
3371 );
3372
3373 tx_r.push_chunk(chunk_r1);
3375 let chunk = hash_join.next_unwrap_ready_chunk()?;
3376 assert_eq!(
3377 chunk,
3378 StreamChunk::from_pretty(
3379 " I I I I
3380 U- 2 5 . .
3381 U+ 2 5 2 6
3382 + . . 4 8
3383 + . . 3 4" )
3387 );
3388
3389 tx_r.push_chunk(chunk_r2);
3391 let chunk = hash_join.next_unwrap_ready_chunk()?;
3392 assert_eq!(
3393 chunk,
3394 StreamChunk::from_pretty(
3395 " I I I I
3396 + . . 5 10
3397 - . . 5 10
3398 + . . 1 2" )
3401 );
3402
3403 Ok(())
3404 }
3405
3406 #[tokio::test]
3407 async fn test_streaming_hash_inner_join_with_nonequi_condition() -> StreamExecutorResult<()> {
3408 let chunk_l1 = StreamChunk::from_pretty(
3409 " I I
3410 + 1 4
3411 + 2 10
3412 + 3 6",
3413 );
3414 let chunk_l2 = StreamChunk::from_pretty(
3415 " I I
3416 + 3 8
3417 - 3 8",
3418 );
3419 let chunk_r1 = StreamChunk::from_pretty(
3420 " I I
3421 + 2 7
3422 + 4 8
3423 + 6 9",
3424 );
3425 let chunk_r2 = StreamChunk::from_pretty(
3426 " I I
3427 + 3 10
3428 + 6 11",
3429 );
3430 let (mut tx_l, mut tx_r, mut hash_join) =
3431 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3432
3433 tx_l.push_barrier(test_epoch(1), false);
3435 tx_r.push_barrier(test_epoch(1), false);
3436 hash_join.next_unwrap_ready_barrier()?;
3437
3438 tx_l.push_chunk(chunk_l1);
3440 hash_join.next_unwrap_pending();
3441
3442 tx_l.push_chunk(chunk_l2);
3444 hash_join.next_unwrap_pending();
3445
3446 tx_r.push_chunk(chunk_r1);
3448 hash_join.next_unwrap_pending();
3449
3450 tx_r.push_chunk(chunk_r2);
3452 let chunk = hash_join.next_unwrap_ready_chunk()?;
3453 assert_eq!(
3454 chunk,
3455 StreamChunk::from_pretty(
3456 " I I I I
3457 + 3 6 3 10"
3458 )
3459 );
3460
3461 Ok(())
3462 }
3463
3464 #[tokio::test]
3465 async fn test_streaming_hash_join_watermark() -> StreamExecutorResult<()> {
3466 let (mut tx_l, mut tx_r, mut hash_join) =
3467 create_classical_executor::<{ JoinType::Inner }>(true, false, None).await;
3468
3469 tx_l.push_barrier(test_epoch(1), false);
3471 tx_r.push_barrier(test_epoch(1), false);
3472 hash_join.next_unwrap_ready_barrier()?;
3473
3474 tx_l.push_int64_watermark(0, 100);
3475
3476 tx_l.push_int64_watermark(0, 200);
3477
3478 tx_l.push_barrier(test_epoch(2), false);
3479 tx_r.push_barrier(test_epoch(2), false);
3480 hash_join.next_unwrap_ready_barrier()?;
3481
3482 tx_r.push_int64_watermark(0, 50);
3483
3484 let w1 = hash_join.next().await.unwrap().unwrap();
3485 let w1 = w1.as_watermark().unwrap();
3486
3487 let w2 = hash_join.next().await.unwrap().unwrap();
3488 let w2 = w2.as_watermark().unwrap();
3489
3490 tx_r.push_int64_watermark(0, 100);
3491
3492 let w3 = hash_join.next().await.unwrap().unwrap();
3493 let w3 = w3.as_watermark().unwrap();
3494
3495 let w4 = hash_join.next().await.unwrap().unwrap();
3496 let w4 = w4.as_watermark().unwrap();
3497
3498 assert_eq!(
3499 w1,
3500 &Watermark {
3501 col_idx: 2,
3502 data_type: DataType::Int64,
3503 val: ScalarImpl::Int64(50)
3504 }
3505 );
3506
3507 assert_eq!(
3508 w2,
3509 &Watermark {
3510 col_idx: 0,
3511 data_type: DataType::Int64,
3512 val: ScalarImpl::Int64(50)
3513 }
3514 );
3515
3516 assert_eq!(
3517 w3,
3518 &Watermark {
3519 col_idx: 2,
3520 data_type: DataType::Int64,
3521 val: ScalarImpl::Int64(100)
3522 }
3523 );
3524
3525 assert_eq!(
3526 w4,
3527 &Watermark {
3528 col_idx: 0,
3529 data_type: DataType::Int64,
3530 val: ScalarImpl::Int64(100)
3531 }
3532 );
3533
3534 Ok(())
3535 }
3536}